Skip to content
Snippets Groups Projects
Commit bdf1bea7 authored by Lorenz Boguhn's avatar Lorenz Boguhn Committed by Lorenz Boguhn
Browse files

Rename beam-common kafkaReader and outsource uc3-MapTimeFormatFunction

parent df842778
No related branches found
No related tags found
1 merge request!187Migrate Beam benchmark implementation
...@@ -21,7 +21,6 @@ public class KafkaAggregatedPowerRecordReader extends ...@@ -21,7 +21,6 @@ public class KafkaAggregatedPowerRecordReader extends
private final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> reader; private final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> reader;
/** /**
* Instantiates a {@link PTransform} that reads from Kafka with the given Configuration. * Instantiates a {@link PTransform} that reads from Kafka with the given Configuration.
*/ */
......
package application;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import titan.ccp.model.records.ActivePowerRecord;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
public class MapTimeFormat extends SimpleFunction<KV<String, ActivePowerRecord>, KV<HourOfDayKey,
ActivePowerRecord>> {
private final ZoneId zone = ZoneId.of("Europe/Paris");
final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory();
@Override
public KV<application.HourOfDayKey, ActivePowerRecord> apply(
final KV<String, ActivePowerRecord> kv) {
final Instant instant = Instant.ofEpochMilli(kv.getValue().getTimestamp());
final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone);
return KV.of(keyFactory.createKey(kv.getValue().getIdentifier(), dateTime),
kv.getValue());
}
}
}
...@@ -123,20 +123,7 @@ public final class Uc3ApplicationBeam { ...@@ -123,20 +123,7 @@ public final class Uc3ApplicationBeam {
// Read from Kafka // Read from Kafka
pipeline.apply(kafka) pipeline.apply(kafka)
// Map to correct time format // Map to correct time format
.apply(MapElements.via( .apply(MapElements.via(new MapTimeFormat()))
new SimpleFunction<KV<String, ActivePowerRecord>, KV<HourOfDayKey,
ActivePowerRecord>>() {
private final ZoneId zone = ZoneId.of("Europe/Paris");
@Override
public KV<application.HourOfDayKey, ActivePowerRecord> apply(
final KV<String, ActivePowerRecord> kv) {
final Instant instant = Instant.ofEpochMilli(kv.getValue().getTimestamp());
final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone);
return KV.of(keyFactory.createKey(kv.getValue().getIdentifier(), dateTime),
kv.getValue());
}
}))
// Apply a sliding window // Apply a sliding window
.apply(Window .apply(Window
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment