diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java index 56e1239f6ebfde69a9482cf50331acea1ad300d5..03f11cc74c775b1f00c48b573d661057fd7da550 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java @@ -1,6 +1,8 @@ package theodolite.commons.beam; import java.util.HashMap; +import java.util.Map; + import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.commons.configuration2.Configuration; @@ -11,11 +13,10 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; */ public class AbstractPipeline extends Pipeline { - // Application Configurations - private final Configuration config; - protected final String inputTopic; protected final String bootstrapServer; + // Application Configurations + private final Configuration config; protected AbstractPipeline(final PipelineOptions options, final Configuration config) { super(options); @@ -30,8 +31,8 @@ public class AbstractPipeline extends Pipeline { * * @return the build configuration. */ - public HashMap<String, Object> buildConsumerConfig() { - final HashMap<String, Object> consumerConfig = new HashMap<>(); + public Map<String, Object> buildConsumerConfig() { + final Map<String, Object> consumerConfig = new HashMap<>(); consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG)); consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerRecordReader.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerRecordReader.java index 56f5a681097046e94c8a77427e7864c9edfcdede..07b52be5d0d8547828cfce8ae7668a85e4bd56f0 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerRecordReader.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerRecordReader.java @@ -1,9 +1,7 @@ package theodolite.commons.beam.kafka; import io.confluent.kafka.serializers.KafkaAvroDeserializer; - -import java.util.HashMap; - +import java.util.Map; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.transforms.PTransform; @@ -28,7 +26,7 @@ public class KafkaActivePowerRecordReader extends */ @SuppressWarnings({"unchecked", "rawtypes"}) public KafkaActivePowerRecordReader(final String bootstrapServer, final String inputTopic, - final HashMap consumerConfig) { + final Map consumerConfig) { super(); // Check if boostrap server and inputTopic are defined diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerTimestampReader.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerTimestampReader.java index 0b22745bfc7b3d82e1ae0f6fd930d98ed6e98665..fdb15865d9a8aa015679e4a9e4966dbeb6b40df7 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerTimestampReader.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerTimestampReader.java @@ -1,6 +1,7 @@ package theodolite.commons.beam.kafka; import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import java.util.Map; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.transforms.PTransform; @@ -10,9 +11,6 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.kafka.common.serialization.StringDeserializer; import titan.ccp.model.records.ActivePowerRecord; -import java.util.HashMap; -import java.util.Properties; - /** * Simple {@link PTransform} that read from Kafka using {@link KafkaIO}. * Has additional a TimestampPolicy. @@ -29,7 +27,7 @@ public class KafkaActivePowerTimestampReader extends */ @SuppressWarnings({"unchecked", "rawtypes"}) public KafkaActivePowerTimestampReader(final String bootstrapServer, final String inputTopic, - final HashMap consumerConfig) { + final Map consumerConfig) { super(); // Check if boostrap server and inputTopic are defined diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java index b67b76b742a897c39bb50a82c00e00ffced8c6f7..992eda1612a2e3a574c6c37f7c0c6de934291348 100644 --- a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java @@ -1,6 +1,7 @@ package application; import java.util.HashMap; +import java.util.Map; import java.util.Properties; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.CoderRegistry; @@ -35,7 +36,7 @@ public final class Uc1BeamPipeline extends AbstractPipeline { cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$)); // build KafkaConsumerConfig - final HashMap consumerConfig = buildConsumerConfig(); + final Map consumerConfig = buildConsumerConfig(); // Create Pipeline transformations final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka = diff --git a/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java b/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java index 2971390c627b8c8765cee486860427c335acf7f2..a0ba5cb088c74d20c4b733d3ac2537e3fad2b127 100644 --- a/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java +++ b/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java @@ -4,6 +4,7 @@ import com.google.common.math.Stats; import com.google.common.math.StatsAccumulator; import java.util.HashMap; +import java.util.Map; import java.util.Properties; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.CoderRegistry; @@ -50,7 +51,7 @@ public final class Uc2BeamPipeline extends AbstractPipeline { final Duration duration = Duration.standardMinutes(windowDurationMinutes); // Build kafka configuration - final HashMap consumerConfig = buildConsumerConfig(); + final Map consumerConfig = buildConsumerConfig(); // Set Coders for Classes that will be distributed final CoderRegistry cr = this.getCoderRegistry(); diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java b/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java index 6964d23d84fcc1d0f836842d8d4c146352b10e90..088664aef8545eecf849d53f4c96dc04f8258631 100644 --- a/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java +++ b/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java @@ -4,6 +4,7 @@ import com.google.common.math.Stats; import com.google.common.math.StatsAccumulator; import java.util.HashMap; +import java.util.Map; import java.util.Properties; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.CoderRegistry; @@ -58,7 +59,7 @@ public final class Uc3BeamPipeline extends AbstractPipeline { final Duration triggerDelay = Duration.standardSeconds(triggerInterval); // Build kafka configuration - final HashMap consumerConfig = buildConsumerConfig(); + final Map consumerConfig = buildConsumerConfig(); // Set Coders for Classes that will be distributed final CoderRegistry cr = this.getCoderRegistry(); diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java index cf3eb22d4c712922ce716cd1180f085e6f5e9c51..a8523220dc61d3a3e8dffbfbfe0de8069dfd4cb5 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java @@ -29,7 +29,7 @@ import titan.ccp.model.records.AggregatedActivePowerRecord; /** * Implementation of the use case Database Storage using Apache Beam with the Flink Runner. To * execute locally in standalone start Kafka, Zookeeper, the schema-registry and the workload - * generator using the delayed_startup.sh script. Start a Flink cluster and pass its REST adress + * generator using the delayed_startup.sh script. Start a Flink cluster and pass its REST address * using--flinkMaster as run parameter. To persist logs add * ${workspace_loc:/uc1-application-samza/eclipseConsoleLogs.log} as Output File under Standard * Input Output in Common in the Run Configuration Start via Eclipse Run. @@ -57,8 +57,8 @@ public final class Uc4BeamPipeline extends AbstractPipeline { final Duration gracePeriod = Duration.standardSeconds(grace); // Build kafka configuration - final HashMap<String, Object> consumerConfig = buildConsumerConfig(); - final HashMap<String, Object> configurationConfig = configurationConfig(config); + final Map<String, Object> consumerConfig = buildConsumerConfig(); + final Map<String, Object> configurationConfig = configurationConfig(config); // Set Coders for Classes that will be distributed final CoderRegistry cr = this.getCoderRegistry(); @@ -106,7 +106,7 @@ public final class Uc4BeamPipeline extends AbstractPipeline { (tp, previousWaterMark) -> new AggregatedActivePowerRecordEventTimePolicy( previousWaterMark)) .withoutMetadata()) - .apply("Apply Winddows", Window.into(FixedWindows.of(duration))) + .apply("Apply Windows", Window.into(FixedWindows.of(duration))) // Convert into the correct data format .apply("Convert AggregatedActivePowerRecord to ActivePowerRecord", MapElements.via(aggregatedToActive)) @@ -162,13 +162,14 @@ public final class Uc4BeamPipeline extends AbstractPipeline { .accumulatingFiredPanes()) .apply(View.asMap()); - FilterNullValues filterNullValues = new FilterNullValues(); + final FilterNullValues filterNullValues = new FilterNullValues(); // Build pairs of every sensor reading and parent final PCollection<KV<SensorParentKey, ActivePowerRecord>> flatMappedValues = inputCollection.apply( "Duplicate as flatMap", - ParDo.of(new DuplicateAsFlatMap(childParentPairMap)).withSideInputs(childParentPairMap)) + ParDo.of(new DuplicateAsFlatMap(childParentPairMap)) + .withSideInputs(childParentPairMap)) .apply("Filter only latest changes", Latest.perKey()) .apply("Filter out null values", Filter.by(filterNullValues));