diff --git a/theodolite-benchmarks/docker-test/uc4-hazelcastjet-docker-compose/test.sh b/theodolite-benchmarks/docker-test/uc4-hazelcastjet-docker-compose/test.sh index 731310b017a4c66f08ca2196e8429ccb7238e1c9..d9bb6ccf241935c39df63ea5e2f0fce02476e976 100755 --- a/theodolite-benchmarks/docker-test/uc4-hazelcastjet-docker-compose/test.sh +++ b/theodolite-benchmarks/docker-test/uc4-hazelcastjet-docker-compose/test.sh @@ -2,10 +2,10 @@ until docker-compose exec -T kcat kcat -L -b kafka:9092 -t output -J | jq -r '.topics[0].partitions | length' | grep "\b3\b"; do sleep 5s; done -docker-compose exec -T kcat kcat -C -b kafka:9092 -t output -s key=s -s value=avro -r http://schema-registry:8081 -f '%k:%s\n' -c 500 | +docker-compose exec -T kcat kcat -C -b kafka:9092 -t output -s key=s -s value=avro -r http://schema-registry:8081 -f '%k:%s\n' -c 600 | tee /dev/stderr | awk -F ':' '!/^%/ {print $1}' | sort | uniq | - wc -l | + wc -l| grep "\b21\b" \ No newline at end of file diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/HistoryService.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/HistoryService.java index 00f3eb748eb95a29cfa3967cc645cd0521ce8916..c15ef7dce261a243fa3a2d68d34097eadf827a23 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/HistoryService.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/HistoryService.java @@ -56,7 +56,7 @@ public class HistoryService { private void createHazelcastJetApplication() throws Exception { // NOPMD new Uc4HazelcastJetFactory() .setReadPropertiesFromEnv(KAFKA_BSERVER_DEFAULT, SCHEMA_REGISTRY_URL_DEFAULT) - .setWritePropertiesFromEnv(KAFKA_BSERVER_DEFAULT) + .setWritePropertiesFromEnv(KAFKA_BSERVER_DEFAULT, SCHEMA_REGISTRY_URL_DEFAULT) .setKafkaInputTopicFromEnv(KAFKA_INPUT_TOPIC_DEFAULT) .setKafkaOutputTopicFromEnv(KAFKA_OUTPUT_TOPIC_DEFAULT) .setKafkaConfigurationTopicFromEnv(KAFKA_CONFIG_TOPIC_DEFAULT) diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4HazelcastJetFactory.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4HazelcastJetFactory.java index 5b0e07f2449f8b8294b532dce6cde343f64e736d..69466250f008b0e8eedffc64a6fc1dce1c670cc5 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4HazelcastJetFactory.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4HazelcastJetFactory.java @@ -218,11 +218,11 @@ public class Uc4HazelcastJetFactory { * @return The Uc4HazelcastJetBuilder factory with set kafkaWritePropertiesForPipeline. */ public Uc4HazelcastJetFactory setWritePropertiesFromEnv(// NOPMD - final String bootstrapServersDefault) { + final String bootstrapServersDefault, final String schemaRegistryUrlDefault) { // Use KafkaPropertiesBuilder to build a properties object used for kafka final Uc4KafkaPropertiesBuilder propsBuilder = new Uc4KafkaPropertiesBuilder(); final Properties kafkaWriteProps = - propsBuilder.buildKafkaWritePropsFromEnv(bootstrapServersDefault); + propsBuilder.buildKafkaWritePropsFromEnv(bootstrapServersDefault, schemaRegistryUrlDefault); this.kafkaWritePropsForPipeline = kafkaWriteProps; return this; } diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4KafkaPropertiesBuilder.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4KafkaPropertiesBuilder.java index be77ab774e6e94065148adf460b100cdfebf3169..9f533387a65159d6bcede884a727c08d887be01e 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4KafkaPropertiesBuilder.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4KafkaPropertiesBuilder.java @@ -1,12 +1,11 @@ package theodolite.uc4.application; import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import io.confluent.kafka.serializers.KafkaAvroSerializer; import java.util.Objects; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.DoubleDeserializer; -import org.apache.kafka.common.serialization.DoubleSerializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import theodolite.commons.hazelcastjet.ConfigurationKeys; @@ -74,9 +73,10 @@ public class Uc4KafkaPropertiesBuilder { props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - DoubleDeserializer.class.getCanonicalName()); + KafkaAvroDeserializer.class.getCanonicalName()); props.put(SPECIFIC_AVRO_READER_CONFIG, true); props.setProperty(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + return props; } @@ -117,18 +117,25 @@ public class Uc4KafkaPropertiesBuilder { * @return A Kafka Properties Object containing the values needed for a Hazelcast Jet UC4 * Pipeline. */ - public Properties buildKafkaWritePropsFromEnv(final String kafkaBootstrapServerDefault) { + public Properties buildKafkaWritePropsFromEnv(final String kafkaBootstrapServerDefault, + final String schemaRegistryUrlDefault) { final String kafkaBootstrapServers = Objects.requireNonNullElse( System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS), kafkaBootstrapServerDefault); + final String schemaRegistryUrl = Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL), + schemaRegistryUrlDefault); final Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); // NOCS props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - DoubleSerializer.class.getCanonicalName()); + KafkaAvroSerializer.class.getCanonicalName()); + props.put("specific.avro.writer", true); + props.put(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + return props; } diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4PipelineBuilder.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4PipelineBuilder.java index 7f3a87e15f3543843f059ff258457794d259e457..05ec199d1cf5e1c965063ed137d6e93428586189 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4PipelineBuilder.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4PipelineBuilder.java @@ -22,6 +22,8 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Properties; import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import theodolite.uc4.application.uc4specifics.ChildParentsTransformer; import theodolite.uc4.application.uc4specifics.SensorGroupKey; import theodolite.uc4.application.uc4specifics.ValueGroup; @@ -33,8 +35,10 @@ import titan.ccp.model.sensorregistry.SensorRegistry; * Builder to build a HazelcastJet Pipeline for UC4 which can be used for stream processing using * Hazelcast Jet. */ +@SuppressWarnings("PMD.ExcessiveImports") public class Uc4PipelineBuilder { + private static final Logger LOGGER = LoggerFactory.getLogger(Uc4PipelineBuilder.class); private static final String SENSOR_PARENT_MAP_NAME = "SensorParentMap"; /** @@ -66,6 +70,14 @@ public class Uc4PipelineBuilder { final String kafkaFeedbackTopic, final int windowSize) { + if (LOGGER.isInfoEnabled()) { + LOGGER.info("kafkaConfigProps: " + kafkaConfigPropsForPipeline); + LOGGER.info("kafkaFeedbackProps: " + kafkaFeedbackPropsForPipeline); + LOGGER.info("kafkaWriteProps: " + kafkaWritePropsForPipeline); + } + + + // The pipeline for this Use Case final Pipeline uc4Pipeline = Pipeline.create(); @@ -75,19 +87,20 @@ public class Uc4PipelineBuilder { final StreamSource<Entry<String, ActivePowerRecord>> inputSource = KafkaSources.<String, ActivePowerRecord>kafka( kafkaInputReadPropsForPipeline, kafkaInputTopic); - final StreamSource<Entry<String, Double>> aggregationSource = - KafkaSources.<String, Double>kafka(kafkaFeedbackPropsForPipeline, kafkaFeedbackTopic); + final StreamSource<Entry<String, ActivePowerRecord>> aggregationSource = + KafkaSources.<String, ActivePowerRecord> + kafka(kafkaFeedbackPropsForPipeline, kafkaFeedbackTopic); // Extend UC4 topology to pipeline - final StreamStage<Entry<String, Double>> uc4Product = + final StreamStage<Entry<String, ActivePowerRecord>> uc4Product = this.extendUc4Topology(uc4Pipeline, inputSource, aggregationSource, configSource, windowSize); // Add Sink1: Write back to kafka output topic - uc4Product.writeTo(KafkaSinks.<String, Double>kafka( + uc4Product.writeTo(KafkaSinks.kafka( kafkaWritePropsForPipeline, kafkaOutputTopic)); // Add Sink2: Write back to kafka feedback/aggregation topic - uc4Product.writeTo(KafkaSinks.<String, Double>kafka( + uc4Product.writeTo(KafkaSinks.kafka( kafkaWritePropsForPipeline, kafkaFeedbackTopic)); // Add Sink3: Logger uc4Product.writeTo(Sinks.logger()); @@ -125,9 +138,10 @@ public class Uc4PipelineBuilder { * according aggregated values. The data can be further modified or directly be linked to * a Hazelcast Jet sink. */ - public StreamStage<Entry<String, Double>> extendUc4Topology(final Pipeline pipe, // NOPMD + public StreamStage<Entry<String, ActivePowerRecord>> extendUc4Topology(// NOPMD + final Pipeline pipe, final StreamSource<Entry<String, ActivePowerRecord>> inputSource, - final StreamSource<Entry<String, Double>> aggregationSource, + final StreamSource<Entry<String, ActivePowerRecord>> aggregationSource, final StreamSource<Entry<Event, String>> configurationSource, final int windowSize) { ////////////////////////////////// @@ -145,26 +159,20 @@ public class Uc4PipelineBuilder { ////////////////////////////////// // (1) Sensor Input Stream - final StreamStage<Entry<String, Double>> inputStream = pipe + final StreamStage<Entry<String, ActivePowerRecord>> inputStream = pipe .readFrom(inputSource) - .withNativeTimestamps(0) - .map(stream -> { - // Build data for next pipeline stage - final String sensorId = stream.getValue().getIdentifier(); - final Double valueInW = stream.getValue().getValueInW(); - // Return data for next pipeline stage - return Util.entry(sensorId, valueInW); - }); + .withNativeTimestamps(0); ////////////////////////////////// // (1) Aggregation Stream - final StreamStage<Entry<String, Double>> aggregations = pipe + final StreamStage<Entry<String, ActivePowerRecord>> aggregations = pipe .readFrom(aggregationSource) .withNativeTimestamps(0); ////////////////////////////////// // (2) UC4 Merge Input with aggregation stream - final StreamStageWithKey<Entry<String, Double>, String> mergedInputAndAggregations = inputStream + final StreamStageWithKey<Entry<String, ActivePowerRecord>, String> + mergedInputAndAggregations = inputStream .merge(aggregations) .groupingKey(Entry::getKey); @@ -194,21 +202,21 @@ public class Uc4PipelineBuilder { ////////////////////////////////// // (4) UC4 Duplicate as flatmap joined Stream // [(sensorKey, Group) , value] - final StreamStage<Entry<SensorGroupKey, Double>> dupliAsFlatmappedStage = joinedStage + final StreamStage<Entry<SensorGroupKey, ActivePowerRecord>> dupliAsFlatmappedStage = joinedStage .flatMap(entry -> { // Supplied data final String keyGroupId = entry.getKey(); - final Double valueInW = entry.getValue().getValueInW(); + final ActivePowerRecord record = entry.getValue().getRecord(); final Set<String> groups = entry.getValue().getGroups(); // Transformed Data final String[] groupList = groups.toArray(String[]::new); final SensorGroupKey[] newKeyList = new SensorGroupKey[groupList.length]; - final List<Entry<SensorGroupKey, Double>> newEntryList = new ArrayList<>(); + final List<Entry<SensorGroupKey, ActivePowerRecord>> newEntryList = new ArrayList<>(); for (int i = 0; i < groupList.length; i++) { newKeyList[i] = new SensorGroupKey(keyGroupId, groupList[i]); - newEntryList.add(Util.entry(newKeyList[i], valueInW)); + newEntryList.add(Util.entry(newKeyList[i], record)); } // Return traversable list of new entry elements @@ -218,7 +226,8 @@ public class Uc4PipelineBuilder { ////////////////////////////////// // (5) UC4 Last Value Map // Table with tumbling window differentiation [ (sensorKey,Group) , value ],Time - final StageWithWindow<Entry<SensorGroupKey, Double>> windowedLastValues = dupliAsFlatmappedStage + final StageWithWindow<Entry<SensorGroupKey, ActivePowerRecord>> + windowedLastValues = dupliAsFlatmappedStage .window(WindowDefinition.tumbling(windowSize)); ////////////////////////////////// @@ -226,15 +235,16 @@ public class Uc4PipelineBuilder { // Group using the group out of the sensorGroupKey keys return windowedLastValues .groupingKey(entry -> entry.getKey().getGroup()) - .aggregate(AggregateOperations.summingDouble(Entry::getValue)) + .aggregate(AggregateOperations.summingDouble(entry -> entry.getValue().getValueInW())) .map(agg -> { - // Construct data for return pair final String theGroup = agg.getKey(); final Double summedValueInW = agg.getValue(); // Return aggregates group value pair - return Util.entry(theGroup, summedValueInW); + return Util.entry( + theGroup, + new ActivePowerRecord(theGroup, System.currentTimeMillis(), summedValueInW)); }); } diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/ValueGroup.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/ValueGroup.java index 76430e41e69d23b5b3c4fe588b4c9574e9553015..1af5d0d001984eca47b69aaa901f5c8e72633c53 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/ValueGroup.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/ValueGroup.java @@ -2,22 +2,27 @@ package theodolite.uc4.application.uc4specifics; import java.util.Objects; import java.util.Set; +import titan.ccp.model.records.ActivePowerRecord; /** * Structure: (valueInW, Set(Groups)). */ public class ValueGroup { - private final Double valueInW; + private final ActivePowerRecord record; private final Set<String> groups; - public ValueGroup(final Double valueInW, final Set<String> groups) { - this.valueInW = valueInW; + public ValueGroup(final ActivePowerRecord record, final Set<String> groups) { + this.record = record; this.groups = groups; } + public ActivePowerRecord getRecord() { + return this.record; + } + public Double getValueInW() { - return this.valueInW; + return this.record.getValueInW(); } public Set<String> getGroups() { @@ -30,12 +35,12 @@ public class ValueGroup { for (final String group : this.groups) { groupString = groupString + group + "/";// NOPMD } - return this.valueInW.toString() + ";" + groupString + "]"; + return this.record.getValueInW() + ";" + groupString + "]"; } @Override public int hashCode() { - return Objects.hash(this.valueInW, this.groups); + return Objects.hash(this.record, this.groups); } @Override @@ -45,7 +50,7 @@ public class ValueGroup { } if (obj instanceof ValueGroup) { final ValueGroup other = (ValueGroup) obj; - return Objects.equals(this.valueInW, other.valueInW) + return Objects.equals(this.record.getValueInW(), other.getValueInW()) && this.groups.containsAll(other.groups); } return false; diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/ValueGroupSerializer.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/ValueGroupSerializer.java index 72d421a6c3031c5e7cac8838bc54751da8db3e34..f32dffece8b80e010956054a9aa562546751bf39 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/ValueGroupSerializer.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/ValueGroupSerializer.java @@ -19,13 +19,13 @@ public class ValueGroupSerializer implements StreamSerializer<ValueGroup> { @Override public void write(final ObjectDataOutput out, final ValueGroup key) throws IOException { - out.writeDouble(key.getValueInW()); + out.writeObject(key); out.writeString(String.join(",", key.getGroups())); } @Override public ValueGroup read(final ObjectDataInput in) throws IOException { - return new ValueGroup(in.readDouble(), + return new ValueGroup(in.readObject(ValueGroup.class), new HashSet<>(Arrays.asList(in.readString().split(",")))); } diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/theodolite/uc4/application/Uc4PipelineTest.java b/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/theodolite/uc4/application/Uc4PipelineTest.java index 3adf122e60700600e775bd1276921ead6f618c60..263b3c1a57f591e9945e01fc78dbccecf42784ea 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/theodolite/uc4/application/Uc4PipelineTest.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/theodolite/uc4/application/Uc4PipelineTest.java @@ -12,10 +12,7 @@ import com.hazelcast.jet.pipeline.test.AssertionCompletedException; import com.hazelcast.jet.pipeline.test.Assertions; import com.hazelcast.jet.pipeline.test.TestSources; import com.hazelcast.jet.test.SerialTest; -import java.time.Instant; -import java.time.LocalDateTime; import java.util.Map; -import java.util.TimeZone; import java.util.Map.Entry; import java.util.concurrent.CompletionException; import org.junit.After; @@ -34,7 +31,6 @@ import titan.ccp.model.sensorregistry.ImmutableSensorRegistry; import titan.ccp.model.sensorregistry.MachineSensor; import titan.ccp.model.sensorregistry.MutableAggregatedSensor; import titan.ccp.model.sensorregistry.MutableSensorRegistry; -import titan.ccp.model.sensorregistry.SensorRegistry; @Category(SerialTest.class) public class Uc4PipelineTest extends JetTestSupport { @@ -42,7 +38,7 @@ public class Uc4PipelineTest extends JetTestSupport { // TEst Machinery JetInstance testInstance = null; Pipeline testPipeline = null; - StreamStage<Entry<String, Double>> uc4Topology = null; + StreamStage<Entry<String, ActivePowerRecord>> uc4Topology = null; @Before public void buildUc4Pipeline() { @@ -72,10 +68,10 @@ public class Uc4PipelineTest extends JetTestSupport { }); // Create test source 2 : Mock aggregation Values - final StreamSource<Entry<String, Double>> testAggregationSource = + final StreamSource<Entry<String, ActivePowerRecord>> testAggregationSource = TestSources.itemStream(testItemsPerSecond, (timestamp, item) -> { - final Double testAggValue = testValueInW; - final Entry<String, Double> testEntry = + final ActivePowerRecord testAggValue = new ActivePowerRecord(testSensorName,System.currentTimeMillis(),testValueInW); + final Entry<String, ActivePowerRecord> testEntry = Map.entry(testLevel1GroupName, testAggValue); return testEntry; });