Skip to content
Snippets Groups Projects
Commit 6a339c78 authored by Sören Henning's avatar Sören Henning
Browse files

Try fixing failing test

parent 71cad78f
No related branches found
No related tags found
No related merge requests found
...@@ -3,10 +3,9 @@ package rocks.theodolite.benchmarks.uc3.hazelcastjet; ...@@ -3,10 +3,9 @@ package rocks.theodolite.benchmarks.uc3.hazelcastjet;
import java.time.LocalDateTime; import java.time.LocalDateTime;
/** /**
* A factory class to build an {@link HourOfDayKey}. * {@link StatsKeyFactory} for {@link HourOfDayKey}.
*
*/ */
public class HoursOfDayKeyFactory implements StatsKeyFactory<HourOfDayKey> { public class HourOfDayKeyFactory implements StatsKeyFactory<HourOfDayKey> {
@Override @Override
public HourOfDayKey createKey(final String sensorId, final LocalDateTime dateTime) { public HourOfDayKey createKey(final String sensorId, final LocalDateTime dateTime) {
......
package rocks.theodolite.benchmarks.uc3.hazelcastjet;
import java.time.ZoneId;
/**
* Stores a configuration consisting of a {@link StatsKeyFactory} and a {@link ZoneId}.
*/
public class MapTimeKeyConfiguration {
private final StatsKeyFactory<HourOfDayKey> keyFactory;
private final ZoneId zone;
public MapTimeKeyConfiguration(
final StatsKeyFactory<HourOfDayKey> keyFactory,
final ZoneId zone) {
super();
this.keyFactory = keyFactory;
this.zone = zone;
}
public StatsKeyFactory<HourOfDayKey> getKeyFactory() {
return this.keyFactory;
}
public ZoneId getZone() {
return this.zone;
}
}
...@@ -4,6 +4,8 @@ import com.hazelcast.jet.aggregate.AggregateOperations; ...@@ -4,6 +4,8 @@ import com.hazelcast.jet.aggregate.AggregateOperations;
import com.hazelcast.jet.kafka.KafkaSinks; import com.hazelcast.jet.kafka.KafkaSinks;
import com.hazelcast.jet.kafka.KafkaSources; import com.hazelcast.jet.kafka.KafkaSources;
import com.hazelcast.jet.pipeline.Pipeline; import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.ServiceFactories;
import com.hazelcast.jet.pipeline.ServiceFactory;
import com.hazelcast.jet.pipeline.Sinks; import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.StreamSource; import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.jet.pipeline.StreamStage; import com.hazelcast.jet.pipeline.StreamStage;
...@@ -11,10 +13,10 @@ import com.hazelcast.jet.pipeline.WindowDefinition; ...@@ -11,10 +13,10 @@ import com.hazelcast.jet.pipeline.WindowDefinition;
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Properties; import java.util.Properties;
import java.util.TimeZone;
import rocks.theodolite.benchmarks.commons.hazelcastjet.PipelineFactory; import rocks.theodolite.benchmarks.commons.hazelcastjet.PipelineFactory;
import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord; import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord;
...@@ -101,25 +103,26 @@ public class Uc3PipelineFactory extends PipelineFactory { ...@@ -101,25 +103,26 @@ public class Uc3PipelineFactory extends PipelineFactory {
public StreamStage<Map.Entry<String, String>> extendUc3Topology( public StreamStage<Map.Entry<String, String>> extendUc3Topology(
final StreamSource<Map.Entry<String, ActivePowerRecord>> source) { final StreamSource<Map.Entry<String, ActivePowerRecord>> source) {
final ServiceFactory<?, MapTimeKeyConfiguration> timeKeyConfigService =
ServiceFactories.nonSharedService(
pctx -> new MapTimeKeyConfiguration(
new HourOfDayKeyFactory(),
ZoneId.of("Europe/Paris") // TODO Make configurable
));
// Build the pipeline topology. // Build the pipeline topology.
return this.pipe return this.pipe
.readFrom(source) .readFrom(source)
// use Timestamps // use Timestamps
.withNativeTimestamps(0) .withNativeTimestamps(0)
// .setLocalParallelism(1) // .setLocalParallelism(1)
// Map timestamp to hour of day and create new key using sensorID and // Map key to HourOfDayKey
// datetime mapped to HourOfDay .mapUsingService(timeKeyConfigService, (config, record) -> {
.map(record -> {
final String sensorId = record.getValue().getIdentifier(); final String sensorId = record.getValue().getIdentifier();
final long timestamp = record.getValue().getTimestamp(); final Instant instant = Instant.ofEpochMilli(record.getValue().getTimestamp());
final LocalDateTime dateTime = LocalDateTime.ofInstant( final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, config.getZone());
Instant.ofEpochMilli(timestamp), final HourOfDayKey key = config.getKeyFactory().createKey(sensorId, dateTime);
TimeZone.getDefault().toZoneId()); return Map.entry(key, record.getValue());
final StatsKeyFactory<HourOfDayKey> keyFactory = new HoursOfDayKeyFactory();
final HourOfDayKey newKey = keyFactory.createKey(sensorId, dateTime);
return Map.entry(newKey, record.getValue());
}) })
// group by new keys // group by new keys
.groupingKey(Entry::getKey) .groupingKey(Entry::getKey)
......
...@@ -15,10 +15,10 @@ import com.hazelcast.jet.test.SerialTest; ...@@ -15,10 +15,10 @@ import com.hazelcast.jet.test.SerialTest;
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Properties; import java.util.Properties;
import java.util.TimeZone;
import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionException;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
...@@ -57,7 +57,7 @@ public class Uc3PipelineTest extends JetTestSupport { ...@@ -57,7 +57,7 @@ public class Uc3PipelineTest extends JetTestSupport {
final Double testValueInW = 10.0; final Double testValueInW = 10.0;
final Duration testHopSize = Duration.ofSeconds(1); final Duration testHopSize = Duration.ofSeconds(1);
final Duration testWindowSize = Duration.ofSeconds(50); final Duration testWindowSize = Duration.ofSeconds(50);
final Duration testEmitPeriod = Duration.ofSeconds(0); // Do not emir early results final Duration testEmitPeriod = Duration.ofSeconds(0); // Do not emit early results
// Used to check hourOfDay // Used to check hourOfDay
final long mockTimestamp = 1632741651; final long mockTimestamp = 1632741651;
...@@ -115,8 +115,9 @@ public class Uc3PipelineTest extends JetTestSupport { ...@@ -115,8 +115,9 @@ public class Uc3PipelineTest extends JetTestSupport {
for (final Entry<String, String> entry : collection) { for (final Entry<String, String> entry : collection) {
// Build hour of day // Build hour of day
final int expectedHour = LocalDateTime.ofInstant(Instant.ofEpochMilli(mockTimestamp), final int expectedHour = LocalDateTime
TimeZone.getDefault().toZoneId()).getHour(); .ofInstant(Instant.ofEpochMilli(mockTimestamp), ZoneId.of("Europe/Paris"))
.getHour();
// Compare expected output with generated output // Compare expected output with generated output
final String expectedKey = testSensorName + ";" + expectedHour; final String expectedKey = testSensorName + ";" + expectedHour;
...@@ -137,8 +138,7 @@ public class Uc3PipelineTest extends JetTestSupport { ...@@ -137,8 +138,7 @@ public class Uc3PipelineTest extends JetTestSupport {
} }
// Assertion // Assertion
Assert.assertTrue( Assert.assertTrue("Items do not match expected structure!", allOkay);
"Items do not match expected structure!", allOkay);
})); }));
// Run the test! // Run the test!
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment