diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaAggregatedPowerRecordReader.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerRecordReader.java similarity index 99% rename from theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaAggregatedPowerRecordReader.java rename to theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerRecordReader.java index c919878046af0ab71e19f1ff33bd6d6b5f82c9ba..1e97ec57c7728c0636b9d810d04cce00eeb31176 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaAggregatedPowerRecordReader.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/kafka/KafkaActivePowerRecordReader.java @@ -21,7 +21,6 @@ public class KafkaAggregatedPowerRecordReader extends private final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> reader; - /** * Instantiates a {@link PTransform} that reads from Kafka with the given Configuration. */ diff --git a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/MapTimeFormat.java b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/MapTimeFormat.java new file mode 100644 index 0000000000000000000000000000000000000000..1c61d7272c2135f07d400a751a7624f10131b1b6 --- /dev/null +++ b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/MapTimeFormat.java @@ -0,0 +1,25 @@ +package application; + +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.KV; +import titan.ccp.model.records.ActivePowerRecord; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; + +public class MapTimeFormat extends SimpleFunction<KV<String, ActivePowerRecord>, KV<HourOfDayKey, + ActivePowerRecord>> { + private final ZoneId zone = ZoneId.of("Europe/Paris"); + final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory(); + + @Override + public KV<application.HourOfDayKey, ActivePowerRecord> apply( + final KV<String, ActivePowerRecord> kv) { + final Instant instant = Instant.ofEpochMilli(kv.getValue().getTimestamp()); + final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone); + return KV.of(keyFactory.createKey(kv.getValue().getIdentifier(), dateTime), + kv.getValue()); + } + } +} diff --git a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/Uc3ApplicationBeam.java b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/Uc3ApplicationBeam.java index b80b0dfd38bd4d797ec536c55ac54edadce4d2bd..a058e6c58285ef7eed27d179834a26289e8bdbcc 100644 --- a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/Uc3ApplicationBeam.java +++ b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/Uc3ApplicationBeam.java @@ -123,20 +123,7 @@ public final class Uc3ApplicationBeam { // Read from Kafka pipeline.apply(kafka) // Map to correct time format - .apply(MapElements.via( - new SimpleFunction<KV<String, ActivePowerRecord>, KV<HourOfDayKey, - ActivePowerRecord>>() { - private final ZoneId zone = ZoneId.of("Europe/Paris"); - - @Override - public KV<application.HourOfDayKey, ActivePowerRecord> apply( - final KV<String, ActivePowerRecord> kv) { - final Instant instant = Instant.ofEpochMilli(kv.getValue().getTimestamp()); - final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone); - return KV.of(keyFactory.createKey(kv.getValue().getIdentifier(), dateTime), - kv.getValue()); - } - })) + .apply(MapElements.via(new MapTimeFormat())) // Apply a sliding window .apply(Window