From ad60dd62ad82f97ddc9c14dca23b1ec6258df134 Mon Sep 17 00:00:00 2001
From: lorenz <stu203404@mail.uni-kiel.de>
Date: Sun, 5 Jun 2022 23:56:38 +0200
Subject: [PATCH] Refactor uc2 hzl to use new implementation
---
.../uc2/hazelcastjet/HistoryService.java | 94 +++---
.../uc2/hazelcastjet/NewHistoryService.java | 64 ----
.../hazelcastjet/Uc2HazelcastJetFactory.java | 301 ------------------
.../uc2/hazelcastjet/Uc2PipelineBuilder.java | 135 --------
.../uc2/hazelcastjet/Uc2PipelineTest.java | 11 +-
5 files changed, 51 insertions(+), 554 deletions(-)
delete mode 100644 theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/NewHistoryService.java
delete mode 100644 theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2HazelcastJetFactory.java
delete mode 100644 theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineBuilder.java
diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java
index f382978b7..4c097c578 100644
--- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java
+++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java
@@ -1,70 +1,64 @@
package rocks.theodolite.benchmarks.uc2.hazelcastjet;
+import com.google.common.math.StatsAccumulator;
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+import java.util.Properties;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys;
+import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService;
+import rocks.theodolite.benchmarks.uc2.hazelcastjet.uc2specifics.StatsAccumulatorSerializer;
+
/**
- * A microservice that manages the history and, therefore, stores and aggregates incoming
- * measurements.
+ * A microservice that aggregate incoming messages in a tumbling window.
*/
-public class HistoryService {
+public class HistoryService extends HazelcastJetService {
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class);
- // Hazelcast settings (default)
- private static final String HZ_KUBERNETES_SERVICE_DNS_KEY = "service-dns";
- private static final String BOOTSTRAP_SERVER_DEFAULT = "localhost:5701";
- // Kafka settings (default)
- private static final String KAFKA_BOOTSTRAP_DEFAULT = "localhost:9092";
- private static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081";
- private static final String KAFKA_INPUT_TOPIC_DEFAULT = "input";
- private static final String KAFKA_OUTPUT_TOPIC_DEFAULT = "output";
+ /**
+ * Constructs the use case logic for UC2.
+ * Retrieves the needed values and instantiates a pipeline factory.
+ */
+ public HistoryService() {
+ super(LOGGER);
+ final Properties kafkaProps =
+ this.propsBuilder.buildReadProperties(
+ StringDeserializer.class.getCanonicalName(),
+ KafkaAvroDeserializer.class.getCanonicalName());
- // UC2 specific (default)
- private static final String DOWNSAMPLE_INTERVAL_DEFAULT_MS = "60000";
+ final Properties kafkaWriteProps =
+ this.propsBuilder.buildWriteProperties(
+ StringSerializer.class.getCanonicalName(),
+ StringSerializer.class.getCanonicalName());
- // Job name (default)
- private static final String JOB_NAME = "uc2-hazelcastjet";
+ final String kafkaOutputTopic =
+ config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString();
- /**
- * Entrypoint for UC2 using Gradle Run.
- */
- public static void main(final String[] args) {
- final HistoryService uc2HistoryService = new HistoryService();
- try {
- uc2HistoryService.run();
- } catch (final Exception e) { // NOPMD
- LOGGER.error("ABORT MISSION!: {}", e);
- }
- }
+ // Transform minutes to milliseconds
+ final int downsampleInterval = Integer.parseInt(
+ config.getProperty(ConfigurationKeys.DOWNSAMPLE_INTERVAL).toString());
+ final int downsampleIntervalMs = downsampleInterval * 60_000;
- /**
- * Start a UC2 service.
- *
- * @throws Exception This Exception occurs if the Uc2HazelcastJetFactory is used in the wrong way.
- * Detailed data is provided once an Exception occurs.
- */
- public void run() throws Exception { // NOPMD
- this.createHazelcastJetApplication();
+ this.pipelineFactory = new Uc2PipelineFactory(
+ kafkaProps,
+ this.kafkaInputTopic,
+ kafkaWriteProps,
+ kafkaOutputTopic,
+ downsampleIntervalMs);
}
- /**
- * Creates a Hazelcast Jet Application for UC2 using the Uc1HazelcastJetFactory.
- *
- * @throws Exception This Exception occurs if the Uc2HazelcastJetFactory is used in the wrong way.
- * Detailed data is provided once an Exception occurs.
- */
- private void createHazelcastJetApplication() throws Exception { // NOPMD
- new Uc2HazelcastJetFactory()
- .setReadPropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT, SCHEMA_REGISTRY_URL_DEFAULT,JOB_NAME)
- .setWritePropertiesFromEnv(KAFKA_BOOTSTRAP_DEFAULT,SCHEMA_REGISTRY_URL_DEFAULT)
- .setKafkaInputTopicFromEnv(KAFKA_INPUT_TOPIC_DEFAULT)
- .setKafkaOutputTopicFromEnv(KAFKA_OUTPUT_TOPIC_DEFAULT)
- .setDownsampleIntervalFromEnv(DOWNSAMPLE_INTERVAL_DEFAULT_MS)
- .buildUc2Pipeline()
- .buildUc2JetInstanceFromEnv(LOGGER, BOOTSTRAP_SERVER_DEFAULT, HZ_KUBERNETES_SERVICE_DNS_KEY)
- .runUc2Job(JOB_NAME);
+ @Override
+ protected void registerSerializer() {
+ this.jobConfig.registerSerializer(StatsAccumulator.class, StatsAccumulatorSerializer.class);
}
+
+ public static void main(final String[] args) {
+ new HistoryService().run();
+ }
}
diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/NewHistoryService.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/NewHistoryService.java
deleted file mode 100644
index 572329c5d..000000000
--- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/NewHistoryService.java
+++ /dev/null
@@ -1,64 +0,0 @@
-package rocks.theodolite.benchmarks.uc2.hazelcastjet;
-
-import com.google.common.math.StatsAccumulator;
-import io.confluent.kafka.serializers.KafkaAvroDeserializer;
-import java.util.Properties;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys;
-import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService;
-import rocks.theodolite.benchmarks.uc2.hazelcastjet.uc2specifics.StatsAccumulatorSerializer;
-
-
-/**
- * A microservice that aggregate incoming messages in a tumbling window.
- */
-public class NewHistoryService extends HazelcastJetService {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class);
-
-
- /**
- * Constructs the use case logic for UC2.
- * Retrieves the needed values and instantiates a pipeline factory.
- */
- public NewHistoryService() {
- super(LOGGER);
- final Properties kafkaProps =
- this.propsBuilder.buildReadProperties(
- StringDeserializer.class.getCanonicalName(),
- KafkaAvroDeserializer.class.getCanonicalName());
-
- final Properties kafkaWriteProps =
- this.propsBuilder.buildWriteProperties(
- StringSerializer.class.getCanonicalName(),
- StringSerializer.class.getCanonicalName());
-
- final String kafkaOutputTopic =
- config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString();
-
- // Transform minutes to milliseconds
- final int downsampleInterval = Integer.parseInt(
- config.getProperty(ConfigurationKeys.DOWNSAMPLE_INTERVAL).toString());
- final int downsampleIntervalMs = downsampleInterval * 60_000;
-
- this.pipelineFactory = new Uc2PipelineFactory(
- kafkaProps,
- this.kafkaInputTopic,
- kafkaWriteProps,
- kafkaOutputTopic,
- downsampleIntervalMs);
- }
-
- @Override
- protected void registerSerializer() {
- this.jobConfig.registerSerializer(StatsAccumulator.class, StatsAccumulatorSerializer.class);
- }
-
-
- public static void main(final String[] args) {
- new NewHistoryService().run();
- }
-}
diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2HazelcastJetFactory.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2HazelcastJetFactory.java
deleted file mode 100644
index 143b154f3..000000000
--- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2HazelcastJetFactory.java
+++ /dev/null
@@ -1,301 +0,0 @@
-package rocks.theodolite.benchmarks.uc2.hazelcastjet;
-
-import com.google.common.math.StatsAccumulator;
-import com.hazelcast.jet.JetInstance;
-import com.hazelcast.jet.config.JobConfig;
-import com.hazelcast.jet.pipeline.Pipeline;
-import io.confluent.kafka.serializers.KafkaAvroDeserializer;
-import java.util.Objects;
-import java.util.Properties;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.slf4j.Logger;
-import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys;
-import rocks.theodolite.benchmarks.commons.hazelcastjet.JetInstanceBuilder;
-import rocks.theodolite.benchmarks.commons.hazelcastjet.KafkaPropertiesBuilder;
-import rocks.theodolite.benchmarks.uc2.hazelcastjet.uc2specifics.StatsAccumulatorSerializer;
-
-/**
- * A Hazelcast Jet factory which can build a Hazelcast Jet Instance and Pipeline for the UC2
- * benchmark and lets you start the Hazelcast Jet job. The JetInstance can be built directly as the
- * Hazelcast Config is managed internally. In order to build the Pipeline, you first have to build
- * the Read and Write Properties, set the input and output topic, and set the downsample interval
- * which can be done using internal functions of this factory. Outside data only refers to custom
- * values or default values in case data of the environment cannot the fetched.
- */
-public class Uc2HazelcastJetFactory {
-
- // Information per History Service
- private Properties kafkaReadPropsForPipeline;
- private Properties kafkaWritePropsForPipeline;
- private String kafkaInputTopic;
- private String kafkaOutputTopic;
- private JetInstance uc2JetInstance;
- private Pipeline uc2JetPipeline;
- // UC2 specific
- private int downsampleInterval;
-
- /////////////////////////////////////
- // Layer 1 - Hazelcast Jet Run Job //
- /////////////////////////////////////
-
- /**
- * Needs a JetInstance and Pipeline defined in this factors. Adds the pipeline to the existing
- * JetInstance as a job.
- *
- * @param jobName The name of the job.
- */
- public void runUc2Job(final String jobName) {
-
- // Check if a Jet Instance for UC2 is set.
- if (this.uc2JetInstance == null) {
- throw new IllegalStateException("Jet Instance is not set! "
- + "Cannot start a hazelcast jet job for UC2.");
- }
-
- // Check if a Pipeline for UC2 is set.
- if (this.uc2JetPipeline == null) {
- throw new IllegalStateException(
- "Hazelcast Pipeline is not set! Cannot start a hazelcast jet job for UC2.");
- }
-
- // Adds the job name and joins a job to the JetInstance defined in this factory
- final JobConfig jobConfig = new JobConfig();
- jobConfig.registerSerializer(StatsAccumulator.class, StatsAccumulatorSerializer.class);
- jobConfig.setName(jobName);
- this.uc2JetInstance.newJobIfAbsent(this.uc2JetPipeline, jobConfig).join();
- }
-
- /////////////
- // Layer 2 //
- /////////////
-
- /**
- * Build a Hazelcast JetInstance used to run a job on.
- *
- * @param logger The logger specified for this JetInstance.
- * @param bootstrapServerDefault Default bootstrap server in case no value can be derived from the
- * environment.
- * @param hzKubernetesServiceDnsKey The kubernetes service dns key.
- * @return A Uc2HazelcastJetFactory containing a set JetInstance.
- */
- public Uc2HazelcastJetFactory buildUc2JetInstanceFromEnv(final Logger logger,
- final String bootstrapServerDefault,
- final String hzKubernetesServiceDnsKey) {
- this.uc2JetInstance = new JetInstanceBuilder()
- .setConfigFromEnv(logger, bootstrapServerDefault, hzKubernetesServiceDnsKey)
- .build();
- return this;
- }
-
- /**
- * Builds a Hazelcast Jet pipeline used for a JetInstance to run it as a job on. Needs the input
- * topic and kafka properties defined in this factory beforehand.
- *
- * @return A Uc2HazelcastJetFactory containg a set pipeline.
- * @throws Exception If the input topic or the kafka properties are not defined, the pipeline
- * cannot be built.
- */
- public Uc2HazelcastJetFactory buildUc2Pipeline() throws IllegalStateException { // NOPMD
-
- final String defaultPipelineWarning = "Cannot build pipeline."; // NOPMD
-
- // Check if Properties for the Kafka Input are set.
- if (this.kafkaReadPropsForPipeline == null) {
- throw new IllegalStateException("Kafka Read Properties for pipeline not set! "
- + defaultPipelineWarning);
- }
-
- // Check if Properties for the Kafka Output are set.
- if (this.kafkaWritePropsForPipeline == null) {
- throw new IllegalStateException("Kafka Write Properties for pipeline not set! "
- + defaultPipelineWarning);
- }
-
- // Check if the Kafka input topic is set.
- if (this.kafkaInputTopic == null) {
- throw new IllegalStateException("Kafka input topic for pipeline not set! "
- + defaultPipelineWarning);
- }
-
- // Check if the Kafka output topic is set.
- if (this.kafkaOutputTopic == null) {
- throw new IllegalStateException("kafka output topic for pipeline not set! "
- + defaultPipelineWarning);
- }
-
- // Check if the downsampleInterval (tumbling window time) is set.
- if (this.downsampleInterval <= 0) {
- throw new IllegalStateException(
- "downsample interval for pipeline not set or not bigger than 0! "
- + defaultPipelineWarning);
- }
-
- // Build Pipeline Using the pipelineBuilder
- final Uc2PipelineBuilder pipeBuilder = new Uc2PipelineBuilder();
- this.uc2JetPipeline =
- pipeBuilder.build(this.kafkaReadPropsForPipeline, this.kafkaWritePropsForPipeline,
- this.kafkaInputTopic, this.kafkaOutputTopic, this.downsampleInterval);
- // Return Uc2HazelcastJetBuilder factory
- return this;
- }
-
- /////////////
- // Layer 3 //
- /////////////
-
- /**
- * Sets kafka read properties for pipeline used in this builder.
- *
- * @param kafkaReadProperties A propeties object containing necessary values used for the hazelcst
- * jet kafka connection to read data.
- * @return The Uc2HazelcastJetBuilder factory with set kafkaReadPropsForPipeline.
- */
- public Uc2HazelcastJetFactory setCustomReadProperties(// NOPMD
- final Properties kafkaReadProperties) {
- this.kafkaReadPropsForPipeline = kafkaReadProperties;
- return this;
- }
-
- /**
- * Sets kafka write properties for pipeline used in this builder.
- *
- * @param kafkaWriteProperties A propeties object containing necessary values used for the
- * hazelcst jet kafka connection to write data.
- * @return The Uc2HazelcastJetBuilder factory with set kafkaWritePropsForPipeline.
- */
- public Uc2HazelcastJetFactory setCustomWriteProperties(// NOPMD
- final Properties kafkaWriteProperties) {
- this.kafkaWritePropsForPipeline = kafkaWriteProperties;
- return this;
- }
-
- /**
- * Sets kafka read properties for pipeline used in this builder using environment variables.
- *
- * @param bootstrapServersDefault Default Bootstrap server in the case that no bootstrap server
- * can be fetched from the environment.
- * @param schemaRegistryUrlDefault Default schema registry url in the case that no schema registry
- * url can be fetched from the environment.
- * @return The Uc2HazelcastJetBuilder factory with set kafkaReadPropertiesForPipeline.
- */
- public Uc2HazelcastJetFactory setReadPropertiesFromEnv(// NOPMD
- final String bootstrapServersDefault,
- final String schemaRegistryUrlDefault,
- final String jobName) {
- // Use KafkaPropertiesBuilder to build a properties object used for kafka
- final KafkaPropertiesBuilder propsBuilder = new KafkaPropertiesBuilder();
- final Properties kafkaReadProps =
- propsBuilder.buildKafkaInputReadPropsFromEnv(bootstrapServersDefault,
- schemaRegistryUrlDefault,
- jobName,
- StringDeserializer.class.getCanonicalName(),
- KafkaAvroDeserializer.class.getCanonicalName());
- this.kafkaReadPropsForPipeline = kafkaReadProps;
- return this;
- }
-
- /**
- * Sets kafka write properties for pipeline used in this builder using environment variables.
- *
- * @param bootstrapServersDefault Default Bootstrap server in the case that no bootstrap server
- * can be fetched from the environment.
- * @return The Uc2HazelcastJetBuilder factory with set kafkaWritePropertiesForPipeline.
- */
- public Uc2HazelcastJetFactory setWritePropertiesFromEnv(// NOPMD
- final String bootstrapServersDefault, final String schemaRegistryUrlDefault) {
- // Use KafkaPropertiesBuilder to build a properties object used for kafka
- final KafkaPropertiesBuilder propsBuilder = new KafkaPropertiesBuilder();
- final Properties kafkaWriteProps =
- propsBuilder.buildKafkaWritePropsFromEnv(bootstrapServersDefault,
- schemaRegistryUrlDefault,
- StringSerializer.class.getCanonicalName(),
- StringSerializer.class.getCanonicalName());
- this.kafkaWritePropsForPipeline = kafkaWriteProps;
- return this;
- }
-
- /**
- * Sets the kafka input topic for the pipeline used in this builder.
- *
- * @param inputTopic The kafka topic used as the pipeline input.
- * @return A Uc2HazelcastJetBuilder factory with a set kafkaInputTopic.
- */
- public Uc2HazelcastJetFactory setCustomKafkaInputTopic(// NOPMD
- final String inputTopic) {
- this.kafkaInputTopic = inputTopic;
- return this;
- }
-
- /**
- * Sets the kafka input output for the pipeline used in this builder.
- *
- * @param outputTopic The kafka topic used as the pipeline output.
- * @return A Uc2HazelcastJetBuilder factory with a set kafkaOutputTopic.
- */
- public Uc2HazelcastJetFactory setCustomKafkaOutputTopic(final String outputTopic) { // NOPMD
- this.kafkaOutputTopic = outputTopic;
- return this;
- }
-
-
- /**
- * Sets the kafka input topic for the pipeline used in this builder using environment variables.
- *
- * @param defaultInputTopic The default kafka input topic used if no topic is specified by the
- * environment.
- * @return A Uc2HazelcastJetBuilder factory with a set kafkaInputTopic.
- */
- public Uc2HazelcastJetFactory setKafkaInputTopicFromEnv(// NOPMD
- final String defaultInputTopic) {
- this.kafkaInputTopic = Objects.requireNonNullElse(
- System.getenv(ConfigurationKeys.KAFKA_INPUT_TOPIC),
- defaultInputTopic);
- return this;
- }
-
- /**
- * Sets the kafka output topic for the pipeline used in this builder using environment variables.
- *
- * @param defaultOutputTopic The default kafka output topic used if no topic is specified by the
- * environment.
- * @return A Uc2HazelcastJetBuilder factory with a set kafkaOutputTopic.
- */
- public Uc2HazelcastJetFactory setKafkaOutputTopicFromEnv(// NOPMD
- final String defaultOutputTopic) {
- this.kafkaOutputTopic = Objects.requireNonNullElse(
- System.getenv(ConfigurationKeys.KAFKA_OUTPUT_TOPIC),
- defaultOutputTopic);
- return this;
- }
-
- /**
- * Sets the downsample interval for the pipeline used in this builder.
- *
- * @param downsampleInterval the downsample interval to be used for this pipeline.
- * @return A Uc2HazelcastJetFactory with a set downsampleInterval.
- */
- public Uc2HazelcastJetFactory setCustomDownsampleInterval(// NOPMD
- final int downsampleInterval) {
- this.downsampleInterval = downsampleInterval;
- return this;
- }
-
- /**
- * Sets the downsample interval for the pipeline used in this builder from the environment.
- *
- * @param defaultDownsampleInterval the default downsample interval to be used for this pipeline
- * when none is set in the environment.
- * @return A Uc2HazelcastJetFactory with a set downsampleInterval.
- */
- public Uc2HazelcastJetFactory setDownsampleIntervalFromEnv(// NOPMD
- final String defaultDownsampleInterval) {
- final String downsampleInterval = Objects.requireNonNullElse(
- System.getenv(ConfigurationKeys.DOWNSAMPLE_INTERVAL),
- defaultDownsampleInterval);
- final int downsampleIntervalNumber = Integer.parseInt(downsampleInterval);
- this.downsampleInterval = downsampleIntervalNumber;
- return this;
- }
-
-}
diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineBuilder.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineBuilder.java
deleted file mode 100644
index 73377de61..000000000
--- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineBuilder.java
+++ /dev/null
@@ -1,135 +0,0 @@
-package rocks.theodolite.benchmarks.uc2.hazelcastjet;
-
-import com.google.common.math.Stats;
-import com.google.common.math.StatsAccumulator;
-import com.hazelcast.jet.aggregate.AggregateOperation;
-import com.hazelcast.jet.aggregate.AggregateOperation1;
-import com.hazelcast.jet.kafka.KafkaSinks;
-import com.hazelcast.jet.kafka.KafkaSources;
-import com.hazelcast.jet.pipeline.Pipeline;
-import com.hazelcast.jet.pipeline.Sinks;
-import com.hazelcast.jet.pipeline.StreamSource;
-import com.hazelcast.jet.pipeline.StreamStage;
-import com.hazelcast.jet.pipeline.WindowDefinition;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import rocks.theodolite.benchmarks.uc2.hazelcastjet.uc2specifics.StatsAccumulatorSupplier;
-import titan.ccp.model.records.ActivePowerRecord;
-
-/**
- * Builder to build a HazelcastJet Pipeline for UC2 which can be used for stream processing using
- * Hazelcast Jet.
- */
-public class Uc2PipelineBuilder {
-
- /**
- * Builds a pipeline which can be used for stream processing using Hazelcast Jet.
- *
- * @param kafkaReadPropsForPipeline Properties Object containing the necessary kafka reads
- * attributes.
- * @param kafkaWritePropsForPipeline Properties Object containing the necessary kafka write
- * attributes.
- * @param kafkaInputTopic The name of the input topic used for the pipeline.
- * @param kafkaOutputTopic The name of the output topic used for the pipeline.
- * @param downsampleIntervalInMs The window length of the tumbling window used in the aggregation
- * of this pipeline.
- * @return returns a Pipeline used which can be used in a Hazelcast Jet Instance to process data
- * for UC2.
- */
- public Pipeline build(final Properties kafkaReadPropsForPipeline,
- final Properties kafkaWritePropsForPipeline, final String kafkaInputTopic,
- final String kafkaOutputTopic,
- final int downsampleIntervalInMs) {
-
- // Define a new pipeline
- final Pipeline pipe = Pipeline.create();
-
- // Define the Kafka Source
- final StreamSource<Entry<String, ActivePowerRecord>> kafkaSource =
- KafkaSources.<String, ActivePowerRecord>kafka(kafkaReadPropsForPipeline, kafkaInputTopic);
-
- // Extend UC2 topology to the pipeline
- final StreamStage<Map.Entry<String, String>> uc2TopologyProduct =
- this.extendUc2Topology(pipe, kafkaSource, downsampleIntervalInMs);
-
- // Add Sink1: Logger
- uc2TopologyProduct.writeTo(Sinks.logger());
- // Add Sink2: Write back to kafka for the final benchmark
- uc2TopologyProduct.writeTo(KafkaSinks.<String, String>kafka(
- kafkaWritePropsForPipeline, kafkaOutputTopic));
-
- return pipe;
- }
-
- /**
- * Extends to a blank Hazelcast Jet Pipeline the UC2 topology defined by theodolite.
- *
- * <p>
- * UC2 takes {@code ActivePowerRecord} objects, groups them by keys, windows them in a tumbling
- * window and aggregates them into {@code Stats} objects. The final map returns an
- * {@code Entry<String,String>} where the key is the key of the group and the String is the
- * {@code .toString()} representation of the {@code Stats} object.
- * </p>
- *
- * @param pipe The blank hazelcast jet pipeline to extend the logic to.
- * @param source A streaming source to fetch data from.
- * @param downsampleIntervalInMs The size of the tumbling window.
- * @return A {@code StreamStage<Map.Entry<String,String>>} with the above definition of the key
- * and value of the Entry object. It can be used to be further modified or directly be
- * written into a sink.
- */
- public StreamStage<Map.Entry<String, String>> extendUc2Topology(final Pipeline pipe,
- final StreamSource<Entry<String, ActivePowerRecord>> source,
- final int downsampleIntervalInMs) {
- // Build the pipeline topology.
- return pipe.readFrom(source)
- .withNativeTimestamps(0)
- .setLocalParallelism(1)
- .groupingKey(record -> record.getValue().getIdentifier())
- .window(WindowDefinition.tumbling(downsampleIntervalInMs))
- .aggregate(this.uc2AggregateOperation())
- .map(agg -> {
- final String theKey = agg.key();
- final String theValue = agg.getValue().toString();
- return Map.entry(theKey, theValue);
- });
- }
-
- /**
- * Defines an AggregateOperation1 for Hazelcast Jet which is used in the Pipeline of the Hazelcast
- * Jet implementation of UC2.
- *
- * <p>
- * Takes a windowed and keyed {@code Entry<String,ActivePowerRecord>} elements and returns a
- * {@Stats} object.
- * </p>
- *
- * @return An AggregateOperation used by Hazelcast Jet in a streaming stage which aggregates
- * ActivePowerRecord Objects into Stats Objects.
- */
- public AggregateOperation1<Entry<String, ActivePowerRecord>,
- StatsAccumulator, Stats> uc2AggregateOperation() {
- // Aggregate Operation to Create a Stats Object from Entry<String,ActivePowerRecord> items using
- // the Statsaccumulator.
- return AggregateOperation
- // Creates the accumulator
- .withCreate(new StatsAccumulatorSupplier())
- // Defines the accumulation
- .<Entry<String, ActivePowerRecord>>andAccumulate((accumulator, item) -> {
- accumulator.add(item.getValue().getValueInW());
- })
- // Defines the combination of spread out instances
- .andCombine((left, right) -> {
- final Stats rightStats = right.snapshot();
- left.addAll(rightStats);
-
- })
- // Finishes the aggregation
- .andExportFinish(
- (accumulator) -> {
- return accumulator.snapshot();
- });
- }
-
-}
diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineTest.java b/theodolite-benchmarks/uc2-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineTest.java
index ff72b9558..4a9a2a591 100644
--- a/theodolite-benchmarks/uc2-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineTest.java
+++ b/theodolite-benchmarks/uc2-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineTest.java
@@ -5,6 +5,7 @@ import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.config.JetConfig;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.pipeline.Pipeline;
+import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.jet.pipeline.StreamStage;
import com.hazelcast.jet.pipeline.test.AssertionCompletedException;
@@ -13,6 +14,7 @@ import com.hazelcast.jet.pipeline.test.TestSources;
import com.hazelcast.jet.test.SerialTest;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Properties;
import java.util.concurrent.CompletionException;
import org.junit.After;
import org.junit.Assert;
@@ -61,11 +63,12 @@ public class Uc2PipelineTest extends JetTestSupport {
});
// Create pipeline to test
- Uc2PipelineBuilder pipelineBuilder = new Uc2PipelineBuilder();
- this.testPipeline = Pipeline.create();
- this.uc2Topology =
- pipelineBuilder.extendUc2Topology(this.testPipeline, testSource, testWindowInMs);
+ final Properties properties = new Properties();
+ final Uc2PipelineFactory factory = new Uc2PipelineFactory(
+ properties,"",properties,"", testWindowInMs);
+ this.uc2Topology = factory.extendUc2Topology(testSource);
+ this.testPipeline = factory.getPipe();
}
/**
--
GitLab