Skip to content
Snippets Groups Projects
Commit be3849dc authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Clean up uc4-beam

parent f3493a1f
No related branches found
No related tags found
1 merge request!187Migrate Beam benchmark implementation
package application; package application;
import org.apache.beam.runners.samza.SamzaRunner; import org.apache.beam.runners.samza.SamzaRunner;
import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline;
import theodolite.commons.beam.AbstractBeamService; import theodolite.commons.beam.AbstractBeamService;
......
...@@ -11,6 +11,8 @@ import titan.ccp.model.records.AggregatedActivePowerRecord; ...@@ -11,6 +11,8 @@ import titan.ccp.model.records.AggregatedActivePowerRecord;
public class AggregatedToActive extends SimpleFunction<KV<String, AggregatedActivePowerRecord>, public class AggregatedToActive extends SimpleFunction<KV<String, AggregatedActivePowerRecord>,
KV<String, ActivePowerRecord>> { KV<String, ActivePowerRecord>> {
private static final long serialVersionUID = -8275252527964065889L;
@Override @Override
public KV<String, ActivePowerRecord> apply( public KV<String, ActivePowerRecord> apply(
final KV<String, AggregatedActivePowerRecord> kv) { final KV<String, AggregatedActivePowerRecord> kv) {
......
package application; package application;
import org.apache.beam.sdk.transforms.ProcessFunction;
import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.KV;
import titan.ccp.configuration.events.Event; import titan.ccp.configuration.events.Event;
/**
* Filters for {@code Event.SENSOR_REGISTRY_CHANGED} and
* {@code Event.SENSOR_REGISTRY_STATUS} events.
*/
public class FilterEvents implements SerializableFunction<KV<Event, String>, Boolean> { public class FilterEvents implements SerializableFunction<KV<Event, String>, Boolean> {
private static final long serialVersionUID = -2233447357614891559L; private static final long serialVersionUID = -2233447357614891559L;
......
...@@ -4,7 +4,11 @@ import org.apache.beam.sdk.transforms.SerializableFunction; ...@@ -4,7 +4,11 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.KV;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
public class FilterNullValues implements SerializableFunction<KV<SensorParentKey, ActivePowerRecord>, Boolean> { /**
* Filters {@code null} Values.
*/
public class FilterNullValues implements
SerializableFunction<KV<SensorParentKey, ActivePowerRecord>, Boolean> {
private static final long serialVersionUID = -6197352369880867482L; private static final long serialVersionUID = -6197352369880867482L;
@Override @Override
......
...@@ -24,9 +24,9 @@ public class RecordAggregation ...@@ -24,9 +24,9 @@ public class RecordAggregation
@DefaultCoder(AvroCoder.class) @DefaultCoder(AvroCoder.class)
public static class Accum implements Serializable { public static class Accum implements Serializable {
private static final long serialVersionUID = 3701311203919534376L; private static final long serialVersionUID = 3701311203919534376L;
private long count = 0; private long count;
private Double sum = 0.0; private Double sum = 0.0;
private long timestamp = 0; private long timestamp;
} }
@Override @Override
......
...@@ -4,8 +4,13 @@ import org.apache.beam.sdk.transforms.SimpleFunction; ...@@ -4,8 +4,13 @@ import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.KV;
import titan.ccp.model.records.AggregatedActivePowerRecord; import titan.ccp.model.records.AggregatedActivePowerRecord;
/**
* Sets the identifier for new {@link AggregatedActivePowerRecord}.
*/
public class SetIdForAggregated extends SimpleFunction<KV<String, AggregatedActivePowerRecord>, public class SetIdForAggregated extends SimpleFunction<KV<String, AggregatedActivePowerRecord>,
KV<String, AggregatedActivePowerRecord>> { KV<String, AggregatedActivePowerRecord>> {
private static final long serialVersionUID = 2148522605294086982L;
@Override @Override
public KV<String, AggregatedActivePowerRecord> apply( public KV<String, AggregatedActivePowerRecord> apply(
final KV<String, AggregatedActivePowerRecord> kv) { final KV<String, AggregatedActivePowerRecord> kv) {
......
...@@ -4,9 +4,14 @@ import org.apache.beam.sdk.transforms.SimpleFunction; ...@@ -4,9 +4,14 @@ import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.KV;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
/**
* Set the Key for a group of {@code ActivePowerRecords} to their Parent.
*/
public class SetKeyToGroup extends SimpleFunction<KV<SensorParentKey, public class SetKeyToGroup extends SimpleFunction<KV<SensorParentKey,
ActivePowerRecord>, KV<String, ActivePowerRecord>> { ActivePowerRecord>, KV<String, ActivePowerRecord>> {
private static final long serialVersionUID = 790215050768527L;
@Override @Override
public KV<String, ActivePowerRecord> apply( public KV<String, ActivePowerRecord> apply(
final KV<SensorParentKey, ActivePowerRecord> kv) { final KV<SensorParentKey, ActivePowerRecord> kv) {
......
...@@ -10,14 +10,33 @@ import org.apache.beam.sdk.coders.SetCoder; ...@@ -10,14 +10,33 @@ import org.apache.beam.sdk.coders.SetCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.*; import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.windowing.*; import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.values.*; import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Latest;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.joda.time.Duration; import org.joda.time.Duration;
import serialization.*; import serialization.AggregatedActivePowerRecordCoder;
import serialization.AggregatedActivePowerRecordDeserializer;
import serialization.AggregatedActivePowerRecordSerializer;
import serialization.EventCoder;
import serialization.EventDeserializer;
import serialization.SensorParentKeyCoder;
import theodolite.commons.beam.AbstractPipeline; import theodolite.commons.beam.AbstractPipeline;
import theodolite.commons.beam.ConfigurationKeys; import theodolite.commons.beam.ConfigurationKeys;
import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader; import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader;
...@@ -208,8 +227,8 @@ public final class Uc4BeamPipeline extends AbstractPipeline { ...@@ -208,8 +227,8 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
* *
* @return the build configuration. * @return the build configuration.
*/ */
public HashMap<String, Object> configurationConfig(final Configuration config) { public Map<String, Object> configurationConfig(final Configuration config) {
final HashMap<String, Object> consumerConfig = new HashMap<>(); final Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG)); config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG));
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment