Skip to content
Snippets Groups Projects
Commit 654ff9ae authored by Sören Henning's avatar Sören Henning
Browse files

Minor code refactorings

parent 7f7e00fe
No related branches found
No related tags found
1 merge request!249Align package structure among all benchmark implementations
Pipeline #6789 passed
package rocks.theodolite.benchmarks.uc3.beam;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
/**
* Composed key of an hour of the day and a sensor id.
*/
@DefaultCoder(AvroCoder.class)
public class HourOfDayKey {
private final int hourOfDay;
......
......@@ -12,9 +12,9 @@ import org.apache.beam.sdk.coders.CoderException;
import org.apache.kafka.common.serialization.Serde;
/**
* Wrapper Class that encapsulates a HourOfDayKeySerde in a org.apache.beam.sdk.coders.Coder.
* Wrapper Class that encapsulates a {@link HourOfDayKeySerde} in a {@link Coder}.
*/
public class HourOfDaykeyCoder extends Coder<HourOfDayKey> implements Serializable {
public class HourOfDayKeyCoder extends Coder<HourOfDayKey> implements Serializable {
public static final long serialVersionUID = 4444444;
private static final boolean DETERMINISTIC = true;
private static final int VALUE_SIZE = 4;
......
......@@ -8,7 +8,7 @@ import org.apache.beam.sdk.values.KV;
import titan.ccp.model.records.ActivePowerRecord;
/**
* Changes the time format to us europe/paris time.
* Changes the time format to us Europe/Paris time.
*/
public class MapTimeFormat
extends SimpleFunction<KV<String, ActivePowerRecord>, KV<HourOfDayKey, ActivePowerRecord>> {
......@@ -17,11 +17,11 @@ public class MapTimeFormat
private final ZoneId zone = ZoneId.of("Europe/Paris");
@Override
public KV<HourOfDayKey, ActivePowerRecord> apply(
final KV<String, ActivePowerRecord> kv) {
public KV<HourOfDayKey, ActivePowerRecord> apply(final KV<String, ActivePowerRecord> kv) {
final Instant instant = Instant.ofEpochMilli(kv.getValue().getTimestamp());
final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone);
return KV.of(this.keyFactory.createKey(kv.getValue().getIdentifier(), dateTime),
return KV.of(
this.keyFactory.createKey(kv.getValue().getIdentifier(), dateTime),
kv.getValue());
}
}
......@@ -78,8 +78,8 @@ public class PipelineFactory extends AbstractPipelineFactory {
.accumulatingFiredPanes())
// Aggregate per window for every key
.apply(Combine.<HourOfDayKey, ActivePowerRecord, Stats>perKey(new StatsAggregation()))
.setCoder(KvCoder.of(new HourOfDaykeyCoder(), SerializableCoder.of(Stats.class)))
.apply(Combine.perKey(new StatsAggregation()))
.setCoder(KvCoder.of(new HourOfDayKeyCoder(), SerializableCoder.of(Stats.class)))
// Map into correct output format
.apply(MapElements.via(hourOfDayWithStats))
......@@ -94,7 +94,7 @@ public class PipelineFactory extends AbstractPipelineFactory {
AvroCoder.of(ActivePowerRecord.SCHEMA$));
registry.registerCoderForClass(
HourOfDayKey.class,
new HourOfDaykeyCoder());
new HourOfDayKeyCoder());
registry.registerCoderForClass(
StatsAggregation.class,
SerializableCoder.of(StatsAggregation.class));
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment