diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/EventTimePolicy.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/EventTimePolicy.java
deleted file mode 100644
index 5828c1c80f2b3c791b6cd47a9d8a6c00059ade10..0000000000000000000000000000000000000000
--- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/EventTimePolicy.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package application;
-
-import java.util.Optional;
-import org.apache.beam.sdk.io.kafka.KafkaRecord;
-import org.apache.beam.sdk.io.kafka.TimestampPolicy;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.joda.time.Instant;
-import titan.ccp.model.records.ActivePowerRecord;
-
-/**
- * TimeStampPolicy to use event time based on the timestamp of the record value.
- */
-public class EventTimePolicy extends TimestampPolicy<String, ActivePowerRecord> {
-  protected Instant currentWatermark;
-
-  public EventTimePolicy(final Optional<Instant> previousWatermark) {
-    super();
-    this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
-  }
-
-
-  @Override
-  public Instant getTimestampForRecord(final PartitionContext ctx,
-      final KafkaRecord<String, ActivePowerRecord> record) {
-    this.currentWatermark = new Instant(record.getKV().getValue().getTimestamp());
-    return this.currentWatermark;
-  }
-
-  @Override
-  public Instant getWatermark(final PartitionContext ctx) {
-    return this.currentWatermark;
-  }
-
-}
diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4ApplicationBeam.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4ApplicationBeam.java
deleted file mode 100644
index 7a9a8ca20c7d01c50e0df866ba900ac6682eae41..0000000000000000000000000000000000000000
--- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4ApplicationBeam.java
+++ /dev/null
@@ -1,353 +0,0 @@
-package application;
-
-import com.google.common.math.StatsAccumulator;
-import io.confluent.kafka.serializers.KafkaAvroDeserializer;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import org.apache.beam.runners.samza.SamzaRunner;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.NullableCoder;
-import org.apache.beam.sdk.coders.SetCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.kafka.KafkaIO;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Filter;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.Latest;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.windowing.AfterPane;
-import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.Repeatedly;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.joda.time.Duration;
-import serialization.AggregatedActivePowerRecordCoder;
-import serialization.AggregatedActivePowerRecordDeserializer;
-import serialization.AggregatedActivePowerRecordSerializer;
-import serialization.EventCoder;
-import serialization.EventDeserializer;
-import serialization.SensorParentKeyCoder;
-import titan.ccp.configuration.events.Event;
-import titan.ccp.model.records.ActivePowerRecord;
-import titan.ccp.model.records.AggregatedActivePowerRecord;
-
-/**
- * Implementation of the use case Hierarchical Aggregation using Apache Beam with the Samza
- * Runner. To run locally in standalone start Kafka, Zookeeper, the schema-registry and the
- * workload generator using the delayed_startup.sh script. Add
- * --configFactory=org.apache.samza.config.factories.PropertiesConfigFactory
- * --configFilePath=${workspace_loc:uc4-application-samza}/config/standalone_local.properties
- * --samzaExecutionEnvironment=STANDALONE --maxSourceParallelism=1024 --as program arguments. To
- * persist logs add ${workspace_loc:/uc4-application-samza/eclipseConsoleLogs.log} as Output File
- * under Standard Input Output in Common in the Run Configuration Start via Eclipse Run.
- */
-final public class Uc4ApplicationBeam {
-  private static final String JOB_NAME = "Uc4Application";
-  private static final String BOOTSTRAP = "KAFKA_BOOTSTRAP_SERVERS";
-  private static final String INPUT = "INPUT";
-  private static final String OUTPUT = "OUTPUT";
-  private static final String CONFIGURATION = "CONFIGURATION";
-  private static final String FEEDBACKTOPIC = "FEEDBACKTOPIC";
-  private static final String SCHEMA_REGISTRY = "SCHEMA_REGISTRY_URL";
-  private static final String YES = "true";
-  private static final String USE_AVRO_READER = YES;
-  private static final String AUTO_COMMIT_CONFIG = YES;
-  private static final String KAFKA_WINDOW_DURATION  = "KAFKA_WINDOW_DURATION";
-  private static final String TRIGGER_INTERVAL  = "TRIGGER_INTERVAL";
-  private static final String GRACE_PERIOD  = "GRACE_PERIOD";
-  private static final String AUTO_OFFSET_RESET_CONFIG  = "earliest";
-
-
-
-
-  /**
-   * Private constructor to avoid instantiation.
-   */
-  private Uc4ApplicationBeam() {
-    throw new UnsupportedOperationException();
-  }
-
-  /**
-   * Start executing this microservice.
-   */
-  @SuppressWarnings({"serial", "unchecked", "rawtypes"})
-  public static void main(final String[] args) {
-
-    // Set Configuration for Windows
-    final int windowDuration = Integer.parseInt(
-        System.getenv(KAFKA_WINDOW_DURATION) == null
-            ? "60" : System.getenv(KAFKA_WINDOW_DURATION));
-    final Duration duration = Duration.standardSeconds(windowDuration);
-    final int triggerInterval = Integer.parseInt(
-        System.getenv(TRIGGER_INTERVAL) == null
-            ? "30" : System.getenv(TRIGGER_INTERVAL));
-
-    final Duration triggerDelay = Duration.standardSeconds(triggerInterval);
-
-    final int grace = Integer.parseInt(
-        System.getenv(GRACE_PERIOD) == null
-            ? "270" : System.getenv(GRACE_PERIOD));
-
-    final Duration gracePeriod = Duration.standardSeconds(grace);
-    // Set Configuration for Kafka
-    final String bootstrapServer =
-        System.getenv(BOOTSTRAP) == null ? "my-confluent-cp-kafka:9092"
-            : System.getenv(BOOTSTRAP);
-    final String inputTopic = System.getenv(INPUT) == null ? "input" : System.getenv(INPUT);
-    final String outputTopic = System.getenv(OUTPUT) == null ? "output" : System.getenv(OUTPUT);
-    final String configurationTopic =
-        System.getenv(CONFIGURATION) == null ? "configuration" : System.getenv(CONFIGURATION);
-    final String feedbackTopic =
-        System.getenv(FEEDBACKTOPIC) == null ? "aggregation-feedback"
-            : System.getenv(FEEDBACKTOPIC);
-    final String schemaRegistryUrl =
-        System.getenv(SCHEMA_REGISTRY) == null ? "http://my-confluent-cp-schema-registry:8081"
-            : System.getenv(SCHEMA_REGISTRY);
-
-
-    // final String inputTopic = "input";
-    // final String outputTopic = "output";
-    // final String bootstrapServer = "localhost:9092";
-    // final String configurationTopic = "configuration";
-    // final String feedbackTopic = "aggregation-feedback";
-    // final String schemaRegistryURL = "http://localhost:8081";
-    // final Duration duration = Duration.standardSeconds(5);
-    // final Duration gracePeriod = Duration.standardMinutes(5);
-    // final Duration triggerDelay = Duration.standardSeconds(5);
-
-    // Set consumer configuration for the schema registry and commits back to Kafka
-    final HashMap<String, Object> consumerConfig = new HashMap<>();
-    consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AUTO_COMMIT_CONFIG);
-    consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG);
-    consumerConfig.put("schema.registry.url", schemaRegistryUrl);
-    consumerConfig.put("specific.avro.reader", USE_AVRO_READER);
-    consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "uc-application-input");
-
-    final HashMap<String, Object> consumerConfigConfiguration = new HashMap<>();
-    consumerConfigConfiguration.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AUTO_COMMIT_CONFIG);
-    consumerConfigConfiguration.put(
-        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG);
-    consumerConfigConfiguration.put(ConsumerConfig.GROUP_ID_CONFIG, "uc-application-configuration");
-
-    // Create run options from args
-    final PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
-    options.setRunner(SamzaRunner.class);
-    options.setJobName(JOB_NAME);
-
-    final Pipeline pipeline = Pipeline.create(options);
-    final CoderRegistry cr = pipeline.getCoderRegistry();
-
-    // Set Coders for Classes that will be distributed
-    cr.registerCoderForClass(ActivePowerRecord.class,
-        NullableCoder.of(AvroCoder.of(ActivePowerRecord.class)));
-    cr.registerCoderForClass(AggregatedActivePowerRecord.class,
-        new AggregatedActivePowerRecordCoder());
-    cr.registerCoderForClass(Set.class, SetCoder.of(StringUtf8Coder.of()));
-    cr.registerCoderForClass(Event.class, new EventCoder());
-    cr.registerCoderForClass(SensorParentKey.class, new SensorParentKeyCoder());
-    cr.registerCoderForClass(StatsAccumulator.class, AvroCoder.of(StatsAccumulator.class));
-
-
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka =
-        KafkaIO.<String, ActivePowerRecord>read()
-            .withBootstrapServers(bootstrapServer)
-            .withTopic(inputTopic)
-            .withKeyDeserializer(StringDeserializer.class)
-            .withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
-                NullableCoder.of(AvroCoder.of(ActivePowerRecord.class)))
-            .withConsumerConfigUpdates(consumerConfig)
-            // Set TimeStampPolicy for event time
-            .withTimestampPolicyFactory(
-                (tp, previousWaterMark) -> new EventTimePolicy(previousWaterMark))
-            .withoutMetadata();
-
-    // Apply pipeline transformations
-    // Read from Kafka
-    final PCollection<KV<String, ActivePowerRecord>> values = pipeline.apply(kafka)
-        .apply("Read Windows", Window.into(FixedWindows.of(duration)))
-        .apply("Set trigger for input", Window
-            .<KV<String, ActivePowerRecord>>configure()
-            .triggering(Repeatedly.forever(
-                AfterProcessingTime.pastFirstElementInPane()
-                    .plusDelayOf(triggerDelay)))
-            .withAllowedLateness(gracePeriod)
-            .discardingFiredPanes());
-
-    // Read the results of earlier aggregations.
-    final PCollection<KV<String, ActivePowerRecord>> aggregationsInput = pipeline
-        .apply("Read aggregation results", KafkaIO.<String, AggregatedActivePowerRecord>read()
-            .withBootstrapServers(bootstrapServer)
-            .withTopic(feedbackTopic)
-            .withKeyDeserializer(StringDeserializer.class)
-            .withValueDeserializer(AggregatedActivePowerRecordDeserializer.class)
-            .withTimestampPolicyFactory(
-                (tp, previousWaterMark) -> new AggregatedActivePowerRecordEventTimePolicy(
-                          previousWaterMark))
-            .withoutMetadata())
-        .apply("Apply Winddows", Window.into(FixedWindows.of(duration)))
-        // Convert into the correct data format
-        .apply("Convert AggregatedActivePowerRecord to ActivePowerRecord",
-            MapElements.via(
-                new SimpleFunction<KV<String, AggregatedActivePowerRecord>,
-                    KV<String, ActivePowerRecord>>() {
-                @Override
-                public KV<String, ActivePowerRecord> apply(
-                    final KV<String, AggregatedActivePowerRecord> kv) {
-                      return KV.of(kv.getKey(), new ActivePowerRecord(kv.getValue().getIdentifier(),
-                            kv.getValue().getTimestamp(), kv.getValue().getSumInW()));
-                }
-                }))
-        .apply("Set trigger for feedback", Window
-            .<KV<String, ActivePowerRecord>>configure()
-            .triggering(Repeatedly.forever(
-                AfterProcessingTime.pastFirstElementInPane()
-                    .plusDelayOf(triggerDelay)))
-            .withAllowedLateness(gracePeriod)
-            .discardingFiredPanes());
-    // Prepare flatten
-    final PCollectionList<KV<String, ActivePowerRecord>> collections =
-        PCollectionList.of(values).and(aggregationsInput);
-
-    // Create a single PCollection out of the input and already computed results
-    final PCollection<KV<String, ActivePowerRecord>> inputCollection =
-        collections.apply("Flatten sensor data and aggregation results",
-            Flatten.pCollections());
-
-
-    // Build the configuration stream from a changelog.
-    final PCollection<KV<String, Set<String>>> configurationStream = pipeline
-        .apply("Read sensor groups", KafkaIO.<Event, String>read()
-            .withBootstrapServers(bootstrapServer)
-            .withTopic(configurationTopic)
-            .withKeyDeserializer(EventDeserializer.class)
-            .withValueDeserializer(StringDeserializer.class)
-            .withConsumerConfigUpdates(consumerConfigConfiguration)
-            .withoutMetadata())
-        // Only forward relevant changes in the hierarchie
-        .apply("Filter changed and status events",
-            Filter.by(new SerializableFunction<KV<Event, String>, Boolean>() {
-              @Override
-              public Boolean apply(final KV<Event, String> kv) {
-                return kv.getKey() == Event.SENSOR_REGISTRY_CHANGED
-                    || kv.getKey() == Event.SENSOR_REGISTRY_STATUS;
-              }
-            }))
-        // Build the changelog
-        .apply("Generate Parents for every Sensor", ParDo.of(new GenerateParentsFn()))
-        .apply("Update child and parent pairs", ParDo.of(new UpdateChildParentPairs()))
-        .apply("Set trigger for configuration", Window
-            .<KV<String, Set<String>>>configure()
-            .triggering(AfterWatermark.pastEndOfWindow()
-                .withEarlyFirings(
-                    AfterPane.elementCountAtLeast(1)))
-            .withAllowedLateness(Duration.ZERO)
-            .accumulatingFiredPanes());
-
-    final PCollectionView<Map<String, Set<String>>> childParentPairMap =
-        configurationStream.apply(Latest.perKey())
-            // Reset trigger to avoid synchronized processing time
-            .apply("Reset trigger for configurations", Window
-                .<KV<String, Set<String>>>configure()
-                .triggering(AfterWatermark.pastEndOfWindow()
-                    .withEarlyFirings(
-                        AfterPane.elementCountAtLeast(1)))
-                .withAllowedLateness(Duration.ZERO)
-                .accumulatingFiredPanes())
-            .apply(View.asMap());
-
-    // Build pairs of every sensor reading and parent
-    final PCollection<KV<SensorParentKey, ActivePowerRecord>> flatMappedValues =
-        inputCollection.apply(
-            "Duplicate as flatMap",
-            ParDo.of(new DuplicateAsFlatMap(childParentPairMap)).withSideInputs(childParentPairMap))
-            .apply("Filter only latest changes", Latest.perKey())
-            .apply("Filter out null values",
-                Filter.by(
-                    new SerializableFunction<KV<SensorParentKey, ActivePowerRecord>, Boolean>() {
-                      @Override
-                      public Boolean apply(final KV<SensorParentKey, ActivePowerRecord> kv) {
-                        return kv.getValue() != null;
-                      }
-                    }));
-
-    // Aggregate for every sensor group of the current level
-    final PCollection<KV<String, AggregatedActivePowerRecord>>
-        aggregations = flatMappedValues
-        .apply("Set key to group", MapElements.via(
-            new SimpleFunction<KV<SensorParentKey,
-                ActivePowerRecord>, KV<String, ActivePowerRecord>>() {
-              @Override
-              public KV<String, ActivePowerRecord> apply(
-                  final KV<SensorParentKey, ActivePowerRecord> kv) {
-
-                return KV.of(kv.getKey().getParent(), kv.getValue());
-              }
-            }))
-        // Reset trigger to avoid synchronized processing time
-        .apply("Reset trigger for aggregations", Window
-            .<KV<String, ActivePowerRecord>>configure()
-            .triggering(Repeatedly.forever(
-                AfterProcessingTime.pastFirstElementInPane()
-                    .plusDelayOf(triggerDelay)))
-            .withAllowedLateness(gracePeriod)
-            .discardingFiredPanes())
-
-        .apply(
-            "Aggregate per group",
-            Combine.perKey(new RecordAggregation()))
-        .apply("Set the Identifier in AggregatedActivePowerRecord",
-            MapElements.via(
-              new SimpleFunction<KV<String, AggregatedActivePowerRecord>,
-                  KV<String, AggregatedActivePowerRecord>>() {
-                @Override
-                public KV<String, AggregatedActivePowerRecord> apply(
-                    final KV<String, AggregatedActivePowerRecord> kv) {
-                  final AggregatedActivePowerRecord record = new AggregatedActivePowerRecord(
-                      kv.getKey(), kv.getValue().getTimestamp(), kv.getValue().getCount(),
-                      kv.getValue().getSumInW(), kv.getValue().getAverageInW());
-                  return KV.of(kv.getKey(), record);
-                }
-              }));
-
-
-
-    aggregations.apply("Write to aggregation results",
-        KafkaIO.<String, AggregatedActivePowerRecord>write()
-            .withBootstrapServers(bootstrapServer)
-            .withTopic(outputTopic)
-            .withKeySerializer(StringSerializer.class)
-            .withValueSerializer(AggregatedActivePowerRecordSerializer.class));
-
-
-    aggregations
-        .apply("Write to feedback topic", KafkaIO.<String, AggregatedActivePowerRecord>write()
-            .withBootstrapServers(bootstrapServer)
-            .withTopic(feedbackTopic)
-            .withKeySerializer(StringSerializer.class)
-            .withValueSerializer(AggregatedActivePowerRecordSerializer.class));
-
-
-    pipeline.run().waitUntilFinish();
-  }
-
-}
diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4ApplicationBeamNoFeedback.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4ApplicationBeamNoFeedback.java
index 4e2efbd2ec26b61b5e48c5b3aa9dd67d978cf35c..0c7ff4a5d5ff922caf2896057d9d013610eacbd0 100644
--- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4ApplicationBeamNoFeedback.java
+++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4ApplicationBeamNoFeedback.java
@@ -45,6 +45,7 @@ import serialization.AggregatedActivePowerRecordSerializer;
 import serialization.EventCoder;
 import serialization.EventDeserializer;
 import serialization.SensorParentKeyCoder;
