Skip to content
Snippets Groups Projects
Commit dd7c2ea5 authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Migrate uc4-beam-samza

additional naive upgrade to Titan Control Center Commons 0.1.0 (new ActivePowerRecords,...)

git commit -m Migrate
parent c8034d6c
No related branches found
No related tags found
1 merge request!187Migrate Beam benchmark implementation
Showing
with 1246 additions and 0 deletions
FROM openjdk:8-slim
ADD build/distributions/uc2-application-samza.tar /
ADD ../config/standalone.properties /
CMD /uc2-application-samza/bin/uc2-application-samza --configFactory=org.apache.samza.config.factories.PropertiesConfigFactory --configFilePath=samza-standalone.properties --samzaExecutionEnvironment=STANDALONE --maxSourceParallelism=$MAX_SOURCE_PARALLELISM --enableMetrics=false
plugins {
id 'theodolite.kstreams'
}
allprojects {
repositories {
maven {
url 'https://packages.confluent.io/maven/'
}
mavenCentral()
}
}
dependencies {
compile group: 'org.apache.beam', name: 'beam-sdks-java-core', version: '2.22.0'
compile group: 'org.apache.beam', name: 'beam-runners-samza', version: '2.22.0'
compile('org.apache.beam:beam-sdks-java-io-kafka:2.22.0'){
exclude group: 'org.apache.kafka', module: 'kafka-clients'
}
compile group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.30'
runtime 'org.apache.beam:beam-runners-direct-java:2.22.0'
runtime 'org.slf4j:slf4j-api:1.7.32'
runtime 'org.slf4j:slf4j-jdk14:1.7.32'
}
mainClassName = "application.Uc4ApplicationBeam"
package application;
import java.util.Optional;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.TimestampPolicy;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.joda.time.Instant;
import titan.ccp.model.records.AggregatedActivePowerRecord;
/**
* TimeStampPolicy to use event time based on the timestamp of the record value.
*/
public class AggregatedActivePowerRecordEventTimePolicy
extends TimestampPolicy<String, AggregatedActivePowerRecord> {
protected Instant currentWatermark;
public AggregatedActivePowerRecordEventTimePolicy(final Optional<Instant> previousWatermark) {
this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
}
@Override
public Instant getTimestampForRecord(final PartitionContext ctx,
final KafkaRecord<String, AggregatedActivePowerRecord> record) {
this.currentWatermark = new Instant(record.getKV().getValue().getTimestamp());
return this.currentWatermark;
}
@Override
public Instant getWatermark(final PartitionContext ctx) {
return this.currentWatermark;
}
}
package application;
import java.util.Optional;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.TimestampPolicy;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.joda.time.Instant;
import titan.ccp.model.records.ActivePowerRecord;
/**
* TimeStampPolicy to use event time based on the timestamp of the record value.
*/
public class EventTimePolicy extends TimestampPolicy<String, ActivePowerRecord> {
protected Instant currentWatermark;
public EventTimePolicy(final Optional<Instant> previousWatermark) {
this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
}
@Override
public Instant getTimestampForRecord(final PartitionContext ctx,
final KafkaRecord<String, ActivePowerRecord> record) {
this.currentWatermark = new Instant(record.getKV().getValue().getTimestamp());
return this.currentWatermark;
}
@Override
public Instant getWatermark(final PartitionContext ctx) {
return this.currentWatermark;
}
}
package application;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
// import theodolite.uc2.streamprocessing.KeyValue;
// import theodolite.uc2.streamprocessing.KeyValueIterator;
import titan.ccp.configuration.events.Event;
import titan.ccp.model.sensorregistry.AggregatedSensor;
import titan.ccp.model.sensorregistry.Sensor;
import titan.ccp.model.sensorregistry.SensorRegistry;
/**
* DoFn class to generate a child-parent pair for every sensor in the hierarchie.
*/
public class GenerateParentsFn extends DoFn<KV<Event, String>, KV<String, Set<String>>> {
private static final long serialVersionUID = 958270648688932091L;
@ProcessElement
public void processElement(@Element final KV<Event, String> kv,
final OutputReceiver<KV<String, Set<String>>> out) {
final Map<String, Set<String>> childParentsPairs =
this.constructChildParentsPairs(SensorRegistry.fromJson(kv.getValue()));
final Iterator<Map.Entry<String, Set<String>>> it = childParentsPairs.entrySet().iterator();
while (it.hasNext()) {
final Map.Entry<String, Set<String>> pair = it.next();
out.output(KV.of(pair.getKey(), pair.getValue()));
}
}
private Map<String, Set<String>> constructChildParentsPairs(final SensorRegistry registry) {
return this.streamAllChildren(registry.getTopLevelSensor())
.collect(Collectors.<Sensor, String, Set<String>>toMap(
child -> child.getIdentifier(),
child -> child.getParent()
.map(p -> Stream.of(p.getIdentifier()).collect(Collectors.toSet()))
.orElseGet(() -> Collections.<String>emptySet())));
}
private Stream<Sensor> streamAllChildren(final AggregatedSensor sensor) {
return sensor.getChildren().stream()
.flatMap(s -> Stream.concat(
Stream.of(s),
s instanceof AggregatedSensor ? this.streamAllChildren((AggregatedSensor) s)
: Stream.empty()));
}
}
package application;
import java.io.Serializable;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import titan.ccp.model.records.ActivePowerRecord;
import titan.ccp.model.records.AggregatedActivePowerRecord;
/**
* CombineFn to aggregate ActivePowerRecords into AggregatedActivePowerRecords
*/
public class RecordAggregation
extends CombineFn<ActivePowerRecord, RecordAggregation.Accum, AggregatedActivePowerRecord> {
private static final long serialVersionUID = 4362213539553233529L;
@DefaultCoder(AvroCoder.class)
public static class Accum implements Serializable {
private static final long serialVersionUID = 3701311203919534376L;
long count = 0;
Double sum = 0.0;
long timestamp = 0;
}
@Override
public Accum createAccumulator() {
return new Accum();
}
@Override
public Accum addInput(final Accum mutableAccumulator, final ActivePowerRecord input) {
mutableAccumulator.count += 1;
mutableAccumulator.sum += input.getValueInW();
mutableAccumulator.timestamp = input.getTimestamp();
return mutableAccumulator;
}
@Override
public Accum mergeAccumulators(final Iterable<Accum> accumulators) {
final Accum merged = this.createAccumulator();
for (final Accum accumulator : accumulators) {
merged.count += accumulator.count;
merged.sum += accumulator.sum;
merged.timestamp = accumulator.timestamp;
}
return merged;
}
@Override
public AggregatedActivePowerRecord extractOutput(final Accum accumulator) {
final double average = accumulator.count == 0 ? 0.0 : accumulator.sum / accumulator.count;
return new AggregatedActivePowerRecord("", accumulator.timestamp, accumulator.count,
accumulator.sum, average);
}
}
package application;
/**
* A key consisting of the identifier of a sensor and an identifier of parent sensor.
*/
public class SensorParentKey {
private final String sensorIdentifier;
private final String parentIdentifier;
public SensorParentKey(final String sensorIdentifier, final String parentIdentifier) {
this.sensorIdentifier = sensorIdentifier;
this.parentIdentifier = parentIdentifier;
}
public String getSensor() {
return this.sensorIdentifier;
}
public String getParent() {
return this.parentIdentifier;
}
@Override
public String toString() {
return "{" + this.sensorIdentifier + ", " + this.parentIdentifier + "}";
}
}
package application;
import com.google.common.base.MoreObjects;
import com.google.common.math.StatsAccumulator;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.beam.runners.samza.SamzaRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.SetCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Latest;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.Duration;
import serialization.AggregatedActivePowerRecordCoder;
import serialization.AggregatedActivePowerRecordDeserializer;
import serialization.AggregatedActivePowerRecordSerializer;
import serialization.EventCoder;
import serialization.EventDeserializer;
import serialization.SensorParentKeyCoder;
import titan.ccp.configuration.events.Event;
import titan.ccp.model.records.ActivePowerRecord;
import titan.ccp.model.records.AggregatedActivePowerRecord;;
public class Uc4ApplicationBeam {
/**
* Implementation of the use case Hierarchical Aggregation using Apache Beam with the Samza
* Runner. To run locally in standalone start Kafka, Zookeeper, the schema-registry and the
* workload generator using the delayed_startup.sh script. Add
* --configFactory=org.apache.samza.config.factories.PropertiesConfigFactory
* --configFilePath=${workspace_loc:uc4-application-samza}/config/standalone_local.properties
* --samzaExecutionEnvironment=STANDALONE --maxSourceParallelism=1024 --as program arguments. To
* persist logs add ${workspace_loc:/uc4-application-samza/eclipseConsoleLogs.log} as Output File
* under Standard Input Output in Common in the Run Configuration Start via Eclipse Run.
*/
@SuppressWarnings({"serial", "unchecked", "rawtypes"})
public static void main(final String[] args) {
// Set Configuration for Windows
final int windowDuration = Integer.parseInt(
System.getenv("KAFKA_WINDOW_DURATION") != null
? System.getenv("KAFKA_WINDOW_DURATION")
: "60");
final Duration duration = Duration.standardSeconds(windowDuration);
final int triggerInterval = Integer.parseInt(
System.getenv("TRIGGER_INTERVAL") != null
? System.getenv("TRIGGER_INTERVAL")
: "30");
final Duration triggerDelay = Duration.standardSeconds(triggerInterval);
final int grace = Integer.parseInt(
System.getenv("GRACE_PERIOD") != null
? System.getenv("GRACE_PERIOD")
: "270");
final Duration gracePeriod = Duration.standardSeconds(grace);
// Set Configuration for Kafka
final String bootstrapServer =
System.getenv("KAFKA_BOOTSTRAP_SERVERS") != null ? System.getenv("KAFKA_BOOTSTRAP_SERVERS")
: "my-confluent-cp-kafka:9092";
final String inputTopic = System.getenv("INPUT") != null ? System.getenv("INPUT") : "input";
final String outputTopic = System.getenv("OUTPUT") != null ? System.getenv("OUTPUT") : "output";
final String configurationTopic =
System.getenv("CONFIGURATION") != null ? System.getenv("CONFIGURATION") : "configuration";
final String feedbackTopic =
System.getenv("FEEDBACKTOPIC") != null ? System.getenv("FEEDBACKTOPIC")
: "aggregation-feedback";
final String schemaRegistryURL =
System.getenv("SCHEMA_REGISTRY_URL") != null ? System.getenv("SCHEMA_REGISTRY_URL")
: "http://my-confluent-cp-schema-registry:8081";
// final String inputTopic = "input";
// final String outputTopic = "output";
// final String bootstrapServer = "localhost:9092";
// final String configurationTopic = "configuration";
// final String feedbackTopic = "aggregation-feedback";
// final String schemaRegistryURL = "http://localhost:8081";
// final Duration duration = Duration.standardSeconds(5);
// final Duration gracePeriod = Duration.standardMinutes(5);
// final Duration triggerDelay = Duration.standardSeconds(5);
// Set consumer configuration for the schema registry and commits back to Kafka
final HashMap<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put("schema.registry.url", schemaRegistryURL);
consumerConfig.put("specific.avro.reader", "true");
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "uc-application-input");
final HashMap<String, Object> consumerConfigConfiguration = new HashMap<>();
consumerConfigConfiguration.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
consumerConfigConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfigConfiguration.put(ConsumerConfig.GROUP_ID_CONFIG, "uc-application-configuration");
// Create run options from args
final PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
options.setRunner(SamzaRunner.class);
options.setJobName("ucapplication");
final Pipeline pipeline = Pipeline.create(options);
final CoderRegistry cr = pipeline.getCoderRegistry();
// Set Coders for Classes that will be distributed
cr.registerCoderForClass(ActivePowerRecord.class,
NullableCoder.of(AvroCoder.of(ActivePowerRecord.class)));
cr.registerCoderForClass(AggregatedActivePowerRecord.class,
new AggregatedActivePowerRecordCoder());
cr.registerCoderForClass(Set.class, SetCoder.of(StringUtf8Coder.of()));
cr.registerCoderForClass(Event.class, new EventCoder());
cr.registerCoderForClass(SensorParentKey.class, new SensorParentKeyCoder());
cr.registerCoderForClass(StatsAccumulator.class, AvroCoder.of(StatsAccumulator.class));
@SuppressWarnings({"unchecked", "rawtypes"})
final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka =
KafkaIO.<String, ActivePowerRecord>read()
.withBootstrapServers(bootstrapServer)
.withTopic(inputTopic)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
NullableCoder.of(AvroCoder.of(ActivePowerRecord.class)))
.withConsumerConfigUpdates(consumerConfig)
// Set TimeStampPolicy for event time
.withTimestampPolicyFactory(
(tp, previousWaterMark) -> new EventTimePolicy(previousWaterMark))
.withoutMetadata();
// Apply pipeline transformations
// Read from Kafka
final PCollection<KV<String, ActivePowerRecord>> values = pipeline.apply(kafka)
.apply("Apply Winddows", Window.into(FixedWindows.of(duration)))
.apply("Set trigger for input", Window
.<KV<String, ActivePowerRecord>>configure()
.triggering(Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(triggerDelay)))
.withAllowedLateness(gracePeriod)
.discardingFiredPanes());
// Read the results of earlier aggregations.
final PCollection<KV<String, ActivePowerRecord>> aggregationsInput = pipeline
.apply("Read aggregation results", KafkaIO.<String, AggregatedActivePowerRecord>read()
.withBootstrapServers(bootstrapServer)
.withTopic(feedbackTopic)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(AggregatedActivePowerRecordDeserializer.class)
.withTimestampPolicyFactory(
(tp, previousWaterMark) -> new AggregatedActivePowerRecordEventTimePolicy(
previousWaterMark))
.withoutMetadata())
.apply("Apply Winddows", Window.into(FixedWindows.of(duration)))
// Convert into the correct data format
.apply("Convert AggregatedActivePowerRecord to ActivePowerRecord", MapElements.via(
new SimpleFunction<KV<String, AggregatedActivePowerRecord>, KV<String, ActivePowerRecord>>() {
@Override
public KV<String, ActivePowerRecord> apply(
final KV<String, AggregatedActivePowerRecord> kv) {
return KV.of(kv.getKey(), new ActivePowerRecord(kv.getValue().getIdentifier(),
kv.getValue().getTimestamp(), kv.getValue().getSumInW()));
}
}))
.apply("Set trigger for feedback", Window
.<KV<String, ActivePowerRecord>>configure()
.triggering(Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(triggerDelay)))
.withAllowedLateness(gracePeriod)
.discardingFiredPanes());
// Prepare flatten
final PCollectionList<KV<String, ActivePowerRecord>> collections =
PCollectionList.of(values).and(aggregationsInput);
// Create a single PCollection out of the input and already computed results
final PCollection<KV<String, ActivePowerRecord>> inputCollection =
collections.apply("Flatten sensor data and aggregation results",
Flatten.pCollections());
// Build the configuration stream from a changelog.
final PCollection<KV<String, Set<String>>> configurationStream = pipeline
.apply("Read sensor groups", KafkaIO.<Event, String>read()
.withBootstrapServers(bootstrapServer)
.withTopic(configurationTopic)
.withKeyDeserializer(EventDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withConsumerConfigUpdates(consumerConfigConfiguration)
.withoutMetadata())
// Only forward relevant changes in the hierarchie
.apply("Filter changed and status events",
Filter.by(new SerializableFunction<KV<Event, String>, Boolean>() {
@Override
public Boolean apply(final KV<Event, String> kv) {
return kv.getKey() == Event.SENSOR_REGISTRY_CHANGED
|| kv.getKey() == Event.SENSOR_REGISTRY_STATUS;
}
}))
// Build the changelog
.apply("Generate Parents for every Sensor", ParDo.of(new GenerateParentsFn()))
.apply("Update child and parent pairs", ParDo.of(new UpdateChildParentPairs()))
.apply("Set trigger for configuration", Window
.<KV<String, Set<String>>>configure()
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(
AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes());
final PCollectionView<Map<String, Set<String>>> childParentPairMap =
configurationStream.apply(Latest.perKey())
// Reset trigger to avoid synchronized processing time
.apply("Reset trigger for configurations", Window
.<KV<String, Set<String>>>configure()
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(
AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes())
.apply(View.asMap());
// Build pairs of every sensor reading and parent
final PCollection<KV<SensorParentKey, ActivePowerRecord>> flatMappedValues =
inputCollection.apply(
"Duplicate as flatMap",
ParDo.of(
new DoFn<KV<String, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>>() {
@StateId("parents")
private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value();
// Generate a KV-pair for every child-parent match
@ProcessElement
public void processElement(@Element final KV<String, ActivePowerRecord> kv,
final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out,
@StateId("parents") final ValueState<Set<String>> state,
final ProcessContext c) {
final ActivePowerRecord record = kv.getValue() == null ? null : kv.getValue();
final Set<String> newParents =
c.sideInput(childParentPairMap).get(kv.getKey()) == null
? Collections.emptySet()
: c.sideInput(childParentPairMap).get(kv.getKey());
final Set<String> oldParents =
MoreObjects.firstNonNull(state.read(), Collections.emptySet());
// Forward new Pairs if they exist
if (!newParents.isEmpty()) {
for (final String parent : newParents) {
// Forward flat mapped record
final SensorParentKey key = new SensorParentKey(kv.getKey(), parent);
out.output(KV.of(key, record));
}
}
if (!newParents.equals(oldParents)) {
for (final String oldParent : oldParents) {
if (!newParents.contains(oldParent)) {
// Forward Delete
final SensorParentKey key = new SensorParentKey(kv.getKey(), oldParent);
out.output(KV.of(key, null));
}
}
state.write(newParents);
}
}
}).withSideInputs(childParentPairMap))
.apply("Filter only latest changes", Latest.perKey())
.apply("Filter out null values",
Filter.by(
new SerializableFunction<KV<SensorParentKey, ActivePowerRecord>, Boolean>() {
@Override
public Boolean apply(final KV<SensorParentKey, ActivePowerRecord> kv) {
return kv.getValue() != null;
}
}));
// Aggregate for every sensor group of the current level
final PCollection<KV<String, AggregatedActivePowerRecord>> aggregations = flatMappedValues
.apply("Set key to group", MapElements.via(
new SimpleFunction<KV<SensorParentKey, ActivePowerRecord>, KV<String, ActivePowerRecord>>() {
@Override
public KV<String, ActivePowerRecord> apply(
final KV<SensorParentKey, ActivePowerRecord> kv) {
return KV.of(kv.getKey().getParent(), kv.getValue());
}
}))
// Reset trigger to avoid synchronized processing time
.apply("Reset trigger for aggregations", Window
.<KV<String, ActivePowerRecord>>configure()
.triggering(Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(triggerDelay)))
.withAllowedLateness(gracePeriod)
.discardingFiredPanes())
.apply("Aggregate per group", Combine.perKey(new RecordAggregation()))
.apply("Set the Identifier in AggregatedActivePowerRecord", MapElements.via(
new SimpleFunction<KV<String, AggregatedActivePowerRecord>, KV<String, AggregatedActivePowerRecord>>() {
@Override
public KV<String, AggregatedActivePowerRecord> apply(
final KV<String, AggregatedActivePowerRecord> kv) {
final AggregatedActivePowerRecord record = new AggregatedActivePowerRecord(
kv.getKey(), kv.getValue().getTimestamp(), kv.getValue().getCount(),
kv.getValue().getSumInW(), kv.getValue().getAverageInW());
return KV.of(kv.getKey(), record);
}
}));
aggregations.apply("Write to aggregation results",
KafkaIO.<String, AggregatedActivePowerRecord>write()
.withBootstrapServers(bootstrapServer)
.withTopic(outputTopic)
.withKeySerializer(StringSerializer.class)
.withValueSerializer(AggregatedActivePowerRecordSerializer.class));
aggregations
.apply("Write to feedback topic", KafkaIO.<String, AggregatedActivePowerRecord>write()
.withBootstrapServers(bootstrapServer)
.withTopic(feedbackTopic)
.withKeySerializer(StringSerializer.class)
.withValueSerializer(AggregatedActivePowerRecordSerializer.class));
pipeline.run().waitUntilFinish();
}
}
package application;
import com.google.common.base.MoreObjects;
import com.google.common.math.StatsAccumulator;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.beam.runners.samza.SamzaRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.SetCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Latest;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.Duration;
import serialization.AggregatedActivePowerRecordCoder;
import serialization.AggregatedActivePowerRecordSerializer;
import serialization.EventCoder;
import serialization.EventDeserializer;
import serialization.SensorParentKeyCoder;
import titan.ccp.configuration.events.Event;
import titan.ccp.model.records.ActivePowerRecord;
import titan.ccp.model.records.AggregatedActivePowerRecord;
public class Uc4ApplicationBeamNoFeedback {
@SuppressWarnings({"serial", "unchecked", "rawtypes"})
public static void main(final String[] args) {
final String inputTopic = "input";
final String outputTopic = "output";
final String bootstrapServer = "localhost:9092";
final String configurationTopic = "configuration";
final String schemaRegistryURL = "http://localhost:8081";
final Duration duration = Duration.standardSeconds(15);
// Set consumer configuration for the schema registry and commits back to Kafka
final HashMap<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put("schema.registry.url", schemaRegistryURL);
consumerConfig.put("specific.avro.reader", "true");
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "uc-application-input");
final HashMap<String, Object> consumerConfigConfiguration = new HashMap<>();
consumerConfigConfiguration.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
consumerConfigConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfigConfiguration.put(ConsumerConfig.GROUP_ID_CONFIG, "uc-application-configuration");
// final DirectOptions options =
// PipelineOptionsFactory.fromArgs(args).create().as(DirectOptions.class);
// options.setRunner(DirectRunner.class);
// options.setJobName("ucapplication");
// options.setTargetParallelism(1);
final PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
options.setRunner(SamzaRunner.class);
options.setJobName("ucapplication");
final Pipeline pipeline = Pipeline.create(options);
final CoderRegistry cr = pipeline.getCoderRegistry();
// Set Coders for Classes that will be distributed
cr.registerCoderForClass(ActivePowerRecord.class,
NullableCoder.of(AvroCoder.of(ActivePowerRecord.class)));
cr.registerCoderForClass(AggregatedActivePowerRecord.class,
new AggregatedActivePowerRecordCoder());
cr.registerCoderForClass(Set.class, SetCoder.of(StringUtf8Coder.of()));
cr.registerCoderForClass(Event.class, new EventCoder());
// SensorRegistry
cr.registerCoderForClass(SensorParentKey.class, new SensorParentKeyCoder());
cr.registerCoderForClass(StatsAccumulator.class, AvroCoder.of(StatsAccumulator.class));
final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka =
KafkaIO.<String, ActivePowerRecord>read()
.withBootstrapServers(bootstrapServer)
.withTopic(inputTopic)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
NullableCoder.of(AvroCoder.of(ActivePowerRecord.class)))
.withConsumerConfigUpdates(consumerConfig)
// Set TimeStampPolicy for event time
.withTimestampPolicyFactory(
(tp, previousWaterMark) -> new EventTimePolicy(previousWaterMark))
// .commitOffsetsInFinalize()
.withoutMetadata();
// Apply pipeline transformations
// Read from Kafka
final PCollection<KV<String, ActivePowerRecord>> values = pipeline.apply(kafka)
.apply("Apply Winddows", Window.into(FixedWindows.of(duration)));
// Build the configuration stream from a changelog.
final PCollection<KV<String, Set<String>>> configurationStream = pipeline
.apply("Read sensor groups", KafkaIO.<Event, String>read()
.withBootstrapServers(bootstrapServer)
.withTopic(configurationTopic)
.withKeyDeserializer(EventDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withConsumerConfigUpdates(consumerConfigConfiguration)
// .commitOffsetsInFinalize()
.withoutMetadata())
.apply("Generate Parents for every Sensor", ParDo.of(new GenerateParentsFn()))
.apply("Update child and parent pairs", ParDo.of(new UpdateChildParentPairs()))
.apply("Set trigger for configurations", Window
.<KV<String, Set<String>>>configure()
.triggering(Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(5))))
.accumulatingFiredPanes());
// This may need to be changed to eliminate duplicates in first iteration
final PCollectionView<Map<String, Set<String>>> childParentPairMap =
// configurationStream.apply(Latest.perKey())
configurationStream.apply(View.asMap());
final PCollection<KV<SensorParentKey, ActivePowerRecord>> flatMappedValues =
values.apply(
"Duplicate as flatMap",
ParDo.of(
new DoFn<KV<String, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>>() {
@StateId("parents")
private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value();
// Generate a KV-pair for every child-parent match
@ProcessElement
public void processElement(@Element final KV<String, ActivePowerRecord> kv,
final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out,
@StateId("parents") final ValueState<Set<String>> state,
final ProcessContext c) {
final ActivePowerRecord record = kv.getValue() == null ? null : kv.getValue();
final Set<String> newParents =
c.sideInput(childParentPairMap).get(kv.getKey()) == null
? Collections.emptySet()
: c.sideInput(childParentPairMap).get(kv.getKey());
System.out.println("Map Entry for Key: " + newParents.toString());
final Set<String> oldParents =
MoreObjects.firstNonNull(state.read(), Collections.emptySet());
// Forward new Pairs if they exist
if (!newParents.isEmpty()) {
for (final String parent : newParents) {
// Forward flat mapped record
final SensorParentKey key = new SensorParentKey(kv.getKey(), parent);
out.output(KV.of(key, record));
}
}
if (!newParents.equals(oldParents)) {
for (final String oldParent : oldParents) {
if (!newParents.contains(oldParent)) {
// Forward Delete
final SensorParentKey key = new SensorParentKey(kv.getKey(), oldParent);
out.output(KV.of(key, null));
}
}
state.write(newParents);
}
}
}).withSideInputs(childParentPairMap))
.apply("Debugging output before filtering latest", ParDo.of(
new DoFn<KV<SensorParentKey, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>>() {
@ProcessElement
public void processElement(
@Element final KV<SensorParentKey, ActivePowerRecord> kv,
final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out,
final ProcessContext c) {
System.out.println("Before filter latest Sensor: " + kv.getKey().getSensor()
+ " Parent: " + kv.getKey().getParent() + " ValueKey : "
+ kv.getValue().getIdentifier() + " ValueInW: "
+ kv.getValue().getValueInW()
+ " Timestamp: " + kv.getValue().getTimestamp());
out.output(kv);
}
}))
.apply("Filter only latest changes", Latest.perKey())
.apply("Debugging output after filtering latest", ParDo.of(
new DoFn<KV<SensorParentKey, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>>() {
@ProcessElement
public void processElement(
@Element final KV<SensorParentKey, ActivePowerRecord> kv,
final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out,
final ProcessContext c) {
System.out.println("After filter latest Sensor: " + kv.getKey().getSensor()
+ " Parent: " + kv.getKey().getParent() + " ValueKey : "
+ kv.getValue().getIdentifier() + " ValueInW: "
+ kv.getValue().getValueInW()
+ " Timestamp: " + kv.getValue().getTimestamp());
out.output(kv);
}
}));
final PCollection<KV<String, AggregatedActivePowerRecord>> aggregations = flatMappedValues
.apply("Set key to group", MapElements.via(
new SimpleFunction<KV<SensorParentKey, ActivePowerRecord>, KV<String, ActivePowerRecord>>() {
@Override
public KV<String, ActivePowerRecord> apply(
final KV<SensorParentKey, ActivePowerRecord> kv) {
System.out.println("key set to group" + kv.getKey() + "Timestamp: "
+ kv.getValue().getTimestamp());
return KV.of(kv.getKey().getParent(), kv.getValue());
}
}))
.apply("Aggregate per group", Combine.perKey(new RecordAggregation()))
.apply("Set the Identifier in AggregatedActivePowerRecord", MapElements.via(
new SimpleFunction<KV<String, AggregatedActivePowerRecord>, KV<String, AggregatedActivePowerRecord>>() {
@Override
public KV<String, AggregatedActivePowerRecord> apply(
final KV<String, AggregatedActivePowerRecord> kv) {
final AggregatedActivePowerRecord record = new AggregatedActivePowerRecord(
kv.getKey(), kv.getValue().getTimestamp(), kv.getValue().getCount(),
kv.getValue().getSumInW(), kv.getValue().getAverageInW());
System.out.println("set identifier to: " + record.getIdentifier() + "Timestamp: "
+ record.getTimestamp());
return KV.of(kv.getKey(), record);
}
}));
aggregations.apply("Print Stats", MapElements.via(
new SimpleFunction<KV<String, AggregatedActivePowerRecord>, KV<String, AggregatedActivePowerRecord>>() {
@Override
public KV<String, AggregatedActivePowerRecord> apply(
final KV<String, AggregatedActivePowerRecord> kv) {
System.out.println("Output: Key: "
+ kv.getKey()
+ " Identifier: " + kv.getValue().getIdentifier()
+ " Timestamp: " + kv.getValue().getTimestamp()
+ " Avg: " + kv.getValue().getAverageInW()
+ " Count: " + kv.getValue().getCount()
+ " Sum: " + kv.getValue().getSumInW());
//
return kv;
}
}))
.apply("Write to aggregation results", KafkaIO.<String, AggregatedActivePowerRecord>write()
.withBootstrapServers(bootstrapServer)
.withTopic(outputTopic)
.withKeySerializer(StringSerializer.class)
.withValueSerializer(AggregatedActivePowerRecordSerializer.class));
pipeline.run().waitUntilFinish();
}
}
package application;
import java.util.Set;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
/**
* Forward changes or tombstone values for deleted records
*/
public class UpdateChildParentPairs extends DoFn<KV<String, Set<String>>, KV<String, Set<String>>> {
private static final long serialVersionUID = 1L;
@StateId("parents")
private final StateSpec<ValueState<Set<String>>> parents =
StateSpecs.value();
@ProcessElement
public void processElement(@Element final KV<String, Set<String>> kv,
final OutputReceiver<KV<String, Set<String>>> out,
@StateId("parents") final ValueState<Set<String>> state) {
if (kv.getValue() == null || !kv.getValue().equals(state.read())) {
out.output(kv);
state.write(kv.getValue());
}
}
}
package serialization;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.List;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import titan.ccp.model.records.AggregatedActivePowerRecord;
/**
* Wrapper Class that encapsulates a AggregatedActivePowerRecord Serde in a
* org.apache.beam.sdk.coders.Coder.
*/
@SuppressWarnings("serial")
public class AggregatedActivePowerRecordCoder extends Coder<AggregatedActivePowerRecord>
implements Serializable {
private transient AvroCoder avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class);
@Override
public void encode(final AggregatedActivePowerRecord value, final OutputStream outStream)
throws CoderException, IOException {
if (this.avroEnCoder == null) {
this.avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class);
}
this.avroEnCoder.encode(value, outStream);
}
@Override
public AggregatedActivePowerRecord decode(final InputStream inStream)
throws CoderException, IOException {
if (this.avroEnCoder == null) {
avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class);
}
return (AggregatedActivePowerRecord) this.avroEnCoder.decode(inStream);
}
@Override
public List<? extends Coder<?>> getCoderArguments() {
return null;
}
@Override
public void verifyDeterministic() throws NonDeterministicException {
}
}
package serialization;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.kafka.common.serialization.Deserializer;
import titan.ccp.model.records.AggregatedActivePowerRecord;
/**
* Wrapper Class that encapsulates a IMonitoringRecordSerde.serializer in a Deserializer
*/
public class AggregatedActivePowerRecordDeserializer
implements Deserializer<AggregatedActivePowerRecord> {
private transient AvroCoder avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class);
@Override
public AggregatedActivePowerRecord deserialize(final String topic, final byte[] data) {
AggregatedActivePowerRecord value = null;
try {
value = (AggregatedActivePowerRecord) avroEnCoder.decode(new ByteArrayInputStream(data));
} catch (IOException e) {
e.printStackTrace();
}
return value;
}
}
package serialization;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.kafka.common.serialization.Serializer;
import titan.ccp.model.records.AggregatedActivePowerRecord;
/**
* Wrapper Class that encapsulates a IMonitoringRecordSerde.serializer in a Serializer
*/
public class AggregatedActivePowerRecordSerializer
implements Serializer<AggregatedActivePowerRecord> {
private transient AvroCoder avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class);
// Gab
// Fehler:/home/jan/jan-bensien-bsc/uc2-application-samza/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java:9:
// error: AggregatedActivePowerRecordSerializer is not abstract and does not override abstract
// method close() in Serializer
// public class AggregatedActivePowerRecordSerializer implements Serializer
// <AggregatedActivePowerRecord>{
@Override
public byte[] serialize(final String topic, final AggregatedActivePowerRecord data) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
try {
this.avroEnCoder.encode(data,out);
} catch (IOException e) {
e.printStackTrace();
}
byte[] result = out.toByteArray();
try {
out.close();
} catch (IOException e) {
e.printStackTrace();
}
return result;
}
}
package serialization;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.kafka.common.serialization.Serde;
import titan.ccp.configuration.events.Event;
import titan.ccp.configuration.events.EventSerde;
/**
* Wrapper Class that encapsulates a Event Serde in a org.apache.beam.sdk.coders.Coder.
*/
public class EventCoder extends Coder<Event> implements Serializable {
private static final long serialVersionUID = 8403045343970659100L;
private transient Serde<Event> innerSerde = EventSerde.serde();
@Override
public void encode(final Event value, final OutputStream outStream)
throws CoderException, IOException {
if (this.innerSerde == null) {
this.innerSerde = EventSerde.serde();
}
final byte[] bytes = this.innerSerde.serializer().serialize("ser", value);
final byte[] sizeinBytes = ByteBuffer.allocate(4).putInt(bytes.length).array();
outStream.write(sizeinBytes);
outStream.write(bytes);
}
@Override
public Event decode(final InputStream inStream) throws CoderException, IOException {
if (this.innerSerde == null) {
this.innerSerde = EventSerde.serde();
}
final byte[] sizeinBytes = new byte[4];
inStream.read(sizeinBytes);
final int size = ByteBuffer.wrap(sizeinBytes).getInt();
final byte[] bytes = new byte[size];
inStream.read(bytes);
return this.innerSerde.deserializer().deserialize("deser", bytes);
}
@Override
public List<? extends Coder<?>> getCoderArguments() {
return null;
}
@Override
public void verifyDeterministic() throws NonDeterministicException {
}
}
package serialization;
import java.util.Map;
import org.apache.kafka.common.serialization.ByteBufferDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import titan.ccp.configuration.events.Event;
public class EventDeserializer implements Deserializer<Event> {
private final ByteBufferDeserializer byteBufferDeserializer = new ByteBufferDeserializer();
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
this.byteBufferDeserializer.configure(configs, isKey);
}
@Override
public Event deserialize(final String topic, final byte[] data) {
final int ordinal = this.byteBufferDeserializer.deserialize(topic, data).getInt();
for (final Event event : Event.values()) {
if (ordinal == event.ordinal()) {
return event;
}
}
throw new IllegalArgumentException("Deserialized data is not a valid event.");
}
@Override
public void close() {
this.byteBufferDeserializer.close();
}
}
package serialization;
import application.SensorParentKey;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.kafka.common.serialization.Serde;
/**
* Wrapper Class that encapsulates a SensorParentKey Serde in a org.apache.beam.sdk.coders.Coder.
*/
public class SensorParentKeyCoder extends Coder<SensorParentKey> implements Serializable {
private static final long serialVersionUID = -3480141901035692398L;
private transient Serde<application.SensorParentKey> innerSerde = SensorParentKeySerde.serde();
@Override
public void encode(final SensorParentKey value, final OutputStream outStream)
throws CoderException, IOException {
if (this.innerSerde == null) {
this.innerSerde = SensorParentKeySerde.serde();
}
final byte[] bytes = this.innerSerde.serializer().serialize("ser", value);
final byte[] sizeinBytes = ByteBuffer.allocate(4).putInt(bytes.length).array();
outStream.write(sizeinBytes);
outStream.write(bytes);
}
@Override
public SensorParentKey decode(final InputStream inStream) throws CoderException, IOException {
if (this.innerSerde == null) {
this.innerSerde = SensorParentKeySerde.serde();
}
final byte[] sizeinBytes = new byte[4];
inStream.read(sizeinBytes);
final int size = ByteBuffer.wrap(sizeinBytes).getInt();
final byte[] bytes = new byte[size];
inStream.read(bytes);
return this.innerSerde.deserializer().deserialize("deser", bytes);
}
@Override
public List<? extends Coder<?>> getCoderArguments() {
return null;
}
@Override
public void verifyDeterministic() throws NonDeterministicException {
}
}
package serialization;
import application.SensorParentKey;
import org.apache.kafka.common.serialization.Serde;
import titan.ccp.common.kafka.simpleserdes.BufferSerde;
import titan.ccp.common.kafka.simpleserdes.ReadBuffer;
import titan.ccp.common.kafka.simpleserdes.SimpleSerdes;
import titan.ccp.common.kafka.simpleserdes.WriteBuffer;
/**
* {@link Serde} factory for {@link SensorParentKey}.
*/
public final class SensorParentKeySerde implements BufferSerde<SensorParentKey> {
private SensorParentKeySerde() {}
@Override
public void serialize(final WriteBuffer buffer, final SensorParentKey key) {
buffer.putString(key.getSensor());
buffer.putString(key.getParent());
}
@Override
public SensorParentKey deserialize(final ReadBuffer buffer) {
final String sensor = buffer.getString();
final String parent = buffer.getString();
return new SensorParentKey(sensor, parent);
}
public static Serde<SensorParentKey> serde() {
return SimpleSerdes.create(new SensorParentKeySerde());
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment