Skip to content
Snippets Groups Projects
Commit 42b048d0 authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Rename beam-common kafkaReader and outsource uc3-MapTimeFormatFunction

parent dede63aa
No related branches found
No related tags found
1 merge request!187Migrate Beam benchmark implementation
......@@ -21,7 +21,6 @@ public class KafkaAggregatedPowerRecordReader extends
private final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> reader;
/**
* Instantiates a {@link PTransform} that reads from Kafka with the given Configuration.
*/
......
package application;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import titan.ccp.model.records.ActivePowerRecord;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
public class MapTimeFormat extends SimpleFunction<KV<String, ActivePowerRecord>, KV<HourOfDayKey,
ActivePowerRecord>> {
private final ZoneId zone = ZoneId.of("Europe/Paris");
final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory();
@Override
public KV<application.HourOfDayKey, ActivePowerRecord> apply(
final KV<String, ActivePowerRecord> kv) {
final Instant instant = Instant.ofEpochMilli(kv.getValue().getTimestamp());
final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone);
return KV.of(keyFactory.createKey(kv.getValue().getIdentifier(), dateTime),
kv.getValue());
}
}
}
......@@ -123,20 +123,7 @@ public final class Uc3ApplicationBeam {
// Read from Kafka
pipeline.apply(kafka)
// Map to correct time format
.apply(MapElements.via(
new SimpleFunction<KV<String, ActivePowerRecord>, KV<HourOfDayKey,
ActivePowerRecord>>() {
private final ZoneId zone = ZoneId.of("Europe/Paris");
@Override
public KV<application.HourOfDayKey, ActivePowerRecord> apply(
final KV<String, ActivePowerRecord> kv) {
final Instant instant = Instant.ofEpochMilli(kv.getValue().getTimestamp());
final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone);
return KV.of(keyFactory.createKey(kv.getValue().getIdentifier(), dateTime),
kv.getValue());
}
}))
.apply(MapElements.via(new MapTimeFormat()))
// Apply a sliding window
.apply(Window
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment