diff --git a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/MapTimeFormat.java b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/MapTimeFormat.java index 1c61d7272c2135f07d400a751a7624f10131b1b6..909972c2d67cb313f5d92c5787024e867fd76840 100644 --- a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/MapTimeFormat.java +++ b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/MapTimeFormat.java @@ -1,25 +1,27 @@ package application; -import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.values.KV; -import titan.ccp.model.records.ActivePowerRecord; - import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.KV; +import titan.ccp.model.records.ActivePowerRecord; +/** + * Changes the time format to us europe/paris time. + */ public class MapTimeFormat extends SimpleFunction<KV<String, ActivePowerRecord>, KV<HourOfDayKey, ActivePowerRecord>> { - private final ZoneId zone = ZoneId.of("Europe/Paris"); - final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory(); + private static final long serialVersionUID = -6597391279968647035L; + private final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory(); + private final ZoneId zone = ZoneId.of("Europe/Paris"); - @Override - public KV<application.HourOfDayKey, ActivePowerRecord> apply( - final KV<String, ActivePowerRecord> kv) { - final Instant instant = Instant.ofEpochMilli(kv.getValue().getTimestamp()); - final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone); - return KV.of(keyFactory.createKey(kv.getValue().getIdentifier(), dateTime), - kv.getValue()); - } + @Override + public KV<application.HourOfDayKey, ActivePowerRecord> apply( + final KV<String, ActivePowerRecord> kv) { + final Instant instant = Instant.ofEpochMilli(kv.getValue().getTimestamp()); + final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone); + return KV.of(keyFactory.createKey(kv.getValue().getIdentifier(), dateTime), + kv.getValue()); } } diff --git a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/Uc3ApplicationBeam.java b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/Uc3ApplicationBeam.java index a058e6c58285ef7eed27d179834a26289e8bdbcc..7a8a0965787f357194d3f009b8cad4574444692e 100644 --- a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/Uc3ApplicationBeam.java +++ b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/Uc3ApplicationBeam.java @@ -3,12 +3,8 @@ package application; import com.google.common.math.Stats; import com.google.common.math.StatsAccumulator; import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import java.time.Instant; -import java.time.LocalDateTime; -import java.time.ZoneId; import java.util.HashMap; import java.util.Map; - import org.apache.beam.runners.flink.FlinkRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.AvroCoder;