From 6a339c78b6e0aa16a940faff55578b1e60386444 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de>
Date: Fri, 25 Nov 2022 18:56:53 +0100
Subject: [PATCH] Try fixing failing test
---
...yFactory.java => HourOfDayKeyFactory.java} | 5 ++--
.../hazelcastjet/MapTimeKeyConfiguration.java | 29 +++++++++++++++++++
.../uc3/hazelcastjet/Uc3PipelineFactory.java | 29 ++++++++++---------
.../uc3/hazelcastjet/Uc3PipelineTest.java | 12 ++++----
4 files changed, 53 insertions(+), 22 deletions(-)
rename theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/{HoursOfDayKeyFactory.java => HourOfDayKeyFactory.java} (75%)
create mode 100644 theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/MapTimeKeyConfiguration.java
diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HoursOfDayKeyFactory.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HourOfDayKeyFactory.java
similarity index 75%
rename from theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HoursOfDayKeyFactory.java
rename to theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HourOfDayKeyFactory.java
index af32575e5..db2617bf2 100644
--- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HoursOfDayKeyFactory.java
+++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HourOfDayKeyFactory.java
@@ -3,10 +3,9 @@ package rocks.theodolite.benchmarks.uc3.hazelcastjet;
import java.time.LocalDateTime;
/**
- * A factory class to build an {@link HourOfDayKey}.
- *
+ * {@link StatsKeyFactory} for {@link HourOfDayKey}.
*/
-public class HoursOfDayKeyFactory implements StatsKeyFactory<HourOfDayKey> {
+public class HourOfDayKeyFactory implements StatsKeyFactory<HourOfDayKey> {
@Override
public HourOfDayKey createKey(final String sensorId, final LocalDateTime dateTime) {
diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/MapTimeKeyConfiguration.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/MapTimeKeyConfiguration.java
new file mode 100644
index 000000000..75bf67321
--- /dev/null
+++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/MapTimeKeyConfiguration.java
@@ -0,0 +1,29 @@
+package rocks.theodolite.benchmarks.uc3.hazelcastjet;
+
+import java.time.ZoneId;
+
+/**
+ * Stores a configuration consisting of a {@link StatsKeyFactory} and a {@link ZoneId}.
+ */
+public class MapTimeKeyConfiguration {
+
+ private final StatsKeyFactory<HourOfDayKey> keyFactory;
+ private final ZoneId zone;
+
+ public MapTimeKeyConfiguration(
+ final StatsKeyFactory<HourOfDayKey> keyFactory,
+ final ZoneId zone) {
+ super();
+ this.keyFactory = keyFactory;
+ this.zone = zone;
+ }
+
+ public StatsKeyFactory<HourOfDayKey> getKeyFactory() {
+ return this.keyFactory;
+ }
+
+ public ZoneId getZone() {
+ return this.zone;
+ }
+
+}
diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java
index d31854ec7..1f3cd7e77 100644
--- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java
+++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java
@@ -4,6 +4,8 @@ import com.hazelcast.jet.aggregate.AggregateOperations;
import com.hazelcast.jet.kafka.KafkaSinks;
import com.hazelcast.jet.kafka.KafkaSources;
import com.hazelcast.jet.pipeline.Pipeline;
+import com.hazelcast.jet.pipeline.ServiceFactories;
+import com.hazelcast.jet.pipeline.ServiceFactory;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.jet.pipeline.StreamStage;
@@ -11,10 +13,10 @@ import com.hazelcast.jet.pipeline.WindowDefinition;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
+import java.time.ZoneId;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
-import java.util.TimeZone;
import rocks.theodolite.benchmarks.commons.hazelcastjet.PipelineFactory;
import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord;
@@ -101,25 +103,26 @@ public class Uc3PipelineFactory extends PipelineFactory {
public StreamStage<Map.Entry<String, String>> extendUc3Topology(
final StreamSource<Map.Entry<String, ActivePowerRecord>> source) {
+ final ServiceFactory<?, MapTimeKeyConfiguration> timeKeyConfigService =
+ ServiceFactories.nonSharedService(
+ pctx -> new MapTimeKeyConfiguration(
+ new HourOfDayKeyFactory(),
+ ZoneId.of("Europe/Paris") // TODO Make configurable
+ ));
+
// Build the pipeline topology.
return this.pipe
.readFrom(source)
// use Timestamps
.withNativeTimestamps(0)
// .setLocalParallelism(1)
- // Map timestamp to hour of day and create new key using sensorID and
- // datetime mapped to HourOfDay
- .map(record -> {
+ // Map key to HourOfDayKey
+ .mapUsingService(timeKeyConfigService, (config, record) -> {
final String sensorId = record.getValue().getIdentifier();
- final long timestamp = record.getValue().getTimestamp();
- final LocalDateTime dateTime = LocalDateTime.ofInstant(
- Instant.ofEpochMilli(timestamp),
- TimeZone.getDefault().toZoneId());
-
- final StatsKeyFactory<HourOfDayKey> keyFactory = new HoursOfDayKeyFactory();
- final HourOfDayKey newKey = keyFactory.createKey(sensorId, dateTime);
-
- return Map.entry(newKey, record.getValue());
+ final Instant instant = Instant.ofEpochMilli(record.getValue().getTimestamp());
+ final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, config.getZone());
+ final HourOfDayKey key = config.getKeyFactory().createKey(sensorId, dateTime);
+ return Map.entry(key, record.getValue());
})
// group by new keys
.groupingKey(Entry::getKey)
diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineTest.java b/theodolite-benchmarks/uc3-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineTest.java
index 3e2ccff7f..2da9780af 100644
--- a/theodolite-benchmarks/uc3-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineTest.java
+++ b/theodolite-benchmarks/uc3-hazelcastjet/src/test/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineTest.java
@@ -15,10 +15,10 @@ import com.hazelcast.jet.test.SerialTest;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
+import java.time.ZoneId;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
-import java.util.TimeZone;
import java.util.concurrent.CompletionException;
import org.junit.After;
import org.junit.Assert;
@@ -57,7 +57,7 @@ public class Uc3PipelineTest extends JetTestSupport {
final Double testValueInW = 10.0;
final Duration testHopSize = Duration.ofSeconds(1);
final Duration testWindowSize = Duration.ofSeconds(50);
- final Duration testEmitPeriod = Duration.ofSeconds(0); // Do not emir early results
+ final Duration testEmitPeriod = Duration.ofSeconds(0); // Do not emit early results
// Used to check hourOfDay
final long mockTimestamp = 1632741651;
@@ -115,8 +115,9 @@ public class Uc3PipelineTest extends JetTestSupport {
for (final Entry<String, String> entry : collection) {
// Build hour of day
- final int expectedHour = LocalDateTime.ofInstant(Instant.ofEpochMilli(mockTimestamp),
- TimeZone.getDefault().toZoneId()).getHour();
+ final int expectedHour = LocalDateTime
+ .ofInstant(Instant.ofEpochMilli(mockTimestamp), ZoneId.of("Europe/Paris"))
+ .getHour();
// Compare expected output with generated output
final String expectedKey = testSensorName + ";" + expectedHour;
@@ -137,8 +138,7 @@ public class Uc3PipelineTest extends JetTestSupport {
}
// Assertion
- Assert.assertTrue(
- "Items do not match expected structure!", allOkay);
+ Assert.assertTrue("Items do not match expected structure!", allOkay);
}));
// Run the test!
--
GitLab