diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java
index 024822080127a114955617502c44d0db46a6e366..46fa53756fb028c2bccf86d544bd8430c9ef12d6 100644
--- a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java
+++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java
@@ -34,7 +34,7 @@ public final class Uc1BeamPipeline extends AbstractPipeline {
     cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$));
 
     // build KafkaConsumerConfig
-    final Map consumerConfig = buildConsumerConfig();
+    final Map<String, Object> consumerConfig = buildConsumerConfig();
 
     // Create Pipeline transformations
     final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka =
diff --git a/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java b/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java
index f518ba8a794d33d7b5569d4a648eeadbc47a1e7b..3b43dc47aaf6e3f9937aa87fca7fc1895c8fef84 100644
--- a/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java
+++ b/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java
@@ -17,7 +17,6 @@ import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.POutput;
 import org.apache.commons.configuration2.Configuration;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.joda.time.Duration;
@@ -38,7 +37,7 @@ import titan.ccp.model.records.ActivePowerRecord;
  */
 public final class Uc2BeamPipeline extends AbstractPipeline {
 
-  protected Uc2BeamPipeline(final PipelineOptions options,final Configuration config) {
+  protected Uc2BeamPipeline(final PipelineOptions options, final Configuration config) {
     super(options, config);
     // Additional needed variables
     final String outputTopic = config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
@@ -48,7 +47,7 @@ public final class Uc2BeamPipeline extends AbstractPipeline {
     final Duration duration = Duration.standardMinutes(windowDurationMinutes);
 
     // Build kafka configuration
-    final Map consumerConfig = buildConsumerConfig();
+    final Map<String, Object> consumerConfig = buildConsumerConfig();
 
     // Set Coders for Classes that will be distributed
     final CoderRegistry cr = this.getCoderRegistry();
@@ -67,8 +66,8 @@ public final class Uc2BeamPipeline extends AbstractPipeline {
     final StatsToString statsToString = new StatsToString();
 
     // Write to Kafka
-    final PTransform<PCollection<KV<String, String>>, POutput> kafkaWriter =
-        new KafkaWriterTransformation(bootstrapServer, outputTopic, StringSerializer.class);
+    final KafkaWriterTransformation<String> kafkaWriter =
+        new KafkaWriterTransformation<>(bootstrapServer, outputTopic, StringSerializer.class);
 
     // Apply pipeline transformations
     this.apply(kafkaActivePowerRecordReader)
diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java b/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java
index eda0d8161750d49998b66640546b0e24ccb6256b..7424a19fecef5a7b86d273a223c6f3f7a2562db9 100644
--- a/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java
+++ b/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java
@@ -10,14 +10,11 @@ import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.POutput;
 import org.apache.commons.configuration2.Configuration;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.joda.time.Duration;
@@ -63,7 +60,6 @@ public final class Uc3BeamPipeline extends AbstractPipeline {
     registerCoders(cr);
 
     // Read from Kafka
-    @SuppressWarnings({"rawtypes", "unchecked"})
     final KafkaActivePowerTimestampReader kafka =
         new KafkaActivePowerTimestampReader(bootstrapServer, inputTopic, consumerConfig);
 
@@ -74,9 +70,8 @@ public final class Uc3BeamPipeline extends AbstractPipeline {
     final HourOfDayWithStats hourOfDayWithStats = new HourOfDayWithStats();
 
     // Write to Kafka
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    final PTransform<PCollection<KV<String, String>>, POutput> kafkaWriter =
-        new KafkaWriterTransformation(bootstrapServer, outputTopic, StringSerializer.class);
+    final KafkaWriterTransformation<String> kafkaWriter =
+        new KafkaWriterTransformation<>(bootstrapServer, outputTopic, StringSerializer.class);
 
     this.apply(kafka)
         // Map to correct time format