From be3849dce63c5e7962379b0feb41146354de72f0 Mon Sep 17 00:00:00 2001 From: lorenz <stu203404@mail.uni-kiel.de> Date: Sun, 28 Nov 2021 16:34:32 +0100 Subject: [PATCH] Clean up uc4-beam --- .../main/java/application/Uc4BeamSamza.java | 1 - .../java/application/AggregatedToActive.java | 2 ++ .../main/java/application/FilterEvents.java | 6 ++-- .../java/application/FilterNullValues.java | 6 +++- .../java/application/RecordAggregation.java | 4 +-- .../java/application/SetIdForAggregated.java | 5 +++ .../main/java/application/SetKeyToGroup.java | 5 +++ .../java/application/Uc4BeamPipeline.java | 31 +++++++++++++++---- 8 files changed, 48 insertions(+), 12 deletions(-) diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4BeamSamza.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4BeamSamza.java index 41d5bacd8..090334459 100644 --- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4BeamSamza.java +++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4BeamSamza.java @@ -1,6 +1,5 @@ package application; - import org.apache.beam.runners.samza.SamzaRunner; import org.apache.beam.sdk.Pipeline; import theodolite.commons.beam.AbstractBeamService; diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/AggregatedToActive.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/AggregatedToActive.java index f1f3e82e7..dbf32a5a4 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/application/AggregatedToActive.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/AggregatedToActive.java @@ -11,6 +11,8 @@ import titan.ccp.model.records.AggregatedActivePowerRecord; public class AggregatedToActive extends SimpleFunction<KV<String, AggregatedActivePowerRecord>, KV<String, ActivePowerRecord>> { + private static final long serialVersionUID = -8275252527964065889L; + @Override public KV<String, ActivePowerRecord> apply( final KV<String, AggregatedActivePowerRecord> kv) { diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/FilterEvents.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/FilterEvents.java index 7c2143526..d5b9b5c1b 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/application/FilterEvents.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/FilterEvents.java @@ -1,11 +1,13 @@ package application; -import org.apache.beam.sdk.transforms.ProcessFunction; import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.KV; import titan.ccp.configuration.events.Event; +/** + * Filters for {@code Event.SENSOR_REGISTRY_CHANGED} and + * {@code Event.SENSOR_REGISTRY_STATUS} events. + */ public class FilterEvents implements SerializableFunction<KV<Event, String>, Boolean> { private static final long serialVersionUID = -2233447357614891559L; diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/FilterNullValues.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/FilterNullValues.java index 8b73b43aa..143294f1f 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/application/FilterNullValues.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/FilterNullValues.java @@ -4,7 +4,11 @@ import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.KV; import titan.ccp.model.records.ActivePowerRecord; -public class FilterNullValues implements SerializableFunction<KV<SensorParentKey, ActivePowerRecord>, Boolean> { +/** + * Filters {@code null} Values. + */ +public class FilterNullValues implements + SerializableFunction<KV<SensorParentKey, ActivePowerRecord>, Boolean> { private static final long serialVersionUID = -6197352369880867482L; @Override diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/RecordAggregation.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/RecordAggregation.java index f3b9f95b5..b2ddf296a 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/application/RecordAggregation.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/RecordAggregation.java @@ -24,9 +24,9 @@ public class RecordAggregation @DefaultCoder(AvroCoder.class) public static class Accum implements Serializable { private static final long serialVersionUID = 3701311203919534376L; - private long count = 0; + private long count; private Double sum = 0.0; - private long timestamp = 0; + private long timestamp; } @Override diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/SetIdForAggregated.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/SetIdForAggregated.java index 5191ffaf0..58518eefb 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/application/SetIdForAggregated.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/SetIdForAggregated.java @@ -4,8 +4,13 @@ import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.KV; import titan.ccp.model.records.AggregatedActivePowerRecord; +/** + * Sets the identifier for new {@link AggregatedActivePowerRecord}. + */ public class SetIdForAggregated extends SimpleFunction<KV<String, AggregatedActivePowerRecord>, KV<String, AggregatedActivePowerRecord>> { + private static final long serialVersionUID = 2148522605294086982L; + @Override public KV<String, AggregatedActivePowerRecord> apply( final KV<String, AggregatedActivePowerRecord> kv) { diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/SetKeyToGroup.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/SetKeyToGroup.java index d6016e298..c44028eb6 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/application/SetKeyToGroup.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/SetKeyToGroup.java @@ -4,9 +4,14 @@ import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.KV; import titan.ccp.model.records.ActivePowerRecord; +/** + * Set the Key for a group of {@code ActivePowerRecords} to their Parent. + */ public class SetKeyToGroup extends SimpleFunction<KV<SensorParentKey, ActivePowerRecord>, KV<String, ActivePowerRecord>> { + private static final long serialVersionUID = 790215050768527L; + @Override public KV<String, ActivePowerRecord> apply( final KV<SensorParentKey, ActivePowerRecord> kv) { diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java index a8523220d..e93f0da22 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java @@ -10,14 +10,33 @@ import org.apache.beam.sdk.coders.SetCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.*; -import org.apache.beam.sdk.transforms.windowing.*; -import org.apache.beam.sdk.values.*; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Filter; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.Latest; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.PCollectionView; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.joda.time.Duration; -import serialization.*; +import serialization.AggregatedActivePowerRecordCoder; +import serialization.AggregatedActivePowerRecordDeserializer; +import serialization.AggregatedActivePowerRecordSerializer; +import serialization.EventCoder; +import serialization.EventDeserializer; +import serialization.SensorParentKeyCoder; import theodolite.commons.beam.AbstractPipeline; import theodolite.commons.beam.ConfigurationKeys; import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader; @@ -208,8 +227,8 @@ public final class Uc4BeamPipeline extends AbstractPipeline { * * @return the build configuration. */ - public HashMap<String, Object> configurationConfig(final Configuration config) { - final HashMap<String, Object> consumerConfig = new HashMap<>(); + public Map<String, Object> configurationConfig(final Configuration config) { + final Map<String, Object> consumerConfig = new HashMap<>(); consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG)); consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, -- GitLab