diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java index f382978b714fdfdff6c190339c2ed23a2e037069..4c097c5789d438dccca14dbb827d23c2c227e15c 100644 --- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java +++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java @@ -1,70 +1,64 @@ package rocks.theodolite.benchmarks.uc2.hazelcastjet; +import com.google.common.math.StatsAccumulator; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import java.util.Properties; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys; +import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService; +import rocks.theodolite.benchmarks.uc2.hazelcastjet.uc2specifics.StatsAccumulatorSerializer; + /** - * A microservice that manages the history and, therefore, stores and aggregates incoming - * measurements. + * A microservice that aggregate incoming messages in a tumbling window. */ -public class HistoryService { +public class HistoryService extends HazelcastJetService { private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class); - // Hazelcast settings (default) - private static final String HZ_KUBERNETES_SERVICE_DNS_KEY = "service-dns"; - private static final String BOOTSTRAP_SERVER_DEFAULT = "localhost:5701"; - // Kafka settings (default) - private static final String KAFKA_BOOTSTRAP_DEFAULT = "localhost:9092"; - private static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081"; - private static final String KAFKA_INPUT_TOPIC_DEFAULT = "input"; - private static final String KAFKA_OUTPUT_TOPIC_DEFAULT = "output"; + /** + * Constructs the use case logic for UC2. + * Retrieves the needed values and instantiates a pipeline factory. + */ + public HistoryService() { + super(LOGGER); + final Properties kafkaProps = + this.propsBuilder.buildReadProperties( + StringDeserializer.class.getCanonicalName(), + KafkaAvroDeserializer.class.getCanonicalName()); - // UC2 specific (default) - private static final String DOWNSAMPLE_INTERVAL_DEFAULT_MS = "60000"; + final Properties kafkaWriteProps = + this.propsBuilder.buildWriteProperties( + StringSerializer.class.getCanonicalName(), + StringSerializer.class.getCanonicalName()); - // Job name (default) - private static final String JOB_NAME = "uc2-hazelcastjet"; + final String kafkaOutputTopic = + config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString(); - /** - * Entrypoint for UC2 using Gradle Run. - */ - public static void main(final String[] args) { - final HistoryService uc2HistoryService = new HistoryService(); - try { - uc2HistoryService.run(); - } catch (final Exception e) { // NOPMD - LOGGER.error("ABORT MISSION!: {}", e); - } - } + // Transform minutes to milliseconds + final int downsampleInterval = Integer.parseInt( + config.getProperty(ConfigurationKeys.DOWNSAMPLE_INTERVAL).toString()); + final int downsampleIntervalMs = downsampleInterval * 60_000; - /** - * Start a UC2 service. - * - * @throws Exception This Exception occurs if the Uc2HazelcastJetFactory is used in the wrong way. - * Detailed data is provided once an Exception occurs. - */ - public void run() throws Exception { // NOPMD - this.createHazelcastJetApplication(); + this.pipelineFactory = new Uc2PipelineFactory( + kafkaProps, + this.kafkaInputTopic, + kafkaWriteProps, + kafkaOutputTopic, + downsampleIntervalMs); } - /** - * Creates a Hazelcast Jet Application for UC2 using the Uc1HazelcastJetFactory. - * - * @throws Exception This Exception occurs if the Uc2HazelcastJetFactory is used in the wrong way. - * Detailed data is provided once an Exception occurs. - */ - private void createHazelcastJetApplication() throws Exception { // NOPMD - new Uc2HazelcastJetFactory() - .setReadPropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT, SCHEMA_REGISTRY_URL_DEFAULT,JOB_NAME) - .setWritePropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT,SCHEMA_REGISTRY_URL_DEFAULT) - .setKafkaInputTopicFromEnv(KAFKA_INPUT_TOPIC_DEFAULT) - .setKafkaOutputTopicFromEnv(KAFKA_OUTPUT_TOPIC_DEFAULT) - .setDownsampleIntervalFromEnv(DOWNSAMPLE_INTERVAL_DEFAULT_MS) - .buildUc2Pipeline() - .buildUc2JetInstanceFromEnv(LOGGER, BOOTSTRAP_SERVER_DEFAULT, HZ_KUBERNETES_SERVICE_DNS_KEY) - .runUc2Job(JOB_NAME); + @Override + protected void registerSerializer() { + this.jobConfig.registerSerializer(StatsAccumulator.class, StatsAccumulatorSerializer.class); } + + public static void main(final String[] args) { + new HistoryService().run(); + } } diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/NewHistoryService.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/NewHistoryService.java deleted file mode 100644 index 572329c5d783259e4d6658ab2df42de545ef0ced..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/NewHistoryService.java +++ /dev/null @@ -1,64 +0,0 @@ -package rocks.theodolite.benchmarks.uc2.hazelcastjet; - -import com.google.common.math.StatsAccumulator; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import java.util.Properties; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys; -import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService; -import rocks.theodolite.benchmarks.uc2.hazelcastjet.uc2specifics.StatsAccumulatorSerializer; - - -/** - * A microservice that aggregate incoming messages in a tumbling window. - */ -public class NewHistoryService extends HazelcastJetService { - - private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class); - - - /** - * Constructs the use case logic for UC2. - * Retrieves the needed values and instantiates a pipeline factory. - */ - public NewHistoryService() { - super(LOGGER); - final Properties kafkaProps = - this.propsBuilder.buildReadProperties( - StringDeserializer.class.getCanonicalName(), - KafkaAvroDeserializer.class.getCanonicalName()); - - final Properties kafkaWriteProps = - this.propsBuilder.buildWriteProperties( - StringSerializer.class.getCanonicalName(), - StringSerializer.class.getCanonicalName()); - - final String kafkaOutputTopic = - config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString(); - - // Transform minutes to milliseconds - final int downsampleInterval = Integer.parseInt( - config.getProperty(ConfigurationKeys.DOWNSAMPLE_INTERVAL).toString()); - final int downsampleIntervalMs = downsampleInterval * 60_000; - - this.pipelineFactory = new Uc2PipelineFactory( - kafkaProps, - this.kafkaInputTopic, - kafkaWriteProps, - kafkaOutputTopic, - downsampleIntervalMs); - } - - @Override - protected void registerSerializer() { - this.jobConfig.registerSerializer(StatsAccumulator.class, StatsAccumulatorSerializer.class); - } - - - public static void main(final String[] args) { - new NewHistoryService().run(); - } -} diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2HazelcastJetFactory.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2HazelcastJetFactory.java deleted file mode 100644 index 143b154f3726e75d2842766b49bd2e26f57ce39b..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2HazelcastJetFactory.java +++ /dev/null @@ -1,301 +0,0 @@ -package rocks.theodolite.benchmarks.uc2.hazelcastjet; - -import com.google.common.math.StatsAccumulator; -import com.hazelcast.jet.JetInstance; -import com.hazelcast.jet.config.JobConfig; -import com.hazelcast.jet.pipeline.Pipeline; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import java.util.Objects; -import java.util.Properties; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.slf4j.Logger; -import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys; -import rocks.theodolite.benchmarks.commons.hazelcastjet.JetInstanceBuilder; -import rocks.theodolite.benchmarks.commons.hazelcastjet.KafkaPropertiesBuilder; -import rocks.theodolite.benchmarks.uc2.hazelcastjet.uc2specifics.StatsAccumulatorSerializer; - -/** - * A Hazelcast Jet factory which can build a Hazelcast Jet Instance and Pipeline for the UC2 - * benchmark and lets you start the Hazelcast Jet job. The JetInstance can be built directly as the - * Hazelcast Config is managed internally. In order to build the Pipeline, you first have to build - * the Read and Write Properties, set the input and output topic, and set the downsample interval - * which can be done using internal functions of this factory. Outside data only refers to custom - * values or default values in case data of the environment cannot the fetched. - */ -public class Uc2HazelcastJetFactory { - - // Information per History Service - private Properties kafkaReadPropsForPipeline; - private Properties kafkaWritePropsForPipeline; - private String kafkaInputTopic; - private String kafkaOutputTopic; - private JetInstance uc2JetInstance; - private Pipeline uc2JetPipeline; - // UC2 specific - private int downsampleInterval; - - ///////////////////////////////////// - // Layer 1 - Hazelcast Jet Run Job // - ///////////////////////////////////// - - /** - * Needs a JetInstance and Pipeline defined in this factors. Adds the pipeline to the existing - * JetInstance as a job. - * - * @param jobName The name of the job. - */ - public void runUc2Job(final String jobName) { - - // Check if a Jet Instance for UC2 is set. - if (this.uc2JetInstance == null) { - throw new IllegalStateException("Jet Instance is not set! " - + "Cannot start a hazelcast jet job for UC2."); - } - - // Check if a Pipeline for UC2 is set. - if (this.uc2JetPipeline == null) { - throw new IllegalStateException( - "Hazelcast Pipeline is not set! Cannot start a hazelcast jet job for UC2."); - } - - // Adds the job name and joins a job to the JetInstance defined in this factory - final JobConfig jobConfig = new JobConfig(); - jobConfig.registerSerializer(StatsAccumulator.class, StatsAccumulatorSerializer.class); - jobConfig.setName(jobName); - this.uc2JetInstance.newJobIfAbsent(this.uc2JetPipeline, jobConfig).join(); - } - - ///////////// - // Layer 2 // - ///////////// - - /** - * Build a Hazelcast JetInstance used to run a job on. - * - * @param logger The logger specified for this JetInstance. - * @param bootstrapServerDefault Default bootstrap server in case no value can be derived from the - * environment. - * @param hzKubernetesServiceDnsKey The kubernetes service dns key. - * @return A Uc2HazelcastJetFactory containing a set JetInstance. - */ - public Uc2HazelcastJetFactory buildUc2JetInstanceFromEnv(final Logger logger, - final String bootstrapServerDefault, - final String hzKubernetesServiceDnsKey) { - this.uc2JetInstance = new JetInstanceBuilder() - .setConfigFromEnv(logger, bootstrapServerDefault, hzKubernetesServiceDnsKey) - .build(); - return this; - } - - /** - * Builds a Hazelcast Jet pipeline used for a JetInstance to run it as a job on. Needs the input - * topic and kafka properties defined in this factory beforehand. - * - * @return A Uc2HazelcastJetFactory containg a set pipeline. - * @throws Exception If the input topic or the kafka properties are not defined, the pipeline - * cannot be built. - */ - public Uc2HazelcastJetFactory buildUc2Pipeline() throws IllegalStateException { // NOPMD - - final String defaultPipelineWarning = "Cannot build pipeline."; // NOPMD - - // Check if Properties for the Kafka Input are set. - if (this.kafkaReadPropsForPipeline == null) { - throw new IllegalStateException("Kafka Read Properties for pipeline not set! " - + defaultPipelineWarning); - } - - // Check if Properties for the Kafka Output are set. - if (this.kafkaWritePropsForPipeline == null) { - throw new IllegalStateException("Kafka Write Properties for pipeline not set! " - + defaultPipelineWarning); - } - - // Check if the Kafka input topic is set. - if (this.kafkaInputTopic == null) { - throw new IllegalStateException("Kafka input topic for pipeline not set! " - + defaultPipelineWarning); - } - - // Check if the Kafka output topic is set. - if (this.kafkaOutputTopic == null) { - throw new IllegalStateException("kafka output topic for pipeline not set! " - + defaultPipelineWarning); - } - - // Check if the downsampleInterval (tumbling window time) is set. - if (this.downsampleInterval <= 0) { - throw new IllegalStateException( - "downsample interval for pipeline not set or not bigger than 0! " - + defaultPipelineWarning); - } - - // Build Pipeline Using the pipelineBuilder - final Uc2PipelineBuilder pipeBuilder = new Uc2PipelineBuilder(); - this.uc2JetPipeline = - pipeBuilder.build(this.kafkaReadPropsForPipeline, this.kafkaWritePropsForPipeline, - this.kafkaInputTopic, this.kafkaOutputTopic, this.downsampleInterval); - // Return Uc2HazelcastJetBuilder factory - return this; - } - - ///////////// - // Layer 3 // - ///////////// - - /** - * Sets kafka read properties for pipeline used in this builder. - * - * @param kafkaReadProperties A propeties object containing necessary values used for the hazelcst - * jet kafka connection to read data. - * @return The Uc2HazelcastJetBuilder factory with set kafkaReadPropsForPipeline. - */ - public Uc2HazelcastJetFactory setCustomReadProperties(// NOPMD - final Properties kafkaReadProperties) { - this.kafkaReadPropsForPipeline = kafkaReadProperties; - return this; - } - - /** - * Sets kafka write properties for pipeline used in this builder. - * - * @param kafkaWriteProperties A propeties object containing necessary values used for the - * hazelcst jet kafka connection to write data. - * @return The Uc2HazelcastJetBuilder factory with set kafkaWritePropsForPipeline. - */ - public Uc2HazelcastJetFactory setCustomWriteProperties(// NOPMD - final Properties kafkaWriteProperties) { - this.kafkaWritePropsForPipeline = kafkaWriteProperties; - return this; - } - - /** - * Sets kafka read properties for pipeline used in this builder using environment variables. - * - * @param bootstrapServersDefault Default Bootstrap server in the case that no bootstrap server - * can be fetched from the environment. - * @param schemaRegistryUrlDefault Default schema registry url in the case that no schema registry - * url can be fetched from the environment. - * @return The Uc2HazelcastJetBuilder factory with set kafkaReadPropertiesForPipeline. - */ - public Uc2HazelcastJetFactory setReadPropertiesFromEnv(// NOPMD - final String bootstrapServersDefault, - final String schemaRegistryUrlDefault, - final String jobName) { - // Use KafkaPropertiesBuilder to build a properties object used for kafka - final KafkaPropertiesBuilder propsBuilder = new KafkaPropertiesBuilder(); - final Properties kafkaReadProps = - propsBuilder.buildKafkaInputReadPropsFromEnv(bootstrapServersDefault, - schemaRegistryUrlDefault, - jobName, - StringDeserializer.class.getCanonicalName(), - KafkaAvroDeserializer.class.getCanonicalName()); - this.kafkaReadPropsForPipeline = kafkaReadProps; - return this; - } - - /** - * Sets kafka write properties for pipeline used in this builder using environment variables. - * - * @param bootstrapServersDefault Default Bootstrap server in the case that no bootstrap server - * can be fetched from the environment. - * @return The Uc2HazelcastJetBuilder factory with set kafkaWritePropertiesForPipeline. - */ - public Uc2HazelcastJetFactory setWritePropertiesFromEnv(// NOPMD - final String bootstrapServersDefault, final String schemaRegistryUrlDefault) { - // Use KafkaPropertiesBuilder to build a properties object used for kafka - final KafkaPropertiesBuilder propsBuilder = new KafkaPropertiesBuilder(); - final Properties kafkaWriteProps = - propsBuilder.buildKafkaWritePropsFromEnv(bootstrapServersDefault, - schemaRegistryUrlDefault, - StringSerializer.class.getCanonicalName(), - StringSerializer.class.getCanonicalName()); - this.kafkaWritePropsForPipeline = kafkaWriteProps; - return this; - } - - /** - * Sets the kafka input topic for the pipeline used in this builder. - * - * @param inputTopic The kafka topic used as the pipeline input. - * @return A Uc2HazelcastJetBuilder factory with a set kafkaInputTopic. - */ - public Uc2HazelcastJetFactory setCustomKafkaInputTopic(// NOPMD - final String inputTopic) { - this.kafkaInputTopic = inputTopic; - return this; - } - - /** - * Sets the kafka input output for the pipeline used in this builder. - * - * @param outputTopic The kafka topic used as the pipeline output. - * @return A Uc2HazelcastJetBuilder factory with a set kafkaOutputTopic. - */ - public Uc2HazelcastJetFactory setCustomKafkaOutputTopic(final String outputTopic) { // NOPMD - this.kafkaOutputTopic = outputTopic; - return this; - } - - - /** - * Sets the kafka input topic for the pipeline used in this builder using environment variables. - * - * @param defaultInputTopic The default kafka input topic used if no topic is specified by the - * environment. - * @return A Uc2HazelcastJetBuilder factory with a set kafkaInputTopic. - */ - public Uc2HazelcastJetFactory setKafkaInputTopicFromEnv(// NOPMD - final String defaultInputTopic) { - this.kafkaInputTopic = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_INPUT_TOPIC), - defaultInputTopic); - return this; - } - - /** - * Sets the kafka output topic for the pipeline used in this builder using environment variables. - * - * @param defaultOutputTopic The default kafka output topic used if no topic is specified by the - * environment. - * @return A Uc2HazelcastJetBuilder factory with a set kafkaOutputTopic. - */ - public Uc2HazelcastJetFactory setKafkaOutputTopicFromEnv(// NOPMD - final String defaultOutputTopic) { - this.kafkaOutputTopic = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_OUTPUT_TOPIC), - defaultOutputTopic); - return this; - } - - /** - * Sets the downsample interval for the pipeline used in this builder. - * - * @param downsampleInterval the downsample interval to be used for this pipeline. - * @return A Uc2HazelcastJetFactory with a set downsampleInterval. - */ - public Uc2HazelcastJetFactory setCustomDownsampleInterval(// NOPMD - final int downsampleInterval) { - this.downsampleInterval = downsampleInterval; - return this; - } - - /** - * Sets the downsample interval for the pipeline used in this builder from the environment. - * - * @param defaultDownsampleInterval the default downsample interval to be used for this pipeline - * when none is set in the environment. - * @return A Uc2HazelcastJetFactory with a set downsampleInterval. - */ - public Uc2HazelcastJetFactory setDownsampleIntervalFromEnv(// NOPMD - final String defaultDownsampleInterval) { - final String downsampleInterval = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.DOWNSAMPLE_INTERVAL), - defaultDownsampleInterval); - final int downsampleIntervalNumber = Integer.parseInt(downsampleInterval); - this.downsampleInterval = downsampleIntervalNumber; - return this; - } - -} diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineBuilder.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineBuilder.java deleted file mode 100644 index 73377de6122d4a723c5dbbcb8198fa814c4bed1e..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineBuilder.java +++ /dev/null @@ -1,135 +0,0 @@ -package rocks.theodolite.benchmarks.uc2.hazelcastjet; - -import com.google.common.math.Stats; -import com.google.common.math.StatsAccumulator; -import com.hazelcast.jet.aggregate.AggregateOperation; -import com.hazelcast.jet.aggregate.AggregateOperation1; -import com.hazelcast.jet.kafka.KafkaSinks; -import com.hazelcast.jet.kafka.KafkaSources; -import com.hazelcast.jet.pipeline.Pipeline; -import com.hazelcast.jet.pipeline.Sinks; -import com.hazelcast.jet.pipeline.StreamSource; -import com.hazelcast.jet.pipeline.StreamStage; -import com.hazelcast.jet.pipeline.WindowDefinition; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import rocks.theodolite.benchmarks.uc2.hazelcastjet.uc2specifics.StatsAccumulatorSupplier; -import titan.ccp.model.records.ActivePowerRecord; - -/** - * Builder to build a HazelcastJet Pipeline for UC2 which can be used for stream processing using - * Hazelcast Jet. - */ -public class Uc2PipelineBuilder { - - /** - * Builds a pipeline which can be used for stream processing using Hazelcast Jet. - * - * @param kafkaReadPropsForPipeline Properties Object containing the necessary kafka reads - * attributes. - * @param kafkaWritePropsForPipeline Properties Object containing the necessary kafka write - * attributes. - * @param kafkaInputTopic The name of the input topic used for the pipeline. - * @param kafkaOutputTopic The name of the output topic used for the pipeline. - * @param downsampleIntervalInMs The window length of the tumbling window used in the aggregation - * of this pipeline. - * @return returns a Pipeline used which can be used in a Hazelcast Jet Instance to process data - * for UC2. - */ - public Pipeline build(final Properties kafkaReadPropsForPipeline, - final Properties kafkaWritePropsForPipeline, final String kafkaInputTopic, - final String kafkaOutputTopic, - final int downsampleIntervalInMs) { - - // Define a new pipeline - final Pipeline pipe = Pipeline.create(); - - // Define the Kafka Source - final StreamSource<Entry<String, ActivePowerRecord>> kafkaSource = - KafkaSources.<String, ActivePowerRecord>kafka(kafkaReadPropsForPipeline, kafkaInputTopic); - - // Extend UC2 topology to the pipeline - final StreamStage<Map.Entry<String, String>> uc2TopologyProduct = - this.extendUc2Topology(pipe, kafkaSource, downsampleIntervalInMs); - - // Add Sink1: Logger - uc2TopologyProduct.writeTo(Sinks.logger()); - // Add Sink2: Write back to kafka for the final benchmark - uc2TopologyProduct.writeTo(KafkaSinks.<String, String>kafka( - kafkaWritePropsForPipeline, kafkaOutputTopic)); - - return pipe; - } - - /** - * Extends to a blank Hazelcast Jet Pipeline the UC2 topology defined by theodolite. - * - * <p> - * UC2 takes {@code ActivePowerRecord} objects, groups them by keys, windows them in a tumbling - * window and aggregates them into {@code Stats} objects. The final map returns an - * {@code Entry<String,String>} where the key is the key of the group and the String is the - * {@code .toString()} representation of the {@code Stats} object. - * </p> - * - * @param pipe The blank hazelcast jet pipeline to extend the logic to. - * @param source A streaming source to fetch data from. - * @param downsampleIntervalInMs The size of the tumbling window. - * @return A {@code StreamStage<Map.Entry<String,String>>} with the above definition of the key - * and value of the Entry object. It can be used to be further modified or directly be - * written into a sink. - */ - public StreamStage<Map.Entry<String, String>> extendUc2Topology(final Pipeline pipe, - final StreamSource<Entry<String, ActivePowerRecord>> source, - final int downsampleIntervalInMs) { - // Build the pipeline topology. - return pipe.readFrom(source) - .withNativeTimestamps(0) - .setLocalParallelism(1) - .groupingKey(record -> record.getValue().getIdentifier()) - .window(WindowDefinition.tumbling(downsampleIntervalInMs)) - .aggregate(this.uc2AggregateOperation()) - .map(agg -> { - final String theKey = agg.key(); - final String theValue = agg.getValue().toString(); - return Map.entry(theKey, theValue); - }); - } - - /** - * Defines an AggregateOperation1 for Hazelcast Jet which is used in the Pipeline of the Hazelcast - * Jet implementation of UC2. - * - * <p> - * Takes a windowed and keyed {@code Entry<String,ActivePowerRecord>} elements and returns a - * {@Stats} object. - * </p> - * - * @return An AggregateOperation used by Hazelcast Jet in a streaming stage which aggregates - * ActivePowerRecord Objects into Stats Objects. - */ - public AggregateOperation1<Entry<String, ActivePowerRecord>, - StatsAccumulator, Stats> uc2AggregateOperation() { - // Aggregate Operation to Create a Stats Object from Entry<String,ActivePowerRecord> items using - // the Statsaccumulator. - return AggregateOperation - // Creates the accumulator - .withCreate(new StatsAccumulatorSupplier()) - // Defines the accumulation - .<Entry<String, ActivePowerRecord>>andAccumulate((accumulator, item) -> { - accumulator.add(item.getValue().getValueInW()); - }) - // Defines the combination of spread out instances - .andCombine((left, right) -> { - final Stats rightStats = right.snapshot(); - left.addAll(rightStats); - - }) - // Finishes the aggregation - .andExportFinish( - (accumulator) -> { - return accumulator.snapshot(); - }); - } - -} diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineTest.java b/theodolite-benchmarks/uc2-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineTest.java index ff72b9558f43334feb8846d50bef2c6714d9404a..4a9a2a591522a4172141f666ec23ca16a3fc7660 100644 --- a/theodolite-benchmarks/uc2-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineTest.java +++ b/theodolite-benchmarks/uc2-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineTest.java @@ -5,6 +5,7 @@ import com.hazelcast.jet.JetInstance; import com.hazelcast.jet.config.JetConfig; import com.hazelcast.jet.core.JetTestSupport; import com.hazelcast.jet.pipeline.Pipeline; +import com.hazelcast.jet.pipeline.Sinks; import com.hazelcast.jet.pipeline.StreamSource; import com.hazelcast.jet.pipeline.StreamStage; import com.hazelcast.jet.pipeline.test.AssertionCompletedException; @@ -13,6 +14,7 @@ import com.hazelcast.jet.pipeline.test.TestSources; import com.hazelcast.jet.test.SerialTest; import java.util.Map; import java.util.Map.Entry; +import java.util.Properties; import java.util.concurrent.CompletionException; import org.junit.After; import org.junit.Assert; @@ -61,11 +63,12 @@ public class Uc2PipelineTest extends JetTestSupport { }); // Create pipeline to test - Uc2PipelineBuilder pipelineBuilder = new Uc2PipelineBuilder(); - this.testPipeline = Pipeline.create(); - this.uc2Topology = - pipelineBuilder.extendUc2Topology(this.testPipeline, testSource, testWindowInMs); + final Properties properties = new Properties(); + final Uc2PipelineFactory factory = new Uc2PipelineFactory( + properties,"",properties,"", testWindowInMs); + this.uc2Topology = factory.extendUc2Topology(testSource); + this.testPipeline = factory.getPipe(); } /**