Skip to content
Snippets Groups Projects
Commit 70332b10 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'try-fixing-broken-test-uc3-hazelcast' into main

parents a528aa31 b1b311af
No related branches found
No related tags found
No related merge requests found
Pipeline #10138 canceled
...@@ -3,10 +3,9 @@ package rocks.theodolite.benchmarks.uc3.hazelcastjet; ...@@ -3,10 +3,9 @@ package rocks.theodolite.benchmarks.uc3.hazelcastjet;
import java.time.LocalDateTime; import java.time.LocalDateTime;
/** /**
* A factory class to build an {@link HourOfDayKey}. * {@link StatsKeyFactory} for {@link HourOfDayKey}.
*
*/ */
public class HoursOfDayKeyFactory implements StatsKeyFactory<HourOfDayKey> { public class HourOfDayKeyFactory implements StatsKeyFactory<HourOfDayKey> {
@Override @Override
public HourOfDayKey createKey(final String sensorId, final LocalDateTime dateTime) { public HourOfDayKey createKey(final String sensorId, final LocalDateTime dateTime) {
......
package rocks.theodolite.benchmarks.uc3.hazelcastjet;
import java.time.ZoneId;
/**
* Stores a configuration consisting of a {@link StatsKeyFactory} and a {@link ZoneId}.
*/
public class MapTimeKeyConfiguration {
private final StatsKeyFactory<HourOfDayKey> keyFactory;
private final ZoneId zone;
public MapTimeKeyConfiguration(
final StatsKeyFactory<HourOfDayKey> keyFactory,
final ZoneId zone) {
super();
this.keyFactory = keyFactory;
this.zone = zone;
}
public StatsKeyFactory<HourOfDayKey> getKeyFactory() {
return this.keyFactory;
}
public ZoneId getZone() {
return this.zone;
}
}
...@@ -4,6 +4,8 @@ import com.hazelcast.jet.aggregate.AggregateOperations; ...@@ -4,6 +4,8 @@ import com.hazelcast.jet.aggregate.AggregateOperations;
import com.hazelcast.jet.kafka.KafkaSinks; import com.hazelcast.jet.kafka.KafkaSinks;
import com.hazelcast.jet.kafka.KafkaSources; import com.hazelcast.jet.kafka.KafkaSources;
import com.hazelcast.jet.pipeline.Pipeline; import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.ServiceFactories;
import com.hazelcast.jet.pipeline.ServiceFactory;
import com.hazelcast.jet.pipeline.Sinks; import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.StreamSource; import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.jet.pipeline.StreamStage; import com.hazelcast.jet.pipeline.StreamStage;
...@@ -11,10 +13,10 @@ import com.hazelcast.jet.pipeline.WindowDefinition; ...@@ -11,10 +13,10 @@ import com.hazelcast.jet.pipeline.WindowDefinition;
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Properties; import java.util.Properties;
import java.util.TimeZone;
import rocks.theodolite.benchmarks.commons.hazelcastjet.PipelineFactory; import rocks.theodolite.benchmarks.commons.hazelcastjet.PipelineFactory;
import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord; import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord;
...@@ -101,25 +103,26 @@ public class Uc3PipelineFactory extends PipelineFactory { ...@@ -101,25 +103,26 @@ public class Uc3PipelineFactory extends PipelineFactory {
public StreamStage<Map.Entry<String, String>> extendUc3Topology( public StreamStage<Map.Entry<String, String>> extendUc3Topology(
final StreamSource<Map.Entry<String, ActivePowerRecord>> source) { final StreamSource<Map.Entry<String, ActivePowerRecord>> source) {
final ServiceFactory<?, MapTimeKeyConfiguration> timeKeyConfigService =
ServiceFactories.nonSharedService(
pctx -> new MapTimeKeyConfiguration(
new HourOfDayKeyFactory(),
ZoneId.of("Europe/Paris") // TODO Make configurable
));
// Build the pipeline topology. // Build the pipeline topology.
return this.pipe return this.pipe
.readFrom(source) .readFrom(source)
// use Timestamps // use Timestamps
.withNativeTimestamps(0) .withNativeTimestamps(0)
// .setLocalParallelism(1) // .setLocalParallelism(1)
// Map timestamp to hour of day and create new key using sensorID and // Map key to HourOfDayKey
// datetime mapped to HourOfDay .mapUsingService(timeKeyConfigService, (config, record) -> {
.map(record -> {
final String sensorId = record.getValue().getIdentifier(); final String sensorId = record.getValue().getIdentifier();
final long timestamp = record.getValue().getTimestamp(); final Instant instant = Instant.ofEpochMilli(record.getValue().getTimestamp());
final LocalDateTime dateTime = LocalDateTime.ofInstant( final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, config.getZone());
Instant.ofEpochMilli(timestamp), final HourOfDayKey key = config.getKeyFactory().createKey(sensorId, dateTime);
TimeZone.getDefault().toZoneId()); return Map.entry(key, record.getValue());
final StatsKeyFactory<HourOfDayKey> keyFactory = new HoursOfDayKeyFactory();
final HourOfDayKey newKey = keyFactory.createKey(sensorId, dateTime);
return Map.entry(newKey, record.getValue());
}) })
// group by new keys // group by new keys
.groupingKey(Entry::getKey) .groupingKey(Entry::getKey)
......
...@@ -15,10 +15,10 @@ import com.hazelcast.jet.test.SerialTest; ...@@ -15,10 +15,10 @@ import com.hazelcast.jet.test.SerialTest;
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Properties; import java.util.Properties;
import java.util.TimeZone;
import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionException;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
...@@ -57,7 +57,7 @@ public class Uc3PipelineTest extends JetTestSupport { ...@@ -57,7 +57,7 @@ public class Uc3PipelineTest extends JetTestSupport {
final Double testValueInW = 10.0; final Double testValueInW = 10.0;
final Duration testHopSize = Duration.ofSeconds(1); final Duration testHopSize = Duration.ofSeconds(1);
final Duration testWindowSize = Duration.ofSeconds(50); final Duration testWindowSize = Duration.ofSeconds(50);
final Duration testEmitPeriod = Duration.ofSeconds(0); // Do not emir early results final Duration testEmitPeriod = Duration.ofSeconds(0); // Do not emit early results
// Used to check hourOfDay // Used to check hourOfDay
final long mockTimestamp = 1632741651; final long mockTimestamp = 1632741651;
...@@ -115,11 +115,12 @@ public class Uc3PipelineTest extends JetTestSupport { ...@@ -115,11 +115,12 @@ public class Uc3PipelineTest extends JetTestSupport {
for (final Entry<String, String> entry : collection) { for (final Entry<String, String> entry : collection) {
// Build hour of day // Build hour of day
final int expectedHour = LocalDateTime.ofInstant(Instant.ofEpochMilli(mockTimestamp), final int expectedHour = LocalDateTime
TimeZone.getDefault().toZoneId()).getHour(); .ofInstant(Instant.ofEpochMilli(mockTimestamp), ZoneId.of("Europe/Paris"))
.getHour();
// Compare expected output with generated output // Compare expected output with generated output
final String expectedKey = testSensorName + ";" + expectedHour; final String expectedKey = testSensorName;
final String expectedValue = Double.toString(testValueInW); final String expectedValue = Double.toString(testValueInW);
// DEBUG // DEBUG
...@@ -137,8 +138,7 @@ public class Uc3PipelineTest extends JetTestSupport { ...@@ -137,8 +138,7 @@ public class Uc3PipelineTest extends JetTestSupport {
} }
// Assertion // Assertion
Assert.assertTrue( Assert.assertTrue("Items do not match expected structure!", allOkay);
"Items do not match expected structure!", allOkay);
})); }));
// Run the test! // Run the test!
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment