diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HistoryService.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HistoryService.java index a57c85cbf53c49f316a37b1cd68f340130f62f6a..fa08c6e7ddf511a687881816601e41b92abccf18 100644 --- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HistoryService.java +++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HistoryService.java @@ -1,72 +1,64 @@ package rocks.theodolite.benchmarks.uc3.hazelcastjet; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import java.util.Properties; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys; +import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService; +import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKey; +import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKeySerializer; /** - * A microservice that manages the history and, therefore, stores and aggregates incoming - * measurements. + * A microservice that aggregate incoming messages in a sliding window. */ -public class HistoryService { +public class HistoryService extends HazelcastJetService { private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class); - // Hazelcast settings (default) - private static final String HZ_KUBERNETES_SERVICE_DNS_KEY = "service-dns"; - private static final String BOOTSTRAP_SERVER_DEFAULT = "localhost:5701"; + /** + * Constructs the use case logic for UC3. + * Retrieves the needed values and instantiates a pipeline factory. + */ + public HistoryService() { + super(LOGGER); + final Properties kafkaProps = + this.propsBuilder.buildReadProperties( + StringDeserializer.class.getCanonicalName(), + KafkaAvroDeserializer.class.getCanonicalName()); - // Kafka settings (default) - private static final String KAFKA_BOOTSTRAP_DEFAULT = "localhost:9092"; - private static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081"; - private static final String KAFKA_INPUT_TOPIC_DEFAULT = "input"; - private static final String KAFKA_OUTPUT_TOPIC_DEFAULT = "output"; - - // UC3 specific (default) - private static final String WINDOW_SIZE_IN_SECONDS_DEFAULT = "30"; - private static final String HOPSIZE_IN_SEC_DEFAULT = "1"; + final Properties kafkaWriteProps = + this.propsBuilder.buildWriteProperties( + StringSerializer.class.getCanonicalName(), + StringSerializer.class.getCanonicalName()); - // Job name (default) - private static final String JOB_NAME = "uc3-hazelcastjet"; + final String kafkaOutputTopic = + config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString(); - /** - * Entrypoint for UC3 using Gradle Run. - */ - public static void main(final String[] args) { - final HistoryService uc3HistoryService = new HistoryService(); - try { - uc3HistoryService.run(); - } catch (final Exception e) { // NOPMD - LOGGER.error("ABORT MISSION!: {}", e); - } - } + final int windowSizeInDaysNumber = Integer.parseInt( + config.getProperty(ConfigurationKeys.AGGREGATION_DURATION_DAYS).toString()); - /** - * Start a UC3 service. - * - * @throws Exception This Exception occurs if the Uc3HazelcastJetFactory is used in the wrong way. - * Detailed data is provided once an Exception occurs. - */ - public void run() throws Exception { // NOPMD - this.createHazelcastJetApplication(); + final int hoppingSizeInDaysNumber = Integer.parseInt( + config.getProperty(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS).toString()); + + this.pipelineFactory = new Uc3PipelineFactory( + kafkaProps, + kafkaInputTopic, + kafkaWriteProps, + kafkaOutputTopic, + windowSizeInDaysNumber, + hoppingSizeInDaysNumber); } - /** - * Creates a Hazelcast Jet Application for UC3 using the Uc3HazelcastJetFactory. - * - * @throws Exception This Exception occurs if the Uc3HazelcastJetFactory is used in the wrong way. - * Detailed data is provided once an Exception occurs. - */ - private void createHazelcastJetApplication() throws Exception { // NOPMD - new Uc3HazelcastJetFactory() - .setReadPropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT, SCHEMA_REGISTRY_URL_DEFAULT, JOB_NAME) - .setWritePropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT, SCHEMA_REGISTRY_URL_DEFAULT) - .setKafkaInputTopicFromEnv(KAFKA_INPUT_TOPIC_DEFAULT) - .setKafkaOutputTopicFromEnv(KAFKA_OUTPUT_TOPIC_DEFAULT) - .setWindowSizeInSecondsFromEnv(WINDOW_SIZE_IN_SECONDS_DEFAULT) - .setHoppingSizeInSecondsFromEnv(HOPSIZE_IN_SEC_DEFAULT) - .buildUc3Pipeline() - .buildUc3JetInstanceFromEnv(LOGGER, BOOTSTRAP_SERVER_DEFAULT, HZ_KUBERNETES_SERVICE_DNS_KEY) - .runUc3Job(JOB_NAME); + @Override + protected void registerSerializer() { + this.jobConfig.registerSerializer(HourOfDayKey.class, HourOfDayKeySerializer.class); } + + public static void main(final String[] args) { + new HistoryService().run(); + } } diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/NewHistoryService.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/NewHistoryService.java deleted file mode 100644 index 6b8a35c352d6b36c0490097426dcead1de851bb0..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/NewHistoryService.java +++ /dev/null @@ -1,64 +0,0 @@ -package rocks.theodolite.benchmarks.uc3.hazelcastjet; - -import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import java.util.Properties; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys; -import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService; -import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKey; -import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKeySerializer; - -/** - * A microservice that aggregate incoming messages in a sliding window. - */ -public class NewHistoryService extends HazelcastJetService { - - private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class); - - /** - * Constructs the use case logic for UC3. - * Retrieves the needed values and instantiates a pipeline factory. - */ - public NewHistoryService() { - super(LOGGER); - final Properties kafkaProps = - this.propsBuilder.buildReadProperties( - StringDeserializer.class.getCanonicalName(), - KafkaAvroDeserializer.class.getCanonicalName()); - - final Properties kafkaWriteProps = - this.propsBuilder.buildWriteProperties( - StringSerializer.class.getCanonicalName(), - StringSerializer.class.getCanonicalName()); - - final String kafkaOutputTopic = - config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString(); - - final int windowSizeInDaysNumber = Integer.parseInt( - config.getProperty(ConfigurationKeys.AGGREGATION_DURATION_DAYS).toString()); - - final int hoppingSizeInDaysNumber = Integer.parseInt( - config.getProperty(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS).toString()); - - this.pipelineFactory = new Uc3PipelineFactory( - kafkaProps, - kafkaInputTopic, - kafkaWriteProps, - kafkaOutputTopic, - windowSizeInDaysNumber, - hoppingSizeInDaysNumber); - } - - @Override - protected void registerSerializer() { - this.jobConfig.registerSerializer(HourOfDayKey.class, HourOfDayKeySerializer.class); - } - - - public static void main(final String[] args) { - new NewHistoryService().run(); - } -} diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3HazelcastJetFactory.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3HazelcastJetFactory.java deleted file mode 100644 index 50bdb6c57acd5789e4f05dfb658e794d2a33dace..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3HazelcastJetFactory.java +++ /dev/null @@ -1,341 +0,0 @@ -package rocks.theodolite.benchmarks.uc3.hazelcastjet; - -import com.hazelcast.jet.JetInstance; -import com.hazelcast.jet.config.JobConfig; -import com.hazelcast.jet.pipeline.Pipeline; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import java.util.Objects; -import java.util.Properties; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.slf4j.Logger; -import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys; -import rocks.theodolite.benchmarks.commons.hazelcastjet.JetInstanceBuilder; -import rocks.theodolite.benchmarks.commons.hazelcastjet.KafkaPropertiesBuilder; -import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKey; -import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKeySerializer; - -/** - * A Hazelcast Jet factory which can build a Hazelcast Jet Instance and Pipeline for the UC3 - * benchmark and lets you start the Hazelcast Jet job. The JetInstance can be built directly as the - * Hazelcast Config is managed internally. In order to build the Pipeline, you first have to build - * the Read and Write Properties, set the input and output topic, and set the window size in seconds - * and the hopping size in seconds. This can be done using internal functions of this factory. - * Outside data only refers to custom values or default values in case data of the environment - * cannot the fetched. - */ -public class Uc3HazelcastJetFactory { // NOPMD - - // Information per History Service - private Properties kafkaReadPropsForPipeline; - private Properties kafkaWritePropsForPipeline; - private String kafkaInputTopic; - private String kafkaOutputTopic; - private JetInstance uc3JetInstance; - private Pipeline uc3JetPipeline; - // UC3 specific - private int windowSizeInSeconds; - private int hoppingSizeInSeconds; - - ///////////////////////////////////// - // Layer 1 - Hazelcast Jet Run Job // - ///////////////////////////////////// - - /** - * Needs a JetInstance and Pipeline defined in this factors. Adds the pipeline to the existing - * JetInstance as a job. - * - * @param jobName The name of the job. - * @throws Exception If either no JetInstance or Pipeline is set, a job cannot be startet. - */ - public void runUc3Job(final String jobName) throws IllegalStateException { // NOPMD - - // Check if a Jet Instance for UC3 is set. - if (this.uc3JetInstance == null) { - throw new IllegalStateException("Jet Instance is not set! " - + "Cannot start a hazelcast jet job for UC3."); - } - - // Check if a Pipeline for UC3 is set. - if (this.uc3JetPipeline == null) { - throw new IllegalStateException( - "Hazelcast Pipeline is not set! Cannot start a hazelcast jet job for UC3."); - } - - // Adds the job name and joins a job to the JetInstance defined in this factory - final JobConfig jobConfig = new JobConfig() - .registerSerializer(HourOfDayKey.class, HourOfDayKeySerializer.class) - .setName(jobName); - this.uc3JetInstance.newJobIfAbsent(this.uc3JetPipeline, jobConfig).join(); - } - - ///////////// - // Layer 2 // - ///////////// - - /** - * Build a Hazelcast JetInstance used to run a job on. - * - * @param logger The logger specified for this JetInstance. - * @param bootstrapServerDefault Default bootstrap server in case no value can be derived from the - * environment. - * @param hzKubernetesServiceDnsKey The kubernetes service dns key. - * @return A Uc3HazelcastJetFactory containing a set JetInstance. - */ - public Uc3HazelcastJetFactory buildUc3JetInstanceFromEnv(final Logger logger, - final String bootstrapServerDefault, - final String hzKubernetesServiceDnsKey) { - this.uc3JetInstance = new JetInstanceBuilder() - .setConfigFromEnv(logger, bootstrapServerDefault, hzKubernetesServiceDnsKey) - .build(); - return this; - } - - /** - * Builds a Hazelcast Jet pipeline used for a JetInstance to run it as a job on. Needs the input - * topic and kafka properties defined in this factory beforehand. - * - * @return A Uc3HazelcastJetFactory containg a set pipeline. - * @throws Exception If the input topic or the kafka properties are not defined, the pipeline - * cannot be built. - */ - public Uc3HazelcastJetFactory buildUc3Pipeline() throws IllegalStateException { // NOPMD - - final String defaultPipelineWarning = "Cannot build pipeline."; // NOPMD - - // Check if Properties for the Kafka Input are set. - if (this.kafkaReadPropsForPipeline == null) { - throw new IllegalStateException("Kafka Read Properties for pipeline not set! " - + defaultPipelineWarning); - } - - // Check if Properties for the Kafka Output are set. - if (this.kafkaWritePropsForPipeline == null) { - throw new IllegalStateException("Kafka Write Properties for pipeline not set! " - + defaultPipelineWarning); - } - - // Check if the Kafka input topic is set. - if (this.kafkaInputTopic == null) { - throw new IllegalStateException("Kafka input topic for pipeline not set! " - + defaultPipelineWarning); - } - - // Check if the Kafka output topic is set. - if (this.kafkaOutputTopic == null) { - throw new IllegalStateException("kafka output topic for pipeline not set! " - + defaultPipelineWarning); - } - - // Check if the window size for the "sliding" window is set. - if (this.windowSizeInSeconds <= 0) { - throw new IllegalStateException( - "window size in seconds for pipeline not set or not greater than 0! " - + defaultPipelineWarning); - } - - // Check if the hopping distance for the "sliding" window is set. - if (this.hoppingSizeInSeconds <= 0) { - throw new IllegalStateException( - "hopping size in seconds for pipeline not set or not greater than 0! " - + defaultPipelineWarning); - } - - // Build Pipeline Using the pipelineBuilder - final Uc3PipelineBuilder pipeBuilder = new Uc3PipelineBuilder(); - this.uc3JetPipeline = - pipeBuilder.build(this.kafkaReadPropsForPipeline, - this.kafkaWritePropsForPipeline, - this.kafkaInputTopic, this.kafkaOutputTopic, this.hoppingSizeInSeconds, - this.windowSizeInSeconds); - // Return Uc3HazelcastJetBuilder factory - return this; - } - - ///////////// - // Layer 3 // - ///////////// - - /** - * Sets kafka read properties for pipeline used in this builder. - * - * @param kafkaReadProperties A propeties object containing necessary values used for the hazelcst - * jet kafka connection to read data. - * @return The Uc3HazelcastJetBuilder factory with set kafkaReadPropsForPipeline. - */ - public Uc3HazelcastJetFactory setCustomReadProperties(// NOPMD - final Properties kafkaReadProperties) { - this.kafkaReadPropsForPipeline = kafkaReadProperties; - return this; - } - - /** - * Sets kafka write properties for pipeline used in this builder. - * - * @param kafkaWriteProperties A propeties object containing necessary values used for the - * hazelcst jet kafka connection to write data. - * @return The Uc3HazelcastJetBuilder factory with set kafkaWritePropsForPipeline. - */ - public Uc3HazelcastJetFactory setCustomWriteProperties(// NOPMD - final Properties kafkaWriteProperties) { - this.kafkaWritePropsForPipeline = kafkaWriteProperties; - return this; - } - - /** - * Sets kafka read properties for pipeline used in this builder using environment variables. - * - * @param bootstrapServersDefault Default Bootstrap server in the case that no bootstrap server - * can be fetched from the environment. - * @param schemaRegistryUrlDefault Default schema registry url in the case that no schema registry - * url can be fetched from the environment. - * @return The Uc3HazelcastJetBuilder factory with set kafkaReadPropertiesForPipeline. - */ - public Uc3HazelcastJetFactory setReadPropertiesFromEnv(// NOPMD - final String bootstrapServersDefault, - final String schemaRegistryUrlDefault, - final String jobName) { - // Use KafkaPropertiesBuilder to build a properties object used for kafka - final KafkaPropertiesBuilder propsBuilder = new KafkaPropertiesBuilder(); - final Properties kafkaReadProps = - propsBuilder.buildKafkaInputReadPropsFromEnv(bootstrapServersDefault, - schemaRegistryUrlDefault, - jobName, - StringDeserializer.class.getCanonicalName(), - KafkaAvroDeserializer.class.getCanonicalName()); - this.kafkaReadPropsForPipeline = kafkaReadProps; - return this; - } - - /** - * Sets kafka write properties for pipeline used in this builder using environment variables. - * - * @param bootstrapServersDefault Default Bootstrap server in the case that no bootstrap server - * can be fetched from the environment. - * @return The Uc3HazelcastJetBuilder factory with set kafkaWritePropertiesForPipeline. - */ - public Uc3HazelcastJetFactory setWritePropertiesFromEnv(// NOPMD - final String bootstrapServersDefault, final String schemaRegistryUrlDefault) { - // Use KafkaPropertiesBuilder to build a properties object used for kafka - final KafkaPropertiesBuilder propsBuilder = new KafkaPropertiesBuilder(); - final Properties kafkaWriteProps = - propsBuilder.buildKafkaWritePropsFromEnv(bootstrapServersDefault, - schemaRegistryUrlDefault, - StringSerializer.class.getCanonicalName(), - StringSerializer.class.getCanonicalName()); - this.kafkaWritePropsForPipeline = kafkaWriteProps; - return this; - } - - /** - * Sets the kafka input topic for the pipeline used in this builder. - * - * @param inputTopic The kafka topic used as the pipeline input. - * @return A Uc3HazelcastJetBuilder factory with a set kafkaInputTopic. - */ - public Uc3HazelcastJetFactory setCustomKafkaInputTopic(// NOPMD - final String inputTopic) { - this.kafkaInputTopic = inputTopic; - return this; - } - - /** - * Sets the kafka input output for the pipeline used in this builder. - * - * @param outputTopic The kafka topic used as the pipeline output. - * @return A Uc3HazelcastJetBuilder factory with a set kafkaOutputTopic. - */ - public Uc3HazelcastJetFactory setCustomKafkaOutputTopic(final String outputTopic) { // NOPMD - this.kafkaOutputTopic = outputTopic; - return this; - } - - - /** - * Sets the kafka input topic for the pipeline used in this builder using environment variables. - * - * @param defaultInputTopic The default kafka input topic used if no topic is specified by the - * environment. - * @return A Uc3HazelcastJetBuilder factory with a set kafkaInputTopic. - */ - public Uc3HazelcastJetFactory setKafkaInputTopicFromEnv(// NOPMD - final String defaultInputTopic) { - this.kafkaInputTopic = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_INPUT_TOPIC), - defaultInputTopic); - return this; - } - - /** - * Sets the kafka output topic for the pipeline used in this builder using environment variables. - * - * @param defaultOutputTopic The default kafka output topic used if no topic is specified by the - * environment. - * @return A Uc3HazelcastJetBuilder factory with a set kafkaOutputTopic. - */ - public Uc3HazelcastJetFactory setKafkaOutputTopicFromEnv(// NOPMD - final String defaultOutputTopic) { - this.kafkaOutputTopic = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_OUTPUT_TOPIC), - defaultOutputTopic); - return this; - } - - /** - * Sets the window size in seconds for the pipeline used in this builder. - * - * @param windowSizeInSeconds the windowSizeInSeconds to be used for this pipeline. - * @return A Uc3HazelcastJetFactory with a set windowSizeInSeconds. - */ - public Uc3HazelcastJetFactory setCustomWindowSizeInSeconds(// NOPMD - final int windowSizeInSeconds) { - this.windowSizeInSeconds = windowSizeInSeconds; - return this; - } - - /** - * Sets the window size in seconds for the pipeline used in this builder from the environment. - * - * @param defaultWindowSizeInSeconds the default window size in seconds to be used for this - * pipeline when none is set in the environment. - * @return A Uc3HazelcastJetFactory with a set windowSizeInSeconds. - */ - public Uc3HazelcastJetFactory setWindowSizeInSecondsFromEnv(// NOPMD - final String defaultWindowSizeInSeconds) { - final String windowSizeInSeconds = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.AGGREGATION_DURATION_DAYS), - defaultWindowSizeInSeconds); - final int windowSizeInSecondsNumber = Integer.parseInt(windowSizeInSeconds); - this.windowSizeInSeconds = windowSizeInSecondsNumber; - return this; - } - - /** - * Sets the hopping size in seconds for the pipeline used in this builder. - * - * @param hoppingSizeInSeconds the hoppingSizeInSeconds to be used for this pipeline. - * @return A Uc3HazelcastJetFactory with a set hoppingSizeInSeconds. - */ - public Uc3HazelcastJetFactory setCustomHoppingSizeInSeconds(// NOPMD - final int hoppingSizeInSeconds) { - this.hoppingSizeInSeconds = hoppingSizeInSeconds; - return this; - } - - /** - * Sets the hopping size in seconds for the pipeline used in this builder from the environment. - * - * @param defaultHoppingSizeInSeconds the default hopping size in seconds to be used for this - * pipeline when none is set in the environment. - * @return A Uc3HazelcastJetFactory with a set hoppingSizeInSeconds. - */ - public Uc3HazelcastJetFactory setHoppingSizeInSecondsFromEnv(// NOPMD - final String defaultHoppingSizeInSeconds) { - final String hoppingSizeInSeconds = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS), - defaultHoppingSizeInSeconds); - final int hoppingSizeInSecondsNumber = Integer.parseInt(hoppingSizeInSeconds); - this.hoppingSizeInSeconds = hoppingSizeInSecondsNumber; - return this; - } -} diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineBuilder.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineBuilder.java deleted file mode 100644 index 5df6a7bb14f89a684b893600140aa094067ea983..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineBuilder.java +++ /dev/null @@ -1,125 +0,0 @@ -package rocks.theodolite.benchmarks.uc3.hazelcastjet; - -import com.hazelcast.jet.aggregate.AggregateOperations; -import com.hazelcast.jet.kafka.KafkaSinks; -import com.hazelcast.jet.kafka.KafkaSources; -import com.hazelcast.jet.pipeline.Pipeline; -import com.hazelcast.jet.pipeline.Sinks; -import com.hazelcast.jet.pipeline.StreamSource; -import com.hazelcast.jet.pipeline.StreamStage; -import com.hazelcast.jet.pipeline.WindowDefinition; -import java.time.Instant; -import java.time.LocalDateTime; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.TimeZone; -import java.util.concurrent.TimeUnit; -import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKey; -import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HoursOfDayKeyFactory; -import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.StatsKeyFactory; -import titan.ccp.model.records.ActivePowerRecord; - -/** - * Builder to build a HazelcastJet Pipeline for UC3 which can be used for stream processing using - * Hazelcast Jet. - */ -public class Uc3PipelineBuilder { - - /** - * Builds a pipeline which can be used for stream processing using Hazelcast Jet. - * - * @param kafkaReadPropsForPipeline Properties Object containing the necessary kafka reads - * attributes. - * @param kafkaWritePropsForPipeline Properties Object containing the necessary kafka write - * attributes. - * @param kafkaInputTopic The name of the input topic used for the pipeline. - * @param kafkaOutputTopic The name of the output topic used for the pipeline. - * @param hoppingSizeInSeconds The hop length of the sliding window used in the aggregation of - * this pipeline. - * @param windowSizeInSeconds The window length of the sliding window used in the aggregation of - * this pipeline. - * @return returns a Pipeline used which can be used in a Hazelcast Jet Instance to process data - * for UC3. - */ - public Pipeline build(final Properties kafkaReadPropsForPipeline, - final Properties kafkaWritePropsForPipeline, final String kafkaInputTopic, - final String kafkaOutputTopic, - final int hoppingSizeInSeconds, final int windowSizeInSeconds) { - - // Define a new Pipeline - final Pipeline pipe = Pipeline.create(); - - // Define the source - final StreamSource<Entry<String, ActivePowerRecord>> kafkaSource = KafkaSources - .<String, ActivePowerRecord>kafka( - kafkaReadPropsForPipeline, kafkaInputTopic); - - // Extend topology for UC3 - final StreamStage<Map.Entry<String, String>> uc3Product = - this.extendUc3Topology(pipe, kafkaSource, hoppingSizeInSeconds, windowSizeInSeconds); - - // Add Sink1: Logger - uc3Product.writeTo(Sinks.logger()); - // Add Sink2: Write back to kafka for the final benchmark - uc3Product.writeTo(KafkaSinks.<String, String>kafka( - kafkaWritePropsForPipeline, kafkaOutputTopic)); - - return pipe; - } - - /** - * Extends to a blank Hazelcast Jet Pipeline the UC3 topology defined by theodolite. - * - * <p> - * UC3 takes {@code ActivePowerRecord} object, groups them by keys and calculates average double - * values for a sliding window and sorts them into the hour of the day. - * </p> - * - * @param pipe The blank hazelcast jet pipeline to extend the logic to. - * @param source A streaming source to fetch data from. - * @param hoppingSizeInSeconds The jump distance of the "sliding" window. - * @param windowSizeInSeconds The size of the "sliding" window. - * @return A {@code StreamStage<Map.Entry<String,String>>} with the above definition of the key - * and value of the Entry object. It can be used to be further modified or directly be - * written into a sink. - */ - public StreamStage<Map.Entry<String, String>> extendUc3Topology(final Pipeline pipe, - final StreamSource<Entry<String, ActivePowerRecord>> source, final int hoppingSizeInSeconds, - final int windowSizeInSeconds) { - // Build the pipeline topology. - return pipe - .readFrom(source) - // use Timestamps - .withNativeTimestamps(0) - .setLocalParallelism(1) - // Map timestamp to hour of day and create new key using sensorID and - // datetime mapped to HourOfDay - .map(record -> { - final String sensorId = record.getValue().getIdentifier(); - final long timestamp = record.getValue().getTimestamp(); - final LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), - TimeZone.getDefault().toZoneId()); - - final StatsKeyFactory<HourOfDayKey> keyFactory = new HoursOfDayKeyFactory(); - final HourOfDayKey newKey = keyFactory.createKey(sensorId, dateTime); - - return Map.entry(newKey, record.getValue()); - }) - // group by new keys - .groupingKey(Entry::getKey) - // Sliding/Hopping Window - .window(WindowDefinition.sliding(TimeUnit.DAYS.toMillis(windowSizeInSeconds), - TimeUnit.DAYS.toMillis(hoppingSizeInSeconds))) - // get average value of group (sensoreId,hourOfDay) - .aggregate( - AggregateOperations.averagingDouble(record -> record.getValue().getValueInW())) - // map to return pair (sensorID,hourOfDay) -> (averaged what value) - .map(agg -> { - final String theValue = agg.getValue().toString(); - final String theKey = agg.getKey().toString(); - return Map.entry(theKey, theValue); - }); - } - -} diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineTest.java b/theodolite-benchmarks/uc3-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineTest.java index 3df4f4642f1bc6c8637f90bcae3f352f5c298e51..e4f8d9b8c622346ace13b168e524326ed1961f0f 100644 --- a/theodolite-benchmarks/uc3-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineTest.java +++ b/theodolite-benchmarks/uc3-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineTest.java @@ -16,6 +16,7 @@ import java.time.Instant; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Map; +import java.util.Properties; import java.util.TimeZone; import java.util.Map.Entry; import java.util.concurrent.CompletionException; @@ -24,7 +25,8 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; -import rocks.theodolite.benchmarks.uc3.hazelcastjet.Uc3PipelineBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKey; import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKeySerializer; import titan.ccp.model.records.ActivePowerRecord; @@ -35,6 +37,9 @@ import titan.ccp.model.records.ActivePowerRecord; @Category(SerialTest.class) public class Uc3PipelineTest extends JetTestSupport { + private static final Logger LOGGER = LoggerFactory.getLogger(Uc3PipelineTest.class); + + // Test Machinery private JetInstance testInstance = null; private Pipeline testPipeline = null; @@ -49,13 +54,13 @@ public class Uc3PipelineTest extends JetTestSupport { public void buildUc3Pipeline() { // Setup Configuration - int testItemsPerSecond = 1; - String testSensorName = "TEST-SENSOR"; - Double testValueInW = 10.0; - int testHopSizeInSec = 1; - int testWindowSizeInSec = 50; + final int testItemsPerSecond = 1; + final String testSensorName = "TEST-SENSOR"; + final Double testValueInW = 10.0; + final int testHopSizeInSec = 1; + final int testWindowSizeInSec = 50; // Used to check hourOfDay - long mockTimestamp = 1632741651; + final long mockTimestamp = 1632741651; // Create mock jet instance with configuration @@ -75,10 +80,13 @@ public class Uc3PipelineTest extends JetTestSupport { }); // Create pipeline to test - Uc3PipelineBuilder pipelineBuilder = new Uc3PipelineBuilder(); - this.testPipeline = Pipeline.create(); - this.uc3Topology = pipelineBuilder.extendUc3Topology(testPipeline, testSource, - testHopSizeInSec, testWindowSizeInSec); + final Properties properties = new Properties(); + final Uc3PipelineFactory factory = new Uc3PipelineFactory( + properties,"", properties,"", testWindowSizeInSec, testHopSizeInSec); + + this.uc3Topology = factory.extendUc3Topology(testSource); + + testPipeline = factory.getPipe(); } /** @@ -88,44 +96,43 @@ public class Uc3PipelineTest extends JetTestSupport { public void testOutput() { // Assertion Configuration - int timeout = 10; - String testSensorName = "TEST-SENSOR"; - Double testValueInW = 10.0; + final int timeout = 10; + final String testSensorName = "TEST-SENSOR"; + final Double testValueInW = 10.0; // Used to check hourOfDay - long mockTimestamp = 1632741651; + final long mockTimestamp = 1632741651; // Assertion this.uc3Topology.apply(Assertions.assertCollectedEventually(timeout, collection -> { // DEBUG - System.out.println("DEBUG: CHECK 1 || Entered Assertion of testOutput()"); + LOGGER.info("CHECK 1 || Entered Assertion of testOutput()"); // Check all collected Items boolean allOkay = true; if (collection != null) { - System.out.println("DEBUG: CHECK 2 || Collection Size: " + collection.size()); - for (int i = 0; i < collection.size(); i++) { + LOGGER.info("CHECK 2 || Collection Size: " + collection.size()); + for (final Entry<String, String> entry : collection) { // Build hour of day long timestamp = mockTimestamp; - int expectedHour = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), + final int expectedHour = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), TimeZone.getDefault().toZoneId()).getHour(); // Compare expected output with generated output - Entry<String, String> currentEntry = collection.get(i); - String expectedKey = testSensorName + ";" + expectedHour; - String expectedValue = testValueInW.toString(); + final String expectedKey = testSensorName + ";" + expectedHour; + final String expectedValue = testValueInW.toString(); // DEBUG - System.out.println( - "DEBUG: CHECK 3 || Expected Output: '" + expectedKey + "=" + expectedValue - + "' - Actual Output: '" + currentEntry.getKey() + "=" - + currentEntry.getValue().toString() + "'"); - - if (!(currentEntry.getKey().equals(expectedKey) - && currentEntry.getValue().toString().equals(expectedValue))) { - System.out.println("DEBUG: CHECK 5 || Failed assertion!"); + LOGGER.info( + "CHECK 3 || Expected Output: '" + expectedKey + "=" + expectedValue + + "' - Actual Output: '" + entry.getKey() + "=" + + entry.getValue() + "'"); + + if (!(entry.getKey().equals(expectedKey) + && entry.getValue().equals(expectedValue))) { + LOGGER.info("CHECK 5 || Failed assertion!"); allOkay = false; } }