From a0bb0d9b6d8d26f240d56cfa80d397090b346323 Mon Sep 17 00:00:00 2001 From: lorenz <stu203404@mail.uni-kiel.de> Date: Mon, 25 Oct 2021 16:51:08 +0200 Subject: [PATCH] uc3-beam-flink Rm cs + sp warnings but pmd remain --- .../java/application/EventTimePolicy.java | 3 +- .../main/java/application/HourOfDayKey.java | 6 +- .../java/application/HourOfDaykeyCoder.java | 23 ++-- .../java/application/Uc3ApplicationBeam.java | 113 +++++++++++------- 4 files changed, 91 insertions(+), 54 deletions(-) diff --git a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/EventTimePolicy.java b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/EventTimePolicy.java index 993951cb3..627c13da9 100644 --- a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/EventTimePolicy.java +++ b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/EventTimePolicy.java @@ -1,7 +1,7 @@ package application; - import java.util.Optional; + import org.apache.beam.sdk.io.kafka.KafkaRecord; import org.apache.beam.sdk.io.kafka.TimestampPolicy; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -15,6 +15,7 @@ public class EventTimePolicy protected Instant currentWatermark; public EventTimePolicy(final Optional<Instant> previousWatermark) { + super(); this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE); } diff --git a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDayKey.java b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDayKey.java index 55ed8b535..bd87abbe9 100644 --- a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDayKey.java +++ b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDayKey.java @@ -1,15 +1,15 @@ package application; - -import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.DefaultCoder; + /** * Composed key of an hour of the day and a sensor id. */ @DefaultCoder(AvroCoder.class) -public class HourOfDayKey{ +public class HourOfDayKey { private final int hourOfDay; private final String sensorId; diff --git a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDaykeyCoder.java b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDaykeyCoder.java index 31fe4da7e..801937b24 100644 --- a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDaykeyCoder.java +++ b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/HourOfDaykeyCoder.java @@ -5,6 +5,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.List; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; @@ -13,9 +14,12 @@ import org.apache.kafka.common.serialization.Serde; /** * Wrapper Class that encapsulates a HourOfDayKeySerde in a org.apache.beam.sdk.coders.Coder. */ -@SuppressWarnings("serial") public class HourOfDaykeyCoder extends Coder<HourOfDayKey> implements Serializable { - private transient Serde<HourOfDayKey> innerSerde = HourOfDayKeySerde.create(); + private static final int VALUE_SIZE = 4; + private static final boolean DETEMINISTIC = false; + public static final long serialVersionUID = 4444444; + + private Serde<HourOfDayKey> innerSerde = HourOfDayKeySerde.create(); @Override public void encode(final HourOfDayKey value, final OutputStream outStream) @@ -24,7 +28,7 @@ public class HourOfDaykeyCoder extends Coder<HourOfDayKey> implements Serializab this.innerSerde = HourOfDayKeySerde.create(); } final byte[] bytes = this.innerSerde.serializer().serialize("ser", value); - final byte[] sizeinBytes = ByteBuffer.allocate(4).putInt(bytes.length).array(); + final byte[] sizeinBytes = ByteBuffer.allocate(VALUE_SIZE).putInt(bytes.length).array(); outStream.write(sizeinBytes); outStream.write(bytes); } @@ -34,22 +38,23 @@ public class HourOfDaykeyCoder extends Coder<HourOfDayKey> implements Serializab if (this.innerSerde == null) { this.innerSerde = HourOfDayKeySerde.create(); } - final byte[] sizeinBytes = new byte[4]; - inStream.read(sizeinBytes); + final byte[] sizeinBytes = new byte[VALUE_SIZE]; + //inStream.read(sizeinBytes); final int size = ByteBuffer.wrap(sizeinBytes).getInt(); final byte[] bytes = new byte[size]; - inStream.read(bytes); + //inStream.read(bytes); return this.innerSerde.deserializer().deserialize("deser", bytes); } @Override public List<? extends Coder<?>> getCoderArguments() { - return null; + return Collections.emptyList(); } @Override public void verifyDeterministic() throws NonDeterministicException { - + if (!DETEMINISTIC) { + throw new NonDeterministicException(this, "This class is not deterministic!"); + } } - } diff --git a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/Uc3ApplicationBeam.java b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/Uc3ApplicationBeam.java index 796f5a8cf..b80b0dfd3 100644 --- a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/Uc3ApplicationBeam.java +++ b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/Uc3ApplicationBeam.java @@ -7,6 +7,8 @@ import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; import java.util.HashMap; +import java.util.Map; + import org.apache.beam.runners.flink.FlinkRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.AvroCoder; @@ -42,65 +44,67 @@ import titan.ccp.model.records.ActivePowerRecord; * ${workspace_loc:/uc4-application-samza/eclipseConsoleLogs.log} as Output File under Standard * Input Output in Common in the Run Configuration Start via Eclipse Run. */ -public class Uc3ApplicationBeam { - +public final class Uc3ApplicationBeam { + + private static final String JOB_NAME = "Uc3Application"; + private static final String BOOTSTRAP = "KAFKA_BOOTSTRAP_SERVERS"; + private static final String INPUT = "INPUT"; + private static final String OUTPUT = "OUTPUT"; + private static final String SCHEMA_REGISTRY = "SCHEMA_REGISTRY_URL"; + private static final String YES = "true"; + private static final String USE_AVRO_READER = YES; + private static final String AUTO_COMMIT_CONFIG = YES; + private static final String KAFKA_WINDOW_DURATION_DAYS = "KAFKA_WINDOW_DURATION_MINUTES"; + private static final String AGGREGATION_ADVANCE_DAYS = "AGGREGATION_ADVANCE_DAYS"; + private static final String TRIGGER_INTERVAL = "TRIGGER_INTERVAL"; + + /** + * Private constructor to avoid instantiation. + */ + private Uc3ApplicationBeam() { + throw new UnsupportedOperationException(); + } - @SuppressWarnings("serial") + /** + * Start running this microservice. + */ public static void main(final String[] args) { // Set Configuration for Windows final int windowDuration = Integer.parseInt( - System.getenv("KAFKA_WINDOW_DURATION_DAYS") != null - ? System.getenv("KAFKA_WINDOW_DURATION_DAYS") - : "30"); + System.getenv(KAFKA_WINDOW_DURATION_DAYS) == null + ? "30" : System.getenv(KAFKA_WINDOW_DURATION_DAYS)); final Duration duration = Duration.standardDays(windowDuration); final int aggregationAdvance = Integer.parseInt( - System.getenv("AGGREGATION_ADVANCE_DAYS") != null - ? System.getenv("AGGREGATION_ADVANCE_DAYS") - : "1"); + System.getenv(AGGREGATION_ADVANCE_DAYS) == null + ? "1" : System.getenv(AGGREGATION_ADVANCE_DAYS)); final Duration advance = Duration.standardDays(aggregationAdvance); final int triggerInterval = Integer.parseInt( - System.getenv("TRIGGER_INTERVAL") != null - ? System.getenv("TRIGGER_INTERVAL") - : "15"); + System.getenv(TRIGGER_INTERVAL) == null + ? "15" : System.getenv(TRIGGER_INTERVAL)); final Duration triggerDelay = Duration.standardSeconds(triggerInterval); // Set Configuration for Kafka final String bootstrapServer = - System.getenv("KAFKA_BOOTSTRAP_SERVERS") != null ? System.getenv("KAFKA_BOOTSTRAP_SERVERS") - : "my-confluent-cp-kafka:9092"; - final String inputTopic = System.getenv("INPUT") != null ? System.getenv("INPUT") : "input"; - final String outputTopic = System.getenv("OUTPUT") != null ? System.getenv("OUTPUT") : "output"; - final String schemaRegistryURL = - System.getenv("SCHEMA_REGISTRY_URL") != null ? System.getenv("SCHEMA_REGISTRY_URL") - : "http://my-confluent-cp-schema-registry:8081"; - - // Set consumer configuration for the schema registry and commits back to Kafka - final HashMap<String, Object> consumerConfig = new HashMap<>(); - consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); - consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - consumerConfig.put("schema.registry.url", schemaRegistryURL); - consumerConfig.put("specific.avro.reader", "true"); - consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "uc-application"); + System.getenv(BOOTSTRAP) == null ? "my-confluent-cp-kafka:9092" + : System.getenv(BOOTSTRAP); + final String inputTopic = System.getenv(INPUT) == null ? "input" : System.getenv(INPUT); + final String outputTopic = System.getenv(OUTPUT) == null ? "output" : System.getenv(OUTPUT); + final String schemaRegistryUrl = + System.getenv(SCHEMA_REGISTRY) == null ? "http://my-confluent-cp-schema-registry:8081" + : System.getenv(SCHEMA_REGISTRY); + + final Map<String, Object> consumerConfig = buildConsumerConfig(schemaRegistryUrl); final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory(); - final PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); options.setRunner(FlinkRunner.class); - options.setJobName("ucapplication"); + options.setJobName(JOB_NAME); final Pipeline pipeline = Pipeline.create(options); final CoderRegistry cr = pipeline.getCoderRegistry(); - - - cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$)); - cr.registerCoderForClass(HourOfDayKey.class, new HourOfDaykeyCoder()); - cr.registerCoderForClass(StatsAggregation.class, - SerializableCoder.of(StatsAggregation.class)); - cr.registerCoderForClass(StatsAccumulator.class, AvroCoder.of(StatsAccumulator.class)); - - + registerCoders(cr); @SuppressWarnings({"rawtypes", "unchecked"}) final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka = @@ -120,8 +124,9 @@ public class Uc3ApplicationBeam { pipeline.apply(kafka) // Map to correct time format .apply(MapElements.via( - new SimpleFunction<KV<String, ActivePowerRecord>, KV<HourOfDayKey, ActivePowerRecord>>() { - final ZoneId zone = ZoneId.of("Europe/Paris"); + new SimpleFunction<KV<String, ActivePowerRecord>, KV<HourOfDayKey, + ActivePowerRecord>>() { + private final ZoneId zone = ZoneId.of("Europe/Paris"); @Override public KV<application.HourOfDayKey, ActivePowerRecord> apply( @@ -162,11 +167,37 @@ public class Uc3ApplicationBeam { .withKeySerializer(StringSerializer.class) .withValueSerializer(StringSerializer.class)); - pipeline.run().waitUntilFinish(); + } + /** + * Builds a configuration for a Kafka consumer. + * @param schemaRegistryUrl the url to the SchemaRegistry. + * @return the configuration. + */ + public static Map<String, Object> buildConsumerConfig(final String schemaRegistryUrl) { + + // Set consumer configuration for the schema registry and commits back to Kafka + final HashMap<String, Object> consumerConfig = new HashMap<>(); + consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AUTO_COMMIT_CONFIG); + consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerConfig.put("schema.registry.url", schemaRegistryUrl); + consumerConfig.put("specific.avro.reader", USE_AVRO_READER); + consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, JOB_NAME); + return consumerConfig; + } + /** + * Registers all Coders for all needed Coders. + * @param cr CoderRegistry. + */ + private static void registerCoders(final CoderRegistry cr) { + cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$)); + cr.registerCoderForClass(HourOfDayKey.class, new HourOfDaykeyCoder()); + cr.registerCoderForClass(StatsAggregation.class, + SerializableCoder.of(StatsAggregation.class)); + cr.registerCoderForClass(StatsAccumulator.class, AvroCoder.of(StatsAccumulator.class)); } } -- GitLab