diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java index 419c25fec3eeffbd9eabef4897c44b7c6e773cee..e805849e98e65a0d9c4db9920835ba03cfe5715f 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java @@ -1,74 +1,89 @@ package rocks.theodolite.benchmarks.uc4.hazelcastjet; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import java.util.Properties; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys; +import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService; +import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.EventDeserializer; +import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ImmutableSensorRegistryUc4Serializer; +import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.SensorGroupKey; +import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.SensorGroupKeySerializer; +import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ValueGroup; +import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ValueGroupSerializer; +import titan.ccp.model.sensorregistry.ImmutableSensorRegistry; + /** * A microservice that manages the history and, therefore, stores and aggregates incoming * measurements. */ -public class HistoryService { +public class HistoryService extends HazelcastJetService { private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class); - // Hazelcast settings (default) - private static final String HZ_KUBERNETES_SERVICE_DNS_KEY = "service-dns"; - private static final String BOOTSTRAP_SERVER_DEFAULT = "localhost:5701"; + /** + * Constructs the use case logic for UC4. + * Retrieves the needed values and instantiates a pipeline factory. + */ + public HistoryService() { + super(LOGGER); + final Properties kafkaProps = + this.propsBuilder.buildReadProperties( + StringDeserializer.class.getCanonicalName(), + KafkaAvroDeserializer.class.getCanonicalName()); - // Kafka settings (default) - private static final String KAFKA_BOOTSTRAP_DEFAULT = "localhost:9092"; - private static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081"; - private static final String KAFKA_INPUT_TOPIC_DEFAULT = "input"; - private static final String KAFKA_CONFIG_TOPIC_DEFAULT = "configuration"; - private static final String KAFKA_FEEDBACK_TOPIC_DEFAULT = "aggregation-feedback"; - private static final String KAFKA_OUTPUT_TOPIC_DEFAULT = "output"; + final Properties kafkaConfigReadProps = + propsBuilder.buildReadProperties( + EventDeserializer.class.getCanonicalName(), + StringDeserializer.class.getCanonicalName()); - // UC4 specific (default) - private static final String WINDOW_SIZE_DEFAULT_MS = "5000"; + final Properties kafkaAggregationReadProps = + propsBuilder.buildReadProperties( + StringDeserializer.class.getCanonicalName(), + KafkaAvroDeserializer.class.getCanonicalName()); - // Job name (default) - private static final String JOB_NAME = "uc4-hazelcastjet"; + final Properties kafkaWriteProps = + this.propsBuilder.buildWriteProperties( + StringSerializer.class.getCanonicalName(), + KafkaAvroSerializer.class.getCanonicalName()); - /** - * Entrypoint for UC4 using Gradle Run. - */ - public static void main(final String[] args) { - final HistoryService uc4HistoryService = new HistoryService(); - try { - uc4HistoryService.run(); - } catch (final Exception e) { // NOPMD - LOGGER.error("ABORT MISSION!: {}", e); - } - } + final String kafkaOutputTopic = + config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString(); - /** - * Start a UC4 service. - * - * @throws Exception This Exception occurs if the Uc4HazelcastJetFactory is used in the wrong way. - * Detailed data is provided once an Exception occurs. - */ - public void run() throws Exception { // NOPMD - this.createHazelcastJetApplication(); + final String kafkaConfigurationTopic = + config.getProperty(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC).toString(); + + final String kafkaFeedbackTopic = + config.getProperty(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC).toString(); + + final int windowSize = Integer.parseInt( + config.getProperty(ConfigurationKeys.WINDOW_SIZE_UC4).toString()); + + this.pipelineFactory = new Uc4PipelineFactory( + kafkaProps, + kafkaConfigReadProps, + kafkaAggregationReadProps, + kafkaWriteProps, + kafkaInputTopic, kafkaOutputTopic, kafkaConfigurationTopic, kafkaFeedbackTopic, + windowSize); } - /** - * Creates a Hazelcast Jet Application for UC4 using the Uc1HazelcastJetFactory. - * - * @throws Exception This Exception occurs if the Uc4HazelcastJetFactory is used in the wrong way. - * Detailed data is provided once an Exception occurs. - */ - private void createHazelcastJetApplication() throws Exception { // NOPMD - new Uc4HazelcastJetFactory() - .setReadPropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT, SCHEMA_REGISTRY_URL_DEFAULT,JOB_NAME) - .setWritePropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT, SCHEMA_REGISTRY_URL_DEFAULT) - .setKafkaInputTopicFromEnv(KAFKA_INPUT_TOPIC_DEFAULT) - .setKafkaOutputTopicFromEnv(KAFKA_OUTPUT_TOPIC_DEFAULT) - .setKafkaConfigurationTopicFromEnv(KAFKA_CONFIG_TOPIC_DEFAULT) - .setKafkaFeedbackTopicFromEnv(KAFKA_FEEDBACK_TOPIC_DEFAULT) - .setWindowSizeFromEnv(WINDOW_SIZE_DEFAULT_MS) - .buildUc4JetInstanceFromEnv(LOGGER, BOOTSTRAP_SERVER_DEFAULT, HZ_KUBERNETES_SERVICE_DNS_KEY) - .buildUc4Pipeline() - .runUc4Job(JOB_NAME); + + @Override + protected void registerSerializer() { + this.jobConfig.registerSerializer(ValueGroup.class, ValueGroupSerializer.class) + .registerSerializer(SensorGroupKey.class, SensorGroupKeySerializer.class) + .registerSerializer(ImmutableSensorRegistry.class, + ImmutableSensorRegistryUc4Serializer.class); } + + public static void main(final String[] args) { + new HistoryService().run(); + } } diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/NewHistoryService.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/NewHistoryService.java deleted file mode 100644 index 345f8be8bdbacd38fdaabf8f9c6868d4ffc47225..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/NewHistoryService.java +++ /dev/null @@ -1,89 +0,0 @@ -package rocks.theodolite.benchmarks.uc4.hazelcastjet; - -import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import io.confluent.kafka.serializers.KafkaAvroSerializer; -import java.util.Properties; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys; -import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService; -import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.EventDeserializer; -import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ImmutableSensorRegistryUc4Serializer; -import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.SensorGroupKey; -import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.SensorGroupKeySerializer; -import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ValueGroup; -import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ValueGroupSerializer; -import titan.ccp.model.sensorregistry.ImmutableSensorRegistry; - - -/** - * A microservice that manages the history and, therefore, stores and aggregates incoming - * measurements. - */ -public class NewHistoryService extends HazelcastJetService { - - private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class); - - /** - * Constructs the use case logic for UC4. - * Retrieves the needed values and instantiates a pipeline factory. - */ - public NewHistoryService() { - super(LOGGER); - final Properties kafkaProps = - this.propsBuilder.buildReadProperties( - StringDeserializer.class.getCanonicalName(), - KafkaAvroDeserializer.class.getCanonicalName()); - - final Properties kafkaConfigReadProps = - propsBuilder.buildReadProperties( - EventDeserializer.class.getCanonicalName(), - StringDeserializer.class.getCanonicalName()); - - final Properties kafkaAggregationReadProps = - propsBuilder.buildReadProperties( - StringDeserializer.class.getCanonicalName(), - KafkaAvroDeserializer.class.getCanonicalName()); - - final Properties kafkaWriteProps = - this.propsBuilder.buildWriteProperties( - StringSerializer.class.getCanonicalName(), - KafkaAvroSerializer.class.getCanonicalName()); - - final String kafkaOutputTopic = - config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString(); - - final String kafkaConfigurationTopic = - config.getProperty(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC).toString(); - - final String kafkaFeedbackTopic = - config.getProperty(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC).toString(); - - final int windowSize = Integer.parseInt( - config.getProperty(ConfigurationKeys.WINDOW_SIZE_UC4).toString()); - - this.pipelineFactory = new Uc4PipelineFactory( - kafkaProps, - kafkaConfigReadProps, - kafkaAggregationReadProps, - kafkaWriteProps, - kafkaInputTopic, kafkaOutputTopic, kafkaConfigurationTopic, kafkaFeedbackTopic, - windowSize); - } - - - @Override - protected void registerSerializer() { - this.jobConfig.registerSerializer(ValueGroup.class, ValueGroupSerializer.class) - .registerSerializer(SensorGroupKey.class, SensorGroupKeySerializer.class) - .registerSerializer(ImmutableSensorRegistry.class, - ImmutableSensorRegistryUc4Serializer.class); - } - - - public static void main(final String[] args) { - new NewHistoryService().run(); - } -} diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4HazelcastJetFactory.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4HazelcastJetFactory.java deleted file mode 100644 index 9b6aa71267150296d8b65268b1922925b7ada796..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4HazelcastJetFactory.java +++ /dev/null @@ -1,389 +0,0 @@ -package rocks.theodolite.benchmarks.uc4.hazelcastjet; - -import com.hazelcast.jet.JetInstance; -import com.hazelcast.jet.config.JobConfig; -import com.hazelcast.jet.pipeline.Pipeline; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import io.confluent.kafka.serializers.KafkaAvroSerializer; -import java.util.Objects; -import java.util.Properties; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.slf4j.Logger; -import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys; -import rocks.theodolite.benchmarks.commons.hazelcastjet.JetInstanceBuilder; -import rocks.theodolite.benchmarks.commons.hazelcastjet.KafkaPropertiesBuilder; -import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.EventDeserializer; -import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ImmutableSensorRegistryUc4Serializer; -import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.SensorGroupKey; -import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.SensorGroupKeySerializer; -import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ValueGroup; -import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ValueGroupSerializer; -import titan.ccp.model.sensorregistry.ImmutableSensorRegistry; - -/** - * A Hazelcast Jet factory which can build a Hazelcast Jet Instance and Pipeline for the UC4 - * benchmark and lets you start the Hazelcast Jet job. The JetInstance can be built directly as the - * Hazelcast Config is managed internally. In order to build the Pipeline, you first have to build - * the Read and Write Properties and set the input, output, and configuration topic. This can be - * done using internal functions of this factory. Outside data only refers to custom values or - * default values in case data of the environment cannot the fetched. - */ -public class Uc4HazelcastJetFactory { - - // Information per History Service - private Properties kafkaInputReadPropsForPipeline; - private Properties kafkaConfigPropsForPipeline; - private Properties kafkaFeedbackPropsForPipeline; - private Properties kafkaWritePropsForPipeline; - private String kafkaInputTopic; - private String kafkaOutputTopic; - private JetInstance uc4JetInstance; - private Pipeline uc4JetPipeline; - // UC4 specific - private String kafkaConfigurationTopic; - private String kafkaFeedbackTopic; - private int windowSize; - - ///////////////////////////////////// - // Layer 1 - Hazelcast Jet Run Job // - ///////////////////////////////////// - - /** - * Needs a JetInstance and Pipeline defined in this factors. Adds the pipeline to the existing - * JetInstance as a job. - * - * @param jobName The name of the job. - * @throws Exception If either no JetInstance or Pipeline is set, a job cannot be startet. - */ - public void runUc4Job(final String jobName) throws IllegalStateException { // NOPMD - - // Check if a Jet Instance for UC4 is set. - if (this.uc4JetInstance == null) { - throw new IllegalStateException("Jet Instance is not set! " - + "Cannot start a hazelcast jet job for UC4."); - } - - // Check if a Pipeline for UC3 is set. - if (this.uc4JetPipeline == null) { - throw new IllegalStateException( - "Hazelcast Pipeline is not set! Cannot start a hazelcast jet job for UC4."); - } - - // Adds the job name and joins a job to the JetInstance defined in this factory - final JobConfig jobConfig = new JobConfig() - .registerSerializer(ValueGroup.class, ValueGroupSerializer.class) - .registerSerializer(SensorGroupKey.class, SensorGroupKeySerializer.class) - .registerSerializer(ImmutableSensorRegistry.class, - ImmutableSensorRegistryUc4Serializer.class) - .setName(jobName); - this.uc4JetInstance.newJobIfAbsent(this.uc4JetPipeline, jobConfig).join(); - } - - ///////////// - // Layer 2 // - ///////////// - - /** - * Build a Hazelcast JetInstance used to run a job on. - * - * @param logger The logger specified for this JetInstance. - * @param bootstrapServerDefault Default bootstrap server in case no value can be derived from the - * environment. - * @param hzKubernetesServiceDnsKey The kubernetes service dns key. - * @return A Uc4HazelcastJetFactory containing a set JetInstance. - */ - public Uc4HazelcastJetFactory buildUc4JetInstanceFromEnv(final Logger logger, - final String bootstrapServerDefault, - final String hzKubernetesServiceDnsKey) { - this.uc4JetInstance = new JetInstanceBuilder() - .setConfigFromEnv(logger, bootstrapServerDefault, hzKubernetesServiceDnsKey) - .build(); - return this; - } - - /** - * Builds a Hazelcast Jet pipeline used for a JetInstance to run it as a job on. Needs the input - * topic and kafka properties defined in this factory beforehand. - * - * @return A Uc4HazelcastJetFactory containg a set pipeline. - * @throws Exception If the input topic or the kafka properties are not defined, the pipeline - * cannot be built. - */ - public Uc4HazelcastJetFactory buildUc4Pipeline() throws IllegalStateException { // NOPMD - - final String defaultPipelineWarning = "Cannot build pipeline."; // NOPMD - - // Check if Properties for the Kafka Input are set. - if (this.kafkaInputReadPropsForPipeline == null) { - throw new IllegalStateException("Kafka Input Read Properties for pipeline not set! " - + defaultPipelineWarning); - } - - // Check if Properties for the Kafka Output are set. - if (this.kafkaWritePropsForPipeline == null) { - throw new IllegalStateException("Kafka Write Properties for pipeline not set! " - + defaultPipelineWarning); - } - - // Check if Properties for the Kafka Config Read are set. - if (this.kafkaConfigPropsForPipeline == null) { - throw new IllegalStateException("Kafka Config Read Properties for pipeline not set! " - + defaultPipelineWarning); - } - - // Check if Properties for the Kafka Feedback Read are set. - if (this.kafkaFeedbackPropsForPipeline == null) { - throw new IllegalStateException("Kafka Feedback Read Properties for pipeline not set! " - + defaultPipelineWarning); - } - - // Check if the Kafka input topic is set. - if (this.kafkaInputTopic == null) { - throw new IllegalStateException("Kafka input topic for pipeline not set! " - + defaultPipelineWarning); - } - - // Check if the Kafka output topic is set. - if (this.kafkaOutputTopic == null) { - throw new IllegalStateException("kafka output topic for pipeline not set! " - + defaultPipelineWarning); - } - - // Check if the Kafka config topic is set. - if (this.kafkaConfigurationTopic == null) { - throw new IllegalStateException("configuratin topic for pipeline not set! " - + defaultPipelineWarning); - } - - // Check if the Kafka feedback topic is set. - if (this.kafkaFeedbackTopic == null) { - throw new IllegalStateException("Feedback topic not set! " - + defaultPipelineWarning); - } - - // Check if window size for tumbling window is set. - if (this.windowSize <= 0) { - throw new IllegalStateException("window size for pipeline not set or not greater than 0! " - + defaultPipelineWarning); - } - - // Build Pipeline Using the pipelineBuilder - final Uc4PipelineBuilder pipeBuilder = new Uc4PipelineBuilder(); - this.uc4JetPipeline = - pipeBuilder.build(this.kafkaInputReadPropsForPipeline, - this.kafkaConfigPropsForPipeline, - this.kafkaFeedbackPropsForPipeline, - this.kafkaWritePropsForPipeline, - this.kafkaInputTopic, this.kafkaOutputTopic, - this.kafkaConfigurationTopic, - this.kafkaFeedbackTopic, - this.windowSize); - // Return Uc4HazelcastJetBuilder factory - return this; - } - - ///////////// - // Layer 3 // - ///////////// - - /** - * Sets kafka read properties for pipeline used in this builder using environment variables. - * - * @param bootstrapServersDefault Default Bootstrap server in the case that no bootstrap server - * can be fetched from the environment. - * @param schemaRegistryUrlDefault Default schema registry url in the case that no schema registry - * url can be fetched from the environment. - * @return The Uc4HazelcastJetBuilder factory with set kafkaReadPropertiesForPipeline. - */ - public Uc4HazelcastJetFactory setReadPropertiesFromEnv(// NOPMD - final String bootstrapServersDefault, - final String schemaRegistryUrlDefault, - final String jobName) { - // Use KafkaPropertiesBuilder to build a properties object used for kafka - final KafkaPropertiesBuilder propsBuilder = new KafkaPropertiesBuilder(); - - final Properties kafkaInputReadProps = - propsBuilder.buildKafkaInputReadPropsFromEnv(bootstrapServersDefault, - schemaRegistryUrlDefault, jobName, - StringDeserializer.class.getCanonicalName(), - KafkaAvroDeserializer.class.getCanonicalName()); - - final Properties kafkaConfigReadProps = - propsBuilder.buildKafkaInputReadPropsFromEnv(bootstrapServersDefault, - schemaRegistryUrlDefault, - jobName, - EventDeserializer.class.getCanonicalName(), - StringDeserializer.class.getCanonicalName()); - - final Properties kafkaAggregationReadProps = - propsBuilder.buildKafkaInputReadPropsFromEnv(bootstrapServersDefault, - schemaRegistryUrlDefault, - jobName, - StringDeserializer.class.getCanonicalName(), - KafkaAvroDeserializer.class.getCanonicalName()); - - this.kafkaInputReadPropsForPipeline = kafkaInputReadProps; - this.kafkaConfigPropsForPipeline = kafkaConfigReadProps; - this.kafkaFeedbackPropsForPipeline = kafkaAggregationReadProps; - return this; - } - - /** - * Sets kafka write properties for pipeline used in this builder using environment variables. - * - * @param bootstrapServersDefault Default Bootstrap server in the case that no bootstrap server - * can be fetched from the environment. - * @return The Uc4HazelcastJetBuilder factory with set kafkaWritePropertiesForPipeline. - */ - public Uc4HazelcastJetFactory setWritePropertiesFromEnv(// NOPMD - final String bootstrapServersDefault, final String schemaRegistryUrlDefault) { - // Use KafkaPropertiesBuilder to build a properties object used for kafka - final KafkaPropertiesBuilder propsBuilder = new KafkaPropertiesBuilder(); - final Properties kafkaWriteProps = - propsBuilder.buildKafkaWritePropsFromEnv(bootstrapServersDefault, - schemaRegistryUrlDefault, - StringSerializer.class.getCanonicalName(), - KafkaAvroSerializer.class.getCanonicalName()); - this.kafkaWritePropsForPipeline = kafkaWriteProps; - return this; - } - - /** - * Sets the kafka input topic for the pipeline used in this builder. - * - * @param inputTopic The kafka topic used as the pipeline input. - * @return A Uc4HazelcastJetBuilder factory with a set kafkaInputTopic. - */ - public Uc4HazelcastJetFactory setCustomKafkaInputTopic(// NOPMD - final String inputTopic) { - this.kafkaInputTopic = inputTopic; - return this; - } - - /** - * Sets the kafka input output for the pipeline used in this builder. - * - * @param outputTopic The kafka topic used as the pipeline output. - * @return A Uc4HazelcastJetBuilder factory with a set kafkaOutputTopic. - */ - public Uc4HazelcastJetFactory setCustomKafkaOutputTopic(final String outputTopic) { // NOPMD - this.kafkaOutputTopic = outputTopic; - return this; - } - - - /** - * Sets the kafka input topic for the pipeline used in this builder using environment variables. - * - * @param defaultInputTopic The default kafka input topic used if no topic is specified by the - * environment. - * @return A Uc4HazelcastJetBuilder factory with a set kafkaInputTopic. - */ - public Uc4HazelcastJetFactory setKafkaInputTopicFromEnv(// NOPMD - final String defaultInputTopic) { - this.kafkaInputTopic = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_INPUT_TOPIC), - defaultInputTopic); - return this; - } - - /** - * Sets the kafka output topic for the pipeline used in this builder using environment variables. - * - * @param defaultOutputTopic The default kafka output topic used if no topic is specified by the - * environment. - * @return A Uc4HazelcastJetBuilder factory with a set kafkaOutputTopic. - */ - public Uc4HazelcastJetFactory setKafkaOutputTopicFromEnv(// NOPMD - final String defaultOutputTopic) { - this.kafkaOutputTopic = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_OUTPUT_TOPIC), - defaultOutputTopic); - return this; - } - - /** - * Sets the window size for the pipeline used in this builder. - * - * @param windowSize the window size to be used for this pipeline. - * @return A Uc4HazelcastJetFactory with a set windowSize. - */ - public Uc4HazelcastJetFactory setCustomWindowSize(// NOPMD - final int windowSize) { - this.windowSize = windowSize; - return this; - } - - /** - * Sets the window size for the pipeline used in this builder from the environment. - * - * @param defaultWindowSize the default window size to be used for this pipeline when none is set - * in the environment. - * @return A Uc4HazelcastJetFactory with a set windowSize. - */ - public Uc4HazelcastJetFactory setWindowSizeFromEnv(// NOPMD - final String defaultWindowSize) { - final String windowSize = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.WINDOW_SIZE_UC4), - defaultWindowSize); - final int windowSizeNumber = Integer.parseInt(windowSize); - this.windowSize = windowSizeNumber; - return this; - } - - /** - * Sets the configuration topic for the pipeline used in this builder. - * - * @param kafkaConfigurationTopic the configuration topic to be used for this pipeline. - * @return A Uc4HazelcastJetFactory with a set configuration topic. - */ - public Uc4HazelcastJetFactory setCustomKafkaConfigurationTopic(// NOPMD - final String kafkaConfigurationTopic) { - this.kafkaConfigurationTopic = kafkaConfigurationTopic; - return this; - } - - /** - * Sets the configuration topic for the pipeline used in this builder from the environment. - * - * @param defaultKafkaConfigurationTopic the default configuration topic to be used for this - * pipeline when none is set in the environment. - * @return A Uc4HazelcastJetFactory with a set kafkaConfigurationTopic. - */ - public Uc4HazelcastJetFactory setKafkaConfigurationTopicFromEnv(// NOPMD - final String defaultKafkaConfigurationTopic) { - this.kafkaConfigurationTopic = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC), - defaultKafkaConfigurationTopic); - return this; - } - - /** - * Sets the Feedback topic for the pipeline used in this builder. - * - * @param kafkaFeedbackTopic the Feedback topic to be used for this pipeline. - * @return A Uc4HazelcastJetFactory with a set Feedback topic. - */ - public Uc4HazelcastJetFactory setCustomKafkaFeedbackTopic(// NOPMD - final String kafkaFeedbackTopic) { - this.kafkaFeedbackTopic = kafkaFeedbackTopic; - return this; - } - - /** - * Sets the Feedback topic for the pipeline used in this builder from the environment. - * - * @param defaultKafkaFeedbackTopic the default Feedback topic to be used for this pipeline when - * none is set in the environment. - * @return A Uc4HazelcastJetFactory with a set kafkaFeedbackTopic. - */ - public Uc4HazelcastJetFactory setKafkaFeedbackTopicFromEnv(// NOPMD - final String defaultKafkaFeedbackTopic) { - this.kafkaFeedbackTopic = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC), - defaultKafkaFeedbackTopic); - return this; - } - -} diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineBuilder.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineBuilder.java deleted file mode 100644 index 2efb8250c0e1136b34412e4553b2d216c5e24b43..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineBuilder.java +++ /dev/null @@ -1,309 +0,0 @@ -package rocks.theodolite.benchmarks.uc4.hazelcastjet; - -import com.hazelcast.function.BiFunctionEx; -import com.hazelcast.jet.Traverser; -import com.hazelcast.jet.Traversers; -import com.hazelcast.jet.Util; -import com.hazelcast.jet.aggregate.AggregateOperation; -import com.hazelcast.jet.aggregate.AggregateOperation1; -import com.hazelcast.jet.kafka.KafkaSinks; -import com.hazelcast.jet.kafka.KafkaSources; -import com.hazelcast.jet.pipeline.Pipeline; -import com.hazelcast.jet.pipeline.Sinks; -import com.hazelcast.jet.pipeline.StageWithWindow; -import com.hazelcast.jet.pipeline.StreamSource; -import com.hazelcast.jet.pipeline.StreamStage; -import com.hazelcast.jet.pipeline.StreamStageWithKey; -import com.hazelcast.jet.pipeline.WindowDefinition; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.Set; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.AggregatedActivePowerRecordAccumulator; -import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ChildParentsTransformer; -import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.SensorGroupKey; -import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ValueGroup; -import titan.ccp.configuration.events.Event; -import titan.ccp.model.records.ActivePowerRecord; -import titan.ccp.model.records.AggregatedActivePowerRecord; -import titan.ccp.model.sensorregistry.SensorRegistry; - -/** - * Builder to build a HazelcastJet Pipeline for UC4 which can be used for stream processing using - * Hazelcast Jet. - */ -@SuppressWarnings("PMD.ExcessiveImports") -public class Uc4PipelineBuilder { - - private static final Logger LOGGER = LoggerFactory.getLogger(Uc4PipelineBuilder.class); - private static final String SENSOR_PARENT_MAP_NAME = "SensorParentMap"; - - /** - * Builds a pipeline which can be used for stream processing using Hazelcast Jet. - * - * @param kafkaInputReadPropsForPipeline Properties Object containing the necessary kafka input - * read attributes. - * @param kafkaConfigPropsForPipeline Properties Object containing the necessary kafka config read - * attributes. - * @param kafkaFeedbackPropsForPipeline Properties Object containing the necessary kafka - * aggregation read attributes. - * @param kafkaWritePropsForPipeline Properties Object containing the necessary kafka write - * attributes. - * @param kafkaInputTopic The name of the input topic used for the pipeline. - * @param kafkaOutputTopic The name of the output topic used for the pipeline. - * @param kafkaConfigurationTopic The name of the configuration topic used for the pipeline. - * @param kafkaFeedbackTopic The name of the feedback topic used for the pipeline. - * @param windowSize The window size of the tumbling window used in this pipeline. - * @return returns a Pipeline used which can be used in a Hazelcast Jet Instance to process data - * for UC3. - */ - public Pipeline build(final Properties kafkaInputReadPropsForPipeline, // NOPMD - final Properties kafkaConfigPropsForPipeline, - final Properties kafkaFeedbackPropsForPipeline, - final Properties kafkaWritePropsForPipeline, - final String kafkaInputTopic, - final String kafkaOutputTopic, - final String kafkaConfigurationTopic, - final String kafkaFeedbackTopic, - final int windowSize) { - - if (LOGGER.isInfoEnabled()) { - LOGGER.info("kafkaConfigProps: " + kafkaConfigPropsForPipeline); - LOGGER.info("kafkaFeedbackProps: " + kafkaFeedbackPropsForPipeline); - LOGGER.info("kafkaWriteProps: " + kafkaWritePropsForPipeline); - } - - // The pipeline for this Use Case - final Pipeline uc4Pipeline = Pipeline.create(); - - // Sources for this use case - final StreamSource<Entry<Event, String>> configSource = - KafkaSources.kafka(kafkaConfigPropsForPipeline, kafkaConfigurationTopic); - - final StreamSource<Entry<String, ActivePowerRecord>> inputSource = - KafkaSources.kafka(kafkaInputReadPropsForPipeline, kafkaInputTopic); - - final StreamSource<Entry<String, AggregatedActivePowerRecord>> aggregationSource = - KafkaSources.kafka(kafkaFeedbackPropsForPipeline, kafkaFeedbackTopic); - - // Extend UC4 topology to pipeline - final StreamStage<Entry<String, AggregatedActivePowerRecord>> uc4Aggregation = - this.extendUc4Topology(uc4Pipeline, inputSource, aggregationSource, configSource, - windowSize); - - // Add Sink2: Write back to kafka feedback/aggregation topic - uc4Aggregation.writeTo(KafkaSinks.kafka( - kafkaWritePropsForPipeline, kafkaFeedbackTopic)); - - // Log aggregation product - uc4Aggregation.writeTo(Sinks.logger()); - - // Add Sink2: Write back to kafka output topic - uc4Aggregation.writeTo(KafkaSinks.kafka( - kafkaWritePropsForPipeline, kafkaOutputTopic)); - - // Return the pipeline - return uc4Pipeline; - } - - /** - * Extends to a blank Hazelcast Jet Pipeline the UC4 topology defines by theodolite. - * - * <p> - * UC4 takes {@code ActivePowerRecord} events from sensors and a {@code SensorRegistry} with maps - * from keys to groups to map values to their according groups. A feedback stream allows for - * group keys to be mapped to values and eventually to be mapped to other top level groups defines - * by the {@code SensorRegistry}. - * </p> - * - * <p> - * 6 Step topology: <br> - * (1) Inputs (Config, Values, Aggregations) <br> - * (2) Merge Input Values and Aggregations <br> - * (3) Join Configuration with Merged Input Stream <br> - * (4) Duplicate as flatmap per value and group <br> - * (5) Window (preparation for possible last values) <br> - * (6) Aggregate data over the window - * </p> - * - * @param pipe The blank pipeline to extend the logic to. - * @param inputSource A streaming source with {@code ActivePowerRecord} data. - * @param aggregationSource A streaming source with aggregated data. - * @param configurationSource A streaming source delivering a {@code SensorRegistry}. - * @param windowSize The window size used to aggregate over. - * @return A {@code StreamSource<String,Double>} with sensorKeys or groupKeys mapped to their - * according aggregated values. The data can be further modified or directly be linked to - * a Hazelcast Jet sink. - */ - public StreamStage<Entry<String, AggregatedActivePowerRecord>> extendUc4Topology(// NOPMD - final Pipeline pipe, - final StreamSource<Entry<String, ActivePowerRecord>> inputSource, - final StreamSource<Entry<String, AggregatedActivePowerRecord>> aggregationSource, - final StreamSource<Entry<Event, String>> configurationSource, final int windowSize) { - - ////////////////////////////////// - // (1) Configuration Stream - pipe.readFrom(configurationSource) - .withNativeTimestamps(0) - .filter(entry -> entry.getKey() == Event.SENSOR_REGISTRY_CHANGED - || entry.getKey() == Event.SENSOR_REGISTRY_STATUS) - .map(data -> Util.entry(data.getKey(), SensorRegistry.fromJson(data.getValue()))) - .flatMapStateful(HashMap::new, new ConfigFlatMap()) - .writeTo(Sinks.mapWithUpdating( - SENSOR_PARENT_MAP_NAME, // The addressed IMAP - Entry::getKey, // The key to look for - (oldValue, newEntry) -> newEntry.getValue())); - - ////////////////////////////////// - // (1) Sensor Input Stream - final StreamStage<Entry<String, ActivePowerRecord>> inputStream = pipe - .readFrom(inputSource) - .withNativeTimestamps(0); - - ////////////////////////////////// - // (1) Aggregation Stream - final StreamStage<Entry<String, ActivePowerRecord>> aggregations = pipe - .readFrom(aggregationSource) - .withNativeTimestamps(0) - .map(entry -> { // Map Aggregated to ActivePowerRecord - final AggregatedActivePowerRecord agg = entry.getValue(); - final ActivePowerRecord record = new ActivePowerRecord( - agg.getIdentifier(), agg.getTimestamp(), agg.getSumInW()); - return Util.entry(entry.getKey(), record); - }); - - ////////////////////////////////// - // (2) UC4 Merge Input with aggregation stream - final StreamStageWithKey<Entry<String, ActivePowerRecord>, String> - mergedInputAndAggregations = inputStream - .merge(aggregations) - .groupingKey(Entry::getKey); - - ////////////////////////////////// - // (3) UC4 Join Configuration and Merges Input/Aggregation Stream - // [sensorKey , (value,Set<Groups>)] - final StreamStage<Entry<String, ValueGroup>> joinedStage = mergedInputAndAggregations - .<Set<String>, Entry<String, ValueGroup>>mapUsingIMap( - SENSOR_PARENT_MAP_NAME, - (sensorEvent, sensorParentsSet) -> { - // Check whether a groupset exists for a key or not - if (sensorParentsSet == null) { - // No group set exists for this key: return valuegroup with default null group set. - final Set<String> nullSet = new HashSet<>(); - nullSet.add("NULL-GROUPSET"); - return Util.entry(sensorEvent.getKey(), - new ValueGroup(sensorEvent.getValue(), nullSet)); - } else { - // Group set exists for this key: return valuegroup with the groupset. - final ValueGroup valueParentsPair = - new ValueGroup(sensorEvent.getValue(), sensorParentsSet); - // Return solution - return Util.entry(sensorEvent.getKey(), valueParentsPair); - } - }); - - ////////////////////////////////// - // (4) UC4 Duplicate as flatmap joined Stream - // [(sensorKey, Group) , value] - final StreamStage<Entry<SensorGroupKey, ActivePowerRecord>> dupliAsFlatmappedStage = joinedStage - .flatMap(entry -> { - - // Supplied data - final String keyGroupId = entry.getKey(); - final ActivePowerRecord record = entry.getValue().getRecord(); - final Set<String> groups = entry.getValue().getGroups(); - - // Transformed Data - final String[] groupList = groups.toArray(String[]::new); - final SensorGroupKey[] newKeyList = new SensorGroupKey[groupList.length]; - final List<Entry<SensorGroupKey, ActivePowerRecord>> newEntryList = new ArrayList<>(); - for (int i = 0; i < groupList.length; i++) { - newKeyList[i] = new SensorGroupKey(keyGroupId, groupList[i]); - newEntryList.add(Util.entry(newKeyList[i], record)); - } - - // Return traversable list of new entry elements - return Traversers.traverseIterable(newEntryList); - }); - - ////////////////////////////////// - // (5) UC4 Last Value Map - // Table with tumbling window differentiation [ (sensorKey,Group) , value ],Time - final StageWithWindow<Entry<SensorGroupKey, ActivePowerRecord>> - windowedLastValues = dupliAsFlatmappedStage - .window(WindowDefinition.tumbling(windowSize)); - - final AggregateOperation1<Entry<SensorGroupKey, ActivePowerRecord>, - AggregatedActivePowerRecordAccumulator, AggregatedActivePowerRecord> aggrOp = - AggregateOperation - .withCreate(AggregatedActivePowerRecordAccumulator::new) - .<Entry<SensorGroupKey, ActivePowerRecord>>andAccumulate((acc, rec) -> { - acc.setId(rec.getKey().getGroup()); - acc.addInputs(rec.getValue()); - }) - .andCombine((acc, acc2) -> - acc.addInputs(acc2.getId(), acc2.getSumInW(), acc2.getCount(), acc.getTimestamp())) - .andDeduct((acc, acc2) -> acc.removeInputs(acc2.getSumInW(), acc2.getCount())) - .andExportFinish(acc -> - new AggregatedActivePowerRecord(acc.getId(), - acc.getTimestamp(), - acc.getCount(), - acc.getSumInW(), - acc.getAverageInW()) - ); - - // write aggregation back to kafka - - return windowedLastValues - .groupingKey(entry -> entry.getKey().getGroup()) - .aggregate(aggrOp).map(agg -> Util.entry(agg.getKey(), agg.getValue())); - } - - - - /** - * FlatMap function used to process the configuration input for UC4. - */ - private static class ConfigFlatMap implements - BiFunctionEx<Map<String, Set<String>>, Entry<Event, SensorRegistry>, Traverser<Entry<String, Set<String>>>> { // NOCS - - private static final long serialVersionUID = -6769931374907428699L; - - @Override - public Traverser<Entry<String, Set<String>>> applyEx( - final Map<String, Set<String>> flatMapStage, - final Entry<Event, SensorRegistry> eventItem) { - // Transform new Input - final ChildParentsTransformer transformer = new ChildParentsTransformer("default-name"); - final Map<String, Set<String>> mapFromRegistry = - transformer.constructChildParentsPairs(eventItem.getValue()); - - // Compare both tables - final Map<String, Set<String>> updates = new HashMap<>(); - for (final String key : mapFromRegistry.keySet()) { - if (flatMapStage.containsKey(key)) { - if (!mapFromRegistry.get(key).equals(flatMapStage.get(key))) { - updates.put(key, mapFromRegistry.get(key)); - } - } else { - updates.put(key, mapFromRegistry.get(key)); - } - } - - // Create a updates list to pass onto the next pipeline stage- - final List<Entry<String, Set<String>>> updatesList = new ArrayList<>(updates.entrySet()); - - // Return traverser with updates list. - return Traversers.traverseIterable(updatesList) - .map(e -> Util.entry(e.getKey(), e.getValue())); - } - - } - -}