diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/HourOfDayKey.java b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/HourOfDayKey.java index 9a18ae48820a8039912f5591b10816688ef7ce21..bde85327cfa8daa82bd3b29db82a96b06e1621f4 100644 --- a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/HourOfDayKey.java +++ b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/HourOfDayKey.java @@ -1,13 +1,8 @@ package rocks.theodolite.benchmarks.uc3.beam; -import org.apache.beam.sdk.coders.AvroCoder; -import org.apache.beam.sdk.coders.DefaultCoder; - - /** * Composed key of an hour of the day and a sensor id. */ -@DefaultCoder(AvroCoder.class) public class HourOfDayKey { private final int hourOfDay; diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/HourOfDaykeyCoder.java b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/HourOfDayKeyCoder.java similarity index 92% rename from theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/HourOfDaykeyCoder.java rename to theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/HourOfDayKeyCoder.java index dd84010d16cb9c3882452f6fc5e8851e9f95a334..4189761c05981815290c5d2779ad78e94fcb51c0 100644 --- a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/HourOfDaykeyCoder.java +++ b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/HourOfDayKeyCoder.java @@ -12,9 +12,9 @@ import org.apache.beam.sdk.coders.CoderException; import org.apache.kafka.common.serialization.Serde; /** - * Wrapper Class that encapsulates a HourOfDayKeySerde in a org.apache.beam.sdk.coders.Coder. + * Wrapper Class that encapsulates a {@link HourOfDayKeySerde} in a {@link Coder}. */ -public class HourOfDaykeyCoder extends Coder<HourOfDayKey> implements Serializable { +public class HourOfDayKeyCoder extends Coder<HourOfDayKey> implements Serializable { public static final long serialVersionUID = 4444444; private static final boolean DETERMINISTIC = true; private static final int VALUE_SIZE = 4; diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/MapTimeFormat.java b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/MapTimeFormat.java index d129a67b1660ffef437e91fc68273c5a6c66a654..3c0d7acdbeccfaf03aac70df478e3db6dd1378e4 100644 --- a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/MapTimeFormat.java +++ b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/MapTimeFormat.java @@ -8,7 +8,7 @@ import org.apache.beam.sdk.values.KV; import titan.ccp.model.records.ActivePowerRecord; /** - * Changes the time format to us europe/paris time. + * Changes the time format to us Europe/Paris time. */ public class MapTimeFormat extends SimpleFunction<KV<String, ActivePowerRecord>, KV<HourOfDayKey, ActivePowerRecord>> { @@ -17,11 +17,11 @@ public class MapTimeFormat private final ZoneId zone = ZoneId.of("Europe/Paris"); @Override - public KV<HourOfDayKey, ActivePowerRecord> apply( - final KV<String, ActivePowerRecord> kv) { + public KV<HourOfDayKey, ActivePowerRecord> apply(final KV<String, ActivePowerRecord> kv) { final Instant instant = Instant.ofEpochMilli(kv.getValue().getTimestamp()); final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone); - return KV.of(this.keyFactory.createKey(kv.getValue().getIdentifier(), dateTime), + return KV.of( + this.keyFactory.createKey(kv.getValue().getIdentifier(), dateTime), kv.getValue()); } } diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/PipelineFactory.java b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/PipelineFactory.java index 0d1f8fdbb3b7af4eedc787b7c7913f86307377fc..de960d3d8466f9f420f002667df04d8a2fc64873 100644 --- a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/PipelineFactory.java +++ b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/PipelineFactory.java @@ -78,8 +78,8 @@ public class PipelineFactory extends AbstractPipelineFactory { .accumulatingFiredPanes()) // Aggregate per window for every key - .apply(Combine.<HourOfDayKey, ActivePowerRecord, Stats>perKey(new StatsAggregation())) - .setCoder(KvCoder.of(new HourOfDaykeyCoder(), SerializableCoder.of(Stats.class))) + .apply(Combine.perKey(new StatsAggregation())) + .setCoder(KvCoder.of(new HourOfDayKeyCoder(), SerializableCoder.of(Stats.class))) // Map into correct output format .apply(MapElements.via(hourOfDayWithStats)) @@ -94,7 +94,7 @@ public class PipelineFactory extends AbstractPipelineFactory { AvroCoder.of(ActivePowerRecord.SCHEMA$)); registry.registerCoderForClass( HourOfDayKey.class, - new HourOfDaykeyCoder()); + new HourOfDayKeyCoder()); registry.registerCoderForClass( StatsAggregation.class, SerializableCoder.of(StatsAggregation.class));