diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HourOfDayKeyFactory.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HourOfDayKeyFactory.java index db2617bf2cdf725f27faefa1def00367d1c8ecec..dda447d3e4c741d474e77299f38f8cc94f49209e 100644 --- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HourOfDayKeyFactory.java +++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HourOfDayKeyFactory.java @@ -1,11 +1,14 @@ package rocks.theodolite.benchmarks.uc3.hazelcastjet; +import java.io.Serializable; import java.time.LocalDateTime; /** * {@link StatsKeyFactory} for {@link HourOfDayKey}. */ -public class HourOfDayKeyFactory implements StatsKeyFactory<HourOfDayKey> { +public class HourOfDayKeyFactory implements StatsKeyFactory<HourOfDayKey>, Serializable { + + private static final long serialVersionUID = 9047643205410220184L; @Override public HourOfDayKey createKey(final String sensorId, final LocalDateTime dateTime) { diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java index e3dcb037d81b4fa42ed170a861a8a4af354fa6b4..d5dae2bde75856b44b73ee11bc2808de644a27f0 100644 --- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java +++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java @@ -3,8 +3,6 @@ package rocks.theodolite.benchmarks.uc3.hazelcastjet; import com.hazelcast.jet.kafka.KafkaSinks; import com.hazelcast.jet.kafka.KafkaSources; import com.hazelcast.jet.pipeline.Pipeline; -import com.hazelcast.jet.pipeline.ServiceFactories; -import com.hazelcast.jet.pipeline.ServiceFactory; import com.hazelcast.jet.pipeline.Sinks; import com.hazelcast.jet.pipeline.StreamSource; import com.hazelcast.jet.pipeline.StreamStage; @@ -14,7 +12,6 @@ import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; import java.util.Map; -import java.util.Map.Entry; import java.util.Properties; import rocks.theodolite.benchmarks.commons.hazelcastjet.PipelineFactory; import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord; @@ -25,6 +22,8 @@ import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord; */ public class Uc3PipelineFactory extends PipelineFactory { + private final ZoneId zone = ZoneId.of("Europe/Paris"); // TODO as parameter + private final Duration hoppingSize; private final Duration windowSize; private final Duration emitPeriod; @@ -102,12 +101,8 @@ public class Uc3PipelineFactory extends PipelineFactory { public StreamStage<Map.Entry<String, String>> extendUc3Topology( final StreamSource<Map.Entry<String, ActivePowerRecord>> source) { - final ServiceFactory<?, MapTimeKeyConfiguration> timeKeyConfigService = - ServiceFactories.nonSharedService( - pctx -> new MapTimeKeyConfiguration( - new HourOfDayKeyFactory(), - ZoneId.of("Europe/Paris") // TODO Make configurable - )); + final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory(); + final ZoneId localZone = this.zone; // Make serializable in lambdas // Build the pipeline topology. return this.pipe @@ -115,16 +110,13 @@ public class Uc3PipelineFactory extends PipelineFactory { // use Timestamps .withNativeTimestamps(0) // .setLocalParallelism(1) - // Map key to HourOfDayKey - .mapUsingService(timeKeyConfigService, (config, record) -> { + // Group by HourOfDayKey + .groupingKey(record -> { final String sensorId = record.getValue().getIdentifier(); final Instant instant = Instant.ofEpochMilli(record.getValue().getTimestamp()); - final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, config.getZone()); - final HourOfDayKey key = config.getKeyFactory().createKey(sensorId, dateTime); - return Map.entry(key, record.getValue()); + final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, localZone); + return keyFactory.createKey(sensorId, dateTime); }) - // group by new keys - .groupingKey(Entry::getKey) // Sliding/Hopping Window .window(WindowDefinition .sliding(this.windowSize.toMillis(), this.hoppingSize.toMillis())