Skip to content
Snippets Groups Projects
Commit aa2706f9 authored by Lorenz Boguhn's avatar Lorenz Boguhn Committed by Lorenz Boguhn
Browse files

Cleanup checkstyle of uc3-beam-flink

parent 9e51d8e7
No related branches found
No related tags found
1 merge request!187Migrate Beam benchmark implementation
package application;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import titan.ccp.model.records.ActivePowerRecord;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import titan.ccp.model.records.ActivePowerRecord;
/**
* Changes the time format to us europe/paris time.
*/
public class MapTimeFormat extends SimpleFunction<KV<String, ActivePowerRecord>, KV<HourOfDayKey,
ActivePowerRecord>> {
private final ZoneId zone = ZoneId.of("Europe/Paris");
final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory();
private static final long serialVersionUID = -6597391279968647035L;
private final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory();
private final ZoneId zone = ZoneId.of("Europe/Paris");
@Override
public KV<application.HourOfDayKey, ActivePowerRecord> apply(
final KV<String, ActivePowerRecord> kv) {
final Instant instant = Instant.ofEpochMilli(kv.getValue().getTimestamp());
final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone);
return KV.of(keyFactory.createKey(kv.getValue().getIdentifier(), dateTime),
kv.getValue());
}
@Override
public KV<application.HourOfDayKey, ActivePowerRecord> apply(
final KV<String, ActivePowerRecord> kv) {
final Instant instant = Instant.ofEpochMilli(kv.getValue().getTimestamp());
final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone);
return KV.of(keyFactory.createKey(kv.getValue().getIdentifier(), dateTime),
kv.getValue());
}
}
......@@ -3,12 +3,8 @@ package application;
import com.google.common.math.Stats;
import com.google.common.math.StatsAccumulator;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment