Skip to content
Snippets Groups Projects
Commit ab067c35 authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Adapt uc1 hzj to use DatabaseWriter

parent aff43058
No related branches found
No related tags found
1 merge request!208Add benchmark implementations for Hazelcast Jet
Pipeline #7210 passed
......@@ -20,7 +20,10 @@ dependencies {
implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
implementation 'com.google.guava:guava:24.1-jre'
implementation 'org.slf4j:slf4j-api:1.7.25'
implementation 'org.slf4j:slf4j-api:1.7.30'
implementation 'org.slf4j:slf4j-jdk14:1.7.30'
implementation 'io.confluent:kafka-avro-serializer:5.3.0'
implementation 'com.hazelcast.jet:hazelcast-jet:4.5'
......
......@@ -4,6 +4,8 @@ plugins {
dependencies {
implementation project(':uc1-commons')
implementation 'org.slf4j:slf4j-jdk14:1.7.30'
}
mainClassName = "rocks.theodolite.benchmarks.uc1.hazelcastjet.HistoryService"
package rocks.theodolite.benchmarks.uc1.hazelcastjet;
import static com.hazelcast.jet.pipeline.SinkBuilder.sinkBuilder;
import com.hazelcast.jet.kafka.KafkaSources;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.Sink;
import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.jet.pipeline.StreamStage;
import java.util.Map.Entry;
import java.util.Properties;
import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
import rocks.theodolite.benchmarks.uc1.commons.DatabaseWriter;
import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory;
import titan.ccp.model.records.ActivePowerRecord;
......@@ -39,7 +42,15 @@ public class Uc1PipelineBuilder {
final StreamStage<String> uc1TopologyProduct = this.extendUc1Topology(pipe, kafkaSource);
// Add Sink: Logger
uc1TopologyProduct.writeTo(Sinks.logger());
// Do not refactor this to just use the call
// (There is a problem with static calls in functions in hazelcastjet)
final DatabaseWriter<String> writer = this.databaseAdapter.getDatabaseWriter();
final Sink<String> sink = sinkBuilder(
"Sink into database", x -> writer)
.<String>receiveFn(DatabaseWriter::write)
.build();
uc1TopologyProduct.writeTo(sink);
return pipe;
}
......@@ -64,12 +75,8 @@ public class Uc1PipelineBuilder {
return pipe.readFrom(source)
.withNativeTimestamps(0)
.setLocalParallelism(1)
.setName("Log content")
.setName("Convert content")
.map(Entry::getValue)
.map(this.databaseAdapter.getRecordConverter()::convert);
}
}
......@@ -5,6 +5,7 @@ import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.config.JetConfig;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sink;
import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.jet.pipeline.StreamStage;
import com.hazelcast.jet.pipeline.test.AssertionCompletedException;
......@@ -14,14 +15,22 @@ import com.hazelcast.jet.test.SerialTest;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CompletionException;
import com.hazelcast.logging.ILogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
import rocks.theodolite.benchmarks.uc1.commons.DatabaseWriter;
import rocks.theodolite.benchmarks.uc1.commons.logger.LogWriterFactory;
import rocks.theodolite.benchmarks.uc1.hazelcastjet.Uc1PipelineBuilder;
import titan.ccp.model.records.ActivePowerRecord;
import static com.hazelcast.jet.pipeline.SinkBuilder.sinkBuilder;
import static com.hazelcast.logging.Logger.getLogger;
/**
* Test methods for the Hazelcast Jet Implementation of UC1.
*/
......@@ -32,13 +41,24 @@ public class Uc1PipelineTest extends JetTestSupport {
private Pipeline testPipeline = null;
private StreamStage<String> uc1Topology = null;
// Standart Logger
private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(Uc1PipelineTest.class);
// HazelcastJet Logger
private static final ILogger logger = getLogger(Uc1PipelineTest.class);
private final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson();
/**
* Creates the JetInstance, defines a new Hazelcast Jet Pipeline and extends the UC2 topology.
* Creates the JetInstance, defines a new Hazelcast Jet Pipeline and extends the UC1 topology.
* Allows for quick extension of tests.
*/
@Before
public void buildUc1Pipeline() {
this.logger.info("Hazelcast Logger");
LOGGER.info("Standard Logger");
// Setup Configuration
final int testItemsPerSecond = 1;
final String testSensorName = "TEST_SENSOR";
......@@ -47,8 +67,10 @@ public class Uc1PipelineTest extends JetTestSupport {
// Create mock jet instance with configuration
final String testClusterName = randomName();
final JetConfig testJetConfig = new JetConfig();
// testJetConfig.setProperty( "hazelcast.logging.type", "slf4j" );
testJetConfig.getHazelcastConfig().setClusterName(testClusterName);
this.testInstance = this.createJetMember(testJetConfig);
this.testInstance = createJetMember(testJetConfig);
// Create a test source
final StreamSource<Entry<String, ActivePowerRecord>> testSource =
......@@ -66,6 +88,22 @@ public class Uc1PipelineTest extends JetTestSupport {
this.uc1Topology =
pipelineBuilder.extendUc1Topology(this.testPipeline, testSource);
// Create DatabaseWriter sink
final DatabaseWriter<String> adapter = this.databaseAdapter.getDatabaseWriter();
final Sink<String> sink = sinkBuilder(
"database-sink", x -> adapter)
.<String>receiveFn(DatabaseWriter::write)
.build();
// Map Stage, can be used instead of sink
// StreamStage<String> log = uc1Topology.map(s -> {
// LOGGER.info(s);
// return s;
// });
// log.writeTo(sink);
//apply sink
this.uc1Topology.writeTo(sink);
}
/**
......@@ -78,10 +116,18 @@ public class Uc1PipelineTest extends JetTestSupport {
final int assertTimeoutSeconds = 6;
final int assertCollectedItems = 5;
LOGGER.info("Pipeline build successfully, starting test");
// Assertion
this.uc1Topology.apply(Assertions.assertCollectedEventually(assertTimeoutSeconds,
collection -> Assert.assertTrue("Not enough data arrived in the end",
collection.size() >= assertCollectedItems)));
collection -> {
//print the newest Record
// LOGGER.info(collection.get(collection.size()-1));
// Run pipeline until 5th item
Assert.assertTrue("Not enough data arrived in the end",
collection.size() >= assertCollectedItems);
}));
// Test the UC1 Pipeline Recreation
try {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment