diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HoursOfDayKeyFactory.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HourOfDayKeyFactory.java similarity index 75% rename from theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HoursOfDayKeyFactory.java rename to theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HourOfDayKeyFactory.java index af32575e5433f26e19361fa62ea460a78bb9dd66..db2617bf2cdf725f27faefa1def00367d1c8ecec 100644 --- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HoursOfDayKeyFactory.java +++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HourOfDayKeyFactory.java @@ -3,10 +3,9 @@ package rocks.theodolite.benchmarks.uc3.hazelcastjet; import java.time.LocalDateTime; /** - * A factory class to build an {@link HourOfDayKey}. - * + * {@link StatsKeyFactory} for {@link HourOfDayKey}. */ -public class HoursOfDayKeyFactory implements StatsKeyFactory<HourOfDayKey> { +public class HourOfDayKeyFactory implements StatsKeyFactory<HourOfDayKey> { @Override public HourOfDayKey createKey(final String sensorId, final LocalDateTime dateTime) { diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/MapTimeKeyConfiguration.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/MapTimeKeyConfiguration.java new file mode 100644 index 0000000000000000000000000000000000000000..75bf673212f76c040dd15659267df03c110de562 --- /dev/null +++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/MapTimeKeyConfiguration.java @@ -0,0 +1,29 @@ +package rocks.theodolite.benchmarks.uc3.hazelcastjet; + +import java.time.ZoneId; + +/** + * Stores a configuration consisting of a {@link StatsKeyFactory} and a {@link ZoneId}. + */ +public class MapTimeKeyConfiguration { + + private final StatsKeyFactory<HourOfDayKey> keyFactory; + private final ZoneId zone; + + public MapTimeKeyConfiguration( + final StatsKeyFactory<HourOfDayKey> keyFactory, + final ZoneId zone) { + super(); + this.keyFactory = keyFactory; + this.zone = zone; + } + + public StatsKeyFactory<HourOfDayKey> getKeyFactory() { + return this.keyFactory; + } + + public ZoneId getZone() { + return this.zone; + } + +} diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java index d31854ec70ce0ef04e978da67fab35f786003db9..1f3cd7e7754bb89311fbf3b3ef5ffba706d0aeba 100644 --- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java +++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java @@ -4,6 +4,8 @@ import com.hazelcast.jet.aggregate.AggregateOperations; import com.hazelcast.jet.kafka.KafkaSinks; import com.hazelcast.jet.kafka.KafkaSources; import com.hazelcast.jet.pipeline.Pipeline; +import com.hazelcast.jet.pipeline.ServiceFactories; +import com.hazelcast.jet.pipeline.ServiceFactory; import com.hazelcast.jet.pipeline.Sinks; import com.hazelcast.jet.pipeline.StreamSource; import com.hazelcast.jet.pipeline.StreamStage; @@ -11,10 +13,10 @@ import com.hazelcast.jet.pipeline.WindowDefinition; import java.time.Duration; import java.time.Instant; import java.time.LocalDateTime; +import java.time.ZoneId; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; -import java.util.TimeZone; import rocks.theodolite.benchmarks.commons.hazelcastjet.PipelineFactory; import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord; @@ -101,25 +103,26 @@ public class Uc3PipelineFactory extends PipelineFactory { public StreamStage<Map.Entry<String, String>> extendUc3Topology( final StreamSource<Map.Entry<String, ActivePowerRecord>> source) { + final ServiceFactory<?, MapTimeKeyConfiguration> timeKeyConfigService = + ServiceFactories.nonSharedService( + pctx -> new MapTimeKeyConfiguration( + new HourOfDayKeyFactory(), + ZoneId.of("Europe/Paris") // TODO Make configurable + )); + // Build the pipeline topology. return this.pipe .readFrom(source) // use Timestamps .withNativeTimestamps(0) // .setLocalParallelism(1) - // Map timestamp to hour of day and create new key using sensorID and - // datetime mapped to HourOfDay - .map(record -> { + // Map key to HourOfDayKey + .mapUsingService(timeKeyConfigService, (config, record) -> { final String sensorId = record.getValue().getIdentifier(); - final long timestamp = record.getValue().getTimestamp(); - final LocalDateTime dateTime = LocalDateTime.ofInstant( - Instant.ofEpochMilli(timestamp), - TimeZone.getDefault().toZoneId()); - - final StatsKeyFactory<HourOfDayKey> keyFactory = new HoursOfDayKeyFactory(); - final HourOfDayKey newKey = keyFactory.createKey(sensorId, dateTime); - - return Map.entry(newKey, record.getValue()); + final Instant instant = Instant.ofEpochMilli(record.getValue().getTimestamp()); + final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, config.getZone()); + final HourOfDayKey key = config.getKeyFactory().createKey(sensorId, dateTime); + return Map.entry(key, record.getValue()); }) // group by new keys .groupingKey(Entry::getKey) diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineTest.java b/theodolite-benchmarks/uc3-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineTest.java index 3e2ccff7f6f6ce3b8d79ec71519c7b68aa6bdafb..2da9780af50d9de55a3892a3cf05828895b5f39f 100644 --- a/theodolite-benchmarks/uc3-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineTest.java +++ b/theodolite-benchmarks/uc3-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineTest.java @@ -15,10 +15,10 @@ import com.hazelcast.jet.test.SerialTest; import java.time.Duration; import java.time.Instant; import java.time.LocalDateTime; +import java.time.ZoneId; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; -import java.util.TimeZone; import java.util.concurrent.CompletionException; import org.junit.After; import org.junit.Assert; @@ -57,7 +57,7 @@ public class Uc3PipelineTest extends JetTestSupport { final Double testValueInW = 10.0; final Duration testHopSize = Duration.ofSeconds(1); final Duration testWindowSize = Duration.ofSeconds(50); - final Duration testEmitPeriod = Duration.ofSeconds(0); // Do not emir early results + final Duration testEmitPeriod = Duration.ofSeconds(0); // Do not emit early results // Used to check hourOfDay final long mockTimestamp = 1632741651; @@ -115,8 +115,9 @@ public class Uc3PipelineTest extends JetTestSupport { for (final Entry<String, String> entry : collection) { // Build hour of day - final int expectedHour = LocalDateTime.ofInstant(Instant.ofEpochMilli(mockTimestamp), - TimeZone.getDefault().toZoneId()).getHour(); + final int expectedHour = LocalDateTime + .ofInstant(Instant.ofEpochMilli(mockTimestamp), ZoneId.of("Europe/Paris")) + .getHour(); // Compare expected output with generated output final String expectedKey = testSensorName + ";" + expectedHour; @@ -137,8 +138,7 @@ public class Uc3PipelineTest extends JetTestSupport { } // Assertion - Assert.assertTrue( - "Items do not match expected structure!", allOkay); + Assert.assertTrue("Items do not match expected structure!", allOkay); })); // Run the test!