Skip to content
Snippets Groups Projects
Commit 932ae9e1 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'master' into benchmark-smoke-tests

parents 59bff5c3 3bdc9c14
Branches
Tags
1 merge request!232Add smoke tests for benchmark
Pipeline #6350 failed
Showing
with 65 additions and 109 deletions
......@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=15
formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
......
......@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=15
formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
......
......@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=15
formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
......
......@@ -2,4 +2,6 @@ plugins {
id 'theodolite.beam'
}
dependencies {
implementation ('io.confluent:kafka-streams-avro-serde:5.3.2')
}
......@@ -16,8 +16,8 @@ import titan.ccp.model.records.ActivePowerRecord;
/**
* Duplicates the Kv containing the (Children,Parents) pair as a flat map.
*/
public class DuplicateAsFlatMap extends DoFn
<KV<String, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>> {
public class DuplicateAsFlatMap
extends DoFn<KV<String, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>> {
private static final long serialVersionUID = -5132355515723961647L;
@StateId("parents")
private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value();// NOPMD
......@@ -33,16 +33,17 @@ public class DuplicateAsFlatMap extends DoFn
* Generate a KV-pair for every child-parent match.
*/
@ProcessElement
public void processElement(@Element final KV<String, ActivePowerRecord> kv,
public void processElement(
@Element final KV<String, ActivePowerRecord> kv,
final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out,
@StateId("parents") final ValueState<Set<String>> state,
final ProcessContext c) {
final ActivePowerRecord record = kv.getValue() == null ? null : kv.getValue();
final Set<String> newParents =
c.sideInput(childParentPairMap).get(kv.getKey()) == null
c.sideInput(this.childParentPairMap).get(kv.getKey()) == null
? Collections.emptySet()
: c.sideInput(childParentPairMap).get(kv.getKey());
: c.sideInput(this.childParentPairMap).get(kv.getKey());
final Set<String> oldParents =
MoreObjects.firstNonNull(state.read(), Collections.emptySet());
// Forward new Pairs if they exist
......
......@@ -66,8 +66,8 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
final Duration gracePeriod =
Duration.standardSeconds(config.getInt(ConfigurationKeys.GRACE_PERIOD_MS));
// Build kafka configuration
final Map<String, Object> consumerConfig = this.buildConsumerConfig();
// Build Kafka configuration
final Map<String, Object> consumerConfig = super.buildConsumerConfig();
final Map<String, Object> configurationConfig = this.configurationConfig(config);
// Set Coders for Classes that will be distributed
......@@ -77,25 +77,34 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
// Read from Kafka
// ActivePowerRecords
final KafkaActivePowerTimestampReader kafkaActivePowerRecordReader =
new KafkaActivePowerTimestampReader(this.bootstrapServer, this.inputTopic, consumerConfig);
new KafkaActivePowerTimestampReader(
this.bootstrapServer,
this.inputTopic,
consumerConfig);
// Configuration Events
final KafkaGenericReader<Event, String> kafkaConfigurationReader =
new KafkaGenericReader<>(
this.bootstrapServer, configurationTopic, configurationConfig,
EventDeserializer.class, StringDeserializer.class);
// Transform into AggregatedActivePowerRecords into ActivePowerRecords
final AggregatedToActive aggregatedToActive = new AggregatedToActive();
this.bootstrapServer,
configurationTopic,
configurationConfig,
EventDeserializer.class,
StringDeserializer.class);
// Write to Kafka
final KafkaWriterTransformation<AggregatedActivePowerRecord> kafkaOutput =
new KafkaWriterTransformation<>(
this.bootstrapServer, outputTopic, AggregatedActivePowerRecordSerializer.class);
this.bootstrapServer,
outputTopic,
AggregatedActivePowerRecordSerializer.class,
super.buildProducerConfig());
final KafkaWriterTransformation<AggregatedActivePowerRecord> kafkaFeedback =
new KafkaWriterTransformation<>(
this.bootstrapServer, feedbackTopic, AggregatedActivePowerRecordSerializer.class);
this.bootstrapServer,
feedbackTopic,
AggregatedActivePowerRecordSerializer.class,
super.buildProducerConfig());
// Apply pipeline transformations
final PCollection<KV<String, ActivePowerRecord>> values = this
......@@ -115,7 +124,10 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
.withBootstrapServers(this.bootstrapServer)
.withTopic(feedbackTopic)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(AggregatedActivePowerRecordDeserializer.class)
.withValueDeserializerAndCoder(
AggregatedActivePowerRecordDeserializer.class,
AvroCoder.of(AggregatedActivePowerRecord.class))
.withConsumerConfigUpdates(consumerConfig)
.withTimestampPolicyFactory(
(tp, previousWaterMark) -> new AggregatedActivePowerRecordEventTimePolicy(
previousWaterMark))
......@@ -123,11 +135,12 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
.apply("Apply Windows", Window.into(FixedWindows.of(duration)))
// Convert into the correct data format
.apply("Convert AggregatedActivePowerRecord to ActivePowerRecord",
MapElements.via(aggregatedToActive))
MapElements.via(new AggregatedToActive()))
.apply("Set trigger for feedback", Window
.<KV<String, ActivePowerRecord>>configure()
.triggering(Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(triggerDelay)))
.withAllowedLateness(gracePeriod)
.discardingFiredPanes());
......@@ -170,17 +183,13 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
.accumulatingFiredPanes())
.apply(View.asMap());
final FilterNullValues filterNullValues = new FilterNullValues();
// Build pairs of every sensor reading and parent
final PCollection<KV<SensorParentKey, ActivePowerRecord>> flatMappedValues =
inputCollection.apply(
"Duplicate as flatMap",
ParDo.of(new DuplicateAsFlatMap(childParentPairMap))
.withSideInputs(childParentPairMap))
ParDo.of(new DuplicateAsFlatMap(childParentPairMap)).withSideInputs(childParentPairMap))
.apply("Filter only latest changes", Latest.perKey())
.apply("Filter out null values",
Filter.by(filterNullValues));
.apply("Filter out null values", Filter.by(new FilterNullValues()));
final SetIdForAggregated setIdForAggregated = new SetIdForAggregated();
final SetKeyToGroup setKeyToGroup = new SetKeyToGroup();
......@@ -204,8 +213,7 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
aggregations.apply("Write to aggregation results", kafkaOutput);
aggregations
.apply("Write to feedback topic", kafkaFeedback);
aggregations.apply("Write to feedback topic", kafkaFeedback);
}
......@@ -217,13 +225,14 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
*/
public Map<String, Object> configurationConfig(final Configuration config) {
final Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
consumerConfig.put(
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG));
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
config
.getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG));
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, config
consumerConfig.put(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
config.getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG));
consumerConfig.put(
ConsumerConfig.GROUP_ID_CONFIG, config
.getString(ConfigurationKeys.APPLICATION_NAME) + "-configuration");
return consumerConfig;
}
......
......@@ -11,8 +11,7 @@ import org.apache.beam.sdk.coders.CoderException;
import titan.ccp.model.records.AggregatedActivePowerRecord;
/**
* Wrapper Class that encapsulates a AggregatedActivePowerRecord Serde in a
* org.apache.beam.sdk.coders.Coder.
* {@link Coder} for an {@link AggregatedActivePowerRecord}.
*/
@SuppressWarnings("serial")
public class AggregatedActivePowerRecordCoder extends Coder<AggregatedActivePowerRecord>
......@@ -51,7 +50,7 @@ public class AggregatedActivePowerRecordCoder extends Coder<AggregatedActivePowe
@Override
public void verifyDeterministic() throws NonDeterministicException {
if (!DETERMINISTIC) {
throw new NonDeterministicException(this, "This class should be deterministic!");
throw new NonDeterministicException(this, "This class should be deterministic.");
}
}
}
package serialization;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import org.apache.beam.sdk.coders.AvroCoder;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.model.records.AggregatedActivePowerRecord;
/**
* Wrapper Class that encapsulates a IMonitoringRecordSerde.serializer in a Deserializer
* {@link Deserializer} for an {@link AggregatedActivePowerRecord}.
*/
public class AggregatedActivePowerRecordDeserializer
implements Deserializer<AggregatedActivePowerRecord> {
private static final Logger LOGGER =
LoggerFactory.getLogger(AggregatedActivePowerRecordDeserializer.class);
private final transient AvroCoder<AggregatedActivePowerRecord> avroEnCoder =
AvroCoder.of(AggregatedActivePowerRecord.class);
@Override
public AggregatedActivePowerRecord deserialize(final String topic, final byte[] data) {
AggregatedActivePowerRecord value = null;
try {
value = this.avroEnCoder.decode(new ByteArrayInputStream(data));
} catch (final IOException e) {
LOGGER.error("Could not deserialize AggregatedActivePowerRecord", e);
}
return value;
}
extends SpecificAvroDeserializer<AggregatedActivePowerRecord> {
}
package serialization;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import org.apache.beam.sdk.coders.AvroCoder;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.model.records.AggregatedActivePowerRecord;
/**
* Wrapper Class that encapsulates a IMonitoringRecordSerde.serializer in a Serializer
* {@link Serializer} for an {@link AggregatedActivePowerRecord}.
*/
public class AggregatedActivePowerRecordSerializer
implements Serializer<AggregatedActivePowerRecord> {
private static final Logger LOGGER =
LoggerFactory.getLogger(AggregatedActivePowerRecordSerializer.class);
private final transient AvroCoder<AggregatedActivePowerRecord> avroEnCoder =
AvroCoder.of(AggregatedActivePowerRecord.class);
@Override
public byte[] serialize(final String topic, final AggregatedActivePowerRecord data) {
final ByteArrayOutputStream out = new ByteArrayOutputStream();
try {
this.avroEnCoder.encode(data, out);
} catch (final IOException e) {
LOGGER.error("Could not serialize AggregatedActivePowerRecord", e);
}
final byte[] result = out.toByteArray();
try {
out.close();
} catch (final IOException e) {
LOGGER.error(
"Could not close output stream after serialization of AggregatedActivePowerRecord", e);
}
return result;
}
@Override
public void close() {
Serializer.super.close();
}
extends SpecificAvroSerializer<AggregatedActivePowerRecord> {
}
......@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=15
formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
......
......@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=15
formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
......
......@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=15
formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment