diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java index 03f11cc74c775b1f00c48b573d661057fd7da550..c936ce918c10f3c500cdd26f7e057cd7b6c555b6 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java @@ -2,7 +2,6 @@ package theodolite.commons.beam; import java.util.HashMap; import java.util.Map; - import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.commons.configuration2.Configuration; diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/ConfigurationKeys.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/ConfigurationKeys.java index 1f7aca45c77536719c350199be8fec47919569d0..1e4dc593c627282f5c6735a4d91e963d83af6865 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/ConfigurationKeys.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/ConfigurationKeys.java @@ -7,8 +7,6 @@ public final class ConfigurationKeys { // Common keys public static final String APPLICATION_NAME = "application.name"; - public static final String APPLICATION_VERSION = "application.version"; - public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; @@ -31,8 +29,6 @@ public final class ConfigurationKeys { public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days"; // UC4 - public static final String EMIT_PERIOD_MS = "emit.period.ms"; - public static final String GRACE_PERIOD_MS = "grace.period.ms"; @@ -46,8 +42,6 @@ public final class ConfigurationKeys { public static final String TRIGGER_INTERVAL = "trigger.interval"; - - private ConfigurationKeys() { } diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerRecordReader.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerRecordReader.java index 07b52be5d0d8547828cfce8ae7668a85e4bd56f0..4af9053be989d34775e78d43a88e3d0d24cdf411 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerRecordReader.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerRecordReader.java @@ -24,9 +24,8 @@ public class KafkaActivePowerRecordReader extends /** * Instantiates a {@link PTransform} that reads from Kafka with the given Configuration. */ - @SuppressWarnings({"unchecked", "rawtypes"}) public KafkaActivePowerRecordReader(final String bootstrapServer, final String inputTopic, - final Map consumerConfig) { + final Map<String, Object> consumerConfig) { super(); // Check if boostrap server and inputTopic are defined diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerTimestampReader.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerTimestampReader.java index fdb15865d9a8aa015679e4a9e4966dbeb6b40df7..732afe9a0c1d4bdfea876025fceea0c5da1310fe 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerTimestampReader.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerTimestampReader.java @@ -25,9 +25,8 @@ public class KafkaActivePowerTimestampReader extends /** * Instantiates a {@link PTransform} that reads from Kafka with the given Configuration. */ - @SuppressWarnings({"unchecked", "rawtypes"}) public KafkaActivePowerTimestampReader(final String bootstrapServer, final String inputTopic, - final Map consumerConfig) { + final Map<String, Object> consumerConfig) { super(); // Check if boostrap server and inputTopic are defined diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaGenericReader.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaGenericReader.java index 3ec03cbcc56a022177af18ed48dff128b11ca098..83336b5a4c2451ef4bffefbd60ad9d52fccd9c17 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaGenericReader.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaGenericReader.java @@ -22,9 +22,8 @@ public class KafkaGenericReader<K, V> extends /** * Instantiates a {@link PTransform} that reads from Kafka with the given Configuration. */ - @SuppressWarnings({"unchecked", "rawtypes"}) public KafkaGenericReader(final String bootstrapServer, final String inputTopic, - final Map consumerConfig, + final Map<String, Object> consumerConfig, final Class<? extends org.apache.kafka.common.serialization.Deserializer<K>> keyDeserializer, diff --git a/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1BeamFlink.java b/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1BeamFlink.java index 3dc1a7568065deee1621dee9dc1a2606f3fa4ff3..fe58369b3c0c19351bcc5cde170df68946af7cbd 100644 --- a/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1BeamFlink.java +++ b/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1BeamFlink.java @@ -24,7 +24,6 @@ public final class Uc1BeamFlink extends AbstractBeamService { /** * Main method. */ - @SuppressWarnings({"unchecked", "rawtypes", "unused"}) public static void main(final String[] args) { // Create application via configurations diff --git a/theodolite-benchmarks/uc1-beam-samza/src/main/java/application/Uc1BeamSamza.java b/theodolite-benchmarks/uc1-beam-samza/src/main/java/application/Uc1BeamSamza.java index d7b8d5806282e97c67bf94c4e6bcc8cae27e8250..f3b12945144bac57a5c55b3bd9b1f754b833eb7c 100644 --- a/theodolite-benchmarks/uc1-beam-samza/src/main/java/application/Uc1BeamSamza.java +++ b/theodolite-benchmarks/uc1-beam-samza/src/main/java/application/Uc1BeamSamza.java @@ -26,7 +26,6 @@ public final class Uc1BeamSamza extends AbstractBeamService { /** * Main method. */ - @SuppressWarnings({"unchecked", "rawtypes", "unused"}) public static void main(final String[] args) { // Create application via configurations diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java index 024822080127a114955617502c44d0db46a6e366..46fa53756fb028c2bccf86d544bd8430c9ef12d6 100644 --- a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java @@ -34,7 +34,7 @@ public final class Uc1BeamPipeline extends AbstractPipeline { cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$)); // build KafkaConsumerConfig - final Map consumerConfig = buildConsumerConfig(); + final Map<String, Object> consumerConfig = buildConsumerConfig(); // Create Pipeline transformations final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka = diff --git a/theodolite-benchmarks/uc2-beam-flink/src/main/java/application/Uc2BeamFlink.java b/theodolite-benchmarks/uc2-beam-flink/src/main/java/application/Uc2BeamFlink.java index 05728cc19a560348082525814bceb4a733501cb8..e0a29ed513147b15e62c4e46336e792e7cf4de95 100644 --- a/theodolite-benchmarks/uc2-beam-flink/src/main/java/application/Uc2BeamFlink.java +++ b/theodolite-benchmarks/uc2-beam-flink/src/main/java/application/Uc2BeamFlink.java @@ -23,7 +23,6 @@ public final class Uc2BeamFlink extends AbstractBeamService { /** * Start running this microservice. */ - @SuppressWarnings({"serial", "unchecked", "rawtypes"}) public static void main(final String[] args) { final Uc2BeamFlink uc2BeamFlink = new Uc2BeamFlink(args); diff --git a/theodolite-benchmarks/uc2-beam-samza/src/main/java/application/Uc2BeamSamza.java b/theodolite-benchmarks/uc2-beam-samza/src/main/java/application/Uc2BeamSamza.java index aa18729a8ec4e3bb080c6e09366818c4b497d621..d4b3d6d910824a718bffe8dc5f0204d53b9865c1 100644 --- a/theodolite-benchmarks/uc2-beam-samza/src/main/java/application/Uc2BeamSamza.java +++ b/theodolite-benchmarks/uc2-beam-samza/src/main/java/application/Uc2BeamSamza.java @@ -27,7 +27,6 @@ public final class Uc2BeamSamza extends AbstractBeamService { /** * Start running this microservice. */ - @SuppressWarnings({"serial", "unchecked", "rawtypes"}) public static void main(final String[] args) { final Uc2BeamSamza uc2BeamSamza = new Uc2BeamSamza(args); diff --git a/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java b/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java index f518ba8a794d33d7b5569d4a648eeadbc47a1e7b..3b43dc47aaf6e3f9937aa87fca7fc1895c8fef84 100644 --- a/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java +++ b/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java @@ -17,7 +17,6 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.POutput; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.common.serialization.StringSerializer; import org.joda.time.Duration; @@ -38,7 +37,7 @@ import titan.ccp.model.records.ActivePowerRecord; */ public final class Uc2BeamPipeline extends AbstractPipeline { - protected Uc2BeamPipeline(final PipelineOptions options,final Configuration config) { + protected Uc2BeamPipeline(final PipelineOptions options, final Configuration config) { super(options, config); // Additional needed variables final String outputTopic = config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); @@ -48,7 +47,7 @@ public final class Uc2BeamPipeline extends AbstractPipeline { final Duration duration = Duration.standardMinutes(windowDurationMinutes); // Build kafka configuration - final Map consumerConfig = buildConsumerConfig(); + final Map<String, Object> consumerConfig = buildConsumerConfig(); // Set Coders for Classes that will be distributed final CoderRegistry cr = this.getCoderRegistry(); @@ -67,8 +66,8 @@ public final class Uc2BeamPipeline extends AbstractPipeline { final StatsToString statsToString = new StatsToString(); // Write to Kafka - final PTransform<PCollection<KV<String, String>>, POutput> kafkaWriter = - new KafkaWriterTransformation(bootstrapServer, outputTopic, StringSerializer.class); + final KafkaWriterTransformation<String> kafkaWriter = + new KafkaWriterTransformation<>(bootstrapServer, outputTopic, StringSerializer.class); // Apply pipeline transformations this.apply(kafkaActivePowerRecordReader) diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java b/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java index eda0d8161750d49998b66640546b0e24ccb6256b..7424a19fecef5a7b86d273a223c6f3f7a2562db9 100644 --- a/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java +++ b/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java @@ -10,14 +10,11 @@ import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.SlidingWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.POutput; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.common.serialization.StringSerializer; import org.joda.time.Duration; @@ -63,7 +60,6 @@ public final class Uc3BeamPipeline extends AbstractPipeline { registerCoders(cr); // Read from Kafka - @SuppressWarnings({"rawtypes", "unchecked"}) final KafkaActivePowerTimestampReader kafka = new KafkaActivePowerTimestampReader(bootstrapServer, inputTopic, consumerConfig); @@ -74,9 +70,8 @@ public final class Uc3BeamPipeline extends AbstractPipeline { final HourOfDayWithStats hourOfDayWithStats = new HourOfDayWithStats(); // Write to Kafka - @SuppressWarnings({"rawtypes", "unchecked"}) - final PTransform<PCollection<KV<String, String>>, POutput> kafkaWriter = - new KafkaWriterTransformation(bootstrapServer, outputTopic, StringSerializer.class); + final KafkaWriterTransformation<String> kafkaWriter = + new KafkaWriterTransformation<>(bootstrapServer, outputTopic, StringSerializer.class); this.apply(kafka) // Map to correct time format diff --git a/theodolite-benchmarks/uc4-beam-flink/src/main/java/application/Uc4BeamFlink.java b/theodolite-benchmarks/uc4-beam-flink/src/main/java/application/Uc4BeamFlink.java index ff62bd1aebf8b2f7150e3495dd864286a27eb0aa..90f9a4a292e99526fa94c7dd512bdcec548fbb4f 100644 --- a/theodolite-benchmarks/uc4-beam-flink/src/main/java/application/Uc4BeamFlink.java +++ b/theodolite-benchmarks/uc4-beam-flink/src/main/java/application/Uc4BeamFlink.java @@ -22,7 +22,6 @@ public final class Uc4BeamFlink extends AbstractBeamService { /** * Start running this microservice. */ - @SuppressWarnings({"serial", "unchecked", "rawtypes"}) public static void main(final String[] args) { final Uc4BeamFlink uc4BeamFlink = new Uc4BeamFlink(args); diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4BeamSamza.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4BeamSamza.java index 090334459e00495f7f65c4fbbb367e0b3653f269..3894fa95f16253e0a165dde70bf25d4a4bee96cb 100644 --- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4BeamSamza.java +++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4BeamSamza.java @@ -28,7 +28,6 @@ public final class Uc4BeamSamza extends AbstractBeamService { /** * Start running this microservice. */ - @SuppressWarnings({"serial", "unchecked", "rawtypes"}) public static void main(final String[] args) { final Uc4BeamSamza uc4BeamSamza = new Uc4BeamSamza(args);