From 75a3a3d3e5f59d3308bb165f53b2cb2bcc00adb3 Mon Sep 17 00:00:00 2001 From: lorenz <stu203404@mail.uni-kiel.de> Date: Wed, 25 May 2022 16:58:00 +0200 Subject: [PATCH] Add NewHistoryServices --- .../uc2/hazelcastjet/NewHistoryService.java | 64 +++++++++++++ .../uc3/hazelcastjet/NewHistoryService.java | 64 +++++++++++++ .../uc4/hazelcastjet/NewHistoryService.java | 89 +++++++++++++++++++ 3 files changed, 217 insertions(+) create mode 100644 theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/NewHistoryService.java create mode 100644 theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/NewHistoryService.java create mode 100644 theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/NewHistoryService.java diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/NewHistoryService.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/NewHistoryService.java new file mode 100644 index 000000000..572329c5d --- /dev/null +++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/NewHistoryService.java @@ -0,0 +1,64 @@ +package rocks.theodolite.benchmarks.uc2.hazelcastjet; + +import com.google.common.math.StatsAccumulator; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import java.util.Properties; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys; +import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService; +import rocks.theodolite.benchmarks.uc2.hazelcastjet.uc2specifics.StatsAccumulatorSerializer; + + +/** + * A microservice that aggregate incoming messages in a tumbling window. + */ +public class NewHistoryService extends HazelcastJetService { + + private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class); + + + /** + * Constructs the use case logic for UC2. + * Retrieves the needed values and instantiates a pipeline factory. + */ + public NewHistoryService() { + super(LOGGER); + final Properties kafkaProps = + this.propsBuilder.buildReadProperties( + StringDeserializer.class.getCanonicalName(), + KafkaAvroDeserializer.class.getCanonicalName()); + + final Properties kafkaWriteProps = + this.propsBuilder.buildWriteProperties( + StringSerializer.class.getCanonicalName(), + StringSerializer.class.getCanonicalName()); + + final String kafkaOutputTopic = + config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString(); + + // Transform minutes to milliseconds + final int downsampleInterval = Integer.parseInt( + config.getProperty(ConfigurationKeys.DOWNSAMPLE_INTERVAL).toString()); + final int downsampleIntervalMs = downsampleInterval * 60_000; + + this.pipelineFactory = new Uc2PipelineFactory( + kafkaProps, + this.kafkaInputTopic, + kafkaWriteProps, + kafkaOutputTopic, + downsampleIntervalMs); + } + + @Override + protected void registerSerializer() { + this.jobConfig.registerSerializer(StatsAccumulator.class, StatsAccumulatorSerializer.class); + } + + + public static void main(final String[] args) { + new NewHistoryService().run(); + } +} diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/NewHistoryService.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/NewHistoryService.java new file mode 100644 index 000000000..0da94bafe --- /dev/null +++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/NewHistoryService.java @@ -0,0 +1,64 @@ +package rocks.theodolite.benchmarks.uc3.hazelcastjet; + +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import java.util.Properties; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys; +import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService; +import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKey; +import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKeySerializer; + +/** + * A microservice that aggregate incoming messages in a sliding window. + */ +public class NewHistoryService extends HazelcastJetService { + + private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class); + + /** + * Constructs the use case logic for UC3. + * Retrieves the needed values and instantiates a pipeline factory. + */ + public NewHistoryService() { + super(LOGGER); + final Properties kafkaProps = + this.propsBuilder.buildReadProperties( + StringDeserializer.class.getCanonicalName(), + KafkaAvroDeserializer.class.getCanonicalName()); + + final Properties kafkaWriteProps = + this.propsBuilder.buildWriteProperties( + StringSerializer.class.getCanonicalName(), + StringSerializer.class.getCanonicalName()); + + final String kafkaOutputTopic = + config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString(); + + final int windowSizeInSecondsNumber = Integer.parseInt( + config.getProperty(ConfigurationKeys.AGGREGATION_DURATION_DAYS).toString()); + + final int hoppingSizeInSecondsNumber = Integer.parseInt( + config.getProperty(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS).toString()); + + this.pipelineFactory = new Uc3PipelineFactory( + kafkaProps, + kafkaInputTopic, + kafkaWriteProps, + kafkaOutputTopic, + windowSizeInSecondsNumber, + hoppingSizeInSecondsNumber); + } + + @Override + protected void registerSerializer() { + this.jobConfig.registerSerializer(HourOfDayKey.class, HourOfDayKeySerializer.class); + } + + + public static void main(final String[] args) { + new NewHistoryService().run(); + } +} diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/NewHistoryService.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/NewHistoryService.java new file mode 100644 index 000000000..345f8be8b --- /dev/null +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/NewHistoryService.java @@ -0,0 +1,89 @@ +package rocks.theodolite.benchmarks.uc4.hazelcastjet; + +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import java.util.Properties; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys; +import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService; +import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.EventDeserializer; +import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ImmutableSensorRegistryUc4Serializer; +import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.SensorGroupKey; +import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.SensorGroupKeySerializer; +import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ValueGroup; +import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ValueGroupSerializer; +import titan.ccp.model.sensorregistry.ImmutableSensorRegistry; + + +/** + * A microservice that manages the history and, therefore, stores and aggregates incoming + * measurements. + */ +public class NewHistoryService extends HazelcastJetService { + + private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class); + + /** + * Constructs the use case logic for UC4. + * Retrieves the needed values and instantiates a pipeline factory. + */ + public NewHistoryService() { + super(LOGGER); + final Properties kafkaProps = + this.propsBuilder.buildReadProperties( + StringDeserializer.class.getCanonicalName(), + KafkaAvroDeserializer.class.getCanonicalName()); + + final Properties kafkaConfigReadProps = + propsBuilder.buildReadProperties( + EventDeserializer.class.getCanonicalName(), + StringDeserializer.class.getCanonicalName()); + + final Properties kafkaAggregationReadProps = + propsBuilder.buildReadProperties( + StringDeserializer.class.getCanonicalName(), + KafkaAvroDeserializer.class.getCanonicalName()); + + final Properties kafkaWriteProps = + this.propsBuilder.buildWriteProperties( + StringSerializer.class.getCanonicalName(), + KafkaAvroSerializer.class.getCanonicalName()); + + final String kafkaOutputTopic = + config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString(); + + final String kafkaConfigurationTopic = + config.getProperty(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC).toString(); + + final String kafkaFeedbackTopic = + config.getProperty(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC).toString(); + + final int windowSize = Integer.parseInt( + config.getProperty(ConfigurationKeys.WINDOW_SIZE_UC4).toString()); + + this.pipelineFactory = new Uc4PipelineFactory( + kafkaProps, + kafkaConfigReadProps, + kafkaAggregationReadProps, + kafkaWriteProps, + kafkaInputTopic, kafkaOutputTopic, kafkaConfigurationTopic, kafkaFeedbackTopic, + windowSize); + } + + + @Override + protected void registerSerializer() { + this.jobConfig.registerSerializer(ValueGroup.class, ValueGroupSerializer.class) + .registerSerializer(SensorGroupKey.class, SensorGroupKeySerializer.class) + .registerSerializer(ImmutableSensorRegistry.class, + ImmutableSensorRegistryUc4Serializer.class); + } + + + public static void main(final String[] args) { + new NewHistoryService().run(); + } +} -- GitLab