diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4BeamSamza.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4BeamSamza.java index 41d5bacd85135e58740423f6cb19d0f0c136b493..090334459e00495f7f65c4fbbb367e0b3653f269 100644 --- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4BeamSamza.java +++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4BeamSamza.java @@ -1,6 +1,5 @@ package application; - import org.apache.beam.runners.samza.SamzaRunner; import org.apache.beam.sdk.Pipeline; import theodolite.commons.beam.AbstractBeamService; diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/AggregatedToActive.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/AggregatedToActive.java index f1f3e82e720ca11c5e376254459004a9d5d6b576..dbf32a5a49209b929cb24997110d4a52a916fdba 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/application/AggregatedToActive.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/AggregatedToActive.java @@ -11,6 +11,8 @@ import titan.ccp.model.records.AggregatedActivePowerRecord; public class AggregatedToActive extends SimpleFunction<KV<String, AggregatedActivePowerRecord>, KV<String, ActivePowerRecord>> { + private static final long serialVersionUID = -8275252527964065889L; + @Override public KV<String, ActivePowerRecord> apply( final KV<String, AggregatedActivePowerRecord> kv) { diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/FilterEvents.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/FilterEvents.java index 7c21435264629b8d1360aaf62f7308ddcb9381d7..d5b9b5c1bcc878f35b36936d74673a7c268770a1 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/application/FilterEvents.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/FilterEvents.java @@ -1,11 +1,13 @@ package application; -import org.apache.beam.sdk.transforms.ProcessFunction; import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.KV; import titan.ccp.configuration.events.Event; +/** + * Filters for {@code Event.SENSOR_REGISTRY_CHANGED} and + * {@code Event.SENSOR_REGISTRY_STATUS} events. + */ public class FilterEvents implements SerializableFunction<KV<Event, String>, Boolean> { private static final long serialVersionUID = -2233447357614891559L; diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/FilterNullValues.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/FilterNullValues.java index 8b73b43aa013b43fde738d52a5ded2f7b1c857ce..143294f1ff2bfeea77c40ce38cd10ce3eb44be49 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/application/FilterNullValues.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/FilterNullValues.java @@ -4,7 +4,11 @@ import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.KV; import titan.ccp.model.records.ActivePowerRecord; -public class FilterNullValues implements SerializableFunction<KV<SensorParentKey, ActivePowerRecord>, Boolean> { +/** + * Filters {@code null} Values. + */ +public class FilterNullValues implements + SerializableFunction<KV<SensorParentKey, ActivePowerRecord>, Boolean> { private static final long serialVersionUID = -6197352369880867482L; @Override diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/RecordAggregation.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/RecordAggregation.java index f3b9f95b5fc8897230afc1924d07c547301c1eb5..b2ddf296acfe8d98b4a96ad8f1d23263d5b3f259 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/application/RecordAggregation.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/RecordAggregation.java @@ -24,9 +24,9 @@ public class RecordAggregation @DefaultCoder(AvroCoder.class) public static class Accum implements Serializable { private static final long serialVersionUID = 3701311203919534376L; - private long count = 0; + private long count; private Double sum = 0.0; - private long timestamp = 0; + private long timestamp; } @Override diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/SetIdForAggregated.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/SetIdForAggregated.java index 5191ffaf0bc373840079eca0599770857c4e7462..58518eefb4b3a321e9fcd33db45eb8e3ab562ff0 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/application/SetIdForAggregated.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/SetIdForAggregated.java @@ -4,8 +4,13 @@ import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.KV; import titan.ccp.model.records.AggregatedActivePowerRecord; +/** + * Sets the identifier for new {@link AggregatedActivePowerRecord}. + */ public class SetIdForAggregated extends SimpleFunction<KV<String, AggregatedActivePowerRecord>, KV<String, AggregatedActivePowerRecord>> { + private static final long serialVersionUID = 2148522605294086982L; + @Override public KV<String, AggregatedActivePowerRecord> apply( final KV<String, AggregatedActivePowerRecord> kv) { diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/SetKeyToGroup.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/SetKeyToGroup.java index d6016e29874afb36e164d5e3a915345f350f4e46..c44028eb6c793573f7ceb8187759b716aafe17be 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/application/SetKeyToGroup.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/SetKeyToGroup.java @@ -4,9 +4,14 @@ import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.KV; import titan.ccp.model.records.ActivePowerRecord; +/** + * Set the Key for a group of {@code ActivePowerRecords} to their Parent. + */ public class SetKeyToGroup extends SimpleFunction<KV<SensorParentKey, ActivePowerRecord>, KV<String, ActivePowerRecord>> { + private static final long serialVersionUID = 790215050768527L; + @Override public KV<String, ActivePowerRecord> apply( final KV<SensorParentKey, ActivePowerRecord> kv) { diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java index a8523220dc61d3a3e8dffbfbfe0de8069dfd4cb5..e93f0da2271b894f65adc03803aa7448a0d0a3cc 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java @@ -10,14 +10,33 @@ import org.apache.beam.sdk.coders.SetCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.*; -import org.apache.beam.sdk.transforms.windowing.*; -import org.apache.beam.sdk.values.*; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Filter; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.Latest; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.PCollectionView; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.joda.time.Duration; -import serialization.*; +import serialization.AggregatedActivePowerRecordCoder; +import serialization.AggregatedActivePowerRecordDeserializer; +import serialization.AggregatedActivePowerRecordSerializer; +import serialization.EventCoder; +import serialization.EventDeserializer; +import serialization.SensorParentKeyCoder; import theodolite.commons.beam.AbstractPipeline; import theodolite.commons.beam.ConfigurationKeys; import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader; @@ -208,8 +227,8 @@ public final class Uc4BeamPipeline extends AbstractPipeline { * * @return the build configuration. */ - public HashMap<String, Object> configurationConfig(final Configuration config) { - final HashMap<String, Object> consumerConfig = new HashMap<>(); + public Map<String, Object> configurationConfig(final Configuration config) { + final Map<String, Object> consumerConfig = new HashMap<>(); consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG)); consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,