Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • she/theodolite
1 result
Show changes
Showing
with 85 additions and 123 deletions
......@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=15
formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
......
......@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=15
formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
......
......@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=15
formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
......
......@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=15
formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
......
......@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=15
formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
......
......@@ -2,4 +2,6 @@ plugins {
id 'theodolite.beam'
}
dependencies {
implementation ('io.confluent:kafka-streams-avro-serde:5.3.2')
}
......@@ -14,13 +14,18 @@ import titan.ccp.model.records.ActivePowerRecord;
/**
* Duplicates the Kv containing the (Children,Parents) pair as a flat map.
* Duplicates the {@link KV} containing the (children,parents) pairs as flatMap.
*/
public class DuplicateAsFlatMap extends DoFn
<KV<String, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>> {
public class DuplicateAsFlatMap
extends DoFn<KV<String, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>> {
private static final long serialVersionUID = -5132355515723961647L;
@StateId("parents")
private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value();//NOPMD
private static final String STATE_STORE_NAME = "DuplicateParents";
@StateId(STATE_STORE_NAME)
private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value(); // NOPMD
private final PCollectionView<Map<String, Set<String>>> childParentPairMap;
public DuplicateAsFlatMap(final PCollectionView<Map<String, Set<String>>> childParentPairMap) {
......@@ -28,21 +33,21 @@ public class DuplicateAsFlatMap extends DoFn
this.childParentPairMap = childParentPairMap;
}
/**
* Generate a KV-pair for every child-parent match.
* Generate a KV-pair for every child-parent match.
*/
@ProcessElement
public void processElement(@Element final KV<String, ActivePowerRecord> kv,
final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out,
@StateId("parents") final ValueState<Set<String>> state,
final ProcessContext c) {
public void processElement(
@Element final KV<String, ActivePowerRecord> kv,
final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out,
@StateId(STATE_STORE_NAME) final ValueState<Set<String>> state,
final ProcessContext c) {
final ActivePowerRecord record = kv.getValue() == null ? null : kv.getValue();
final Set<String> newParents =
c.sideInput(childParentPairMap).get(kv.getKey()) == null
c.sideInput(this.childParentPairMap).get(kv.getKey()) == null
? Collections.emptySet()
: c.sideInput(childParentPairMap).get(kv.getKey());
: c.sideInput(this.childParentPairMap).get(kv.getKey());
final Set<String> oldParents =
MoreObjects.firstNonNull(state.read(), Collections.emptySet());
// Forward new Pairs if they exist
......
......@@ -66,8 +66,8 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
final Duration gracePeriod =
Duration.standardSeconds(config.getInt(ConfigurationKeys.GRACE_PERIOD_MS));
// Build kafka configuration
final Map<String, Object> consumerConfig = this.buildConsumerConfig();
// Build Kafka configuration
final Map<String, Object> consumerConfig = super.buildConsumerConfig();
final Map<String, Object> configurationConfig = this.configurationConfig(config);
// Set Coders for Classes that will be distributed
......@@ -77,25 +77,34 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
// Read from Kafka
// ActivePowerRecords
final KafkaActivePowerTimestampReader kafkaActivePowerRecordReader =
new KafkaActivePowerTimestampReader(this.bootstrapServer, this.inputTopic, consumerConfig);
new KafkaActivePowerTimestampReader(
this.bootstrapServer,
this.inputTopic,
consumerConfig);
// Configuration Events
final KafkaGenericReader<Event, String> kafkaConfigurationReader =
new KafkaGenericReader<>(
this.bootstrapServer, configurationTopic, configurationConfig,
EventDeserializer.class, StringDeserializer.class);
// Transform into AggregatedActivePowerRecords into ActivePowerRecords
final AggregatedToActive aggregatedToActive = new AggregatedToActive();
this.bootstrapServer,
configurationTopic,
configurationConfig,
EventDeserializer.class,
StringDeserializer.class);
// Write to Kafka
final KafkaWriterTransformation<AggregatedActivePowerRecord> kafkaOutput =
new KafkaWriterTransformation<>(
this.bootstrapServer, outputTopic, AggregatedActivePowerRecordSerializer.class);
this.bootstrapServer,
outputTopic,
AggregatedActivePowerRecordSerializer.class,
super.buildProducerConfig());
final KafkaWriterTransformation<AggregatedActivePowerRecord> kafkaFeedback =
new KafkaWriterTransformation<>(
this.bootstrapServer, feedbackTopic, AggregatedActivePowerRecordSerializer.class);
this.bootstrapServer,
feedbackTopic,
AggregatedActivePowerRecordSerializer.class,
super.buildProducerConfig());
// Apply pipeline transformations
final PCollection<KV<String, ActivePowerRecord>> values = this
......@@ -115,7 +124,10 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
.withBootstrapServers(this.bootstrapServer)
.withTopic(feedbackTopic)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(AggregatedActivePowerRecordDeserializer.class)
.withValueDeserializerAndCoder(
AggregatedActivePowerRecordDeserializer.class,
AvroCoder.of(AggregatedActivePowerRecord.class))
.withConsumerConfigUpdates(consumerConfig)
.withTimestampPolicyFactory(
(tp, previousWaterMark) -> new AggregatedActivePowerRecordEventTimePolicy(
previousWaterMark))
......@@ -123,11 +135,12 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
.apply("Apply Windows", Window.into(FixedWindows.of(duration)))
// Convert into the correct data format
.apply("Convert AggregatedActivePowerRecord to ActivePowerRecord",
MapElements.via(aggregatedToActive))
MapElements.via(new AggregatedToActive()))
.apply("Set trigger for feedback", Window
.<KV<String, ActivePowerRecord>>configure()
.triggering(Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(triggerDelay)))
.withAllowedLateness(gracePeriod)
.discardingFiredPanes());
......@@ -170,17 +183,13 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
.accumulatingFiredPanes())
.apply(View.asMap());
final FilterNullValues filterNullValues = new FilterNullValues();
// Build pairs of every sensor reading and parent
final PCollection<KV<SensorParentKey, ActivePowerRecord>> flatMappedValues =
inputCollection.apply(
"Duplicate as flatMap",
ParDo.of(new DuplicateAsFlatMap(childParentPairMap))
.withSideInputs(childParentPairMap))
ParDo.of(new DuplicateAsFlatMap(childParentPairMap)).withSideInputs(childParentPairMap))
.apply("Filter only latest changes", Latest.perKey())
.apply("Filter out null values",
Filter.by(filterNullValues));
.apply("Filter out null values", Filter.by(new FilterNullValues()));
final SetIdForAggregated setIdForAggregated = new SetIdForAggregated();
final SetKeyToGroup setKeyToGroup = new SetKeyToGroup();
......@@ -204,8 +213,7 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
aggregations.apply("Write to aggregation results", kafkaOutput);
aggregations
.apply("Write to feedback topic", kafkaFeedback);
aggregations.apply("Write to feedback topic", kafkaFeedback);
}
......@@ -217,14 +225,15 @@ public final class Uc4BeamPipeline extends AbstractPipeline {
*/
public Map<String, Object> configurationConfig(final Configuration config) {
final Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
consumerConfig.put(
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG));
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
config
.getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG));
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, config
.getString(ConfigurationKeys.APPLICATION_NAME) + "-configuration");
consumerConfig.put(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
config.getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG));
consumerConfig.put(
ConsumerConfig.GROUP_ID_CONFIG, config
.getString(ConfigurationKeys.APPLICATION_NAME) + "-configuration");
return consumerConfig;
}
......
......@@ -12,11 +12,12 @@ import org.apache.beam.sdk.values.KV;
*/
public class UpdateChildParentPairs extends DoFn<KV<String, Set<String>>, KV<String, Set<String>>> {
private static final String STATE_STORE_NAME = "UpdateParents";
private static final long serialVersionUID = 1L;
@StateId("parents")
private final StateSpec<ValueState<Set<String>>> parents = // NOPMD
StateSpecs.value();
@StateId(STATE_STORE_NAME)
private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value(); // NOPMD
/**
* Match the changes accordingly.
......@@ -24,9 +25,10 @@ public class UpdateChildParentPairs extends DoFn<KV<String, Set<String>>, KV<Str
* @param kv the sensor parents set that contains the changes.
*/
@ProcessElement
public void processElement(@Element final KV<String, Set<String>> kv,
public void processElement(
@Element final KV<String, Set<String>> kv,
final OutputReceiver<KV<String, Set<String>>> out,
@StateId("parents") final ValueState<Set<String>> state) {
@StateId(STATE_STORE_NAME) final ValueState<Set<String>> state) {
if (kv.getValue() == null || !kv.getValue().equals(state.read())) {
out.output(kv);
state.write(kv.getValue());
......
......@@ -11,8 +11,7 @@ import org.apache.beam.sdk.coders.CoderException;
import titan.ccp.model.records.AggregatedActivePowerRecord;
/**
* Wrapper Class that encapsulates a AggregatedActivePowerRecord Serde in a
* org.apache.beam.sdk.coders.Coder.
* {@link Coder} for an {@link AggregatedActivePowerRecord}.
*/
@SuppressWarnings("serial")
public class AggregatedActivePowerRecordCoder extends Coder<AggregatedActivePowerRecord>
......@@ -45,13 +44,13 @@ public class AggregatedActivePowerRecordCoder extends Coder<AggregatedActivePowe
@Override
public List<? extends Coder<?>> getCoderArguments() {
return null;
return List.of();
}
@Override
public void verifyDeterministic() throws NonDeterministicException {
if (!DETERMINISTIC) {
throw new NonDeterministicException(this, "This class should be deterministic!");
throw new NonDeterministicException(this, "This class should be deterministic.");
}
}
}
package serialization;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import org.apache.beam.sdk.coders.AvroCoder;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.model.records.AggregatedActivePowerRecord;
/**
* Wrapper Class that encapsulates a IMonitoringRecordSerde.serializer in a Deserializer
* {@link Deserializer} for an {@link AggregatedActivePowerRecord}.
*/
public class AggregatedActivePowerRecordDeserializer
implements Deserializer<AggregatedActivePowerRecord> {
private static final Logger LOGGER =
LoggerFactory.getLogger(AggregatedActivePowerRecordDeserializer.class);
private final transient AvroCoder<AggregatedActivePowerRecord> avroEnCoder =
AvroCoder.of(AggregatedActivePowerRecord.class);
@Override
public AggregatedActivePowerRecord deserialize(final String topic, final byte[] data) {
AggregatedActivePowerRecord value = null;
try {
value = this.avroEnCoder.decode(new ByteArrayInputStream(data));
} catch (final IOException e) {
LOGGER.error("Could not deserialize AggregatedActivePowerRecord", e);
}
return value;
}
extends SpecificAvroDeserializer<AggregatedActivePowerRecord> {
}
package serialization;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import org.apache.beam.sdk.coders.AvroCoder;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.model.records.AggregatedActivePowerRecord;
/**
* Wrapper Class that encapsulates a IMonitoringRecordSerde.serializer in a Serializer
* {@link Serializer} for an {@link AggregatedActivePowerRecord}.
*/
public class AggregatedActivePowerRecordSerializer
implements Serializer<AggregatedActivePowerRecord> {
private static final Logger LOGGER =
LoggerFactory.getLogger(AggregatedActivePowerRecordSerializer.class);
private final transient AvroCoder<AggregatedActivePowerRecord> avroEnCoder =
AvroCoder.of(AggregatedActivePowerRecord.class);
@Override
public byte[] serialize(final String topic, final AggregatedActivePowerRecord data) {
final ByteArrayOutputStream out = new ByteArrayOutputStream();
try {
this.avroEnCoder.encode(data, out);
} catch (final IOException e) {
LOGGER.error("Could not serialize AggregatedActivePowerRecord", e);
}
final byte[] result = out.toByteArray();
try {
out.close();
} catch (final IOException e) {
LOGGER.error(
"Could not close output stream after serialization of AggregatedActivePowerRecord", e);
}
return result;
}
@Override
public void close() {
Serializer.super.close();
}
extends SpecificAvroSerializer<AggregatedActivePowerRecord> {
}
......@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=15
formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
......
......@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=15
formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
......
......@@ -61,7 +61,7 @@ cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=15
formatter_settings_version=21
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
......
......@@ -32,11 +32,11 @@ class ConfigMapResourceSet : ResourceSet, KubernetesResource {
}
if (::files.isInitialized) {
resources = resources.filter { files.contains(it.key) }
if (resources.size != files.size) {
val filteredResources = resources.filter { files.contains(it.key) }
if (filteredResources.size != files.size) {
throw DeploymentFailedException("Could not find all specified Kubernetes manifests files")
}
resources = filteredResources
}
return try {
......