diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/NewHistoryService.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/NewHistoryService.java
new file mode 100644
index 0000000000000000000000000000000000000000..572329c5d783259e4d6658ab2df42de545ef0ced
--- /dev/null
+++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/NewHistoryService.java
@@ -0,0 +1,64 @@
+package rocks.theodolite.benchmarks.uc2.hazelcastjet;
+
+import com.google.common.math.StatsAccumulator;
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+import java.util.Properties;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys;
+import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService;
+import rocks.theodolite.benchmarks.uc2.hazelcastjet.uc2specifics.StatsAccumulatorSerializer;
+
+
+/**
+ * A microservice that aggregate incoming messages in a tumbling window.
+ */
+public class NewHistoryService extends HazelcastJetService {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class);
+
+
+  /**
+   * Constructs the use case logic for UC2.
+   * Retrieves the needed values and instantiates a pipeline factory.
+   */
+  public NewHistoryService() {
+    super(LOGGER);
+    final Properties kafkaProps =
+        this.propsBuilder.buildReadProperties(
+            StringDeserializer.class.getCanonicalName(),
+            KafkaAvroDeserializer.class.getCanonicalName());
+
+    final Properties kafkaWriteProps =
+        this.propsBuilder.buildWriteProperties(
+            StringSerializer.class.getCanonicalName(),
+            StringSerializer.class.getCanonicalName());
+
+    final String kafkaOutputTopic =
+        config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString();
+
+    // Transform minutes to milliseconds
+    final int downsampleInterval = Integer.parseInt(
+        config.getProperty(ConfigurationKeys.DOWNSAMPLE_INTERVAL).toString());
+    final int downsampleIntervalMs = downsampleInterval * 60_000;
+
+    this.pipelineFactory = new Uc2PipelineFactory(
+        kafkaProps,
+        this.kafkaInputTopic,
+        kafkaWriteProps,
+        kafkaOutputTopic,
+        downsampleIntervalMs);
+  }
+
+  @Override
+  protected void registerSerializer() {
+    this.jobConfig.registerSerializer(StatsAccumulator.class, StatsAccumulatorSerializer.class);
+  }
+
+
+  public static void main(final String[] args) {
+    new NewHistoryService().run();
+  }
+}
diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/NewHistoryService.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/NewHistoryService.java
new file mode 100644
index 0000000000000000000000000000000000000000..0da94bafebeecaa1ce3ac46a49e27549750d5d83
--- /dev/null
+++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/NewHistoryService.java
@@ -0,0 +1,64 @@
+package rocks.theodolite.benchmarks.uc3.hazelcastjet;
+
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+import java.util.Properties;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys;
+import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService;
+import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKey;
+import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKeySerializer;
+
+/**
+ * A microservice that aggregate incoming messages in a sliding window.
+ */
+public class NewHistoryService extends HazelcastJetService {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class);
+
+  /**
+   * Constructs the use case logic for UC3.
+   * Retrieves the needed values and instantiates a pipeline factory.
+   */
+  public NewHistoryService() {
+    super(LOGGER);
+    final Properties kafkaProps =
+        this.propsBuilder.buildReadProperties(
+            StringDeserializer.class.getCanonicalName(),
+            KafkaAvroDeserializer.class.getCanonicalName());
+
+    final Properties kafkaWriteProps =
+        this.propsBuilder.buildWriteProperties(
+            StringSerializer.class.getCanonicalName(),
+            StringSerializer.class.getCanonicalName());
+
+    final String kafkaOutputTopic =
+        config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString();
+
+    final int windowSizeInSecondsNumber = Integer.parseInt(
+        config.getProperty(ConfigurationKeys.AGGREGATION_DURATION_DAYS).toString());
+
+    final int hoppingSizeInSecondsNumber = Integer.parseInt(
+        config.getProperty(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS).toString());
+
+    this.pipelineFactory = new Uc3PipelineFactory(
+        kafkaProps,
+        kafkaInputTopic,
+        kafkaWriteProps,
+        kafkaOutputTopic,
+        windowSizeInSecondsNumber,
+        hoppingSizeInSecondsNumber);
+  }
+
+  @Override
+  protected void registerSerializer() {
+    this.jobConfig.registerSerializer(HourOfDayKey.class, HourOfDayKeySerializer.class);
+  }
+
+
+  public static void main(final String[] args) {
+    new NewHistoryService().run();
+  }
+}
diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/NewHistoryService.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/NewHistoryService.java
new file mode 100644
index 0000000000000000000000000000000000000000..345f8be8bdbacd38fdaabf8f9c6868d4ffc47225
--- /dev/null
+++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/NewHistoryService.java
@@ -0,0 +1,89 @@
+package rocks.theodolite.benchmarks.uc4.hazelcastjet;
+
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import java.util.Properties;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys;
+import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService;
+import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.EventDeserializer;
+import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ImmutableSensorRegistryUc4Serializer;
+import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.SensorGroupKey;
+import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.SensorGroupKeySerializer;
+import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ValueGroup;
+import rocks.theodolite.benchmarks.uc4.hazelcastjet.uc4specifics.ValueGroupSerializer;
+import titan.ccp.model.sensorregistry.ImmutableSensorRegistry;
+
+
+/**
+ * A microservice that manages the history and, therefore, stores and aggregates incoming
+ * measurements.
+ */
+public class NewHistoryService extends HazelcastJetService {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class);
+
+  /**
+   * Constructs the use case logic for UC4.
+   * Retrieves the needed values and instantiates a pipeline factory.
+   */
+  public NewHistoryService() {
+    super(LOGGER);
+    final Properties kafkaProps =
+        this.propsBuilder.buildReadProperties(
+            StringDeserializer.class.getCanonicalName(),
+            KafkaAvroDeserializer.class.getCanonicalName());
+
+    final Properties kafkaConfigReadProps =
+        propsBuilder.buildReadProperties(
+            EventDeserializer.class.getCanonicalName(),
+            StringDeserializer.class.getCanonicalName());
+
+    final Properties kafkaAggregationReadProps =
+        propsBuilder.buildReadProperties(
+            StringDeserializer.class.getCanonicalName(),
+            KafkaAvroDeserializer.class.getCanonicalName());
+
+    final Properties kafkaWriteProps =
+        this.propsBuilder.buildWriteProperties(
+            StringSerializer.class.getCanonicalName(),
+            KafkaAvroSerializer.class.getCanonicalName());
+
+    final String kafkaOutputTopic =
+        config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString();
+
+    final String kafkaConfigurationTopic =
+        config.getProperty(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC).toString();
+
+    final String kafkaFeedbackTopic =
+        config.getProperty(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC).toString();
+
+    final int windowSize = Integer.parseInt(
+        config.getProperty(ConfigurationKeys.WINDOW_SIZE_UC4).toString());
+
+    this.pipelineFactory = new Uc4PipelineFactory(
+        kafkaProps,
+        kafkaConfigReadProps,
+        kafkaAggregationReadProps,
+        kafkaWriteProps,
+        kafkaInputTopic, kafkaOutputTopic, kafkaConfigurationTopic, kafkaFeedbackTopic,
+        windowSize);
+  }
+
+
+  @Override
+  protected void registerSerializer() {
+    this.jobConfig.registerSerializer(ValueGroup.class, ValueGroupSerializer.class)
+        .registerSerializer(SensorGroupKey.class, SensorGroupKeySerializer.class)
+        .registerSerializer(ImmutableSensorRegistry.class,
+            ImmutableSensorRegistryUc4Serializer.class);
+  }
+
+
+  public static void main(final String[] args) {
+    new NewHistoryService().run();
+  }
+}