diff --git a/theodolite-benchmarks/uc4-beam-samza/Dockerfile b/theodolite-benchmarks/uc4-beam-samza/Dockerfile
new file mode 100644
index 0000000000000000000000000000000000000000..4505849ce764648f19c080a1f7534abc5486279d
--- /dev/null
+++ b/theodolite-benchmarks/uc4-beam-samza/Dockerfile
@@ -0,0 +1,7 @@
+FROM openjdk:8-slim
+
+ADD build/distributions/uc2-application-samza.tar /
+ADD ../config/standalone.properties /
+
+
+CMD /uc2-application-samza/bin/uc2-application-samza --configFactory=org.apache.samza.config.factories.PropertiesConfigFactory --configFilePath=samza-standalone.properties --samzaExecutionEnvironment=STANDALONE --maxSourceParallelism=$MAX_SOURCE_PARALLELISM --enableMetrics=false
diff --git a/theodolite-benchmarks/uc4-beam-samza/build.gradle b/theodolite-benchmarks/uc4-beam-samza/build.gradle
new file mode 100644
index 0000000000000000000000000000000000000000..a52d84afcd77a392acd47d2b42ba55711b0488cb
--- /dev/null
+++ b/theodolite-benchmarks/uc4-beam-samza/build.gradle
@@ -0,0 +1,30 @@
+plugins {
+  id 'theodolite.kstreams'
+}
+
+allprojects {
+  repositories {
+    maven {
+      url 'https://packages.confluent.io/maven/'
+    }
+    mavenCentral()
+  }
+}
+
+
+dependencies {
+  compile group: 'org.apache.beam', name: 'beam-sdks-java-core', version: '2.22.0'
+  compile group: 'org.apache.beam', name: 'beam-runners-samza', version: '2.22.0'
+
+  compile('org.apache.beam:beam-sdks-java-io-kafka:2.22.0'){
+    exclude group: 'org.apache.kafka', module: 'kafka-clients'
+  }
+  compile group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.30'
+
+  runtime 'org.apache.beam:beam-runners-direct-java:2.22.0'
+  runtime 'org.slf4j:slf4j-api:1.7.32'
+  runtime 'org.slf4j:slf4j-jdk14:1.7.32'
+}
+
+
+mainClassName = "application.Uc4ApplicationBeam"
diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/AggregatedActivePowerRecordEventTimePolicy.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/AggregatedActivePowerRecordEventTimePolicy.java
new file mode 100644
index 0000000000000000000000000000000000000000..394af1e9bba5cd94883b7bd626242ba791109d47
--- /dev/null
+++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/AggregatedActivePowerRecordEventTimePolicy.java
@@ -0,0 +1,35 @@
+package application;
+
+
+import java.util.Optional;
+import org.apache.beam.sdk.io.kafka.KafkaRecord;
+import org.apache.beam.sdk.io.kafka.TimestampPolicy;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.joda.time.Instant;
+import titan.ccp.model.records.AggregatedActivePowerRecord;
+
+/**
+ * TimeStampPolicy to use event time based on the timestamp of the record value.
+ */
+public class AggregatedActivePowerRecordEventTimePolicy
+    extends TimestampPolicy<String, AggregatedActivePowerRecord> {
+  protected Instant currentWatermark;
+
+  public AggregatedActivePowerRecordEventTimePolicy(final Optional<Instant> previousWatermark) {
+    this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
+  }
+
+
+  @Override
+  public Instant getTimestampForRecord(final PartitionContext ctx,
+      final KafkaRecord<String, AggregatedActivePowerRecord> record) {
+    this.currentWatermark = new Instant(record.getKV().getValue().getTimestamp());
+    return this.currentWatermark;
+  }
+
+  @Override
+  public Instant getWatermark(final PartitionContext ctx) {
+    return this.currentWatermark;
+  }
+
+}
diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/EventTimePolicy.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/EventTimePolicy.java
new file mode 100644
index 0000000000000000000000000000000000000000..ade4f34b7da9aad2d93e47110b22f6a9770a07f9
--- /dev/null
+++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/EventTimePolicy.java
@@ -0,0 +1,34 @@
+package application;
+
+
+import java.util.Optional;
+import org.apache.beam.sdk.io.kafka.KafkaRecord;
+import org.apache.beam.sdk.io.kafka.TimestampPolicy;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.joda.time.Instant;
+import titan.ccp.model.records.ActivePowerRecord;
+
+/**
+ * TimeStampPolicy to use event time based on the timestamp of the record value.
+ */
+public class EventTimePolicy extends TimestampPolicy<String, ActivePowerRecord> {
+  protected Instant currentWatermark;
+
+  public EventTimePolicy(final Optional<Instant> previousWatermark) {
+    this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
+  }
+
+
+  @Override
+  public Instant getTimestampForRecord(final PartitionContext ctx,
+      final KafkaRecord<String, ActivePowerRecord> record) {
+    this.currentWatermark = new Instant(record.getKV().getValue().getTimestamp());
+    return this.currentWatermark;
+  }
+
+  @Override
+  public Instant getWatermark(final PartitionContext ctx) {
+    return this.currentWatermark;
+  }
+
+}
diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/GenerateParentsFn.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/GenerateParentsFn.java
new file mode 100644
index 0000000000000000000000000000000000000000..b2d6b8e3d743b3e8dacc72fd10ee5f3d194d24bf
--- /dev/null
+++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/GenerateParentsFn.java
@@ -0,0 +1,55 @@
+package application;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.KV;
+// import theodolite.uc2.streamprocessing.KeyValue;
+// import theodolite.uc2.streamprocessing.KeyValueIterator;
+import titan.ccp.configuration.events.Event;
+import titan.ccp.model.sensorregistry.AggregatedSensor;
+import titan.ccp.model.sensorregistry.Sensor;
+import titan.ccp.model.sensorregistry.SensorRegistry;
+
+/**
+ * DoFn class to generate a child-parent pair for every sensor in the hierarchie.
+ */
+public class GenerateParentsFn extends DoFn<KV<Event, String>, KV<String, Set<String>>> {
+
+
+  private static final long serialVersionUID = 958270648688932091L;
+
+  @ProcessElement
+  public void processElement(@Element final KV<Event, String> kv,
+      final OutputReceiver<KV<String, Set<String>>> out) {
+    final Map<String, Set<String>> childParentsPairs =
+        this.constructChildParentsPairs(SensorRegistry.fromJson(kv.getValue()));
+    final Iterator<Map.Entry<String, Set<String>>> it = childParentsPairs.entrySet().iterator();
+    while (it.hasNext()) {
+      final Map.Entry<String, Set<String>> pair = it.next();
+      out.output(KV.of(pair.getKey(), pair.getValue()));
+    }
+
+  }
+
+  private Map<String, Set<String>> constructChildParentsPairs(final SensorRegistry registry) {
+    return this.streamAllChildren(registry.getTopLevelSensor())
+        .collect(Collectors.<Sensor, String, Set<String>>toMap(
+            child -> child.getIdentifier(),
+            child -> child.getParent()
+                .map(p -> Stream.of(p.getIdentifier()).collect(Collectors.toSet()))
+                .orElseGet(() -> Collections.<String>emptySet())));
+  }
+
+  private Stream<Sensor> streamAllChildren(final AggregatedSensor sensor) {
+    return sensor.getChildren().stream()
+        .flatMap(s -> Stream.concat(
+            Stream.of(s),
+            s instanceof AggregatedSensor ? this.streamAllChildren((AggregatedSensor) s)
+                : Stream.empty()));
+  }
+}
diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/RecordAggregation.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/RecordAggregation.java
new file mode 100644
index 0000000000000000000000000000000000000000..d35ef32ea36298e957bde635c601464db502515f
--- /dev/null
+++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/RecordAggregation.java
@@ -0,0 +1,64 @@
+package application;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import titan.ccp.model.records.ActivePowerRecord;
+import titan.ccp.model.records.AggregatedActivePowerRecord;
+
+
+
+/**
+ * CombineFn to aggregate ActivePowerRecords into AggregatedActivePowerRecords
+ */
+public class RecordAggregation
+    extends CombineFn<ActivePowerRecord, RecordAggregation.Accum, AggregatedActivePowerRecord> {
+
+
+
+  private static final long serialVersionUID = 4362213539553233529L;
+
+  @DefaultCoder(AvroCoder.class)
+  public static class Accum implements Serializable {
+    private static final long serialVersionUID = 3701311203919534376L;
+    long count = 0;
+    Double sum = 0.0;
+    long timestamp = 0;
+  }
+
+  @Override
+  public Accum createAccumulator() {
+    return new Accum();
+  }
+
+  @Override
+  public Accum addInput(final Accum mutableAccumulator, final ActivePowerRecord input) {
+    mutableAccumulator.count += 1;
+    mutableAccumulator.sum += input.getValueInW();
+    mutableAccumulator.timestamp = input.getTimestamp();
+    return mutableAccumulator;
+  }
+
+  @Override
+  public Accum mergeAccumulators(final Iterable<Accum> accumulators) {
+    final Accum merged = this.createAccumulator();
+    for (final Accum accumulator : accumulators) {
+      merged.count += accumulator.count;
+      merged.sum += accumulator.sum;
+      merged.timestamp = accumulator.timestamp;
+    }
+
+    return merged;
+  }
+
+  @Override
+  public AggregatedActivePowerRecord extractOutput(final Accum accumulator) {
+    final double average = accumulator.count == 0 ? 0.0 : accumulator.sum / accumulator.count;
+    return new AggregatedActivePowerRecord("", accumulator.timestamp, accumulator.count,
+        accumulator.sum, average);
+  }
+
+
+
+}
diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/SensorParentKey.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/SensorParentKey.java
new file mode 100644
index 0000000000000000000000000000000000000000..546fc04c2de089a28d8f0fba86a7fbcd5c1cc0a8
--- /dev/null
+++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/SensorParentKey.java
@@ -0,0 +1,30 @@
+package application;
+
+/**
+ * A key consisting of the identifier of a sensor and an identifier of parent sensor.
+ */
+public class SensorParentKey {
+
+  private final String sensorIdentifier;
+
+  private final String parentIdentifier;
+
+  public SensorParentKey(final String sensorIdentifier, final String parentIdentifier) {
+    this.sensorIdentifier = sensorIdentifier;
+    this.parentIdentifier = parentIdentifier;
+  }
+
+  public String getSensor() {
+    return this.sensorIdentifier;
+  }
+
+  public String getParent() {
+    return this.parentIdentifier;
+  }
+
+  @Override
+  public String toString() {
+    return "{" + this.sensorIdentifier + ", " + this.parentIdentifier + "}";
+  }
+
+}
diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4ApplicationBeam.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4ApplicationBeam.java
new file mode 100644
index 0000000000000000000000000000000000000000..7604d5f87f709bc710120a82a616262ecdd614d5
--- /dev/null
+++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4ApplicationBeam.java
@@ -0,0 +1,365 @@
+package application;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.math.StatsAccumulator;
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.runners.samza.SamzaRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.SetCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Filter;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.Latest;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.joda.time.Duration;
+import serialization.AggregatedActivePowerRecordCoder;
+import serialization.AggregatedActivePowerRecordDeserializer;
+import serialization.AggregatedActivePowerRecordSerializer;
+import serialization.EventCoder;
+import serialization.EventDeserializer;
+import serialization.SensorParentKeyCoder;
+import titan.ccp.configuration.events.Event;
+import titan.ccp.model.records.ActivePowerRecord;
+import titan.ccp.model.records.AggregatedActivePowerRecord;;
+
+public class Uc4ApplicationBeam {
+
+  /**
+   * Implementation of the use case Hierarchical Aggregation using Apache Beam with the Samza
+   * Runner. To run locally in standalone start Kafka, Zookeeper, the schema-registry and the
+   * workload generator using the delayed_startup.sh script. Add
+   * --configFactory=org.apache.samza.config.factories.PropertiesConfigFactory
+   * --configFilePath=${workspace_loc:uc4-application-samza}/config/standalone_local.properties
+   * --samzaExecutionEnvironment=STANDALONE --maxSourceParallelism=1024 --as program arguments. To
+   * persist logs add ${workspace_loc:/uc4-application-samza/eclipseConsoleLogs.log} as Output File
+   * under Standard Input Output in Common in the Run Configuration Start via Eclipse Run.
+   */
+
+  @SuppressWarnings({"serial", "unchecked", "rawtypes"})
+  public static void main(final String[] args) {
+
+    // Set Configuration for Windows
+    final int windowDuration = Integer.parseInt(
+        System.getenv("KAFKA_WINDOW_DURATION") != null
+            ? System.getenv("KAFKA_WINDOW_DURATION")
+            : "60");
+    final Duration duration = Duration.standardSeconds(windowDuration);
+    final int triggerInterval = Integer.parseInt(
+        System.getenv("TRIGGER_INTERVAL") != null
+            ? System.getenv("TRIGGER_INTERVAL")
+            : "30");
+
+    final Duration triggerDelay = Duration.standardSeconds(triggerInterval);
+
+    final int grace = Integer.parseInt(
+        System.getenv("GRACE_PERIOD") != null
+            ? System.getenv("GRACE_PERIOD")
+            : "270");
+
+    final Duration gracePeriod = Duration.standardSeconds(grace);
+    // Set Configuration for Kafka
+    final String bootstrapServer =
+        System.getenv("KAFKA_BOOTSTRAP_SERVERS") != null ? System.getenv("KAFKA_BOOTSTRAP_SERVERS")
+            : "my-confluent-cp-kafka:9092";
+    final String inputTopic = System.getenv("INPUT") != null ? System.getenv("INPUT") : "input";
+    final String outputTopic = System.getenv("OUTPUT") != null ? System.getenv("OUTPUT") : "output";
+    final String configurationTopic =
+        System.getenv("CONFIGURATION") != null ? System.getenv("CONFIGURATION") : "configuration";
+    final String feedbackTopic =
+        System.getenv("FEEDBACKTOPIC") != null ? System.getenv("FEEDBACKTOPIC")
+            : "aggregation-feedback";
+    final String schemaRegistryURL =
+        System.getenv("SCHEMA_REGISTRY_URL") != null ? System.getenv("SCHEMA_REGISTRY_URL")
+            : "http://my-confluent-cp-schema-registry:8081";
+
+
+    // final String inputTopic = "input";
+    // final String outputTopic = "output";
+    // final String bootstrapServer = "localhost:9092";
+    // final String configurationTopic = "configuration";
+    // final String feedbackTopic = "aggregation-feedback";
+    // final String schemaRegistryURL = "http://localhost:8081";
+    // final Duration duration = Duration.standardSeconds(5);
+    // final Duration gracePeriod = Duration.standardMinutes(5);
+    // final Duration triggerDelay = Duration.standardSeconds(5);
+
+    // Set consumer configuration for the schema registry and commits back to Kafka
+    final HashMap<String, Object> consumerConfig = new HashMap<>();
+    consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+    consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+    consumerConfig.put("schema.registry.url", schemaRegistryURL);
+    consumerConfig.put("specific.avro.reader", "true");
+    consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "uc-application-input");
+
+    final HashMap<String, Object> consumerConfigConfiguration = new HashMap<>();
+    consumerConfigConfiguration.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+    consumerConfigConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+    consumerConfigConfiguration.put(ConsumerConfig.GROUP_ID_CONFIG, "uc-application-configuration");
+
+    // Create run options from args
+    final PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
+    options.setRunner(SamzaRunner.class);
+    options.setJobName("ucapplication");
+
+    final Pipeline pipeline = Pipeline.create(options);
+    final CoderRegistry cr = pipeline.getCoderRegistry();
+
+    // Set Coders for Classes that will be distributed
+    cr.registerCoderForClass(ActivePowerRecord.class,
+        NullableCoder.of(AvroCoder.of(ActivePowerRecord.class)));
+    cr.registerCoderForClass(AggregatedActivePowerRecord.class,
+        new AggregatedActivePowerRecordCoder());
+    cr.registerCoderForClass(Set.class, SetCoder.of(StringUtf8Coder.of()));
+    cr.registerCoderForClass(Event.class, new EventCoder());
+    cr.registerCoderForClass(SensorParentKey.class, new SensorParentKeyCoder());
+    cr.registerCoderForClass(StatsAccumulator.class, AvroCoder.of(StatsAccumulator.class));
+
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka =
+        KafkaIO.<String, ActivePowerRecord>read()
+            .withBootstrapServers(bootstrapServer)
+            .withTopic(inputTopic)
+            .withKeyDeserializer(StringDeserializer.class)
+            .withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
+                NullableCoder.of(AvroCoder.of(ActivePowerRecord.class)))
+            .withConsumerConfigUpdates(consumerConfig)
+            // Set TimeStampPolicy for event time
+            .withTimestampPolicyFactory(
+                (tp, previousWaterMark) -> new EventTimePolicy(previousWaterMark))
+            .withoutMetadata();
+
+    // Apply pipeline transformations
+    // Read from Kafka
+    final PCollection<KV<String, ActivePowerRecord>> values = pipeline.apply(kafka)
+        .apply("Apply Winddows", Window.into(FixedWindows.of(duration)))
+        .apply("Set trigger for input", Window
+            .<KV<String, ActivePowerRecord>>configure()
+            .triggering(Repeatedly.forever(
+                AfterProcessingTime.pastFirstElementInPane()
+                    .plusDelayOf(triggerDelay)))
+            .withAllowedLateness(gracePeriod)
+            .discardingFiredPanes());
+
+    // Read the results of earlier aggregations.
+    final PCollection<KV<String, ActivePowerRecord>> aggregationsInput = pipeline
+        .apply("Read aggregation results", KafkaIO.<String, AggregatedActivePowerRecord>read()
+            .withBootstrapServers(bootstrapServer)
+            .withTopic(feedbackTopic)
+            .withKeyDeserializer(StringDeserializer.class)
+            .withValueDeserializer(AggregatedActivePowerRecordDeserializer.class)
+            .withTimestampPolicyFactory(
+                (tp, previousWaterMark) -> new AggregatedActivePowerRecordEventTimePolicy(
+                    previousWaterMark))
+            .withoutMetadata())
+        .apply("Apply Winddows", Window.into(FixedWindows.of(duration)))
+        // Convert into the correct data format
+        .apply("Convert AggregatedActivePowerRecord to ActivePowerRecord", MapElements.via(
+            new SimpleFunction<KV<String, AggregatedActivePowerRecord>, KV<String, ActivePowerRecord>>() {
+              @Override
+              public KV<String, ActivePowerRecord> apply(
+                  final KV<String, AggregatedActivePowerRecord> kv) {
+                return KV.of(kv.getKey(), new ActivePowerRecord(kv.getValue().getIdentifier(),
+                    kv.getValue().getTimestamp(), kv.getValue().getSumInW()));
+              }
+            }))
+        .apply("Set trigger for feedback", Window
+            .<KV<String, ActivePowerRecord>>configure()
+            .triggering(Repeatedly.forever(
+                AfterProcessingTime.pastFirstElementInPane()
+                    .plusDelayOf(triggerDelay)))
+            .withAllowedLateness(gracePeriod)
+            .discardingFiredPanes());
+    // Prepare flatten
+    final PCollectionList<KV<String, ActivePowerRecord>> collections =
+        PCollectionList.of(values).and(aggregationsInput);
+
+    // Create a single PCollection out of the input and already computed results
+    final PCollection<KV<String, ActivePowerRecord>> inputCollection =
+        collections.apply("Flatten sensor data and aggregation results",
+            Flatten.pCollections());
+
+
+    // Build the configuration stream from a changelog.
+    final PCollection<KV<String, Set<String>>> configurationStream = pipeline
+        .apply("Read sensor groups", KafkaIO.<Event, String>read()
+            .withBootstrapServers(bootstrapServer)
+            .withTopic(configurationTopic)
+            .withKeyDeserializer(EventDeserializer.class)
+            .withValueDeserializer(StringDeserializer.class)
+            .withConsumerConfigUpdates(consumerConfigConfiguration)
+            .withoutMetadata())
+        // Only forward relevant changes in the hierarchie
+        .apply("Filter changed and status events",
+            Filter.by(new SerializableFunction<KV<Event, String>, Boolean>() {
+              @Override
+              public Boolean apply(final KV<Event, String> kv) {
+                return kv.getKey() == Event.SENSOR_REGISTRY_CHANGED
+                    || kv.getKey() == Event.SENSOR_REGISTRY_STATUS;
+              }
+            }))
+        // Build the changelog
+        .apply("Generate Parents for every Sensor", ParDo.of(new GenerateParentsFn()))
+        .apply("Update child and parent pairs", ParDo.of(new UpdateChildParentPairs()))
+        .apply("Set trigger for configuration", Window
+            .<KV<String, Set<String>>>configure()
+            .triggering(AfterWatermark.pastEndOfWindow()
+                .withEarlyFirings(
+                    AfterPane.elementCountAtLeast(1)))
+            .withAllowedLateness(Duration.ZERO)
+            .accumulatingFiredPanes());
+
+    final PCollectionView<Map<String, Set<String>>> childParentPairMap =
+        configurationStream.apply(Latest.perKey())
+            // Reset trigger to avoid synchronized processing time
+            .apply("Reset trigger for configurations", Window
+                .<KV<String, Set<String>>>configure()
+                .triggering(AfterWatermark.pastEndOfWindow()
+                    .withEarlyFirings(
+                        AfterPane.elementCountAtLeast(1)))
+                .withAllowedLateness(Duration.ZERO)
+                .accumulatingFiredPanes())
+            .apply(View.asMap());
+
+    // Build pairs of every sensor reading and parent
+    final PCollection<KV<SensorParentKey, ActivePowerRecord>> flatMappedValues =
+        inputCollection.apply(
+            "Duplicate as flatMap",
+            ParDo.of(
+                new DoFn<KV<String, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>>() {
+                  @StateId("parents")
+                  private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value();
+
+                  // Generate a KV-pair for every child-parent match
+                  @ProcessElement
+                  public void processElement(@Element final KV<String, ActivePowerRecord> kv,
+                      final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out,
+                      @StateId("parents") final ValueState<Set<String>> state,
+                      final ProcessContext c) {
+                    final ActivePowerRecord record = kv.getValue() == null ? null : kv.getValue();
+                    final Set<String> newParents =
+                        c.sideInput(childParentPairMap).get(kv.getKey()) == null
+                            ? Collections.emptySet()
+                            : c.sideInput(childParentPairMap).get(kv.getKey());
+                    final Set<String> oldParents =
+                        MoreObjects.firstNonNull(state.read(), Collections.emptySet());
+                    // Forward new Pairs if they exist
+                    if (!newParents.isEmpty()) {
+                      for (final String parent : newParents) {
+
+                        // Forward flat mapped record
+                        final SensorParentKey key = new SensorParentKey(kv.getKey(), parent);
+                        out.output(KV.of(key, record));
+                      }
+                    }
+                    if (!newParents.equals(oldParents)) {
+                      for (final String oldParent : oldParents) {
+                        if (!newParents.contains(oldParent)) {
+                          // Forward Delete
+                          final SensorParentKey key = new SensorParentKey(kv.getKey(), oldParent);
+                          out.output(KV.of(key, null));
+                        }
+                      }
+                      state.write(newParents);
+                    }
+                  }
+                }).withSideInputs(childParentPairMap))
+
+            .apply("Filter only latest changes", Latest.perKey())
+            .apply("Filter out null values",
+                Filter.by(
+                    new SerializableFunction<KV<SensorParentKey, ActivePowerRecord>, Boolean>() {
+                      @Override
+                      public Boolean apply(final KV<SensorParentKey, ActivePowerRecord> kv) {
+                        return kv.getValue() != null;
+                      }
+                    }));
+    // Aggregate for every sensor group of the current level
+    final PCollection<KV<String, AggregatedActivePowerRecord>> aggregations = flatMappedValues
+        .apply("Set key to group", MapElements.via(
+            new SimpleFunction<KV<SensorParentKey, ActivePowerRecord>, KV<String, ActivePowerRecord>>() {
+              @Override
+              public KV<String, ActivePowerRecord> apply(
+                  final KV<SensorParentKey, ActivePowerRecord> kv) {
+
+                return KV.of(kv.getKey().getParent(), kv.getValue());
+              }
+            }))
+        // Reset trigger to avoid synchronized processing time
+        .apply("Reset trigger for aggregations", Window
+            .<KV<String, ActivePowerRecord>>configure()
+            .triggering(Repeatedly.forever(
+                AfterProcessingTime.pastFirstElementInPane()
+                    .plusDelayOf(triggerDelay)))
+            .withAllowedLateness(gracePeriod)
+            .discardingFiredPanes())
+
+        .apply("Aggregate per group", Combine.perKey(new RecordAggregation()))
+        .apply("Set the Identifier in AggregatedActivePowerRecord", MapElements.via(
+            new SimpleFunction<KV<String, AggregatedActivePowerRecord>, KV<String, AggregatedActivePowerRecord>>() {
+              @Override
+              public KV<String, AggregatedActivePowerRecord> apply(
+                  final KV<String, AggregatedActivePowerRecord> kv) {
+                final AggregatedActivePowerRecord record = new AggregatedActivePowerRecord(
+                    kv.getKey(), kv.getValue().getTimestamp(), kv.getValue().getCount(),
+                    kv.getValue().getSumInW(), kv.getValue().getAverageInW());
+                return KV.of(kv.getKey(), record);
+              }
+            }));
+
+
+
+    aggregations.apply("Write to aggregation results",
+        KafkaIO.<String, AggregatedActivePowerRecord>write()
+            .withBootstrapServers(bootstrapServer)
+            .withTopic(outputTopic)
+            .withKeySerializer(StringSerializer.class)
+            .withValueSerializer(AggregatedActivePowerRecordSerializer.class));
+
+
+    aggregations
+        .apply("Write to feedback topic", KafkaIO.<String, AggregatedActivePowerRecord>write()
+            .withBootstrapServers(bootstrapServer)
+            .withTopic(feedbackTopic)
+            .withKeySerializer(StringSerializer.class)
+            .withValueSerializer(AggregatedActivePowerRecordSerializer.class));
+
+
+    pipeline.run().waitUntilFinish();
+  }
+
+}
diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4ApplicationBeamNoFeedback.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4ApplicationBeamNoFeedback.java
new file mode 100644
index 0000000000000000000000000000000000000000..e528a836c75d111f602379e9567244255af79a76
--- /dev/null
+++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4ApplicationBeamNoFeedback.java
@@ -0,0 +1,282 @@
+package application;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.math.StatsAccumulator;
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.runners.samza.SamzaRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.SetCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Latest;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.joda.time.Duration;
+import serialization.AggregatedActivePowerRecordCoder;
+import serialization.AggregatedActivePowerRecordSerializer;
+import serialization.EventCoder;
+import serialization.EventDeserializer;
+import serialization.SensorParentKeyCoder;
+import titan.ccp.configuration.events.Event;
+import titan.ccp.model.records.ActivePowerRecord;
+import titan.ccp.model.records.AggregatedActivePowerRecord;
+
+public class Uc4ApplicationBeamNoFeedback {
+
+  @SuppressWarnings({"serial", "unchecked", "rawtypes"})
+  public static void main(final String[] args) {
+
+    final String inputTopic = "input";
+    final String outputTopic = "output";
+    final String bootstrapServer = "localhost:9092";
+    final String configurationTopic = "configuration";
+    final String schemaRegistryURL = "http://localhost:8081";
+    final Duration duration = Duration.standardSeconds(15);
+
+    // Set consumer configuration for the schema registry and commits back to Kafka
+    final HashMap<String, Object> consumerConfig = new HashMap<>();
+    consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+    consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+    consumerConfig.put("schema.registry.url", schemaRegistryURL);
+    consumerConfig.put("specific.avro.reader", "true");
+    consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "uc-application-input");
+
+    final HashMap<String, Object> consumerConfigConfiguration = new HashMap<>();
+    consumerConfigConfiguration.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+    consumerConfigConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+    consumerConfigConfiguration.put(ConsumerConfig.GROUP_ID_CONFIG, "uc-application-configuration");
+
+
+
+    // final DirectOptions options =
+    // PipelineOptionsFactory.fromArgs(args).create().as(DirectOptions.class);
+    // options.setRunner(DirectRunner.class);
+    // options.setJobName("ucapplication");
+    // options.setTargetParallelism(1);
+    final PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
+    options.setRunner(SamzaRunner.class);
+    options.setJobName("ucapplication");
+    final Pipeline pipeline = Pipeline.create(options);
+    final CoderRegistry cr = pipeline.getCoderRegistry();
+
+    // Set Coders for Classes that will be distributed
+    cr.registerCoderForClass(ActivePowerRecord.class,
+        NullableCoder.of(AvroCoder.of(ActivePowerRecord.class)));
+    cr.registerCoderForClass(AggregatedActivePowerRecord.class,
+        new AggregatedActivePowerRecordCoder());
+    cr.registerCoderForClass(Set.class, SetCoder.of(StringUtf8Coder.of()));
+    cr.registerCoderForClass(Event.class, new EventCoder());
+    // SensorRegistry
+    cr.registerCoderForClass(SensorParentKey.class, new SensorParentKeyCoder());
+    cr.registerCoderForClass(StatsAccumulator.class, AvroCoder.of(StatsAccumulator.class));
+
+
+    final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka =
+        KafkaIO.<String, ActivePowerRecord>read()
+            .withBootstrapServers(bootstrapServer)
+            .withTopic(inputTopic)
+            .withKeyDeserializer(StringDeserializer.class)
+            .withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
+                NullableCoder.of(AvroCoder.of(ActivePowerRecord.class)))
+            .withConsumerConfigUpdates(consumerConfig)
+            // Set TimeStampPolicy for event time
+            .withTimestampPolicyFactory(
+                (tp, previousWaterMark) -> new EventTimePolicy(previousWaterMark))
+
+            // .commitOffsetsInFinalize()
+            .withoutMetadata();
+    // Apply pipeline transformations
+    // Read from Kafka
+    final PCollection<KV<String, ActivePowerRecord>> values = pipeline.apply(kafka)
+        .apply("Apply Winddows", Window.into(FixedWindows.of(duration)));
+
+
+    // Build the configuration stream from a changelog.
+    final PCollection<KV<String, Set<String>>> configurationStream = pipeline
+        .apply("Read sensor groups", KafkaIO.<Event, String>read()
+            .withBootstrapServers(bootstrapServer)
+            .withTopic(configurationTopic)
+            .withKeyDeserializer(EventDeserializer.class)
+            .withValueDeserializer(StringDeserializer.class)
+            .withConsumerConfigUpdates(consumerConfigConfiguration)
+            // .commitOffsetsInFinalize()
+            .withoutMetadata())
+
+        .apply("Generate Parents for every Sensor", ParDo.of(new GenerateParentsFn()))
+        .apply("Update child and parent pairs", ParDo.of(new UpdateChildParentPairs()))
+        .apply("Set trigger for configurations", Window
+            .<KV<String, Set<String>>>configure()
+            .triggering(Repeatedly.forever(
+                AfterProcessingTime.pastFirstElementInPane()
+                    .plusDelayOf(Duration.standardSeconds(5))))
+            .accumulatingFiredPanes());
+    // This may need to be changed to eliminate duplicates in first iteration
+
+
+    final PCollectionView<Map<String, Set<String>>> childParentPairMap =
+        // configurationStream.apply(Latest.perKey())
+        configurationStream.apply(View.asMap());
+
+
+    final PCollection<KV<SensorParentKey, ActivePowerRecord>> flatMappedValues =
+        values.apply(
+            "Duplicate as flatMap",
+            ParDo.of(
+                new DoFn<KV<String, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>>() {
+                  @StateId("parents")
+                  private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value();
+
+                  // Generate a KV-pair for every child-parent match
+                  @ProcessElement
+                  public void processElement(@Element final KV<String, ActivePowerRecord> kv,
+                      final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out,
+                      @StateId("parents") final ValueState<Set<String>> state,
+                      final ProcessContext c) {
+                    final ActivePowerRecord record = kv.getValue() == null ? null : kv.getValue();
+                    final Set<String> newParents =
+                        c.sideInput(childParentPairMap).get(kv.getKey()) == null
+                            ? Collections.emptySet()
+                            : c.sideInput(childParentPairMap).get(kv.getKey());
+                    System.out.println("Map Entry for Key: " + newParents.toString());
+                    final Set<String> oldParents =
+                        MoreObjects.firstNonNull(state.read(), Collections.emptySet());
+
+                    // Forward new Pairs if they exist
+                    if (!newParents.isEmpty()) {
+                      for (final String parent : newParents) {
+
+                        // Forward flat mapped record
+                        final SensorParentKey key = new SensorParentKey(kv.getKey(), parent);
+                        out.output(KV.of(key, record));
+                      }
+                    }
+                    if (!newParents.equals(oldParents)) {
+                      for (final String oldParent : oldParents) {
+                        if (!newParents.contains(oldParent)) {
+                          // Forward Delete
+                          final SensorParentKey key = new SensorParentKey(kv.getKey(), oldParent);
+                          out.output(KV.of(key, null));
+
+                        }
+                      }
+                      state.write(newParents);
+                    }
+                  }
+                }).withSideInputs(childParentPairMap))
+
+            .apply("Debugging output before filtering latest", ParDo.of(
+                new DoFn<KV<SensorParentKey, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>>() {
+                  @ProcessElement
+                  public void processElement(
+                      @Element final KV<SensorParentKey, ActivePowerRecord> kv,
+                      final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out,
+                      final ProcessContext c) {
+                    System.out.println("Before filter latest Sensor: " + kv.getKey().getSensor()
+                        + " Parent: " + kv.getKey().getParent() + " ValueKey : "
+                        + kv.getValue().getIdentifier() + " ValueInW: "
+                        + kv.getValue().getValueInW()
+                        + " Timestamp: " + kv.getValue().getTimestamp());
+                    out.output(kv);
+                  }
+                }))
+            .apply("Filter only latest changes", Latest.perKey())
+            .apply("Debugging output after filtering latest", ParDo.of(
+                new DoFn<KV<SensorParentKey, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>>() {
+                  @ProcessElement
+                  public void processElement(
+                      @Element final KV<SensorParentKey, ActivePowerRecord> kv,
+                      final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out,
+                      final ProcessContext c) {
+                    System.out.println("After filter latest Sensor: " + kv.getKey().getSensor()
+                        + " Parent: " + kv.getKey().getParent() + " ValueKey : "
+                        + kv.getValue().getIdentifier() + " ValueInW: "
+                        + kv.getValue().getValueInW()
+                        + " Timestamp: " + kv.getValue().getTimestamp());
+                    out.output(kv);
+                  }
+                }));
+
+
+    final PCollection<KV<String, AggregatedActivePowerRecord>> aggregations = flatMappedValues
+
+        .apply("Set key to group", MapElements.via(
+            new SimpleFunction<KV<SensorParentKey, ActivePowerRecord>, KV<String, ActivePowerRecord>>() {
+              @Override
+              public KV<String, ActivePowerRecord> apply(
+                  final KV<SensorParentKey, ActivePowerRecord> kv) {
+                System.out.println("key set to group" + kv.getKey() + "Timestamp: "
+                    + kv.getValue().getTimestamp());
+                return KV.of(kv.getKey().getParent(), kv.getValue());
+              }
+            }))
+        .apply("Aggregate per group", Combine.perKey(new RecordAggregation()))
+        .apply("Set the Identifier in AggregatedActivePowerRecord", MapElements.via(
+            new SimpleFunction<KV<String, AggregatedActivePowerRecord>, KV<String, AggregatedActivePowerRecord>>() {
+              @Override
+              public KV<String, AggregatedActivePowerRecord> apply(
+                  final KV<String, AggregatedActivePowerRecord> kv) {
+                final AggregatedActivePowerRecord record = new AggregatedActivePowerRecord(
+                    kv.getKey(), kv.getValue().getTimestamp(), kv.getValue().getCount(),
+                    kv.getValue().getSumInW(), kv.getValue().getAverageInW());
+                System.out.println("set identifier to: " + record.getIdentifier() + "Timestamp: "
+                    + record.getTimestamp());
+                return KV.of(kv.getKey(), record);
+              }
+            }));
+
+
+    aggregations.apply("Print Stats", MapElements.via(
+        new SimpleFunction<KV<String, AggregatedActivePowerRecord>, KV<String, AggregatedActivePowerRecord>>() {
+
+          @Override
+          public KV<String, AggregatedActivePowerRecord> apply(
+              final KV<String, AggregatedActivePowerRecord> kv) {
+            System.out.println("Output: Key: "
+                + kv.getKey()
+                + " Identifier: " + kv.getValue().getIdentifier()
+                + " Timestamp: " + kv.getValue().getTimestamp()
+                + " Avg: " + kv.getValue().getAverageInW()
+                + " Count: " + kv.getValue().getCount()
+                + " Sum: " + kv.getValue().getSumInW());
+            //
+            return kv;
+          }
+        }))
+        .apply("Write to aggregation results", KafkaIO.<String, AggregatedActivePowerRecord>write()
+            .withBootstrapServers(bootstrapServer)
+            .withTopic(outputTopic)
+            .withKeySerializer(StringSerializer.class)
+            .withValueSerializer(AggregatedActivePowerRecordSerializer.class));
+
+    pipeline.run().waitUntilFinish();
+  }
+
+}
diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/UpdateChildParentPairs.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/UpdateChildParentPairs.java
new file mode 100644
index 0000000000000000000000000000000000000000..d5d4d3cd5f5c09d4261a097c8500b1f44f59ceb9
--- /dev/null
+++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/UpdateChildParentPairs.java
@@ -0,0 +1,33 @@
+package application;
+
+import java.util.Set;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * Forward changes or tombstone values for deleted records
+ */
+public class UpdateChildParentPairs extends DoFn<KV<String, Set<String>>, KV<String, Set<String>>> {
+
+  private static final long serialVersionUID = 1L;
+
+  @StateId("parents")
+  private final StateSpec<ValueState<Set<String>>> parents =
+      StateSpecs.value();
+
+
+
+  @ProcessElement
+  public void processElement(@Element final KV<String, Set<String>> kv,
+      final OutputReceiver<KV<String, Set<String>>> out,
+      @StateId("parents") final ValueState<Set<String>> state) {
+    if (kv.getValue() == null || !kv.getValue().equals(state.read())) {
+      out.output(kv);
+      state.write(kv.getValue());
+    }
+
+  }
+}
diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordCoder.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordCoder.java
new file mode 100644
index 0000000000000000000000000000000000000000..a8e6f0917f321062ef85542db488ad496d087bc9
--- /dev/null
+++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordCoder.java
@@ -0,0 +1,53 @@
+package serialization;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import titan.ccp.model.records.AggregatedActivePowerRecord;
+
+/**
+ * Wrapper Class that encapsulates a AggregatedActivePowerRecord Serde in a
+ * org.apache.beam.sdk.coders.Coder.
+ */
+@SuppressWarnings("serial")
+public class AggregatedActivePowerRecordCoder extends Coder<AggregatedActivePowerRecord>
+    implements Serializable {
+  private transient  AvroCoder avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class);
+
+
+  @Override
+  public void encode(final AggregatedActivePowerRecord value, final OutputStream outStream)
+      throws CoderException, IOException {
+    if (this.avroEnCoder == null) {
+      this.avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class);
+    }
+    this.avroEnCoder.encode(value, outStream);
+
+  }
+
+  @Override
+  public AggregatedActivePowerRecord decode(final InputStream inStream)
+      throws CoderException, IOException {
+    if (this.avroEnCoder == null) {
+      avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class);
+    }
+    return (AggregatedActivePowerRecord) this.avroEnCoder.decode(inStream);
+
+  }
+
+  @Override
+  public List<? extends Coder<?>> getCoderArguments() {
+    return null;
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+
+  }
+
+}
diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordDeserializer.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordDeserializer.java
new file mode 100644
index 0000000000000000000000000000000000000000..da0798cc4554694e404f5fb6344da6911c51045d
--- /dev/null
+++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordDeserializer.java
@@ -0,0 +1,29 @@
+package serialization;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import titan.ccp.model.records.AggregatedActivePowerRecord;
+
+/**
+ * Wrapper Class that encapsulates a IMonitoringRecordSerde.serializer in a Deserializer
+ */
+public class AggregatedActivePowerRecordDeserializer
+    implements Deserializer<AggregatedActivePowerRecord> {
+
+  private transient  AvroCoder avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class);
+
+  @Override
+  public AggregatedActivePowerRecord deserialize(final String topic, final byte[] data) {
+    AggregatedActivePowerRecord value = null;
+    try {
+      value = (AggregatedActivePowerRecord) avroEnCoder.decode(new ByteArrayInputStream(data));
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    return value;
+  }
+
+}
diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java
new file mode 100644
index 0000000000000000000000000000000000000000..6de16875201382e45bf616980b0ebf698e57749e
--- /dev/null
+++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java
@@ -0,0 +1,42 @@
+package serialization;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.kafka.common.serialization.Serializer;
+import titan.ccp.model.records.AggregatedActivePowerRecord;
+
+/**
+ * Wrapper Class that encapsulates a IMonitoringRecordSerde.serializer in a Serializer
+ */
+public class AggregatedActivePowerRecordSerializer
+    implements Serializer<AggregatedActivePowerRecord> {
+
+  private transient  AvroCoder avroEnCoder = AvroCoder.of(AggregatedActivePowerRecord.class);
+
+  // Gab
+  // Fehler:/home/jan/jan-bensien-bsc/uc2-application-samza/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java:9:
+  // error: AggregatedActivePowerRecordSerializer is not abstract and does not override abstract
+  // method close() in Serializer
+  // public class AggregatedActivePowerRecordSerializer implements Serializer
+  // <AggregatedActivePowerRecord>{
+
+  @Override
+  public byte[] serialize(final String topic, final AggregatedActivePowerRecord data) {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    try {
+      this.avroEnCoder.encode(data,out);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    byte[] result = out.toByteArray();
+    try {
+      out.close();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    return result;
+  }
+}
diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/EventCoder.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/EventCoder.java
new file mode 100644
index 0000000000000000000000000000000000000000..d442e30a58d0a5df93c0c837720badba1c5417df
--- /dev/null
+++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/EventCoder.java
@@ -0,0 +1,58 @@
+package serialization;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.kafka.common.serialization.Serde;
+import titan.ccp.configuration.events.Event;
+import titan.ccp.configuration.events.EventSerde;
+
+/**
+ * Wrapper Class that encapsulates a Event Serde in a org.apache.beam.sdk.coders.Coder.
+ */
+public class EventCoder extends Coder<Event> implements Serializable {
+  private static final long serialVersionUID = 8403045343970659100L;
+  private transient Serde<Event> innerSerde = EventSerde.serde();
+
+  @Override
+  public void encode(final Event value, final OutputStream outStream)
+      throws CoderException, IOException {
+    if (this.innerSerde == null) {
+      this.innerSerde = EventSerde.serde();
+    }
+    final byte[] bytes = this.innerSerde.serializer().serialize("ser", value);
+    final byte[] sizeinBytes = ByteBuffer.allocate(4).putInt(bytes.length).array();
+    outStream.write(sizeinBytes);
+    outStream.write(bytes);
+  }
+
+  @Override
+  public Event decode(final InputStream inStream) throws CoderException, IOException {
+    if (this.innerSerde == null) {
+      this.innerSerde = EventSerde.serde();
+    }
+    final byte[] sizeinBytes = new byte[4];
+    inStream.read(sizeinBytes);
+    final int size = ByteBuffer.wrap(sizeinBytes).getInt();
+    final byte[] bytes = new byte[size];
+    inStream.read(bytes);
+    return this.innerSerde.deserializer().deserialize("deser", bytes);
+  }
+
+  @Override
+  public List<? extends Coder<?>> getCoderArguments() {
+    return null;
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+
+  }
+
+
+}
diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/EventDeserializer.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/EventDeserializer.java
new file mode 100644
index 0000000000000000000000000000000000000000..f5c60380a5ab52bc9c871840f59decf41cf374a3
--- /dev/null
+++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/EventDeserializer.java
@@ -0,0 +1,33 @@
+package serialization;
+
+import java.util.Map;
+import org.apache.kafka.common.serialization.ByteBufferDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import titan.ccp.configuration.events.Event;
+
+public class EventDeserializer implements Deserializer<Event> {
+
+  private final ByteBufferDeserializer byteBufferDeserializer = new ByteBufferDeserializer();
+
+  @Override
+  public void configure(final Map<String, ?> configs, final boolean isKey) {
+    this.byteBufferDeserializer.configure(configs, isKey);
+  }
+
+  @Override
+  public Event deserialize(final String topic, final byte[] data) {
+    final int ordinal = this.byteBufferDeserializer.deserialize(topic, data).getInt();
+    for (final Event event : Event.values()) {
+      if (ordinal == event.ordinal()) {
+        return event;
+      }
+    }
+    throw new IllegalArgumentException("Deserialized data is not a valid event.");
+  }
+
+  @Override
+  public void close() {
+    this.byteBufferDeserializer.close();
+  }
+
+}
diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/SensorParentKeyCoder.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/SensorParentKeyCoder.java
new file mode 100644
index 0000000000000000000000000000000000000000..f7285a5895bef774905239d6f35c524429afe88f
--- /dev/null
+++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/SensorParentKeyCoder.java
@@ -0,0 +1,62 @@
+package serialization;
+
+
+import application.SensorParentKey;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.kafka.common.serialization.Serde;
+
+/**
+ * Wrapper Class that encapsulates a SensorParentKey Serde in a org.apache.beam.sdk.coders.Coder.
+ */
+public class SensorParentKeyCoder extends Coder<SensorParentKey> implements Serializable {
+
+  private static final long serialVersionUID = -3480141901035692398L;
+  private transient Serde<application.SensorParentKey> innerSerde = SensorParentKeySerde.serde(); 
+
+  @Override
+  public void encode(final SensorParentKey value, final OutputStream outStream)
+      throws CoderException, IOException {
+    if (this.innerSerde == null) {
+      this.innerSerde = SensorParentKeySerde.serde();
+
+    }
+    final byte[] bytes = this.innerSerde.serializer().serialize("ser", value);
+    final byte[] sizeinBytes = ByteBuffer.allocate(4).putInt(bytes.length).array();
+    outStream.write(sizeinBytes);
+    outStream.write(bytes);
+
+  }
+
+  @Override
+  public SensorParentKey decode(final InputStream inStream) throws CoderException, IOException {
+    if (this.innerSerde == null) {
+      this.innerSerde = SensorParentKeySerde.serde();
+
+    }
+    final byte[] sizeinBytes = new byte[4];
+    inStream.read(sizeinBytes);
+    final int size = ByteBuffer.wrap(sizeinBytes).getInt();
+    final byte[] bytes = new byte[size];
+    inStream.read(bytes);
+    return this.innerSerde.deserializer().deserialize("deser", bytes);
+
+  }
+
+  @Override
+  public List<? extends Coder<?>> getCoderArguments() {
+    return null;
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+
+  }
+
+}
diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/SensorParentKeySerde.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/SensorParentKeySerde.java
new file mode 100644
index 0000000000000000000000000000000000000000..468adb3947439c11c4fd9b289f41b68e606bdb1d
--- /dev/null
+++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/SensorParentKeySerde.java
@@ -0,0 +1,34 @@
+package serialization;
+
+import application.SensorParentKey;
+import org.apache.kafka.common.serialization.Serde;
+import titan.ccp.common.kafka.simpleserdes.BufferSerde;
+import titan.ccp.common.kafka.simpleserdes.ReadBuffer;
+import titan.ccp.common.kafka.simpleserdes.SimpleSerdes;
+import titan.ccp.common.kafka.simpleserdes.WriteBuffer;
+
+/**
+ * {@link Serde} factory for {@link SensorParentKey}.
+ */
+public final class SensorParentKeySerde implements BufferSerde<SensorParentKey> {
+
+  private SensorParentKeySerde() {}
+
+  @Override
+  public void serialize(final WriteBuffer buffer, final SensorParentKey key) {
+    buffer.putString(key.getSensor());
+    buffer.putString(key.getParent());
+  }
+
+  @Override
+  public SensorParentKey deserialize(final ReadBuffer buffer) {
+    final String sensor = buffer.getString();
+    final String parent = buffer.getString();
+    return new SensorParentKey(sensor, parent);
+  }
+
+  public static Serde<SensorParentKey> serde() {
+    return SimpleSerdes.create(new SensorParentKeySerde());
+  }
+
+}