+import theodolite.commons.beam.kafka.EventTimePolicy;
 import titan.ccp.configuration.events.Event;
 import titan.ccp.model.records.ActivePowerRecord;
 import titan.ccp.model.records.AggregatedActivePowerRecord;
@@ -67,9 +68,6 @@ public final class Uc4ApplicationBeamNoFeedback {
   private static final String VALUE_IN_W = "ValueInW: ";
   private static final String TIMESTAMP = "Timestamp: ";
 
-
-
-
   /**
    * Private constructor to avoid instantiation.
    */
diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4BeamSamza.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4BeamSamza.java
new file mode 100644
index 0000000000000000000000000000000000000000..41d5bacd85135e58740423f6cb19d0f0c136b493
--- /dev/null
+++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4BeamSamza.java
@@ -0,0 +1,42 @@
+package application;
+
+
+import org.apache.beam.runners.samza.SamzaRunner;
+import org.apache.beam.sdk.Pipeline;
+import theodolite.commons.beam.AbstractBeamService;
+
+/**
+ * Implementation of the use case Hierarchical Aggregation using Apache Beam with the Samza
+ * Runner. To run locally in standalone start Kafka, Zookeeper, the schema-registry and the
+ * workload generator using the delayed_startup.sh script. Add
+ * --configFactory=org.apache.samza.config.factories.PropertiesConfigFactory
+ * --configFilePath=${workspace_loc:uc4-application-samza}/config/standalone_local.properties
+ * --samzaExecutionEnvironment=STANDALONE --maxSourceParallelism=1024 --as program arguments. To
+ * persist logs add ${workspace_loc:/uc4-application-samza/eclipseConsoleLogs.log} as Output File
+ * under Standard Input Output in Common in the Run Configuration Start via Eclipse Run.
+ */
+public final class Uc4BeamSamza extends AbstractBeamService {
+
+
+  /**
+   * Private constructor setting specific options for this use case.
+   */
+  private Uc4BeamSamza(final String[] args) { //NOPMD
+    super(args);
+    this.options.setRunner(SamzaRunner.class);
+  }
+
+  /**
+   * Start running this microservice.
+   */
+  @SuppressWarnings({"serial", "unchecked", "rawtypes"})
+  public static void main(final String[] args) {
+
+    final Uc4BeamSamza uc4BeamSamza = new Uc4BeamSamza(args);
+
+    final Pipeline pipeline = new Uc4BeamPipeline(uc4BeamSamza.options, uc4BeamSamza.getConfig());
+
+    pipeline.run().waitUntilFinish();
+  }
+
+}
diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc4-beam-samza/src/main/resources/META-INF/application.properties
new file mode 100644
index 0000000000000000000000000000000000000000..93220d251aebe26197e3c553b21755b8fa36f92f
--- /dev/null
+++ b/theodolite-benchmarks/uc4-beam-samza/src/main/resources/META-INF/application.properties
@@ -0,0 +1,25 @@
+application.name=theodolite-uc1-application
+application.version=0.0.1
+
+kafka.bootstrap.servers=localhost:9092
+kafka.input.topic=input
+kafka.output.topic=output
+kafka.configuration.topic=configuration
+kafka.feedback.topic=aggregation-feedback
+kafka.window.duration.minutes=1
+
+schema.registry.url=http://localhost:8081
+
+aggregation.duration.days=30
+aggregation.advance.days=1
+
+trigger.interval=15
+grace.period.ms=270
+
+num.threads=1
+commit.interval.ms=1000
+cache.max.bytes.buffering=-1
+
+specific.avro.reader=True
+enable.auto.commit.config=True
+auto.offset.reset.config=earliest
\ No newline at end of file
diff --git a/theodolite-benchmarks/uc4-beam/build.gradle b/theodolite-benchmarks/uc4-beam/build.gradle
new file mode 100644
index 0000000000000000000000000000000000000000..502e94fa737fb2ae1bab861407b27575cd8766ca
--- /dev/null
+++ b/theodolite-benchmarks/uc4-beam/build.gradle
@@ -0,0 +1,5 @@
+plugins {
+  id 'theodolite.beam'
+}
+
+
diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/AggregatedActivePowerRecordEventTimePolicy.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/AggregatedActivePowerRecordEventTimePolicy.java
similarity index 100%
rename from theodolite-benchmarks/uc4-beam-samza/src/main/java/application/AggregatedActivePowerRecordEventTimePolicy.java
rename to theodolite-benchmarks/uc4-beam/src/main/java/application/AggregatedActivePowerRecordEventTimePolicy.java
diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/AggregatedToActive.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/AggregatedToActive.java
new file mode 100644
index 0000000000000000000000000000000000000000..f1f3e82e720ca11c5e376254459004a9d5d6b576
--- /dev/null
+++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/AggregatedToActive.java
@@ -0,0 +1,20 @@
+package application;
+
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
+import titan.ccp.model.records.ActivePowerRecord;
+import titan.ccp.model.records.AggregatedActivePowerRecord;
+
+/**
+ * Converts AggregatedActivePowerRecord to ActivePowerRecord.
+ */
+public class AggregatedToActive extends SimpleFunction<KV<String, AggregatedActivePowerRecord>,
+    KV<String, ActivePowerRecord>> {
+
+  @Override
+  public KV<String, ActivePowerRecord> apply(
+      final KV<String, AggregatedActivePowerRecord> kv) {
+    return KV.of(kv.getKey(), new ActivePowerRecord(kv.getValue().getIdentifier(),
+        kv.getValue().getTimestamp(), kv.getValue().getSumInW()));
+  }
+}
diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/DuplicateAsFlatMap.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/DuplicateAsFlatMap.java
similarity index 93%
rename from theodolite-benchmarks/uc4-beam-samza/src/main/java/application/DuplicateAsFlatMap.java
rename to theodolite-benchmarks/uc4-beam/src/main/java/application/DuplicateAsFlatMap.java
index ac5206db53434f3f7679664b0bd612d754f63525..7b66082c91b87c246d8c834249d2bc82545766f5 100644
--- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/DuplicateAsFlatMap.java
+++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/DuplicateAsFlatMap.java
@@ -14,12 +14,13 @@ import titan.ccp.model.records.ActivePowerRecord;
 
 
 /**
- * Duplicates the data as a flat map.
+ * Duplicates the Kv containing the (Children,Parents) pair as a flat map.
  */
 public class DuplicateAsFlatMap extends DoFn
     <KV<String, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>> {
+  private static final long serialVersionUID = -5132355515723961647L;
   @StateId("parents")
-  private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value();
+  private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value();//NOPMD
   private final PCollectionView<Map<String, Set<String>>> childParentPairMap;
 
   public DuplicateAsFlatMap(final PCollectionView<Map<String, Set<String>>> childParentPairMap) {
diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/FilterEvents.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/FilterEvents.java
new file mode 100644
index 0000000000000000000000000000000000000000..7c21435264629b8d1360aaf62f7308ddcb9381d7
--- /dev/null
+++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/FilterEvents.java
@@ -0,0 +1,17 @@
+package application;
+
+import org.apache.beam.sdk.transforms.ProcessFunction;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
+import titan.ccp.configuration.events.Event;
+
+public class FilterEvents implements SerializableFunction<KV<Event, String>, Boolean> {
+  private static final long serialVersionUID = -2233447357614891559L;
+
+  @Override
+  public Boolean apply(final KV<Event, String> kv) {
+    return kv.getKey() == Event.SENSOR_REGISTRY_CHANGED
+        || kv.getKey() == Event.SENSOR_REGISTRY_STATUS;
+  }
+}
diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/FilterNullValues.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/FilterNullValues.java
new file mode 100644
index 0000000000000000000000000000000000000000..8b73b43aa013b43fde738d52a5ded2f7b1c857ce
--- /dev/null
+++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/FilterNullValues.java
@@ -0,0 +1,14 @@
+package application;
+
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import titan.ccp.model.records.ActivePowerRecord;
+
+public class FilterNullValues implements SerializableFunction<KV<SensorParentKey, ActivePowerRecord>, Boolean> {
+  private static final long serialVersionUID = -6197352369880867482L;
+
+  @Override
+  public Boolean apply(final KV<SensorParentKey, ActivePowerRecord> kv) {
+    return kv.getValue() != null;
+  }
+}
diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/GenerateParentsFn.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/GenerateParentsFn.java
similarity index 100%
rename from theodolite-benchmarks/uc4-beam-samza/src/main/java/application/GenerateParentsFn.java
rename to theodolite-benchmarks/uc4-beam/src/main/java/application/GenerateParentsFn.java
diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/RecordAggregation.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/RecordAggregation.java
similarity index 100%
rename from theodolite-benchmarks/uc4-beam-samza/src/main/java/application/RecordAggregation.java
rename to theodolite-benchmarks/uc4-beam/src/main/java/application/RecordAggregation.java
diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/SensorParentKey.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/SensorParentKey.java
similarity index 100%
rename from theodolite-benchmarks/uc4-beam-samza/src/main/java/application/SensorParentKey.java
rename to theodolite-benchmarks/uc4-beam/src/main/java/application/SensorParentKey.java
diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/SetIdForAggregated.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/SetIdForAggregated.java
new file mode 100644
index 0000000000000000000000000000000000000000..5191ffaf0bc373840079eca0599770857c4e7462
--- /dev/null
+++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/SetIdForAggregated.java
@@ -0,0 +1,17 @@
+package application;
+
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
+import titan.ccp.model.records.AggregatedActivePowerRecord;
+
+public class SetIdForAggregated extends SimpleFunction<KV<String, AggregatedActivePowerRecord>,
+    KV<String, AggregatedActivePowerRecord>> {
+  @Override
+  public KV<String, AggregatedActivePowerRecord> apply(
+      final KV<String, AggregatedActivePowerRecord> kv) {
+    final AggregatedActivePowerRecord record = new AggregatedActivePowerRecord(
+        kv.getKey(), kv.getValue().getTimestamp(), kv.getValue().getCount(),
+        kv.getValue().getSumInW(), kv.getValue().getAverageInW());
+    return KV.of(kv.getKey(), record);
+  }
+}
diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/SetKeyToGroup.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/SetKeyToGroup.java
new file mode 100644
index 0000000000000000000000000000000000000000..d6016e29874afb36e164d5e3a915345f350f4e46
--- /dev/null
+++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/SetKeyToGroup.java
@@ -0,0 +1,15 @@
+package application;
+
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
+import titan.ccp.model.records.ActivePowerRecord;
+
+public class SetKeyToGroup extends SimpleFunction<KV<SensorParentKey,
+    ActivePowerRecord>, KV<String, ActivePowerRecord>> {
+
+  @Override
+  public KV<String, ActivePowerRecord> apply(
+      final KV<SensorParentKey, ActivePowerRecord> kv) {
+    return KV.of(kv.getKey().getParent(), kv.getValue());
+  }
+}
\ No newline at end of file
diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java
new file mode 100644
index 0000000000000000000000000000000000000000..32d2ff1b1768622b1a210711bd13671a8a5fbe8e
--- /dev/null
+++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java
@@ -0,0 +1,259 @@
+package application;
+
+
+import com.google.common.math.StatsAccumulator;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.*;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.transforms.windowing.*;
+import org.apache.beam.sdk.values.*;
+import org.apache.commons.configuration2.Configuration;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.joda.time.Duration;
+import serialization.*;
+import theodolite.commons.beam.AbstractPipeline;
+import theodolite.commons.beam.ConfigurationKeys;
+import theodolite.commons.beam.kafka.KafkaActivePowerRecordReader;
+import theodolite.commons.beam.kafka.KafkaGenericReader;
+import theodolite.commons.beam.kafka.KafkaWriterTransformation;
+import titan.ccp.configuration.events.Event;
+import titan.ccp.model.records.ActivePowerRecord;
+import titan.ccp.model.records.AggregatedActivePowerRecord;
+
+
+/**
+ * Implementation of the use case Database Storage using Apache Beam with the Flink Runner. To
+ * execute locally in standalone start Kafka, Zookeeper, the schema-registry and the workload
+ * generator using the delayed_startup.sh script. Start a Flink cluster and pass its REST adress
+ * using--flinkMaster as run parameter. To persist logs add
+ * ${workspace_loc:/uc1-application-samza/eclipseConsoleLogs.log} as Output File under Standard
+ * Input Output in Common in the Run Configuration Start via Eclipse Run.
+ */
+public final class Uc4BeamPipeline extends AbstractPipeline {
+
+  protected Uc4BeamPipeline(final PipelineOptions options, final Configuration config) {
+    super(options, config);
+
+    Pipeline pipeline = Pipeline.create(options);
+
+
+    // Additional needed variables
+    final String feedbackTopic = config.getString(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC);
+    final String outputTopic = config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
+    final String configurationTopic = config.getString(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC);
+
+    final int windowDurationMinutes = Integer.parseInt(
+        config.getString(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES));
+    final Duration duration = Duration.standardSeconds(windowDurationMinutes);
+
+    final int triggerInterval = Integer.parseInt(
+        config.getString(ConfigurationKeys.TRIGGER_INTERVAL));
+    final Duration triggerDelay = Duration.standardSeconds(triggerInterval);
+
+    final int grace = Integer.parseInt(
+        config.getString(ConfigurationKeys.GRACE_PERIOD_MS));
+    final Duration gracePeriod = Duration.standardSeconds(grace);
+
+    // Build kafka configuration
+    final HashMap consumerConfig = buildConsumerConfig();
+    final HashMap<String, Object> configurationConfig = configurationConfig(config);
+
+    // Set Coders for Classes that will be distributed
+    final CoderRegistry cr = this.getCoderRegistry();
+    registerCoders(cr);
+
+    // Read from Kafka
+    final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>>
+        kafkaActivePowerRecordReader =
+        new KafkaActivePowerRecordReader(bootstrapServer, inputTopic, consumerConfig);
+
+//    final PTransform<PBegin, PCollection<KV<String, AggregatedActivePowerRecord>>>
+//        kafkaAggregatedPowerRecordReader =
+//        new KafkaGenericReader<String, AggregatedActivePowerRecord>
+//            (bootstrapServer, feedbackTopic, configurationConfig, StringDeserializer.class,
+//                (Class<KafkaAvroDeserializer>) KafkaAvroDeserializer.class);
+
+    // Transform into AggregatedActivePowerRecords into ActivePowerRecords
+    final AggregatedToActive aggregatedToActive = new AggregatedToActive();
+
+    // Write to Kafka
+    final KafkaWriterTransformation kafkaWriter =
+        new KafkaWriterTransformation(bootstrapServer, outputTopic, StringSerializer.class);
+
+
+    // Apply pipeline transformations
+    // Read from Kafka
+    final PCollection<KV<String, ActivePowerRecord>> values = this
+        .apply(kafkaActivePowerRecordReader)
+            .apply("Read Windows", Window.into(FixedWindows.of(duration)))
+            .apply("Set trigger for input", Window
+                .<KV<String, ActivePowerRecord>>configure()
+                .triggering(Repeatedly.forever(
+                    AfterProcessingTime.pastFirstElementInPane()
+                        .plusDelayOf(triggerDelay)))
+                .withAllowedLateness(gracePeriod)
+                .discardingFiredPanes());
+
+    // Read the results of earlier aggregations.
+    final PCollection<KV<String, ActivePowerRecord>> aggregationsInput = this
+        .apply("Read aggregation results", KafkaIO.<String, AggregatedActivePowerRecord>read()
+            .withBootstrapServers(bootstrapServer)
+            .withTopic(feedbackTopic)
+            .withKeyDeserializer(StringDeserializer.class)
+            .withValueDeserializer(AggregatedActivePowerRecordDeserializer.class)
+            .withTimestampPolicyFactory(
+                (tp, previousWaterMark) -> new AggregatedActivePowerRecordEventTimePolicy(
+                    previousWaterMark))
+            .withoutMetadata())
+        .apply("Apply Winddows", Window.into(FixedWindows.of(duration)))
+        // Convert into the correct data format
+        .apply("Convert AggregatedActivePowerRecord to ActivePowerRecord",
+            MapElements.via(aggregatedToActive))
+        .apply("Set trigger for feedback", Window
+            .<KV<String, ActivePowerRecord>>configure()
+            .triggering(Repeatedly.forever(
+                AfterProcessingTime.pastFirstElementInPane()
+                    .plusDelayOf(triggerDelay)))
+            .withAllowedLateness(gracePeriod)
+            .discardingFiredPanes());
+
+    // Prepare flatten
+    final PCollectionList<KV<String, ActivePowerRecord>> collections =
+        PCollectionList.of(values).and(aggregationsInput);
+
+    // Create a single PCollection out of the input and already computed results
+    final PCollection<KV<String, ActivePowerRecord>> inputCollection =
+        collections.apply("Flatten sensor data and aggregation results",
+            Flatten.pCollections());
+
+    // Build the configuration stream from a changelog.
+    final PCollection<KV<String, Set<String>>> configurationStream = this
+        .apply("Read sensor groups", KafkaIO.<Event, String>read()
+            .withBootstrapServers(bootstrapServer)
+            .withTopic(configurationTopic)
+            .withKeyDeserializer(EventDeserializer.class)
+            .withValueDeserializer(StringDeserializer.class)
+            .withConsumerConfigUpdates(configurationConfig)
+            .withoutMetadata())
+        // Only forward relevant changes in the hierarchy
+        .apply("Filter changed and status events",
+            Filter.by(new FilterEvents()))
+        // Build the changelog
+        .apply("Generate Parents for every Sensor", ParDo.of(new GenerateParentsFn()))
+        .apply("Update child and parent pairs", ParDo.of(new UpdateChildParentPairs()))
+        .apply("Set trigger for configuration", Window
+            .<KV<String, Set<String>>>configure()
+            .triggering(AfterWatermark.pastEndOfWindow()
+                .withEarlyFirings(
+                    AfterPane.elementCountAtLeast(1)))
+            .withAllowedLateness(Duration.ZERO)
+            .accumulatingFiredPanes());
+
+    final PCollectionView<Map<String, Set<String>>> childParentPairMap =
+        configurationStream.apply(Latest.perKey())
+            // Reset trigger to avoid synchronized processing time
+            .apply("Reset trigger for configurations", Window
+                .<KV<String, Set<String>>>configure()
+                .triggering(AfterWatermark.pastEndOfWindow()
+                    .withEarlyFirings(
+                        AfterPane.elementCountAtLeast(1)))
+                .withAllowedLateness(Duration.ZERO)
+                .accumulatingFiredPanes())
+            .apply(View.asMap());
+
+    FilterNullValues filterNullValues = new FilterNullValues();
+
+    // Build pairs of every sensor reading and parent
+    final PCollection<KV<SensorParentKey, ActivePowerRecord>> flatMappedValues =
+        inputCollection.apply(
+                "Duplicate as flatMap",
+                ParDo.of(new DuplicateAsFlatMap(childParentPairMap)).withSideInputs(childParentPairMap))
+            .apply("Filter only latest changes", Latest.perKey())
+            .apply("Filter out null values",
+                Filter.by(filterNullValues));
+
+    SetIdForAggregated setIdForAggregated = new SetIdForAggregated();
+    SetKeyToGroup setKeyToGroup = new SetKeyToGroup();
+
+    // Aggregate for every sensor group of the current level
+    final PCollection<KV<String, AggregatedActivePowerRecord>>
+        aggregations = flatMappedValues
+        .apply("Set key to group", MapElements.via(setKeyToGroup))
+        // Reset trigger to avoid synchronized processing time
+        .apply("Reset trigger for aggregations", Window
+            .<KV<String, ActivePowerRecord>>configure()
+            .triggering(Repeatedly.forever(
+                AfterProcessingTime.pastFirstElementInPane()
+                    .plusDelayOf(triggerDelay)))
+            .withAllowedLateness(gracePeriod)
+            .discardingFiredPanes())
+        .apply(
+            "Aggregate per group",
+            Combine.perKey(new RecordAggregation()))
+        .apply("Set the Identifier in AggregatedActivePowerRecord",
+            MapElements.via(setIdForAggregated));
+
+    aggregations.apply("Write to aggregation results",
+        KafkaIO.<String, AggregatedActivePowerRecord>write()
+            .withBootstrapServers(bootstrapServer)
+            .withTopic(outputTopic)
+            .withKeySerializer(StringSerializer.class)
+            .withValueSerializer(AggregatedActivePowerRecordSerializer.class));
+
+    aggregations
+        .apply("Write to feedback topic", KafkaIO.<String, AggregatedActivePowerRecord>write()
+            .withBootstrapServers(bootstrapServer)
+            .withTopic(feedbackTopic)
+            .withKeySerializer(StringSerializer.class)
+            .withValueSerializer(AggregatedActivePowerRecordSerializer.class));
+
+  }
+
+
+  /**
+   * Builds a simple configuration for a Kafka consumer transformation.
+   *
+   * @return the build configuration.
+   */
+  public HashMap<String, Object> configurationConfig(Configuration config) {
+    final HashMap<String, Object> consumerConfig = new HashMap<>();
+    consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+        config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG));
+    consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+        config
+            .getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG));
+
+//    final String applicationName = config.getString( ConfigurationKeys.APPLICATION_NAME) + "-configuration";
+    consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, config
+        .getString(ConfigurationKeys.APPLICATION_NAME) + "-configuration");
+    return consumerConfig;
+  }
+
+
+  /**
+   * Registers all Coders for all needed Coders.
+   *
+   * @param cr CoderRegistry.
+   */
+  private static void registerCoders(final CoderRegistry cr) {
+    cr.registerCoderForClass(ActivePowerRecord.class,
+        NullableCoder.of(AvroCoder.of(ActivePowerRecord.class)));
+    cr.registerCoderForClass(AggregatedActivePowerRecord.class,
+        new AggregatedActivePowerRecordCoder());
+    cr.registerCoderForClass(Set.class, SetCoder.of(StringUtf8Coder.of()));
+    cr.registerCoderForClass(Event.class, new EventCoder());
+    cr.registerCoderForClass(SensorParentKey.class, new SensorParentKeyCoder());
+    cr.registerCoderForClass(StatsAccumulator.class, AvroCoder.of(StatsAccumulator.class));
+  }
+}
+
diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/UpdateChildParentPairs.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/UpdateChildParentPairs.java
similarity index 100%
rename from theodolite-benchmarks/uc4-beam-samza/src/main/java/application/UpdateChildParentPairs.java
rename to theodolite-benchmarks/uc4-beam/src/main/java/application/UpdateChildParentPairs.java
diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordCoder.java b/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordCoder.java
similarity index 100%
rename from theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordCoder.java
rename to theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordCoder.java
diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordDeserializer.java b/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordDeserializer.java
similarity index 100%
rename from theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordDeserializer.java
rename to theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordDeserializer.java
diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java b/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java
similarity index 87%
rename from theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java
rename to theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java
index 2429b6564d690b9cd388faed5829f827aa25c26f..287faf5240fdd8b56dd50ca6fcd5a291b65386bd 100644
--- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java
+++ b/theodolite-benchmarks/uc4-beam/src/main/java/serialization/AggregatedActivePowerRecordSerializer.java
@@ -29,17 +29,18 @@ public class AggregatedActivePowerRecordSerializer
 
   @Override
   public byte[] serialize(final String topic, final AggregatedActivePowerRecord data) {
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    final ByteArrayOutputStream out = new ByteArrayOutputStream();
     try {
       this.avroEnCoder.encode(data, out);
     } catch (IOException e) {
       LOGGER.error("Could not serialize AggregatedActivePowerRecord", e);
     }
-    byte[] result = out.toByteArray();
+    final byte[] result = out.toByteArray();
     try {
       out.close();
     } catch (IOException e) {
-      LOGGER.error("Could not close output stream after serialization of AggregatedActivePowerRecord", e);
+      LOGGER.error(
+          "Could not close output stream after serialization of AggregatedActivePowerRecord", e);
     }
     return result;
   }
diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/EventCoder.java b/theodolite-benchmarks/uc4-beam/src/main/java/serialization/EventCoder.java
similarity index 100%
rename from theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/EventCoder.java
rename to theodolite-benchmarks/uc4-beam/src/main/java/serialization/EventCoder.java
diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/EventDeserializer.java b/theodolite-benchmarks/uc4-beam/src/main/java/serialization/EventDeserializer.java
similarity index 100%
rename from theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/EventDeserializer.java
rename to theodolite-benchmarks/uc4-beam/src/main/java/serialization/EventDeserializer.java
diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/SensorParentKeyCoder.java b/theodolite-benchmarks/uc4-beam/src/main/java/serialization/SensorParentKeyCoder.java
similarity index 95%
rename from theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/SensorParentKeyCoder.java
rename to theodolite-benchmarks/uc4-beam/src/main/java/serialization/SensorParentKeyCoder.java
index afa9440b31ac01eb9abed5c8bcdb2c4674dca4c4..3fd8758534cff8eee80c2923061eb730b90fe161 100644
--- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/SensorParentKeyCoder.java
+++ b/theodolite-benchmarks/uc4-beam/src/main/java/serialization/SensorParentKeyCoder.java
@@ -20,7 +20,7 @@ public class SensorParentKeyCoder extends Coder<SensorParentKey> implements Seri
   private static final boolean DETERMINISTIC = true;
   private static final int VALUE_SIZE = 4;
 
-  private transient Serde<application.SensorParentKey> innerSerde = SensorParentKeySerde.serde();
+  private transient Serde<SensorParentKey> innerSerde = SensorParentKeySerde.serde();
 
   @Override
   public void encode(final SensorParentKey value, final OutputStream outStream)
diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/SensorParentKeySerde.java b/theodolite-benchmarks/uc4-beam/src/main/java/serialization/SensorParentKeySerde.java
similarity index 100%
rename from theodolite-benchmarks/uc4-beam-samza/src/main/java/serialization/SensorParentKeySerde.java
rename to theodolite-benchmarks/uc4-beam/src/main/java/serialization/SensorParentKeySerde.java