From 8b6f32b17ea45285b18fcb02772d8318738fc739 Mon Sep 17 00:00:00 2001 From: MaxEmerold <wiedenhoeft.max@gmail.com> Date: Tue, 7 Dec 2021 10:55:01 +0100 Subject: [PATCH] Implement Feedback... - Rework flags UC1,UC2,UC3,UC4 - Enhanced Properties implementation UC1,UC2,UC3,UC4 - Changed certain variable names - Changed Exceptions to IllegalStateExceptions UC1,UC2,UC3,UC4 - Changed Implementation of flag Check UC1,UC2,UC3,UC4 - Removed old implementation UC2 Hazelcast Jet - Fixed HashMapSupplier UC4 Hazelcast Jet - Changed Implementation EventDeserializer UC4 Hazelcast Jet - Added Logger use in History Service Main UC1,UC2,UC3,UC4 Fixed Checkstyle and PMD for Hazelcast Jet commons project, UC1 project, UC2 project, UC3 project, UC4 project Still open Feedback: - Adjust UC2 implementation Still open Task: - Implement JUnit Test for UC2,UC3,UC4 --- .../groovy/theodolite.hazelcastjet.gradle | 5 +- .../hazelcastjet/BenchmarkConfigBuilder.java | 9 +- .../hazelcastjet/ConfigurationKeys.java | 3 + .../hazelcastjet/JetInstanceBuilder.java | 18 +- .../uc1/application/HistoryService.java | 3 +- .../application/Uc1HazelcastJetFactory.java | 68 ++-- .../Uc1KafkaPropertiesBuilder.java | 18 +- .../uc1/application/Uc1PipelineBuilder.java | 5 +- .../uc1/application/Uc1PipelineTest.java | 34 +- .../uc2/application/HistoryService.java | 7 +- .../application/Uc2HazelcastJetFactory.java | 137 +++---- .../Uc2KafkaPropertiesBuilder.java | 18 +- .../uc2/applicationold/ClusterConfig.java | 76 ---- .../uc2/applicationold/ConfigurationKeys.java | 44 --- .../uc2/applicationold/HistoryService.java | 264 ------------- .../uc3/application/HistoryService.java | 9 +- .../application/Uc3HazelcastJetFactory.java | 162 ++++---- .../Uc3KafkaPropertiesBuilder.java | 22 +- .../uc4/application/HistoryService.java | 13 +- .../application/Uc4HazelcastJetFactory.java | 207 +++++----- .../Uc4KafkaPropertiesBuilder.java | 57 +-- .../uc4/application/Uc4PipelineBuilder.java | 353 ++++++++++++------ .../application/Uc4PipelineBuilderNew.java | 278 -------------- .../uc4specifics/ChildParentsTransformer.java | 5 + .../uc4specifics/EventDeserializer.java | 22 +- .../uc4specifics/HashMapSupplier.java | 9 +- .../uc4specifics/SensorGroupKey.java | 4 +- .../SensorGroupKeySerializer.java | 17 +- .../application/uc4specifics/ValueGroup.java | 8 +- .../uc4specifics/ValueGroupSerializer.java | 9 +- 30 files changed, 665 insertions(+), 1219 deletions(-) delete mode 100644 theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/applicationold/ClusterConfig.java delete mode 100644 theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/applicationold/ConfigurationKeys.java delete mode 100644 theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/applicationold/HistoryService.java delete mode 100644 theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4PipelineBuilderNew.java diff --git a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.hazelcastjet.gradle b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.hazelcastjet.gradle index fc916f03d..4d965ec84 100644 --- a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.hazelcastjet.gradle +++ b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.hazelcastjet.gradle @@ -30,5 +30,8 @@ dependencies { implementation project(':hazelcastjet-commons') compile 'com.hazelcast.jet:hazelcast-jet-core:4.5:tests' compile 'com.hazelcast:hazelcast:4.2:tests' - + + // Use JUnit test framework + testImplementation("junit:junit:4.13.2") + } \ No newline at end of file diff --git a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/theodolite/commons/hazelcastjet/BenchmarkConfigBuilder.java b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/theodolite/commons/hazelcastjet/BenchmarkConfigBuilder.java index c271e6013..7062f5e1d 100644 --- a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/theodolite/commons/hazelcastjet/BenchmarkConfigBuilder.java +++ b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/theodolite/commons/hazelcastjet/BenchmarkConfigBuilder.java @@ -3,10 +3,17 @@ package theodolite.commons.hazelcastjet; import com.hazelcast.config.Config; import com.hazelcast.config.JoinConfig; import org.slf4j.Logger; -import theodolite.commons.hazelcastjet.ConfigurationKeys; +/** + * Build a Config Object for Benchmarks implemented in Hazelcast Jet. + * + */ public class BenchmarkConfigBuilder { + /** + * Builds a Config Object for Benchmarks implemented in Hazelcast Jet using data from the + * environment. + */ public Config buildFromEnv(final Logger logger, final String bootstrapServerDefault, final String hzKubernetesServiceDnsKey) { diff --git a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/theodolite/commons/hazelcastjet/ConfigurationKeys.java b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/theodolite/commons/hazelcastjet/ConfigurationKeys.java index 85a1d355d..e4575a5de 100644 --- a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/theodolite/commons/hazelcastjet/ConfigurationKeys.java +++ b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/theodolite/commons/hazelcastjet/ConfigurationKeys.java @@ -1,5 +1,8 @@ package theodolite.commons.hazelcastjet; +/** + * Configuration Keys used for Hazelcast Jet Benchmark implementations. + */ public class ConfigurationKeys { // Common Keys diff --git a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/theodolite/commons/hazelcastjet/JetInstanceBuilder.java b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/theodolite/commons/hazelcastjet/JetInstanceBuilder.java index 37c11bf4c..9006c7a34 100644 --- a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/theodolite/commons/hazelcastjet/JetInstanceBuilder.java +++ b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/theodolite/commons/hazelcastjet/JetInstanceBuilder.java @@ -4,8 +4,10 @@ import com.hazelcast.config.Config; import com.hazelcast.jet.Jet; import com.hazelcast.jet.JetInstance; import org.slf4j.Logger; -import theodolite.commons.hazelcastjet.BenchmarkConfigBuilder; +/** + * Builds JetInstances for Benchmark Implementations in Hazelcast Jet. + */ public class JetInstanceBuilder { private Config config; @@ -16,7 +18,7 @@ public class JetInstanceBuilder { * @param hazelcastConfig Config for this JetInstance to be built. * @return A Uc1JetInstanceBuilder with a set Config. */ - public JetInstanceBuilder setCustomConfig(final Config hazelcastConfig) { + public JetInstanceBuilder setCustomConfig(final Config hazelcastConfig) { // NOPMD this.config = hazelcastConfig; return this; } @@ -26,16 +28,16 @@ public class JetInstanceBuilder { * variables. * * @param logger A specified logger to log procedures - * @param BootstrapServerDefault The default bootstrap server used in case no definition by the + * @param bootstrapServerDefault The default bootstrap server used in case no definition by the * environment is provided. * @return The Uc1HazelcastJetBuilder factory with a set ClusterConfig. */ - public JetInstanceBuilder setConfigFromEnv(final Logger logger, - final String BootstrapServerDefault, final String hzKubernetesServiceDnsKey) { + public JetInstanceBuilder setConfigFromEnv(final Logger logger, // NOPMD + final String bootstrapServerDefault, final String hzKubernetesServiceDnsKey) { // Use ClusterConfigBuilder to build a cluster config for this microservice final BenchmarkConfigBuilder configBuilder = new BenchmarkConfigBuilder(); final Config config = - configBuilder.buildFromEnv(logger, BootstrapServerDefault, hzKubernetesServiceDnsKey); + configBuilder.buildFromEnv(logger, bootstrapServerDefault, hzKubernetesServiceDnsKey); this.config = config; return this; } @@ -48,10 +50,10 @@ public class JetInstanceBuilder { */ public JetInstance build() { final JetInstance jet = Jet.newJetInstance(); - if (this.config != null) { - jet.getConfig().setHazelcastConfig(this.config); + if (this.config == null) { return jet; } else { + jet.getConfig().setHazelcastConfig(this.config); return jet; } diff --git a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/theodolite/uc1/application/HistoryService.java b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/theodolite/uc1/application/HistoryService.java index 4058d405b..3371c6140 100644 --- a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/theodolite/uc1/application/HistoryService.java +++ b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/theodolite/uc1/application/HistoryService.java @@ -32,8 +32,9 @@ public class HistoryService { uc1HistoryService.run(); } catch (final Exception e) { // NOPMD e.printStackTrace(); // NOPMD - System.out.println("An Exception occured. "// NOPMD + LOGGER.error("An Exception occured. "// NOPMD + "No history service is deployed! ABORT MISSION!"); + LOGGER.error(e.toString()); } } diff --git a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/theodolite/uc1/application/Uc1HazelcastJetFactory.java b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/theodolite/uc1/application/Uc1HazelcastJetFactory.java index 091faa371..13576ad29 100644 --- a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/theodolite/uc1/application/Uc1HazelcastJetFactory.java +++ b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/theodolite/uc1/application/Uc1HazelcastJetFactory.java @@ -36,23 +36,24 @@ public class Uc1HazelcastJetFactory { * @param jobName The name of the job. * @throws Exception If either no JetInstance or Pipeline is set, a job cannot be startet. */ - public void runUc1Job(final String jobName) throws Exception { // NOPMD - if (this.uc1JetInstance != null) { - if (this.uc1JetPipeline != null) { - - // Adds the job name and joins a job to the JetInstance defined in this factory - final JobConfig jobConfig = new JobConfig(); - jobConfig.setName(jobName); - this.uc1JetInstance.newJobIfAbsent(this.uc1JetPipeline, jobConfig).join(); - - } else { - throw new Exception(// NOPMD - "Hazelcast Pipeline is not set! Cannot start a hazelcast jet job for UC1."); - } - } else { - throw new Exception("Jet Instance is not set! " // NOPMD + public void runUc1Job(final String jobName) throws IllegalStateException { + + // Check if a Jet Instance for UC1 is set. + if (this.uc1JetInstance == null) { + throw new IllegalStateException("Jet Instance is not set! " + "Cannot start a hazelcast jet job for UC1."); } + + // Check if a Pipeline for UC1 is set. + if (this.uc1JetPipeline == null) { + throw new IllegalStateException( + "Hazelcast Pipeline is not set! Cannot start a hazelcast jet job for UC1."); + } + + // Adds the job name and joins a job to the JetInstance defined in this factory + final JobConfig jobConfig = new JobConfig(); + jobConfig.setName(jobName); + this.uc1JetInstance.newJobIfAbsent(this.uc1JetPipeline, jobConfig).join(); } ///////////// @@ -85,25 +86,26 @@ public class Uc1HazelcastJetFactory { * @throws Exception If the input topic or the kafka properties are not defined, the pipeline * cannot be built. */ - public Uc1HazelcastJetFactory buildUc1Pipeline() throws Exception { // NOPMD - // Check for set properties and set input topic - if (this.kafkaPropertiesForPipeline != null) { - if (this.kafkaInputTopic != null) { - - // Build Pipeline Using the pipelineBuilder - final Uc1PipelineBuilder pipeBuilder = new Uc1PipelineBuilder(); - this.uc1JetPipeline = - pipeBuilder.build(this.kafkaPropertiesForPipeline, this.kafkaInputTopic); - // Return Uc1HazelcastJetBuilder factory - return this; - - } else { - throw new Exception("Kafka input topic for pipeline not set! " // NOPMD - + "Cannot build pipeline."); - } - } else { - throw new Exception("Kafka Properties for pipeline not set! Cannot build pipeline."); // NOPMD + public Uc1HazelcastJetFactory buildUc1Pipeline() throws IllegalStateException { + + // Check if Properties for the Kafka Input are set. + if (this.kafkaPropertiesForPipeline == null) { + throw new IllegalStateException( + "Kafka Properties for pipeline not set! Cannot build pipeline."); } + + // Check if the Kafka input topic is set. + if (this.kafkaInputTopic == null) { + throw new IllegalStateException("Kafka input topic for pipeline not set! " + + "Cannot build pipeline."); + } + + // Build Pipeline Using the pipelineBuilder + final Uc1PipelineBuilder pipeBuilder = new Uc1PipelineBuilder(); + this.uc1JetPipeline = + pipeBuilder.build(this.kafkaPropertiesForPipeline, this.kafkaInputTopic); + // Return Uc1HazelcastJetBuilder factory + return this; } ///////////// diff --git a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/theodolite/uc1/application/Uc1KafkaPropertiesBuilder.java b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/theodolite/uc1/application/Uc1KafkaPropertiesBuilder.java index 5df508cc7..3cf7aafc6 100644 --- a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/theodolite/uc1/application/Uc1KafkaPropertiesBuilder.java +++ b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/theodolite/uc1/application/Uc1KafkaPropertiesBuilder.java @@ -3,6 +3,7 @@ package theodolite.uc1.application; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import java.util.Objects; import java.util.Properties; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import theodolite.commons.hazelcastjet.ConfigurationKeys; @@ -30,13 +31,20 @@ public class Uc1KafkaPropertiesBuilder { System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL), schemaRegistryUrlDefault); + // comment: + // > Could not find constant fields for all properties + // > setProperties not applicable for non string values final Properties props = new Properties(); - props.put("bootstrap.servers", kafkaBootstrapServers); - props.put("key.deserializer", StringDeserializer.class.getCanonicalName()); - props.put("value.deserializer", KafkaAvroDeserializer.class); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); + props.setProperty("schema.registry.url", schemaRegistryUrl); + props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + StringDeserializer.class.getCanonicalName()); + props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + KafkaAvroDeserializer.class.getCanonicalName()); + props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put("specific.avro.reader", true); - props.put("schema.registry.url", schemaRegistryUrl); - props.setProperty("auto.offset.reset", "earliest"); + + return props; } diff --git a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/theodolite/uc1/application/Uc1PipelineBuilder.java b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/theodolite/uc1/application/Uc1PipelineBuilder.java index 8090899f8..b90f822ac 100644 --- a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/theodolite/uc1/application/Uc1PipelineBuilder.java +++ b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/theodolite/uc1/application/Uc1PipelineBuilder.java @@ -21,8 +21,9 @@ public class Uc1PipelineBuilder { /** * Builds a pipeline which can be used for stream processing using Hazelcast Jet. * - * @param inputSource A hazelcast jet stream-source for Entry<String,ActivePowerRecord> input - * values. + * @param inputSource A hazelcast jet stream-source for {@code Entry<String,ActivePowerRecord>} + * input values. + * @param outputSink A hazelcast jet sink for String output values. * @return A hazelcast jet pipeline which processes data for Uc1. */ public Pipeline build(final StreamSource<Entry<String, ActivePowerRecord>> inputSource, diff --git a/theodolite-benchmarks/uc1-hazelcastjet/src/test/java/theodolite/uc1/application/Uc1PipelineTest.java b/theodolite-benchmarks/uc1-hazelcastjet/src/test/java/theodolite/uc1/application/Uc1PipelineTest.java index aadeb10bd..4279cbb95 100644 --- a/theodolite-benchmarks/uc1-hazelcastjet/src/test/java/theodolite/uc1/application/Uc1PipelineTest.java +++ b/theodolite-benchmarks/uc1-hazelcastjet/src/test/java/theodolite/uc1/application/Uc1PipelineTest.java @@ -1,7 +1,6 @@ package theodolite.uc1.application; import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; import com.hazelcast.jet.Jet; import com.hazelcast.jet.JetInstance; import com.hazelcast.jet.config.JetConfig; @@ -12,7 +11,6 @@ import com.hazelcast.jet.pipeline.test.AssertionCompletedException; import com.hazelcast.jet.pipeline.test.Assertions; import com.hazelcast.jet.pipeline.test.TestSources; import com.hazelcast.jet.test.SerialTest; -import java.lang.reflect.Type; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -20,23 +18,31 @@ import java.util.Map.Entry; import java.util.concurrent.CompletionException; import org.junit.After; import org.junit.Assert; +import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.jupiter.api.Test; import titan.ccp.model.records.ActivePowerRecord; +/** + * Test methods for the Hazelcast Jet Implementation of UC1. + */ @Category(SerialTest.class) public class Uc1PipelineTest extends JetTestSupport { private static final Gson GSON = new Gson(); - final Type entryStringRecordType = new TypeToken<Entry<String, ActivePowerRecord>>() {}.getType(); + /** + * UC1 Pipeline test to check if items are passed through at an acceptable rate. + */ @Test public void test1Uc1PipelineElements() { // Test Configuration - final int TEST_ITEMS_PER_SECOND = 1; - final String TEST_SENSOR_NAME = "id_test1"; - final Double TEST_VALUE_IN_W = 10.0; + final int testItemsPerSecond = 1; + final String testSensorName = "id_test1"; + final Double testValueInW = 10.0; + // Assertion Configuration + final int assertTimeoutSeconds = 6; + final int assertCollectedItems = 5; // Create mock jet instance with configuration final String testClusterName = randomName(); @@ -48,11 +54,11 @@ public class Uc1PipelineTest extends JetTestSupport { final List<String> sourceRecord = new ArrayList<>(); final StreamSource<Entry<String, ActivePowerRecord>> testSource = - TestSources.itemStream(TEST_ITEMS_PER_SECOND, (timestamp, item) -> { + TestSources.itemStream(testItemsPerSecond, (timestamp, item) -> { final ActivePowerRecord testRecord = - new ActivePowerRecord(TEST_SENSOR_NAME, timestamp, TEST_VALUE_IN_W); + new ActivePowerRecord(testSensorName, timestamp, testValueInW); final Entry<String, ActivePowerRecord> testEntry = - Map.entry(TEST_SENSOR_NAME, testRecord); + Map.entry(testSensorName, testRecord); sourceRecord.add(GSON.toJson(testEntry)); return testEntry; }); @@ -67,15 +73,15 @@ public class Uc1PipelineTest extends JetTestSupport { .map(data -> { return new Gson().toJson(data); }) - .apply(Assertions.assertCollectedEventually(6, + .apply(Assertions.assertCollectedEventually(assertTimeoutSeconds, collection -> Assert.assertTrue("Not enough data arrived in the end", - collection.size() >= 5))); + collection.size() >= assertCollectedItems))); // Test the UC1 Pipeline Recreation try { testInstance.newJob(testPipeline).join(); - Assert.fail("Job should have completed with an AssertionCompletedException, " + - "but completed normally"); + Assert.fail("Job should have completed with an AssertionCompletedException, " + + "but completed normally"); } catch (final CompletionException e) { final String errorMsg = e.getCause().getMessage(); Assert.assertTrue( diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/HistoryService.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/HistoryService.java index 7fc9c5913..fad3cca6c 100644 --- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/HistoryService.java +++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/HistoryService.java @@ -18,7 +18,7 @@ public class HistoryService { private static final String KAFKA_OUTPUT_TOPIC_DEFAULT = "output"; private static final String KAFKA_BSERVER_DEFAULT = "localhost:19092"; // UC2 specific (default) - private static final String DOWNSAMPLE_INTERVAL_DEFAULT = "5000"; + private static final String DOWNSAMPLE_INTERVAL_DEFAULT_MS = "5000"; // -- (default) job name for this history serivce private static final String JOB_NAME = "uc2-hazelcastjet"; @@ -31,8 +31,9 @@ public class HistoryService { uc2HistoryService.run(); } catch (final Exception e) { // NOPMD e.printStackTrace(); // NOPMD - System.out.println("An Exception occured. "// NOPMD + LOGGER.error("An Exception occured. "// NOPMD + "No history service is deployed! ABORT MISSION!"); + LOGGER.error(e.toString()); } } @@ -58,7 +59,7 @@ public class HistoryService { .setWritePropertiesFromEnv(KAFKA_BSERVER_DEFAULT) .setKafkaInputTopicFromEnv(KAFKA_INPUT_TOPIC_DEFAULT) .setKafkaOutputTopicFromEnv(KAFKA_OUTPUT_TOPIC_DEFAULT) - .setDownsampleIntervalFromEnv(DOWNSAMPLE_INTERVAL_DEFAULT) + .setDownsampleIntervalFromEnv(DOWNSAMPLE_INTERVAL_DEFAULT_MS) .buildUc2Pipeline() .buildUc2JetInstanceFromEnv(LOGGER, BOOTSTRAP_SERVER_DEFAULT, HZ_KUBERNETES_SERVICE_DNS_KEY) .runUc2Job(JOB_NAME); diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/Uc2HazelcastJetFactory.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/Uc2HazelcastJetFactory.java index 9d96d59a3..e4955376c 100644 --- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/Uc2HazelcastJetFactory.java +++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/Uc2HazelcastJetFactory.java @@ -29,28 +29,6 @@ public class Uc2HazelcastJetFactory { // UC2 specific private int downsampleInterval; - // Checkflags - private boolean readPropertiesSet; - private boolean writePropertiesSet; - private boolean inputTopicSet; - private boolean outputTopicSet; - private boolean pipelineSet; - private boolean jetInstanceSet; - private boolean downsampleIntervalSet; - - /** - * Create a new Hazelcast Jet Factory for UC2. - */ - public Uc2HazelcastJetFactory() { - this.readPropertiesSet = false; - this.writePropertiesSet = false; - this.inputTopicSet = false; - this.outputTopicSet = false; - this.pipelineSet = false; - this.jetInstanceSet = false; - this.downsampleIntervalSet = false; - } - ///////////////////////////////////// // Layer 1 - Hazelcast Jet Run Job // ///////////////////////////////////// @@ -62,23 +40,24 @@ public class Uc2HazelcastJetFactory { * @param jobName The name of the job. * @throws Exception If either no JetInstance or Pipeline is set, a job cannot be startet. */ - public void runUc2Job(final String jobName) throws Exception { // NOPMD - if (this.jetInstanceSet) { - if (this.pipelineSet) { - - // Adds the job name and joins a job to the JetInstance defined in this factory - final JobConfig jobConfig = new JobConfig(); - jobConfig.setName(jobName); - this.uc2JetInstance.newJobIfAbsent(this.uc2JetPipeline, jobConfig).join(); + public void runUc2Job(final String jobName) throws IllegalStateException { - } else { - throw new Exception(// NOPMD - "Hazelcast Pipeline is not set! Cannot start a hazelcast jet job for UC2."); - } - } else { - throw new Exception("Jet Instance is not set! " // NOPMD + // Check if a Jet Instance for UC2 is set. + if (this.uc2JetInstance == null) { + throw new IllegalStateException("Jet Instance is not set! " + "Cannot start a hazelcast jet job for UC2."); } + + // Check if a Pipeline for UC2 is set. + if (this.uc2JetPipeline == null) { + throw new IllegalStateException( + "Hazelcast Pipeline is not set! Cannot start a hazelcast jet job for UC2."); + } + + // Adds the job name and joins a job to the JetInstance defined in this factory + final JobConfig jobConfig = new JobConfig(); + jobConfig.setName(jobName); + this.uc2JetInstance.newJobIfAbsent(this.uc2JetPipeline, jobConfig).join(); } ///////////// @@ -100,7 +79,6 @@ public class Uc2HazelcastJetFactory { this.uc2JetInstance = new JetInstanceBuilder() .setConfigFromEnv(logger, bootstrapServerDefault, hzKubernetesServiceDnsKey) .build(); - this.jetInstanceSet = true; return this; } @@ -112,41 +90,48 @@ public class Uc2HazelcastJetFactory { * @throws Exception If the input topic or the kafka properties are not defined, the pipeline * cannot be built. */ - public Uc2HazelcastJetFactory buildUc2Pipeline() throws Exception { // NOPMD - // Check for set properties and set input topic - if (this.readPropertiesSet) { - if (this.writePropertiesSet) { - if (this.inputTopicSet) { - if (this.outputTopicSet) { - if (this.downsampleIntervalSet) { - // Build Pipeline Using the pipelineBuilder - final Uc2PipelineBuilder pipeBuilder = new Uc2PipelineBuilder(); - this.uc2JetPipeline = - pipeBuilder.build(this.kafkaReadPropsForPipeline, this.kafkaWritePropsForPipeline, - this.kafkaInputTopic, this.kafkaOutputTopic, this.downsampleInterval); - this.pipelineSet = true; - // Return Uc2HazelcastJetBuilder factory - return this; - } else { - throw new Exception("downsample interval for pipeline not set! " // NOPMD - + "Cannot build pipeline."); // NOCS // NOPMD - } - } else { - throw new Exception("kafka output topic for pipeline not set! " // NOPMD - + "Cannot build pipeline."); // NOCS // NOPMD - } - } else { - throw new Exception("Kafka input topic for pipeline not set! " // NOPMD - + "Cannot build pipeline."); // NOCS // NOPMD - } - } else { - throw new Exception("Kafka Write Properties for pipeline not set! " // NOPMD - + "Cannot build pipeline."); // NOCS // NOPMD - } - } else { - throw new Exception("Kafka Read Properties for pipeline not set! " // NOPMD - + "Cannot build pipeline."); // NOCS // NOPMD + public Uc2HazelcastJetFactory buildUc2Pipeline() throws IllegalStateException { //NOPMD + + final String defaultPipelineWarning = "Cannot build pipeline."; //NOPMD + + // Check if Properties for the Kafka Input are set. + if (this.kafkaReadPropsForPipeline == null) { + throw new IllegalStateException("Kafka Read Properties for pipeline not set! " + + defaultPipelineWarning); } + + // Check if Properties for the Kafka Output are set. + if (this.kafkaWritePropsForPipeline == null) { + throw new IllegalStateException("Kafka Write Properties for pipeline not set! " + + defaultPipelineWarning); + } + + // Check if the Kafka input topic is set. + if (this.kafkaInputTopic == null) { + throw new IllegalStateException("Kafka input topic for pipeline not set! " + + defaultPipelineWarning); + } + + // Check if the Kafka output topic is set. + if (this.kafkaOutputTopic == null) { + throw new IllegalStateException("kafka output topic for pipeline not set! " + + defaultPipelineWarning); + } + + // Check if the downsampleInterval (tumbling window time) is set. + if (this.downsampleInterval <= 0) { + throw new IllegalStateException( + "downsample interval for pipeline not set or not bigger than 0! " + + defaultPipelineWarning); + } + + // Build Pipeline Using the pipelineBuilder + final Uc2PipelineBuilder pipeBuilder = new Uc2PipelineBuilder(); + this.uc2JetPipeline = + pipeBuilder.build(this.kafkaReadPropsForPipeline, this.kafkaWritePropsForPipeline, + this.kafkaInputTopic, this.kafkaOutputTopic, this.downsampleInterval); + // Return Uc2HazelcastJetBuilder factory + return this; } ///////////// @@ -163,7 +148,6 @@ public class Uc2HazelcastJetFactory { public Uc2HazelcastJetFactory setCustomReadProperties(// NOPMD final Properties kafkaReadProperties) { this.kafkaReadPropsForPipeline = kafkaReadProperties; - this.readPropertiesSet = true; return this; } @@ -177,7 +161,6 @@ public class Uc2HazelcastJetFactory { public Uc2HazelcastJetFactory setCustomWriteProperties(// NOPMD final Properties kafkaWriteProperties) { this.kafkaWritePropsForPipeline = kafkaWriteProperties; - this.writePropertiesSet = true; return this; } @@ -199,7 +182,6 @@ public class Uc2HazelcastJetFactory { propsBuilder.buildKafkaReadPropsFromEnv(bootstrapServersDefault, schemaRegistryUrlDefault); this.kafkaReadPropsForPipeline = kafkaReadProps; - this.readPropertiesSet = true; return this; } @@ -217,7 +199,6 @@ public class Uc2HazelcastJetFactory { final Properties kafkaWriteProps = propsBuilder.buildKafkaWritePropsFromEnv(bootstrapServersDefault); this.kafkaWritePropsForPipeline = kafkaWriteProps; - this.writePropertiesSet = true; return this; } @@ -230,7 +211,6 @@ public class Uc2HazelcastJetFactory { public Uc2HazelcastJetFactory setCustomKafkaInputTopic(// NOPMD final String inputTopic) { this.kafkaInputTopic = inputTopic; - this.inputTopicSet = true; return this; } @@ -242,7 +222,6 @@ public class Uc2HazelcastJetFactory { */ public Uc2HazelcastJetFactory setCustomKafkaOutputTopic(final String outputTopic) { // NOPMD this.kafkaOutputTopic = outputTopic; - this.outputTopicSet = true; return this; } @@ -259,7 +238,6 @@ public class Uc2HazelcastJetFactory { this.kafkaInputTopic = Objects.requireNonNullElse( System.getenv(ConfigurationKeys.KAFKA_INPUT_TOPIC), defaultInputTopic); - this.inputTopicSet = true; return this; } @@ -275,7 +253,6 @@ public class Uc2HazelcastJetFactory { this.kafkaOutputTopic = Objects.requireNonNullElse( System.getenv(ConfigurationKeys.KAFKA_OUTPUT_TOPIC), defaultOutputTopic); - this.outputTopicSet = true; return this; } @@ -288,7 +265,6 @@ public class Uc2HazelcastJetFactory { public Uc2HazelcastJetFactory setCustomDownsampleInterval(// NOPMD final int downsampleInterval) { this.downsampleInterval = downsampleInterval; - this.downsampleIntervalSet = true; return this; } @@ -306,7 +282,6 @@ public class Uc2HazelcastJetFactory { defaultDownsampleInterval); final int downsampleIntervalNumber = Integer.parseInt(downsampleInterval); this.downsampleInterval = downsampleIntervalNumber; - this.downsampleIntervalSet = true; return this; } diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/Uc2KafkaPropertiesBuilder.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/Uc2KafkaPropertiesBuilder.java index beee6e1f3..ff34a61e0 100644 --- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/Uc2KafkaPropertiesBuilder.java +++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/Uc2KafkaPropertiesBuilder.java @@ -3,6 +3,7 @@ package theodolite.uc2.application; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import java.util.Objects; import java.util.Properties; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import theodolite.commons.hazelcastjet.ConfigurationKeys; @@ -31,13 +32,18 @@ public class Uc2KafkaPropertiesBuilder { System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL), schemaRegistryUrlDefault); + // comment: + // > Could not find constant fields for all properties + // > setProperties not applicable for non string values final Properties props = new Properties(); - props.put("bootstrap.servers", kafkaBootstrapServers); //NOCS - props.put("key.deserializer", StringDeserializer.class.getCanonicalName()); - props.put("value.deserializer", KafkaAvroDeserializer.class); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); // NOCS + props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + StringDeserializer.class.getCanonicalName()); + props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + KafkaAvroDeserializer.class.getCanonicalName()); props.put("specific.avro.reader", true); - props.put("schema.registry.url", schemaRegistryUrl); - props.setProperty("auto.offset.reset", "latest"); + props.setProperty("schema.registry.url", schemaRegistryUrl); + props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); return props; } @@ -55,7 +61,7 @@ public class Uc2KafkaPropertiesBuilder { kafkaBootstrapServerDefault); final Properties props = new Properties(); - props.put("bootstrap.servers", kafkaBootstrapServers); //NOCS + props.put("bootstrap.servers", kafkaBootstrapServers); // NOCS props.put("key.serializer", StringSerializer.class.getCanonicalName()); props.put("value.serializer", StringSerializer.class.getCanonicalName()); return props; diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/applicationold/ClusterConfig.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/applicationold/ClusterConfig.java deleted file mode 100644 index 1df097a09..000000000 --- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/applicationold/ClusterConfig.java +++ /dev/null @@ -1,76 +0,0 @@ -package theodolite.uc2.applicationold; - -/** - * Configuration of a load generator cluster. - */ -public final class ClusterConfig { - - private static final int PORT_DEFAULT = 5701; - private static final String CLUSTER_NAME_PREFIX_DEFAULT = "theodolite-load-generation"; - - private final String bootstrapServer; - private final String kubernetesDnsName; - private int port = PORT_DEFAULT; - private boolean portAutoIncrement = true; - private String clusterNamePrefix = CLUSTER_NAME_PREFIX_DEFAULT; - - /** - * Create a new {@link ClusterConfig} with the given parameter values. - */ - private ClusterConfig(final String bootstrapServer, final String kubernetesDnsName) { - this.bootstrapServer = bootstrapServer; - this.kubernetesDnsName = kubernetesDnsName; - } - - public boolean hasBootstrapServer() { - return this.bootstrapServer != null; - } - - public String getBootstrapServer() { - return this.bootstrapServer; - } - - public boolean hasKubernetesDnsName() { - return this.kubernetesDnsName != null; - } - - public String getKubernetesDnsName() { - return this.kubernetesDnsName; - } - - public int getPort() { - return this.port; - } - - public boolean isPortAutoIncrement() { - return this.portAutoIncrement; - } - - public ClusterConfig setPortAutoIncrement(final boolean portAutoIncrement) { // NOPMD - this.portAutoIncrement = portAutoIncrement; - return this; - } - - public ClusterConfig setPort(final int port) { // NOPMD - this.port = port; - return this; - } - - public String getClusterNamePrefix() { - return this.clusterNamePrefix; - } - - public ClusterConfig setClusterNamePrefix(final String clusterNamePrefix) { // NOPMD - this.clusterNamePrefix = clusterNamePrefix; - return this; - } - - public static ClusterConfig fromBootstrapServer(final String bootstrapServer) { - return new ClusterConfig(bootstrapServer, null); - } - - public static ClusterConfig fromKubernetesDnsName(final String kubernetesDnsName) { - return new ClusterConfig(null, kubernetesDnsName); - } - -} diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/applicationold/ConfigurationKeys.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/applicationold/ConfigurationKeys.java deleted file mode 100644 index 812922016..000000000 --- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/applicationold/ConfigurationKeys.java +++ /dev/null @@ -1,44 +0,0 @@ -package theodolite.uc2.applicationold; - -/** - * Keys to access configuration parameters. - */ -public final class ConfigurationKeys { - - public static final String BOOTSTRAP_SERVER = "BOOTSTRAP_SERVER"; - - public static final String KUBERNETES_DNS_NAME = "KUBERNETES_DNS_NAME"; - - public static final String PORT = "PORT"; - - public static final String PORT_AUTO_INCREMENT = "PORT_AUTO_INCREMENT"; - - public static final String CLUSTER_NAME_PREFIX = "CLUSTER_NAME_PREFIX"; - - public static final String NUM_SENSORS = "NUM_SENSORS"; - - public static final String PERIOD_MS = "PERIOD_MS"; - - public static final String DOWNSAMPLE_INTERVAL = "DOWNSAMPLE_INTERVAL"; - - public static final String VALUE = "VALUE"; - - public static final String THREADS = "THREADS"; - - public static final String KAFKA_BOOTSTRAP_SERVERS = "KAFKA_BOOTSTRAP_SERVERS"; - - public static final String SCHEMA_REGISTRY_URL = "SCHEMA_REGISTRY_URL"; - - public static final String KAFKA_INPUT_TOPIC = "KAFKA_INPUT_TOPIC"; - - public static final String KAFKA_OUTPUT_TOPIC = "KAFKA_OUTPUT_TOPIC"; - - public static final String KAFKA_BATCH_SIZE = "KAFKA_BATCH_SIZE"; - - public static final String KAFKA_LINGER_MS = "KAFKA_LINGER_MS"; - - public static final String KAFKA_BUFFER_MEMORY = "KAFKA_BUFFER_MEMORY"; - - private ConfigurationKeys() {} - -} diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/applicationold/HistoryService.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/applicationold/HistoryService.java deleted file mode 100644 index 3f43ff191..000000000 --- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/applicationold/HistoryService.java +++ /dev/null @@ -1,264 +0,0 @@ -package theodolite.uc2.applicationold; - -import com.hazelcast.config.Config; -import com.hazelcast.config.JoinConfig; -import com.hazelcast.jet.Jet; -import com.hazelcast.jet.JetInstance; -import com.hazelcast.jet.aggregate.AggregateOperations; -import com.hazelcast.jet.config.JobConfig; -import com.hazelcast.jet.kafka.KafkaSinks; -import com.hazelcast.jet.kafka.KafkaSources; -import com.hazelcast.jet.pipeline.Pipeline; -import com.hazelcast.jet.pipeline.Sinks; -import com.hazelcast.jet.pipeline.StreamStage; -import com.hazelcast.jet.pipeline.WindowDefinition; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; -//import java.time.format.DateTimeFormatter; -import java.util.Map; -import java.util.Objects; -import java.util.Properties; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import titan.ccp.model.records.ActivePowerRecord; - -/** - * A microservice that manages the history and, therefore, stores and aggregates incoming - * measurements. - */ -public class HistoryService { - - // Don't ask we why constant, because checkstyle - private static final String BOOTSTRAP_SERVERS = "bootstrap.servers"; - - private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class); - //static final DateTimeFormatter TIME_FORMATTER_DEFAULT = - // DateTimeFormatter.ofPattern("HH:mm:ss:SSS"); - - // General Information - private static final String HZ_KUBERNETES_SERVICE_DNS_KEY = "service-dns"; - private static final String BOOTSTRAP_SERVER_DEFAULT = "localhost:5701"; - private static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081"; - private static final String KAFKA_INPUT_TOPIC_DEFAULT = "input"; - private static final String KAFKA_OUTPUT_TOPIC_DEFAULT = "output"; - private static final String KAFKA_BSERVER_DEFAULT = "localhost:19092"; - // UC2 specific - private static final String DOWNSAMPLE_INTERVAL_DEFAULT = "5000"; - - // Information per History Service - private ClusterConfig clusterConfig; - private Properties kafkaReadPropsForPipeline; - private Properties kafkaWritePropsForPipeline; - private String kafkaInputTopic; - private String kafkaOutputTopic; - // UC2 specific - private int downsampleInterval; - - - /** - * Entrypoint for UC2 using Gradle Run. - */ - public static void main(final String[] args) { - HistoryService.loadHistoryService().run(); - } - - /** Build a history service object to run. */ - public static HistoryService loadHistoryService() { - final String bootstrapServer = System.getenv(ConfigurationKeys.BOOTSTRAP_SERVER); - final String kubernetesDnsName = System.getenv(ConfigurationKeys.KUBERNETES_DNS_NAME); - - ClusterConfig clusterConfig; - if (bootstrapServer != null) { // NOPMD - clusterConfig = ClusterConfig.fromBootstrapServer(bootstrapServer); - LOGGER.info("Use bootstrap server '{}'.", bootstrapServer); - } else if (kubernetesDnsName != null) { // NOPMD - clusterConfig = ClusterConfig.fromKubernetesDnsName(kubernetesDnsName); - LOGGER.info("Use Kubernetes DNS name '{}'", kubernetesDnsName); - } else { - clusterConfig = ClusterConfig.fromBootstrapServer(BOOTSTRAP_SERVER_DEFAULT); - LOGGER.info(// NOPMD - "Neitehr a bootstrap server nor a Kubernetes DNS name was provided." - + "Use default bootstrap server '{}'", - BOOTSTRAP_SERVER_DEFAULT); - } - - final String port = System.getenv(ConfigurationKeys.PORT); - if (port != null) { - clusterConfig.setPort(Integer.parseInt(port)); - } - - final String portAutoIncrement = System.getenv(ConfigurationKeys.PORT_AUTO_INCREMENT); - if (portAutoIncrement != null) { - clusterConfig.setPortAutoIncrement(Boolean.parseBoolean(portAutoIncrement)); - } - - final String clusterNamePrefix = System.getenv(ConfigurationKeys.CLUSTER_NAME_PREFIX); - if (clusterNamePrefix != null) { - clusterConfig.setClusterNamePrefix(clusterNamePrefix); - } - - final String kafkaBootstrapServers = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS), - KAFKA_BSERVER_DEFAULT); - final String schemaRegistryUrl = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.SCHEMA_REGISTRY_URL), - SCHEMA_REGISTRY_URL_DEFAULT); - final Properties kafkaReadPropsForPipeline = - buildKafkaReadProps(kafkaBootstrapServers, schemaRegistryUrl); - final Properties kafkaWritePropsForPipeline = - buildKafkaWriteProps(kafkaBootstrapServers); - - final String kafkaInputTopic = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_INPUT_TOPIC), - KAFKA_INPUT_TOPIC_DEFAULT); - - final String kafkaOutputTopic = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.KAFKA_OUTPUT_TOPIC), - KAFKA_OUTPUT_TOPIC_DEFAULT); - - final String downsampleInterval = Objects.requireNonNullElse( - System.getenv(ConfigurationKeys.DOWNSAMPLE_INTERVAL), - DOWNSAMPLE_INTERVAL_DEFAULT); - final int downsampleIntervalNumber = Integer.parseInt(downsampleInterval); - - return new HistoryService() - .setClusterConfig(clusterConfig) - .setKafkaReadPropertiesForPipeline(kafkaReadPropsForPipeline) - .setKafkaWritePropertiesForPipeline(kafkaWritePropsForPipeline) - .setKafkaInputTopic(kafkaInputTopic) - .setKafkaOutputTopic(kafkaOutputTopic) - .setDownsampleInterval(downsampleIntervalNumber); - } - - /** Set Cluster Config when creating History Service. */ - private HistoryService setClusterConfig(final ClusterConfig clusterConfig) { // NOPMD - this.clusterConfig = clusterConfig; - return this; - } - - /** Set Pipeline Kafka Read Properties. */ - private HistoryService setKafkaReadPropertiesForPipeline(// NOPMD - final Properties kafkaReadPropertiesForPipeline) { - this.kafkaReadPropsForPipeline = kafkaReadPropertiesForPipeline; - return this; - } - - /** Set Pipeline Kafka Write Properties. */ - private HistoryService setKafkaWritePropertiesForPipeline(// NOPMD - final Properties kafkaWritePropsForPipeline) { - this.kafkaWritePropsForPipeline = kafkaWritePropsForPipeline; - return this; - } - - /** Set Kafka Input topic used to build the pipeline. */ - private HistoryService setKafkaInputTopic(final String kafkaInputTopic) { //NOPMD - this.kafkaInputTopic = kafkaInputTopic; - return this; - } - - /** Set Kafka Output topic used to build the pipeline. */ - private HistoryService setKafkaOutputTopic(final String kafkaOutputTopic) { //NOPMD - this.kafkaOutputTopic = kafkaOutputTopic; - return this; - } - - /** Set the downsample Interval/Window used in this History Service. */ - private HistoryService setDownsampleInterval(final int downsampleInterval) { //NOPMD - this.downsampleInterval = downsampleInterval; - return this; - } - - /** - * Defines kafka properties used to fetch data from kafka using a Hazelcast Jet pipeline. - * - * @return properties used to fetch data from kafka using a Hazelcast Jet pipeline. - */ - private static Properties buildKafkaReadProps(final String kafkaBootstrapServer, - final String schemaRegistryUrl) { - final Properties props = new Properties(); - props.put(BOOTSTRAP_SERVERS, kafkaBootstrapServer); - props.put("key.deserializer", StringDeserializer.class.getCanonicalName()); - props.put("value.deserializer", KafkaAvroDeserializer.class); - props.put("specific.avro.reader", true); - props.put("schema.registry.url", schemaRegistryUrl); - props.setProperty("auto.offset.reset", "latest"); - return props; - } - - /** - * Defines kafka properties used to write data to kafka using a Hazelcast Jet pipeline. - * - * @return properties used to fetch data from kafka using a Hazelcast Jet pipeline. - */ - private static Properties buildKafkaWriteProps(final String kafkaBootstrapServer) { - final Properties props = new Properties(); - props.put(BOOTSTRAP_SERVERS, kafkaBootstrapServer); - props.put("key.serializer", StringSerializer.class.getCanonicalName()); - props.put("value.serializer", StringSerializer.class.getCanonicalName()); - return props; - } - - /** - * Start the UC2 service. - */ - public void run() { - Objects.requireNonNull(this.clusterConfig, "No cluster config set."); - this.createHazelcastJetApplication(); - } - - /** - * Build a pipeline and start a Hazelcast Jet Instance and add a job that uses the built pipeline. - */ - private void createHazelcastJetApplication() { - - // Build Pipeline - final Pipeline pipeline = Pipeline.create(); - final StreamStage<Map.Entry<String, String>> mapProduct = - pipeline.readFrom(KafkaSources.<String, ActivePowerRecord>kafka( - this.kafkaReadPropsForPipeline, this.kafkaInputTopic)) - .withNativeTimestamps(0) - .setLocalParallelism(1) - .groupingKey(record -> record.getValue().getIdentifier()) - .window(WindowDefinition.tumbling(this.downsampleInterval)) - .aggregate( - AggregateOperations.averagingDouble(record -> record.getValue().getValueInW())) - .map(agg -> { - String theValue = agg.getValue().toString(); - String theKey = agg.getKey().toString(); - return Map.entry(theKey, theValue); - }); - // Add Sink1: Logger - mapProduct.writeTo(Sinks.logger()); - // Add Sink2: Write back to kafka for the final benchmark - mapProduct.writeTo(KafkaSinks.<String, String>kafka( - this.kafkaWritePropsForPipeline, this.kafkaOutputTopic)); - - // Set network config for this hazelcast jet instance - // Create Hazelcast Config - final Config config = new Config().setClusterName(this.clusterConfig.getClusterNamePrefix()); - final JoinConfig joinConfig = config.getNetworkConfig() - .setPort(this.clusterConfig.getPort()) - .setPortAutoIncrement(this.clusterConfig.isPortAutoIncrement()) - .getJoin(); - // Set either Bootstrap Server Member or establish Kubernetes Connection - joinConfig.getMulticastConfig().setEnabled(false); - if (this.clusterConfig.hasBootstrapServer()) { - joinConfig.getTcpIpConfig().addMember(this.clusterConfig.getBootstrapServer()); - } else if (this.clusterConfig.hasKubernetesDnsName()) { - joinConfig.getKubernetesConfig() - .setEnabled(true) - .setProperty(HZ_KUBERNETES_SERVICE_DNS_KEY, this.clusterConfig.getKubernetesDnsName()); - } - - // Create Hazelcast Jet Instance - // Add config and add pipeline as the job - final JetInstance jet = Jet.newJetInstance(); - jet.getConfig().setHazelcastConfig(config); - final JobConfig jobConfig = new JobConfig(); - jobConfig.setName("uc2-hazelcastjet"); - jet.newJobIfAbsent(pipeline, jobConfig).join(); - } - - -} diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/application/HistoryService.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/application/HistoryService.java index 3da1eccd7..222088b2b 100644 --- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/application/HistoryService.java +++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/application/HistoryService.java @@ -34,11 +34,12 @@ public class HistoryService { uc3HistoryService.run(); } catch (final Exception e) { // NOPMD e.printStackTrace(); // NOPMD - System.out.println("An Exception occured. "// NOPMD + LOGGER.error("An Exception occured. "// NOPMD + "No history service is deployed! ABORT MISSION!"); + LOGGER.error(e.toString()); } } - + /** * Start a UC3 service. * @@ -48,7 +49,7 @@ public class HistoryService { public void run() throws Exception { // NOPMD this.createHazelcastJetApplication(); } - + /** * Creates a Hazelcast Jet Application for UC3 using the Uc3HazelcastJetFactory. * @@ -67,5 +68,5 @@ public class HistoryService { .buildUc3JetInstanceFromEnv(LOGGER, BOOTSTRAP_SERVER_DEFAULT, HZ_KUBERNETES_SERVICE_DNS_KEY) .runUc3Job(JOB_NAME); } - + } diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/application/Uc3HazelcastJetFactory.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/application/Uc3HazelcastJetFactory.java index 51612e622..8c6e12bbe 100644 --- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/application/Uc3HazelcastJetFactory.java +++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/application/Uc3HazelcastJetFactory.java @@ -20,7 +20,7 @@ import theodolite.uc3.application.uc3specifics.HourOfDayKeySerializer; * Outside data only refers to custom values or default values in case data of the environment * cannot the fetched. */ -public class Uc3HazelcastJetFactory { //NOPMD +public class Uc3HazelcastJetFactory { // NOPMD // Information per History Service private Properties kafkaReadPropsForPipeline; @@ -33,31 +33,6 @@ public class Uc3HazelcastJetFactory { //NOPMD private int windowSizeInSeconds; private int hoppingSizeInSeconds; - // Checkflags - private boolean readPropertiesSet; - private boolean writePropertiesSet; - private boolean inputTopicSet; - private boolean outputTopicSet; - private boolean pipelineSet; - private boolean jetInstanceSet; - private boolean windowSizeInSecondsSet; - private boolean hoppingSizeInSecondsSet; - - - /** - * Create a new Hazelcast Jet Factory for UC3. - */ - public Uc3HazelcastJetFactory() { - this.readPropertiesSet = false; - this.writePropertiesSet = false; - this.inputTopicSet = false; - this.outputTopicSet = false; - this.pipelineSet = false; - this.jetInstanceSet = false; - this.windowSizeInSecondsSet = false; - this.hoppingSizeInSecondsSet = false; - } - ///////////////////////////////////// // Layer 1 - Hazelcast Jet Run Job // ///////////////////////////////////// @@ -69,24 +44,25 @@ public class Uc3HazelcastJetFactory { //NOPMD * @param jobName The name of the job. * @throws Exception If either no JetInstance or Pipeline is set, a job cannot be startet. */ - public void runUc3Job(final String jobName) throws Exception { // NOPMD - if (this.jetInstanceSet) { - if (this.pipelineSet) { + public void runUc3Job(final String jobName) throws IllegalStateException { // NOPMD - // Adds the job name and joins a job to the JetInstance defined in this factory - final JobConfig jobConfig = new JobConfig() - .registerSerializer(HourOfDayKey.class, HourOfDayKeySerializer.class) - .setName(jobName); - this.uc3JetInstance.newJobIfAbsent(this.uc3JetPipeline, jobConfig).join(); - - } else { - throw new Exception(// NOPMD - "Hazelcast Pipeline is not set! Cannot start a hazelcast jet job for UC3."); - } - } else { - throw new Exception("Jet Instance is not set! " // NOPMD + // Check if a Jet Instance for UC3 is set. + if (this.uc3JetInstance == null) { + throw new IllegalStateException("Jet Instance is not set! " + "Cannot start a hazelcast jet job for UC3."); } + + // Check if a Pipeline for UC3 is set. + if (this.uc3JetPipeline == null) { + throw new IllegalStateException( + "Hazelcast Pipeline is not set! Cannot start a hazelcast jet job for UC3."); + } + + // Adds the job name and joins a job to the JetInstance defined in this factory + final JobConfig jobConfig = new JobConfig() + .registerSerializer(HourOfDayKey.class, HourOfDayKeySerializer.class) + .setName(jobName); + this.uc3JetInstance.newJobIfAbsent(this.uc3JetPipeline, jobConfig).join(); } ///////////// @@ -108,7 +84,6 @@ public class Uc3HazelcastJetFactory { //NOPMD this.uc3JetInstance = new JetInstanceBuilder() .setConfigFromEnv(logger, bootstrapServerDefault, hzKubernetesServiceDnsKey) .build(); - this.jetInstanceSet = true; return this; } @@ -120,48 +95,57 @@ public class Uc3HazelcastJetFactory { //NOPMD * @throws Exception If the input topic or the kafka properties are not defined, the pipeline * cannot be built. */ - public Uc3HazelcastJetFactory buildUc3Pipeline() throws Exception { // NOPMD - // Check for set properties and set input topic - if (this.readPropertiesSet) { - if (this.writePropertiesSet) { - if (this.inputTopicSet) { - if (this.outputTopicSet) { - if (this.windowSizeInSecondsSet) { - if (this.hoppingSizeInSecondsSet) { - // Build Pipeline Using the pipelineBuilder - final Uc3PipelineBuilder pipeBuilder = new Uc3PipelineBuilder(); - this.uc3JetPipeline = - pipeBuilder.build(this.kafkaReadPropsForPipeline, - this.kafkaWritePropsForPipeline, - this.kafkaInputTopic, this.kafkaOutputTopic, this.hoppingSizeInSeconds, - this.windowSizeInSeconds); - this.pipelineSet = true; - // Return Uc3HazelcastJetBuilder factory - return this; - } else { - throw new Exception("hopping size in seconds for pipeline not set! " // NOPMD - + "Cannot build pipeline."); // NOCS //NOPMD - } - } else { - throw new Exception("window size in seconds for pipeline not set! " // NOPMD - + "Cannot build pipeline."); // NOCS // NOPMD - } - } else { - throw new Exception("kafka output topic for pipeline not set! " // NOPMD - + "Cannot build pipeline."); // NOCS // NOPMD - } - } else { - throw new Exception("Kafka input topic for pipeline not set! " // NOPMD - + "Cannot build pipeline."); // NOCS // NOPMD - } - } else { - throw new Exception("Kafka Write Properties for pipeline not set! " // NOPMD - + "Cannot build pipeline."); // NOCS // NOPMD - } - } else { - throw new Exception("Kafka Read Properties for pipeline not set! " // NOPMD - + "Cannot build pipeline."); // NOCS // NOPMD + public Uc3HazelcastJetFactory buildUc3Pipeline() throws IllegalStateException { // NOPMD + + final String defaultPipelineWarning = "Cannot build pipeline."; // NOPMD + + // Check if Properties for the Kafka Input are set. + if (this.kafkaReadPropsForPipeline == null) { + throw new IllegalStateException("Kafka Read Properties for pipeline not set! " + + defaultPipelineWarning); + } + + // Check if Properties for the Kafka Output are set. + if (this.kafkaWritePropsForPipeline == null) { + throw new IllegalStateException("Kafka Write Properties for pipeline not set! " + + defaultPipelineWarning); + } + + // Check if the Kafka input topic is set. + if (this.kafkaInputTopic == null) { + throw new IllegalStateException("Kafka input topic for pipeline not set! " + + defaultPipelineWarning); + } + + // Check if the Kafka output topic is set. + if (this.kafkaOutputTopic == null) { + throw new IllegalStateException("kafka output topic for pipeline not set! " + + defaultPipelineWarning); } + + // Check if the window size for the "sliding" window is set. + if (this.windowSizeInSeconds <= 0) { + throw new IllegalStateException( + "window size in seconds for pipeline not set or not greater than 0! " + + defaultPipelineWarning); + } + + // Check if the hopping distance for the "sliding" window is set. + if (this.hoppingSizeInSeconds <= 0) { + throw new IllegalStateException( + "hopping size in seconds for pipeline not set or not greater than 0! " + + defaultPipelineWarning); + } + + // Build Pipeline Using the pipelineBuilder + final Uc3PipelineBuilder pipeBuilder = new Uc3PipelineBuilder(); + this.uc3JetPipeline = + pipeBuilder.build(this.kafkaReadPropsForPipeline, + this.kafkaWritePropsForPipeline, + this.kafkaInputTopic, this.kafkaOutputTopic, this.hoppingSizeInSeconds, + this.windowSizeInSeconds); + // Return Uc3HazelcastJetBuilder factory + return this; } ///////////// @@ -178,7 +162,6 @@ public class Uc3HazelcastJetFactory { //NOPMD public Uc3HazelcastJetFactory setCustomReadProperties(// NOPMD final Properties kafkaReadProperties) { this.kafkaReadPropsForPipeline = kafkaReadProperties; - this.readPropertiesSet = true; return this; } @@ -192,7 +175,6 @@ public class Uc3HazelcastJetFactory { //NOPMD public Uc3HazelcastJetFactory setCustomWriteProperties(// NOPMD final Properties kafkaWriteProperties) { this.kafkaWritePropsForPipeline = kafkaWriteProperties; - this.writePropertiesSet = true; return this; } @@ -214,7 +196,6 @@ public class Uc3HazelcastJetFactory { //NOPMD propsBuilder.buildKafkaReadPropsFromEnv(bootstrapServersDefault, schemaRegistryUrlDefault); this.kafkaReadPropsForPipeline = kafkaReadProps; - this.readPropertiesSet = true; return this; } @@ -232,7 +213,6 @@ public class Uc3HazelcastJetFactory { //NOPMD final Properties kafkaWriteProps = propsBuilder.buildKafkaWritePropsFromEnv(bootstrapServersDefault); this.kafkaWritePropsForPipeline = kafkaWriteProps; - this.writePropertiesSet = true; return this; } @@ -245,7 +225,6 @@ public class Uc3HazelcastJetFactory { //NOPMD public Uc3HazelcastJetFactory setCustomKafkaInputTopic(// NOPMD final String inputTopic) { this.kafkaInputTopic = inputTopic; - this.inputTopicSet = true; return this; } @@ -257,7 +236,6 @@ public class Uc3HazelcastJetFactory { //NOPMD */ public Uc3HazelcastJetFactory setCustomKafkaOutputTopic(final String outputTopic) { // NOPMD this.kafkaOutputTopic = outputTopic; - this.outputTopicSet = true; return this; } @@ -274,7 +252,6 @@ public class Uc3HazelcastJetFactory { //NOPMD this.kafkaInputTopic = Objects.requireNonNullElse( System.getenv(ConfigurationKeys.KAFKA_INPUT_TOPIC), defaultInputTopic); - this.inputTopicSet = true; return this; } @@ -290,7 +267,6 @@ public class Uc3HazelcastJetFactory { //NOPMD this.kafkaOutputTopic = Objects.requireNonNullElse( System.getenv(ConfigurationKeys.KAFKA_OUTPUT_TOPIC), defaultOutputTopic); - this.outputTopicSet = true; return this; } @@ -303,7 +279,6 @@ public class Uc3HazelcastJetFactory { //NOPMD public Uc3HazelcastJetFactory setCustomWindowSizeInSeconds(// NOPMD final int windowSizeInSeconds) { this.windowSizeInSeconds = windowSizeInSeconds; - this.windowSizeInSecondsSet = true; return this; } @@ -321,7 +296,6 @@ public class Uc3HazelcastJetFactory { //NOPMD defaultWindowSizeInSeconds); final int windowSizeInSecondsNumber = Integer.parseInt(windowSizeInSeconds); this.windowSizeInSeconds = windowSizeInSecondsNumber; - this.windowSizeInSecondsSet = true; return this; } @@ -334,7 +308,6 @@ public class Uc3HazelcastJetFactory { //NOPMD public Uc3HazelcastJetFactory setCustomHoppingSizeInSeconds(// NOPMD final int hoppingSizeInSeconds) { this.hoppingSizeInSeconds = hoppingSizeInSeconds; - this.hoppingSizeInSecondsSet = true; return this; } @@ -352,7 +325,6 @@ public class Uc3HazelcastJetFactory { //NOPMD defaultHoppingSizeInSeconds); final int hoppingSizeInSecondsNumber = Integer.parseInt(hoppingSizeInSeconds); this.hoppingSizeInSeconds = hoppingSizeInSecondsNumber; - this.hoppingSizeInSecondsSet = true; return this; } } diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/application/Uc3KafkaPropertiesBuilder.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/application/Uc3KafkaPropertiesBuilder.java index b3600257d..3d81c3e76 100644 --- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/application/Uc3KafkaPropertiesBuilder.java +++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/application/Uc3KafkaPropertiesBuilder.java @@ -3,6 +3,8 @@ package theodolite.uc3.application; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import java.util.Objects; import java.util.Properties; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import theodolite.commons.hazelcastjet.ConfigurationKeys; @@ -32,12 +34,14 @@ public class Uc3KafkaPropertiesBuilder { schemaRegistryUrlDefault); final Properties props = new Properties(); - props.put("bootstrap.servers", kafkaBootstrapServers); // NOCS - props.put("key.deserializer", StringDeserializer.class.getCanonicalName()); - props.put("value.deserializer", KafkaAvroDeserializer.class); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); // NOCS + props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + StringDeserializer.class.getCanonicalName()); + props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + KafkaAvroDeserializer.class.getCanonicalName()); props.put("specific.avro.reader", true); - props.put("schema.registry.url", schemaRegistryUrl); - props.setProperty("auto.offset.reset", "earliest"); + props.setProperty("schema.registry.url", schemaRegistryUrl); + props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return props; } @@ -55,9 +59,11 @@ public class Uc3KafkaPropertiesBuilder { kafkaBootstrapServerDefault); final Properties props = new Properties(); - props.put("bootstrap.servers", kafkaBootstrapServers); // NOCS - props.put("key.serializer", StringSerializer.class.getCanonicalName()); - props.put("value.serializer", StringSerializer.class.getCanonicalName()); + props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); // NOCS + props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + StringSerializer.class.getCanonicalName()); + props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + StringSerializer.class.getCanonicalName()); return props; } diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/HistoryService.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/HistoryService.java index 7463fa794..f315f49cf 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/HistoryService.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/HistoryService.java @@ -3,6 +3,10 @@ package theodolite.uc4.application; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * A microservice that manages the history and, therefore, stores and aggregates incoming + * measurements. + */ public class HistoryService { private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class); @@ -16,7 +20,7 @@ public class HistoryService { // UC4 specific (default) private static final String KAFKA_CONFIG_TOPIC_DEFAULT = "configuration"; private static final String KAFKA_FEEDBACK_TOPIC_DEFAULT = "aggregation-feedback"; - private static final String WINDOW_SIZE_DEFAULT = "5000"; + private static final String WINDOW_SIZE_DEFAULT_MS = "5000"; // -- (default) job name for this history serivce private static final String JOB_NAME = "uc4-hazelcastjet"; @@ -29,9 +33,10 @@ public class HistoryService { try { uc4HistoryService.run(); } catch (final Exception e) { // NOPMD - e.printStackTrace(); // NOPMD - System.out.println("An Exception occured. "// NOPMD + e.printStackTrace(); // NOPMD + LOGGER.error("An Exception occured. "// NOPMD + "No history service is deployed! ABORT MISSION!"); + LOGGER.error(e.toString()); } } @@ -59,7 +64,7 @@ public class HistoryService { .setKafkaOutputTopicFromEnv(KAFKA_OUTPUT_TOPIC_DEFAULT) .setKafkaConfigurationTopicFromEnv(KAFKA_CONFIG_TOPIC_DEFAULT) .setKafkaFeedbackTopicFromEnv(KAFKA_FEEDBACK_TOPIC_DEFAULT) - .setWindowSizeFromEnv(WINDOW_SIZE_DEFAULT) + .setWindowSizeFromEnv(WINDOW_SIZE_DEFAULT_MS) .buildUc4JetInstanceFromEnv(LOGGER, BOOTSTRAP_SERVER_DEFAULT, HZ_KUBERNETES_SERVICE_DNS_KEY) .buildUc4Pipeline() .runUc4Job(JOB_NAME); diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4HazelcastJetFactory.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4HazelcastJetFactory.java index 598e8c2c3..03fc362ef 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4HazelcastJetFactory.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4HazelcastJetFactory.java @@ -25,8 +25,8 @@ public class Uc4HazelcastJetFactory { // Information per History Service private Properties kafkaInputReadPropsForPipeline; - private Properties kafkaConfigReadPropsForPipeline; - private Properties kafkaAggregationReadPropsForPipeline; + private Properties kafkaConfigPropsForPipeline; + private Properties kafkaFeedbackPropsForPipeline; private Properties kafkaWritePropsForPipeline; private String kafkaInputTopic; private String kafkaOutputTopic; @@ -37,32 +37,6 @@ public class Uc4HazelcastJetFactory { private String kafkaFeedbackTopic; private int windowSize; - // Checkflags - private boolean readPropertiesSet; - private boolean writePropertiesSet; - private boolean inputTopicSet; - private boolean outputTopicSet; - private boolean pipelineSet; - private boolean jetInstanceSet; - private boolean kafkaConfigurationTopicSet; - private boolean kafkaFeedbackTopicSet; - private boolean windowSizeSet; - - /** - * Create a new Hazelcast Jet Factory for UC4. - */ - public Uc4HazelcastJetFactory() { - this.readPropertiesSet = false; - this.writePropertiesSet = false; - this.inputTopicSet = false; - this.outputTopicSet = false; - this.pipelineSet = false; - this.jetInstanceSet = false; - this.kafkaConfigurationTopicSet = false; - this.kafkaFeedbackTopicSet = false; - this.windowSizeSet = false; - } - ///////////////////////////////////// // Layer 1 - Hazelcast Jet Run Job // ///////////////////////////////////// @@ -74,25 +48,26 @@ public class Uc4HazelcastJetFactory { * @param jobName The name of the job. * @throws Exception If either no JetInstance or Pipeline is set, a job cannot be startet. */ - public void runUc4Job(final String jobName) throws Exception { // NOPMD - if (this.jetInstanceSet) { - if (this.pipelineSet) { - - // Adds the job name and joins a job to the JetInstance defined in this factory - final JobConfig jobConfig = new JobConfig() - .registerSerializer(ValueGroup.class, ValueGroupSerializer.class) - .registerSerializer(SensorGroupKey.class, SensorGroupKeySerializer.class) - .setName(jobName); - this.uc4JetInstance.newJobIfAbsent(this.uc4JetPipeline, jobConfig).join(); - - } else { - throw new Exception(// NOPMD - "Hazelcast Pipeline is not set! Cannot start a hazelcast jet job for UC4."); - } - } else { - throw new Exception("Jet Instance is not set! " // NOPMD + public void runUc4Job(final String jobName) throws IllegalStateException { // NOPMD + + // Check if a Jet Instance for UC4 is set. + if (this.uc4JetInstance == null) { + throw new IllegalStateException("Jet Instance is not set! " + "Cannot start a hazelcast jet job for UC4."); } + + // Check if a Pipeline for UC3 is set. + if (this.uc4JetPipeline == null) { + throw new IllegalStateException( + "Hazelcast Pipeline is not set! Cannot start a hazelcast jet job for UC4."); + } + + // Adds the job name and joins a job to the JetInstance defined in this factory + final JobConfig jobConfig = new JobConfig() + .registerSerializer(ValueGroup.class, ValueGroupSerializer.class) + .registerSerializer(SensorGroupKey.class, SensorGroupKeySerializer.class) + .setName(jobName); + this.uc4JetInstance.newJobIfAbsent(this.uc4JetPipeline, jobConfig).join(); } ///////////// @@ -114,7 +89,6 @@ public class Uc4HazelcastJetFactory { this.uc4JetInstance = new JetInstanceBuilder() .setConfigFromEnv(logger, bootstrapServerDefault, hzKubernetesServiceDnsKey) .build(); - this.jetInstanceSet = true; return this; } @@ -126,62 +100,77 @@ public class Uc4HazelcastJetFactory { * @throws Exception If the input topic or the kafka properties are not defined, the pipeline * cannot be built. */ - public Uc4HazelcastJetFactory buildUc4Pipeline() throws Exception { // NOPMD - // Check for set properties and set input topic - if (this.readPropertiesSet) { - if (this.writePropertiesSet) { - if (this.inputTopicSet) { - if (this.outputTopicSet) { - if (this.kafkaConfigurationTopicSet) { - if (this.windowSizeSet) { - if (this.jetInstanceSet) { - if (this.kafkaFeedbackTopicSet) { - // Build Pipeline Using the pipelineBuilder - final Uc4PipelineBuilderNew pipeBuilder = new Uc4PipelineBuilderNew(); - this.uc4JetPipeline = - pipeBuilder.build(this.kafkaInputReadPropsForPipeline, - this.kafkaConfigReadPropsForPipeline, - this.kafkaAggregationReadPropsForPipeline, - this.kafkaWritePropsForPipeline, - this.kafkaInputTopic, this.kafkaOutputTopic, - this.kafkaConfigurationTopic, - this.kafkaFeedbackTopic, - this.windowSize); - this.pipelineSet = true; - // Return Uc4HazelcastJetBuilder factory - return this; - } else { - throw new Exception("Feedback topic not set! " // NOPMD - + "Cannot build pipeline."); // NOCS // NOPMD - } - } else { - throw new Exception("Jet Instance not set! " // NOPMD - + "Cannot build pipeline."); // NOCS // NOPMD - } - } else { - throw new Exception("window size for pipeline not set! " // NOPMD - + "Cannot build pipeline."); // NOCS // NOPMD - } - } else { - throw new Exception("configuratin topic for pipeline not set! " // NOPMD - + "Cannot build pipeline."); // NOCS // NOPMD - } - } else { - throw new Exception("kafka output topic for pipeline not set! " // NOPMD - + "Cannot build pipeline."); // NOCS // NOPMD - } - } else { - throw new Exception("Kafka input topic for pipeline not set! " // NOPMD - + "Cannot build pipeline."); // NOCS // NOPMD - } - } else { - throw new Exception("Kafka Write Properties for pipeline not set! " // NOPMD - + "Cannot build pipeline."); // NOCS // NOPMD - } - } else { - throw new Exception("Kafka Read Properties for pipeline not set! " // NOPMD - + "Cannot build pipeline."); // NOCS // NOPMD + public Uc4HazelcastJetFactory buildUc4Pipeline() throws IllegalStateException { // NOPMD + + final String defaultPipelineWarning = "Cannot build pipeline."; // NOPMD + + // Check if Properties for the Kafka Input are set. + if (this.kafkaInputReadPropsForPipeline == null) { + throw new IllegalStateException("Kafka Input Read Properties for pipeline not set! " + + defaultPipelineWarning); + } + + // Check if Properties for the Kafka Output are set. + if (this.kafkaWritePropsForPipeline == null) { + throw new IllegalStateException("Kafka Write Properties for pipeline not set! " + + defaultPipelineWarning); + } + + // Check if Properties for the Kafka Config Read are set. + if (this.kafkaConfigPropsForPipeline == null) { + throw new IllegalStateException("Kafka Config Read Properties for pipeline not set! " + + defaultPipelineWarning); + } + + // Check if Properties for the Kafka Feedback Read are set. + if (this.kafkaFeedbackPropsForPipeline == null) { + throw new IllegalStateException("Kafka Feedback Read Properties for pipeline not set! " + + defaultPipelineWarning); + } + + // Check if the Kafka input topic is set. + if (this.kafkaInputTopic == null) { + throw new IllegalStateException("Kafka input topic for pipeline not set! " + + defaultPipelineWarning); + } + + // Check if the Kafka output topic is set. + if (this.kafkaOutputTopic == null) { + throw new IllegalStateException("kafka output topic for pipeline not set! " + + defaultPipelineWarning); + } + + // Check if the Kafka config topic is set. + if (this.kafkaConfigurationTopic == null) { + throw new IllegalStateException("configuratin topic for pipeline not set! " + + defaultPipelineWarning); + } + + // Check if the Kafka feedback topic is set. + if (this.kafkaFeedbackTopic == null) { + throw new IllegalStateException("Feedback topic not set! " + + defaultPipelineWarning); } + + // Check if window size for tumbling window is set. + if (this.windowSize <= 0) { + throw new IllegalStateException("window size for pipeline not set or not greater than 0! " + + defaultPipelineWarning); + } + + // Build Pipeline Using the pipelineBuilder + final Uc4PipelineBuilder pipeBuilder = new Uc4PipelineBuilder(); + this.uc4JetPipeline = + pipeBuilder.build(this.kafkaInputReadPropsForPipeline, + this.kafkaConfigPropsForPipeline, + this.kafkaFeedbackPropsForPipeline, + this.kafkaWritePropsForPipeline, + this.kafkaInputTopic, this.kafkaOutputTopic, + this.kafkaConfigurationTopic, + this.kafkaFeedbackTopic, + this.windowSize); + // Return Uc4HazelcastJetBuilder factory + return this; } ///////////// @@ -212,9 +201,8 @@ public class Uc4HazelcastJetFactory { propsBuilder.buildKafkaAggregationReadPropsFromEnv(bootstrapServersDefault, schemaRegistryUrlDefault); this.kafkaInputReadPropsForPipeline = kafkaInputReadProps; - this.kafkaConfigReadPropsForPipeline = kafkaConfigReadProps; - this.kafkaAggregationReadPropsForPipeline = kafkaAggregationReadProps; - this.readPropertiesSet = true; + this.kafkaConfigPropsForPipeline = kafkaConfigReadProps; + this.kafkaFeedbackPropsForPipeline = kafkaAggregationReadProps; return this; } @@ -232,7 +220,6 @@ public class Uc4HazelcastJetFactory { final Properties kafkaWriteProps = propsBuilder.buildKafkaWritePropsFromEnv(bootstrapServersDefault); this.kafkaWritePropsForPipeline = kafkaWriteProps; - this.writePropertiesSet = true; return this; } @@ -245,7 +232,6 @@ public class Uc4HazelcastJetFactory { public Uc4HazelcastJetFactory setCustomKafkaInputTopic(// NOPMD final String inputTopic) { this.kafkaInputTopic = inputTopic; - this.inputTopicSet = true; return this; } @@ -257,7 +243,6 @@ public class Uc4HazelcastJetFactory { */ public Uc4HazelcastJetFactory setCustomKafkaOutputTopic(final String outputTopic) { // NOPMD this.kafkaOutputTopic = outputTopic; - this.outputTopicSet = true; return this; } @@ -274,7 +259,6 @@ public class Uc4HazelcastJetFactory { this.kafkaInputTopic = Objects.requireNonNullElse( System.getenv(ConfigurationKeys.KAFKA_INPUT_TOPIC), defaultInputTopic); - this.inputTopicSet = true; return this; } @@ -290,7 +274,6 @@ public class Uc4HazelcastJetFactory { this.kafkaOutputTopic = Objects.requireNonNullElse( System.getenv(ConfigurationKeys.KAFKA_OUTPUT_TOPIC), defaultOutputTopic); - this.outputTopicSet = true; return this; } @@ -303,7 +286,6 @@ public class Uc4HazelcastJetFactory { public Uc4HazelcastJetFactory setCustomWindowSize(// NOPMD final int windowSize) { this.windowSize = windowSize; - this.windowSizeSet = true; return this; } @@ -321,7 +303,6 @@ public class Uc4HazelcastJetFactory { defaultWindowSize); final int windowSizeNumber = Integer.parseInt(windowSize); this.windowSize = windowSizeNumber; - this.windowSizeSet = true; return this; } @@ -334,7 +315,6 @@ public class Uc4HazelcastJetFactory { public Uc4HazelcastJetFactory setCustomKafkaConfigurationTopic(// NOPMD final String kafkaConfigurationTopic) { this.kafkaConfigurationTopic = kafkaConfigurationTopic; - this.kafkaConfigurationTopicSet = true; return this; } @@ -350,7 +330,6 @@ public class Uc4HazelcastJetFactory { this.kafkaConfigurationTopic = (String) Objects.requireNonNullElse( System.getenv(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC), defaultKafkaConfigurationTopic); - this.kafkaConfigurationTopicSet = true; return this; } @@ -363,7 +342,6 @@ public class Uc4HazelcastJetFactory { public Uc4HazelcastJetFactory setCustomKafkaFeedbackTopic(// NOPMD final String kafkaFeedbackTopic) { this.kafkaFeedbackTopic = kafkaFeedbackTopic; - this.kafkaFeedbackTopicSet = true; return this; } @@ -379,7 +357,6 @@ public class Uc4HazelcastJetFactory { this.kafkaFeedbackTopic = (String) Objects.requireNonNullElse( System.getenv(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC), defaultKafkaFeedbackTopic); - this.kafkaFeedbackTopicSet = true; return this; } diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4KafkaPropertiesBuilder.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4KafkaPropertiesBuilder.java index 0cca80b58..de1513b86 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4KafkaPropertiesBuilder.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4KafkaPropertiesBuilder.java @@ -4,13 +4,13 @@ import io.confluent.kafka.serializers.KafkaAvroDeserializer; import java.util.Objects; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.DoubleDeserializer; import org.apache.kafka.common.serialization.DoubleSerializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import theodolite.commons.hazelcastjet.ConfigurationKeys; import theodolite.uc4.application.uc4specifics.EventDeserializer; -import titan.ccp.configuration.events.EventSerde; /** * Builds a read and write Properties objects containing the needed kafka properties used for the @@ -18,6 +18,9 @@ import titan.ccp.configuration.events.EventSerde; */ public class Uc4KafkaPropertiesBuilder { + private static final String SPECIFIC_AVRO_READER_CONFIG = "specific.avro.reader"; + private static final String SCHEMA_REGISTRY_URL_CONFIG = "schema.registry.url"; + /** * Builds Kafka Properties used for the UC4 Benchmark pipeline. * @@ -37,15 +40,17 @@ public class Uc4KafkaPropertiesBuilder { schemaRegistryUrlDefault); final Properties props = new Properties(); - props.put("bootstrap.servers", kafkaBootstrapServers); //NOCS - props.put("key.deserializer", StringDeserializer.class.getCanonicalName()); - props.put("value.deserializer", KafkaAvroDeserializer.class); - props.put("specific.avro.reader", true); - props.put("schema.registry.url", schemaRegistryUrl); - props.put("auto.offset.reset", "latest"); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); // NOCS + props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + StringDeserializer.class.getCanonicalName()); + props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + KafkaAvroDeserializer.class.getCanonicalName()); + props.put(SPECIFIC_AVRO_READER_CONFIG, true); + props.setProperty(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); return props; } - + /** * Builds Kafka Properties used for the UC4 Benchmark pipeline. * @@ -65,14 +70,16 @@ public class Uc4KafkaPropertiesBuilder { schemaRegistryUrlDefault); final Properties props = new Properties(); - props.put("bootstrap.servers", kafkaBootstrapServers); //NOCS - props.put("key.deserializer", StringDeserializer.class.getCanonicalName()); - props.put("value.deserializer", DoubleDeserializer.class.getCanonicalName()); - props.put("specific.avro.reader", true); - props.put("schema.registry.url", schemaRegistryUrl); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); // NOCS + props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + StringDeserializer.class.getCanonicalName()); + props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + DoubleDeserializer.class.getCanonicalName()); + props.put(SPECIFIC_AVRO_READER_CONFIG, true); + props.setProperty(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); return props; } - + /** * Builds Kafka Properties used for the UC4 Benchmark pipeline. * @@ -92,12 +99,14 @@ public class Uc4KafkaPropertiesBuilder { schemaRegistryUrlDefault); final Properties props = new Properties(); - props.put("bootstrap.servers", kafkaBootstrapServers); //NOCS - props.put("key.deserializer", EventDeserializer.class); - props.put("value.deserializer", StringDeserializer.class.getCanonicalName()); - props.put("specific.avro.reader", true); - props.put("schema.registry.url", schemaRegistryUrl); - props.put("auto.offset.reset", "earliest"); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); + props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + EventDeserializer.class.getCanonicalName()); + props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + StringDeserializer.class.getCanonicalName()); + props.put(SPECIFIC_AVRO_READER_CONFIG, true); + props.setProperty(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return props; } @@ -115,9 +124,11 @@ public class Uc4KafkaPropertiesBuilder { kafkaBootstrapServerDefault); final Properties props = new Properties(); - props.put("bootstrap.servers", kafkaBootstrapServers); //NOCS - props.put("key.serializer", StringSerializer.class.getCanonicalName()); - props.put("value.serializer", DoubleSerializer.class.getCanonicalName()); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); // NOCS + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + StringSerializer.class.getCanonicalName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + DoubleSerializer.class.getCanonicalName()); return props; } diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4PipelineBuilder.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4PipelineBuilder.java index c4a9d77f1..383300041 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4PipelineBuilder.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4PipelineBuilder.java @@ -1,180 +1,293 @@ package theodolite.uc4.application; -import com.hazelcast.function.FunctionEx; -import com.hazelcast.jet.JetInstance; +import com.hazelcast.function.BiFunctionEx; +import com.hazelcast.function.SupplierEx; import com.hazelcast.jet.Traverser; import com.hazelcast.jet.Traversers; +import com.hazelcast.jet.Util; import com.hazelcast.jet.aggregate.AggregateOperations; -import com.hazelcast.jet.datamodel.WindowResult; import com.hazelcast.jet.kafka.KafkaSinks; import com.hazelcast.jet.kafka.KafkaSources; -import com.hazelcast.jet.pipeline.JoinClause; import com.hazelcast.jet.pipeline.Pipeline; import com.hazelcast.jet.pipeline.Sinks; -import com.hazelcast.jet.pipeline.StreamSourceStage; +import com.hazelcast.jet.pipeline.StageWithWindow; import com.hazelcast.jet.pipeline.StreamStage; import com.hazelcast.jet.pipeline.StreamStageWithKey; import com.hazelcast.jet.pipeline.WindowDefinition; -import com.hazelcast.map.IMap; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; -import java.util.Optional; import java.util.Properties; import java.util.Set; -import org.apache.kafka.streams.kstream.KTable; +import theodolite.uc4.application.uc4specifics.ChildParentsTransformer; +import theodolite.uc4.application.uc4specifics.SensorGroupKey; import theodolite.uc4.application.uc4specifics.ValueGroup; import titan.ccp.configuration.events.Event; import titan.ccp.model.records.ActivePowerRecord; -import titan.ccp.model.records.AggregatedActivePowerRecord; import titan.ccp.model.sensorregistry.SensorRegistry; +/** + * Builder to build a HazelcastJet Pipeline for UC4 which can be used for stream processing using + * Hazelcast Jet. + */ public class Uc4PipelineBuilder { - private Pipeline pipe = Pipeline.create(); - - // Data - private String kafkaInputTopic; - private String kafkaConfigurationTopic; - private Properties kafkaReadPropsForPipeline; - private JetInstance uc4JetInstance; - - + private static final String SENSOR_PARENT_MAP_NAME = "SensorParentMap"; /** * Builds a pipeline which can be used for stream processing using Hazelcast Jet. * - * @param kafkaReadPropsForPipeline Properties Object containing the necessary kafka reads - * attributes. + * @param kafkaInputReadPropsForPipeline Properties Object containing the necessary kafka input + * read attributes. + * @param kafkaConfigPropsForPipeline Properties Object containing the necessary kafka config + * read attributes. + * @param kafkaFeedbackPropsForPipeline Properties Object containing the necessary kafka + * aggregation read attributes. * @param kafkaWritePropsForPipeline Properties Object containing the necessary kafka write * attributes. * @param kafkaInputTopic The name of the input topic used for the pipeline. * @param kafkaOutputTopic The name of the output topic used for the pipeline. - * @param windowSize The window size in milliseconds of the tumbling window used in the "last - * values" aggregation of this pipeline. + * @param kafkaConfigurationTopic The name of the configuration topic used for the pipeline. + * @param kafkaFeedbackTopic The name of the feedback topic used for the pipeline. + * @param windowSize The window size of the tumbling window used in this pipeline. * @return returns a Pipeline used which can be used in a Hazelcast Jet Instance to process data - * for UC4. + * for UC3. */ - public Pipeline build(final Properties kafkaReadPropsForPipeline, - final Properties kafkaWritePropsForPipeline, final String kafkaInputTopic, + @SuppressWarnings("unchecked") + public Pipeline build(final Properties kafkaInputReadPropsForPipeline, // NOPMD + final Properties kafkaConfigPropsForPipeline, + final Properties kafkaFeedbackPropsForPipeline, + final Properties kafkaWritePropsForPipeline, + final String kafkaInputTopic, final String kafkaOutputTopic, final String kafkaConfigurationTopic, - final int windowSize, - JetInstance jet) { + final String kafkaFeedbackTopic, + final int windowSize) { + + ////////////////////////////////// + // The pipeline for this Use Case + final Pipeline uc4Pipeline = Pipeline.create(); + + // System.out.println("DEBUG: Window Size: " + windowSize); + + ////////////////////////////////// + // (1) Configuration Stream + final StreamStage<Entry<Event, SensorRegistry>> configurationStream = uc4Pipeline + .readFrom(KafkaSources.<Event, String>kafka( + kafkaConfigPropsForPipeline, kafkaConfigurationTopic)) + .withNativeTimestamps(0) + .map(data -> { + // DEBUG + // System.out.println("D E B U G: Got a configuration Stream Element!"); + // System.out.println("Event: " + data.getKey().toString() + "; Sensor Registry: " + + // data.getValue().toString()); - this.uc4JetInstance = jet; + return data; + + }) + .filter(entry -> entry.getKey() == Event.SENSOR_REGISTRY_CHANGED + || entry.getKey() == Event.SENSOR_REGISTRY_STATUS) + .map(data -> { + + // DEBUG + // System.out.println("D E B U G: It passed through the filter"); + + return Map.entry(data.getKey(), SensorRegistry.fromJson(data.getValue())); + }); + + // Builds a new HashMap // + final SupplierEx<? extends HashMap<String, Set<String>>> hashMapSupplier = + () -> new HashMap<String, Set<String>>(); + + // FlatMapFunction // + final BiFunctionEx<? super HashMap<String, Set<String>>, ? super Entry<Event, SensorRegistry>, + ? extends Traverser<Entry<String, Set<String>>>> flatMapFn = + (flatMapStage, eventItem) -> { + // Get Data + HashMap<String, Set<String>> oldParents = + (HashMap<String, Set<String>>) flatMapStage.clone(); + SensorRegistry newSensorRegistry = (SensorRegistry) eventItem.getValue(); - /////////////////////// - // 1. Configuration Map - // this.kafkaConfigurationTopic = kafkaConfigurationTopic; - // this.kafkaReadPropsForPipeline = kafkaReadPropsForPipeline; - // final IMap<String, Set<String>> parentSensorTable = this.buildParentSensorMap(); - /////////////////////// - StreamStage<Entry<String, Set<String>>> configurationStream = null; + // Transform new Input + ChildParentsTransformer transformer = new ChildParentsTransformer("default-name"); + Map<String, Set<String>> mapFromRegistry = + transformer.constructChildParentsPairs(newSensorRegistry); - //////////////// - // 2. Input Map - // this.kafkaInputTopic = kafkaInputTopic; - // final IMap<String, ActivePowerRecord> inputTable = this.buildInputTable(); - //////////////// - StreamStage<Entry<String, ActivePowerRecord>> inputStream = this.pipe - .readFrom(KafkaSources.<String, ActivePowerRecord>kafka( - kafkaReadPropsForPipeline, kafkaInputTopic)) - .withNativeTimestamps(0); - - StreamStage<Entry<String, Double>> reducedInputStream = inputStream - .map(inputEntry -> { - return Map.entry(inputEntry.getValue().getIdentifier(), - inputEntry.getValue().getValueInW()); - }); + // Compare both tables + HashMap<String, Set<String>> updates = new HashMap<String, Set<String>>(); + for (String key : mapFromRegistry.keySet()) { + if (oldParents.containsKey(key)) { + if (!mapFromRegistry.get(key).equals(oldParents.get(key))) { + updates.put(key, mapFromRegistry.get(key)); + } + } else { + updates.put(key, mapFromRegistry.get(key)); + } + } - ////////////////////////////////////////////////////////// - // 3. Last Value Table from Input and Parent Sensor Table - // final IMap<WindowResult<SensorParentKey>, ActivePowerRecord> lastValueTable = - // this.buildLastValueTable(parentSensorTable, inputTable); - //////////////////////////////////////////////////////////// - //StreamStage<Entry<String,ValueGroup>> jointStream = - // inputStream.hashJoin(configurationStream, - // JoinClause.joinMapEntries(leftKeyFn), - // mapToOutputFn); // TODO hmm, how to join? + ArrayList<Entry<String, Set<String>>> updatesList = + new ArrayList<Entry<String, Set<String>>>(updates.entrySet()); - // 4. Aggregation Stream - //final IMap<WindowResult<String>, AggregatedActivePowerRecord> aggregations = - // this.buildAggregationStream(lastValueTable); + // Return traverser with differences + return Traversers.traverseIterable(updatesList) + .map(e -> Util.entry(e.getKey(), e.getValue())); + + }; - return pipe; - } + // Write into table sink + configurationStream + .flatMapStateful(hashMapSupplier, flatMapFn) + .writeTo(Sinks.mapWithUpdating( + SENSOR_PARENT_MAP_NAME, // The addressed IMAP + event -> event.getKey(), // The key to look for + (oldValue, newEntry) -> { // the new entry returned (null automatically results in + // deletion of entry) //NOCS - /** - * Uses a given configuration topic of kafka to get data which represents a table of sensor or - * group identifiers which are mapped to a set of groups and returns an IMap containing these - * entries. - * - * TODO WORK IN PROGRESS - QUESTIONS REGARDING THE LAST STEPS - * - * @return Returns a IMap<String, Set<String>> Object containing sensor/group identifiers and - * their corresponsing groups/parents. - */ - private IMap<String, Set<String>> buildParentSensorMap() { - // Read the raw configuration stream - StreamStage<Entry<Event, String>> configurationStream = this.pipe - .readFrom(KafkaSources.<Event, String>kafka( - kafkaReadPropsForPipeline, kafkaConfigurationTopic)) - .withNativeTimestamps(0); + // DEBUG + /* + * String debugFlatmapString = "["; for (String group : newEntry.getValue()) { + * debugFlatmapString = debugFlatmapString + group + ","; } debugFlatmapString = + * debugFlatmapString + "]"; System.out.println( "Flatmap Writes for key '" + + * newEntry.getKey() + "': " + debugFlatmapString); + */ - // Filter certain values out - StreamStage<Entry<Event, String>> filteredConfigurationStream = configurationStream - .filter(entry -> entry.getKey() == Event.SENSOR_REGISTRY_CHANGED - || entry.getKey() == Event.SENSOR_REGISTRY_STATUS); + // Write new set of groups + return newEntry.getValue(); + })); + + ////////////////////////////////// + // (1) Sensor Input Stream + final StreamStage<Entry<String, Double>> inputStream = uc4Pipeline + .readFrom(KafkaSources.<String, ActivePowerRecord>kafka( + kafkaInputReadPropsForPipeline, kafkaInputTopic)) + .withNativeTimestamps(0) + .map(stream -> { - // Map configuration String to Sensor Registry - StreamStage<Entry<Event, SensorRegistry>> mapped = filteredConfigurationStream - .map(inputEntry -> Map.entry(inputEntry.getKey(), - SensorRegistry.fromJson(inputEntry.getValue()))); + String sensorId = stream.getValue().getIdentifier(); + Double valueInW = stream.getValue().getValueInW(); + // DEBUG + // System.out.println("INPUT D E B U G: Got an input Stream Element!"); + // System.out.println("[SensorId=" + sensorId + "//valueinW=" + valueInW.toString()); - // Flat Transform TODO Needs Traversers thingy? - StreamStage<Entry<String, Optional<Set<String>>>> flatMapped = mapped.flatMap(null); + return Map.entry(sensorId, valueInW); + }); - // Group by Key TODO - StreamStageWithKey<Entry<String, Optional<Set<String>>>, Object> grouped = - flatMapped.groupingKey(entry -> entry.getKey()); + // (1) Aggregation Stream + final StreamStage<Entry<String, Double>> aggregations = uc4Pipeline + .readFrom(KafkaSources.<String, Double>kafka( + kafkaFeedbackPropsForPipeline, kafkaFeedbackTopic)) + .withNativeTimestamps(0) + .map(stream -> { - // Aggregate TODO - IMap<String, Set<String>> aggregated = - (IMap<String, Set<String>>) grouped.rollingAggregate(null); + // DEBUG + // System.out.println("AGGREGATION D E B U G: Got an aggregation Stream Element!"); + // System.out.println( + // "[SensorId=" + stream.getKey() + "//valueinW=" + stream.getValue().toString()); + return stream; - // Return - return aggregated; - } + }); - /** - * Receives an input stream with sensor ID's and values and returns a filled IMap with such - * values. - * - * TODO WORK IN PROGRESS - QUESTIONS - * - * @return An IMap<String,ActivePowerRecord> Object with entries - */ - private IMap<String, ActivePowerRecord> buildInputTable() { + // (2) UC4 Merge Input with aggregation stream + final StreamStageWithKey<Entry<String, Double>, String> mergedInputAndAggregations = inputStream + .merge(aggregations) + .groupingKey(event -> event.getKey()); - final IMap<String, ActivePowerRecord> inputTable = uc4JetInstance.getMap("inputTable"); + // (3) UC4 Join Configuration and Merges Input/Aggregation Stream + // [sensorKey , (value,Set<Groups>)] + final StreamStage<Entry<String, ValueGroup>> joinedStage = mergedInputAndAggregations + .mapUsingIMap( + SENSOR_PARENT_MAP_NAME, + (sensorEvent, sensorParentsSet) -> { + + // Get Data + Set<String> sensorParentsCasted = (Set<String>) sensorParentsSet; + + if (sensorParentsCasted == null) { + Set<String> nullSet = new HashSet<String>(); + nullSet.add("NULL-GROUPSET"); + return Map.entry(sensorEvent.getKey(), + new ValueGroup(sensorEvent.getValue(), nullSet)); + } else { + ValueGroup valueParentsPair = + new ValueGroup(sensorEvent.getValue(), sensorParentsCasted); + // Return solution + return Map.entry(sensorEvent.getKey(), valueParentsPair); + } + + + }); + + // (4) UC4 Duplicate as flatmap joined Stream + // [(sensorKey, Group) , value] + final StreamStage<Entry<SensorGroupKey, Double>> dupliAsFlatmappedStage = joinedStage + .flatMap(entry -> { + + // DEBUG + // System.out.println("D E B G U G Stage 4"); + + // Supplied data + String keyGroupId = entry.getKey(); + Double valueInW = entry.getValue().getValueInW(); + Set<String> groups = entry.getValue().getGroups(); + + // Transformed Data + String[] groupList = groups.toArray(String[]::new); + SensorGroupKey[] newKeyList = new SensorGroupKey[groupList.length]; + ArrayList<Entry<SensorGroupKey, Double>> newEntryList = + new ArrayList<Entry<SensorGroupKey, Double>>(); + for (int i = 0; i < groupList.length; i++) { + newKeyList[i] = new SensorGroupKey(keyGroupId, groupList[i]); + newEntryList.add(Map.entry(newKeyList[i], valueInW)); + // DEBUG + // System.out.println("Added new Entry to list: [(" + newKeyList[i].getSensorId() + "," + // + newKeyList[i].getGroup() + ")," + valueInW.toString()); + } + + + + // Return traversable list of new entry elements + return Traversers.traverseIterable(newEntryList); - // Read Input Stream - // TODO MERGE STEP WITH AGGREGATION RESULTS SKIPPED AT THE MOMENT - StreamStage<Entry<String, ActivePowerRecord>> inputStream = this.pipe - .readFrom(KafkaSources.<String, ActivePowerRecord>kafka( - kafkaReadPropsForPipeline, kafkaInputTopic)) - .withNativeTimestamps(0) - .map(entry -> { - inputTable.put(entry.getKey(), entry.getValue()); - return entry; }); - return inputTable; + // (5) UC4 Last Value Map + // Table with tumbling window differentiation [ (sensorKey,Group) , value ],Time + // TODO: Implementation of static table to fill values out of the past! + final StageWithWindow<Entry<SensorGroupKey, Double>> windowedLastValues = dupliAsFlatmappedStage + .window(WindowDefinition.tumbling(windowSize)); - } + // (6) UC4 GroupBy and aggregate and map + // Group using the group out of the sensorGroupKey keys + final StreamStage<Entry<String, Double>> groupedAggregatedMapped = windowedLastValues + .groupingKey(entry -> entry.getKey().getGroup()) + .aggregate(AggregateOperations.summingDouble(entry -> entry.getValue())) + .map(agg -> { + String theGroup = agg.getKey(); + Double summedValueInW = agg.getValue(); + + // System.out.println("DEBUG - We have a grouped Aggregation Stage at the end!"); + return Util.entry(theGroup, summedValueInW); + }); + + // (7) Sink - Results back to Kafka + groupedAggregatedMapped.writeTo(KafkaSinks.<String, Double>kafka( + kafkaWritePropsForPipeline, kafkaOutputTopic)); + + // (7) Sink - Results back to Kafka + groupedAggregatedMapped.writeTo(KafkaSinks.<String, Double>kafka( + kafkaWritePropsForPipeline, kafkaFeedbackTopic)); + + // (7) Sink - Write to logger/console for debug puposes + groupedAggregatedMapped.writeTo(Sinks.logger()); + + // Return the pipeline + return uc4Pipeline; + } } diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4PipelineBuilderNew.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4PipelineBuilderNew.java deleted file mode 100644 index b4ed5c12a..000000000 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4PipelineBuilderNew.java +++ /dev/null @@ -1,278 +0,0 @@ -package theodolite.uc4.application; - -import com.hazelcast.function.BiFunctionEx; -import com.hazelcast.function.SupplierEx; -import com.hazelcast.jet.JetInstance; -import com.hazelcast.jet.Traverser; -import com.hazelcast.jet.Traversers; -import com.hazelcast.jet.Util; -import com.hazelcast.jet.aggregate.AggregateOperations; -import com.hazelcast.jet.kafka.KafkaSinks; -import com.hazelcast.jet.kafka.KafkaSources; -import com.hazelcast.jet.pipeline.JournalInitialPosition; -import com.hazelcast.jet.pipeline.Pipeline; -import com.hazelcast.jet.pipeline.Sinks; -import com.hazelcast.jet.pipeline.Sources; -import com.hazelcast.jet.pipeline.StageWithWindow; -import com.hazelcast.jet.pipeline.StreamStage; -import com.hazelcast.jet.pipeline.StreamStageWithKey; -import com.hazelcast.jet.pipeline.WindowDefinition; -import com.hazelcast.map.IMap; -import java.util.Properties; -import java.util.Set; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Map.Entry; -import theodolite.uc4.application.uc4specifics.ChildParentsTransformer; -import theodolite.uc4.application.uc4specifics.SensorGroupKey; -import theodolite.uc4.application.uc4specifics.ValueGroup; -import titan.ccp.configuration.events.Event; -import titan.ccp.model.records.ActivePowerRecord; -import titan.ccp.model.sensorregistry.SensorRegistry; - -public class Uc4PipelineBuilderNew { - - @SuppressWarnings("unchecked") - public Pipeline build(final Properties kafkaInputReadPropsForPipeline, - final Properties kafkaConfigReadPropsForPipeline, - final Properties kafkaAggregationReadPropsForPipeline, - final Properties kafkaWritePropsForPipeline, - final String kafkaInputTopic, - final String kafkaOutputTopic, - final String kafkaConfigurationTopic, - final String kafkaFeedbackTopic, - final int windowSize) { - - ////////////////////////////////// - // The pipeline for this Use Case - Pipeline uc4Pipeline = Pipeline.create(); - - //System.out.println("DEBUG: Window Size: " + windowSize); - - ////////////////////////////////// - // (1) Configuration Stream - StreamStage<Entry<Event, SensorRegistry>> configurationStream = uc4Pipeline - .readFrom(KafkaSources.<Event, String>kafka( - kafkaConfigReadPropsForPipeline, kafkaConfigurationTopic)) - .withNativeTimestamps(0) - .map(data -> { - - // DEBUG - // System.out.println("D E B U G: Got a configuration Stream Element!"); - // System.out.println("Event: " + data.getKey().toString() + "; Sensor Registry: " + - // data.getValue().toString()); - - return data; - - }) - .filter(entry -> entry.getKey() == Event.SENSOR_REGISTRY_CHANGED || - entry.getKey() == Event.SENSOR_REGISTRY_STATUS) - .map(data -> { - - // DEBUG - // System.out.println("D E B U G: It passed through the filter"); - - return Map.entry(data.getKey(), SensorRegistry.fromJson(data.getValue())); - }); - - // Builds a new HashMap // - SupplierEx<? extends HashMap<String, Set<String>>> hashMapSupplier = - () -> new HashMap<String, Set<String>>(); - - // FlatMapFunction // - BiFunctionEx<? super HashMap<String, Set<String>>, ? super Entry<Event, SensorRegistry>, ? extends Traverser<Entry<String, Set<String>>>> flatMapFn = - (flatMapStage, eventItem) -> { - // Get Data - HashMap<String, Set<String>> oldParents = - (HashMap<String, Set<String>>) flatMapStage.clone(); - SensorRegistry newSensorRegistry = (SensorRegistry) eventItem.getValue(); - - // Transform new Input - ChildParentsTransformer transformer = new ChildParentsTransformer("default-name"); - Map<String, Set<String>> mapFromRegistry = - transformer.constructChildParentsPairs(newSensorRegistry); - - // Compare both tables - HashMap<String, Set<String>> updates = new HashMap<String, Set<String>>(); - for (String key : mapFromRegistry.keySet()) { - if (oldParents.containsKey(key)) { - if (!mapFromRegistry.get(key).equals(oldParents.get(key))) { - updates.put(key, mapFromRegistry.get(key)); - } - } else { - updates.put(key, mapFromRegistry.get(key)); - } - } - - ArrayList<Entry<String, Set<String>>> updatesList = - new ArrayList<Entry<String, Set<String>>>(updates.entrySet()); - - /* - * DEBUG PRINT System.out.println("DEBUG FLATMAP ARRAY LIST"); for (Entry<String, - * Set<String>> entry : updatesList) { String debugString = "["; for (String group : - * entry.getValue()) { debugString = debugString + group + ","; } debugString = - * debugString + "]"; System.out.println("Entry discovered || Key: " + entry.getKey() + - * "; Groups: " + debugString); } - */ - - // Return traverser with differences - return Traversers.traverseIterable(updatesList) - .map(e -> Util.entry(e.getKey(), e.getValue())); - - }; - - // Write into table sink - configurationStream - .flatMapStateful(hashMapSupplier, flatMapFn) - .writeTo(Sinks.mapWithUpdating( - "SensorParentMap", // The addressed IMAP - event -> event.getKey(), // The key to look for - (oldValue, newEntry) -> { // the new entry returned (null automatically results in - // deletion of entry) - - // DEBUG - /* - * String debugFlatmapString = "["; for (String group : newEntry.getValue()) { - * debugFlatmapString = debugFlatmapString + group + ","; } debugFlatmapString = - * debugFlatmapString + "]"; System.out.println( "Flatmap Writes for key '" + - * newEntry.getKey() + "': " + debugFlatmapString); - */ - - // Write new set of groups - return newEntry.getValue(); - })); - - ////////////////////////////////// - // (1) Sensor Input Stream - StreamStage<Entry<String, Double>> inputStream = uc4Pipeline - .readFrom(KafkaSources.<String, ActivePowerRecord>kafka( - kafkaInputReadPropsForPipeline, kafkaInputTopic)) - .withNativeTimestamps(0) - .map(stream -> { - - String sensorId = stream.getValue().getIdentifier(); - Double valueInW = stream.getValue().getValueInW(); - - // DEBUG - System.out.println("INPUT D E B U G: Got an input Stream Element!"); - System.out.println("[SensorId=" + sensorId + "//valueinW=" + valueInW.toString()); - - return Map.entry(sensorId, valueInW); - }); - - // (1) Aggregation Stream - StreamStage<Entry<String, Double>> aggregations = uc4Pipeline - .readFrom(KafkaSources.<String, Double>kafka( - kafkaAggregationReadPropsForPipeline, kafkaFeedbackTopic)) - .withNativeTimestamps(0) - .map(stream -> { - - // DEBUG - System.out.println("AGGREGATION D E B U G: Got an aggregation Stream Element!"); - System.out.println("[SensorId=" + stream.getKey() + "//valueinW=" + stream.getValue().toString()); - - return stream; - - }); - - // (2) UC4 Merge Input with aggregation stream - StreamStageWithKey<Entry<String, Double>, String> mergedInputAndAggregations = inputStream - .merge(aggregations) - .groupingKey(event -> event.getKey()); - - // (3) UC4 Join Configuration and Merges Input/Aggregation Stream - // [sensorKey , (value,Set<Groups>)] - StreamStage<Entry<String, ValueGroup>> joinedStage = mergedInputAndAggregations - .mapUsingIMap( - "SensorParentMap", - (sensorEvent, sensorParentsSet) -> { - - // Get Data - Set<String> sensorParentsCasted = (Set<String>) sensorParentsSet; - - if (sensorParentsCasted != null) { - ValueGroup valueParentsPair = - new ValueGroup(sensorEvent.getValue(), sensorParentsCasted); - // Return solution - return Map.entry(sensorEvent.getKey(), valueParentsPair); - } else { - Set<String> nullSet = new HashSet<String>(); - nullSet.add("NULL-GROUPSET"); - return Map.entry(sensorEvent.getKey(), - new ValueGroup(sensorEvent.getValue(), nullSet)); - } - - - }); - - // (4) UC4 Duplicate as flatmap joined Stream - // [(sensorKey, Group) , value] - StreamStage<Entry<SensorGroupKey, Double>> dupliAsFlatmappedStage = joinedStage - .flatMap(entry -> { - - // DEBUG - // System.out.println("D E B G U G Stage 4"); - - // Supplied data - String keyGroupId = entry.getKey(); - Double valueInW = entry.getValue().getValueInW(); - Set<String> groups = entry.getValue().getGroups(); - - // Transformed Data - String[] groupList = groups.toArray(String[]::new); - SensorGroupKey[] newKeyList = new SensorGroupKey[groupList.length]; - ArrayList<Entry<SensorGroupKey, Double>> newEntryList = - new ArrayList<Entry<SensorGroupKey, Double>>(); - for (int i = 0; i < groupList.length; i++) { - newKeyList[i] = new SensorGroupKey(keyGroupId, groupList[i]); - newEntryList.add(Map.entry(newKeyList[i], valueInW)); - // DEBUG - // System.out.println("Added new Entry to list: [(" + newKeyList[i].getSensorId() + "," - // + newKeyList[i].getGroup() + ")," + valueInW.toString()); - } - - - - // Return traversable list of new entry elements - return Traversers.traverseIterable(newEntryList); - - }); - - // (5) UC4 Last Value Map - // Table with tumbling window differentiation [ (sensorKey,Group) , value ],Time - // TODO: Implementation of static table to fill values out of the past! - StageWithWindow<Entry<SensorGroupKey, Double>> windowedLastValues = dupliAsFlatmappedStage - .window(WindowDefinition.tumbling(5000)); - - // (6) UC4 GroupBy and aggregate and map - // Group using the group out of the sensorGroupKey keys - StreamStage<Entry<String, Double>> groupedAggregatedMapped = windowedLastValues - .groupingKey(entry -> entry.getKey().getGroup()) - .aggregate(AggregateOperations.summingDouble(entry -> entry.getValue())) - .map(agg -> { - String theGroup = agg.getKey(); - Double summedValueInW = agg.getValue(); - - //System.out.println("DEBUG - We have a grouped Aggregation Stage at the end!"); - - return Util.entry(theGroup, summedValueInW); - }); - - // (7) Sink - Results back to Kafka - groupedAggregatedMapped.writeTo(KafkaSinks.<String, Double>kafka( - kafkaWritePropsForPipeline, kafkaOutputTopic)); - - // (7) Sink - Results back to Kafka - groupedAggregatedMapped.writeTo(KafkaSinks.<String, Double>kafka( - kafkaWritePropsForPipeline, kafkaFeedbackTopic)); - - // (7) Sink - Write to logger/console for debug puposes - groupedAggregatedMapped.writeTo(Sinks.logger()); - - // Return the pipeline - return uc4Pipeline; - } - -} diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/ChildParentsTransformer.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/ChildParentsTransformer.java index f533e5df6..493921721 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/ChildParentsTransformer.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/ChildParentsTransformer.java @@ -62,6 +62,11 @@ public class ChildParentsTransformer implements // Do nothing } + /** + * Constructs a map of keys to their set of parents out of a SensorRegistry. + * @param registry The SensorRegistry to build the map out of. + * @return A map of keys to a set of their parents. + */ public Map<String, Set<String>> constructChildParentsPairs(final SensorRegistry registry) { return this.streamAllChildren(registry.getTopLevelSensor()) .collect(Collectors.toMap( diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/EventDeserializer.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/EventDeserializer.java index 8ff81d1d6..c88b167e1 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/EventDeserializer.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/EventDeserializer.java @@ -1,33 +1,31 @@ package theodolite.uc4.application.uc4specifics; import java.util.Map; -import org.apache.kafka.common.serialization.ByteBufferDeserializer; import org.apache.kafka.common.serialization.Deserializer; import titan.ccp.configuration.events.Event; +import titan.ccp.configuration.events.EventSerde; +/** + * Deserializer for Event Objects. + */ public class EventDeserializer implements Deserializer<Event> { - private final ByteBufferDeserializer byteBufferDeserializer = new ByteBufferDeserializer(); - + private final Deserializer<Event> deserializer = EventSerde.serde().deserializer(); + @Override public void configure(final Map<String, ?> configs, final boolean isKey) { - this.byteBufferDeserializer.configure(configs, isKey); + this.deserializer.configure(configs, isKey); } @Override public Event deserialize(final String topic, final byte[] data) { - final int ordinal = this.byteBufferDeserializer.deserialize(topic, data).getInt(); - for (final Event event : Event.values()) { - if (ordinal == event.ordinal()) { - return event; - } - } - throw new IllegalArgumentException("Deserialized data is not a valid event."); + return this.deserializer.deserialize(topic, data); } @Override public void close() { - this.byteBufferDeserializer.close(); + this.deserializer.close(); } + } diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/HashMapSupplier.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/HashMapSupplier.java index 171c4d09b..889379f20 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/HashMapSupplier.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/HashMapSupplier.java @@ -4,9 +4,12 @@ import com.hazelcast.function.SupplierEx; import java.util.HashMap; import java.util.Set; -public class HashMapSupplier implements SupplierEx<HashMap<String,Set<String>>>{ +/** + * Supplies a hashmap and implements supplierEx. + */ +public class HashMapSupplier implements SupplierEx<HashMap<String,Set<String>>> { - private static final long serialVersionUID = -6247504592403610702L; + private static final long serialVersionUID = -6247504592403610702L;//NOPMD @Override public HashMap<String, Set<String>> get() { @@ -15,7 +18,7 @@ public class HashMapSupplier implements SupplierEx<HashMap<String,Set<String>>>{ @Override public HashMap<String, Set<String>> getEx() throws Exception { - return null; + return this.get(); } diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/SensorGroupKey.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/SensorGroupKey.java index 43dae021f..3680062df 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/SensorGroupKey.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/SensorGroupKey.java @@ -3,14 +3,14 @@ package theodolite.uc4.application.uc4specifics; import java.util.Objects; /** - * Structure (sensorId, group) + * Structure (sensorId, group). */ public class SensorGroupKey { private final String sensorId; private final String group; - public SensorGroupKey(String sensorId, String group) { + public SensorGroupKey(final String sensorId, final String group) { this.sensorId = sensorId; this.group = group; } diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/SensorGroupKeySerializer.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/SensorGroupKeySerializer.java index d45ec67e0..9bcc80980 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/SensorGroupKeySerializer.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/SensorGroupKeySerializer.java @@ -4,13 +4,14 @@ import com.hazelcast.nio.ObjectDataInput; import com.hazelcast.nio.ObjectDataOutput; import com.hazelcast.nio.serialization.StreamSerializer; import java.io.IOException; -import java.util.Arrays; -import java.util.HashSet; -public class SensorGroupKeySerializer implements StreamSerializer<SensorGroupKey>{ - -private static final int TYPE_ID = 2; - +/** + * Serializes and Deserializes a SensorGroupKey. + */ +public class SensorGroupKeySerializer implements StreamSerializer<SensorGroupKey> { + + private static final int TYPE_ID = 2; + @Override public int getTypeId() { return TYPE_ID; @@ -18,8 +19,8 @@ private static final int TYPE_ID = 2; @Override public void write(final ObjectDataOutput out, final SensorGroupKey key) throws IOException { - out.writeString(key.getSensorId()); - out.writeString(key.getGroup()); + out.writeString(key.getSensorId()); + out.writeString(key.getGroup()); } @Override diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/ValueGroup.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/ValueGroup.java index e0ee77b2c..cb322ab55 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/ValueGroup.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/ValueGroup.java @@ -4,14 +4,14 @@ import java.util.Objects; import java.util.Set; /** - * Structure: (valueInW, Set(Groups)) + * Structure: (valueInW, Set(Groups)). */ public class ValueGroup { private final Double valueInW; private final Set<String> groups; - public ValueGroup(Double valueInW, Set<String> groups) { + public ValueGroup(final Double valueInW, final Set<String> groups) { this.valueInW = valueInW; this.groups = groups; } @@ -27,8 +27,8 @@ public class ValueGroup { @Override public String toString() { String groupString = "["; - for (String group: groups) { - groupString = groupString + group + "/"; + for (final String group: groups) { + groupString = groupString + group + "/";//NOPMD } return this.valueInW.toString() + ";" + groupString + "]"; } diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/ValueGroupSerializer.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/ValueGroupSerializer.java index fa72b3d7f..72d421a6c 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/ValueGroupSerializer.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/uc4specifics/ValueGroupSerializer.java @@ -11,7 +11,7 @@ import java.util.HashSet; public class ValueGroupSerializer implements StreamSerializer<ValueGroup> { private static final int TYPE_ID = 1; - + @Override public int getTypeId() { return TYPE_ID; @@ -19,13 +19,14 @@ public class ValueGroupSerializer implements StreamSerializer<ValueGroup> { @Override public void write(final ObjectDataOutput out, final ValueGroup key) throws IOException { - out.writeDouble(key.getValueInW()); - out.writeString(String.join(",", key.getGroups())); + out.writeDouble(key.getValueInW()); + out.writeString(String.join(",", key.getGroups())); } @Override public ValueGroup read(final ObjectDataInput in) throws IOException { - return new ValueGroup(in.readDouble(), new HashSet<>(Arrays.asList(in.readString().split(",")))); + return new ValueGroup(in.readDouble(), + new HashSet<>(Arrays.asList(in.readString().split(",")))); } } -- GitLab