Skip to content
Snippets Groups Projects
Commit 9b5ae78d authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

Uc3 hazelcastjet: change window time to Duration

parent 4601f11c
No related branches found
No related tags found
1 merge request!275Refactor hazelcast jet benchmarks:
Pipeline #8658 failed
This commit is part of merge request !275. Comments created here will be created in the context of that merge request.
package rocks.theodolite.benchmarks.uc3.hazelcastjet; package rocks.theodolite.benchmarks.uc3.hazelcastjet;
import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.time.Duration;
import java.util.Properties; import java.util.Properties;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
...@@ -37,19 +39,19 @@ public class HistoryService extends HazelcastJetService { ...@@ -37,19 +39,19 @@ public class HistoryService extends HazelcastJetService {
final String kafkaOutputTopic = final String kafkaOutputTopic =
config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString(); config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString();
final int windowSizeInDaysNumber = Integer.parseInt( final Duration windowSize = Duration.ofDays(Integer.parseInt(
config.getProperty(ConfigurationKeys.AGGREGATION_DURATION_DAYS).toString()); config.getProperty(ConfigurationKeys.AGGREGATION_DURATION_DAYS).toString()));
final int hoppingSizeInDaysNumber = Integer.parseInt( final Duration hoppingSize = Duration.ofDays(Integer.parseInt(
config.getProperty(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS).toString()); config.getProperty(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS).toString()));
this.pipelineFactory = new Uc3PipelineFactory( this.pipelineFactory = new Uc3PipelineFactory(
kafkaProps, kafkaProps,
kafkaInputTopic, kafkaInputTopic,
kafkaWriteProps, kafkaWriteProps,
kafkaOutputTopic, kafkaOutputTopic,
windowSizeInDaysNumber, windowSize,
hoppingSizeInDaysNumber); hoppingSize);
} }
@Override @Override
......
...@@ -8,15 +8,16 @@ import com.hazelcast.jet.pipeline.Sinks; ...@@ -8,15 +8,16 @@ import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.StreamSource; import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.jet.pipeline.StreamStage; import com.hazelcast.jet.pipeline.StreamStage;
import com.hazelcast.jet.pipeline.WindowDefinition; import com.hazelcast.jet.pipeline.WindowDefinition;
import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Properties; import java.util.Properties;
import java.util.TimeZone; import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord;
import rocks.theodolite.benchmarks.commons.hazelcastjet.PipelineFactory; import rocks.theodolite.benchmarks.commons.hazelcastjet.PipelineFactory;
import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord;
import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKey; import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKey;
import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HoursOfDayKeyFactory; import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HoursOfDayKeyFactory;
import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.StatsKeyFactory; import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.StatsKeyFactory;
...@@ -24,8 +25,8 @@ import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.StatsKeyFactory ...@@ -24,8 +25,8 @@ import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.StatsKeyFactory
public class Uc3PipelineFactory extends PipelineFactory { public class Uc3PipelineFactory extends PipelineFactory {
private final int hoppingSizeInDays; private final Duration hoppingSize;
private final int windowSizeInDays; private final Duration windowSize;
/** /**
* Build a new Pipeline. * Build a new Pipeline.
...@@ -35,21 +36,21 @@ public class Uc3PipelineFactory extends PipelineFactory { ...@@ -35,21 +36,21 @@ public class Uc3PipelineFactory extends PipelineFactory {
* attributes. * attributes.
* @param kafkaInputTopic The name of the input topic used for the pipeline. * @param kafkaInputTopic The name of the input topic used for the pipeline.
* @param kafkaOutputTopic The name of the output topic used for the pipeline. * @param kafkaOutputTopic The name of the output topic used for the pipeline.
* @param hoppingSizeInDays The hop length of the sliding window used in the aggregation of * @param hoppingSize The hop length of the sliding window used in the aggregation of
* this pipeline. * this pipeline.
* @param windowSizeInDays The window length of the sliding window used in the aggregation of * @param windowSize The window length of the sliding window used in the aggregation of
* this pipeline. * this pipeline.
*/ */
public Uc3PipelineFactory(final Properties kafkaReadPropsForPipeline, public Uc3PipelineFactory(final Properties kafkaReadPropsForPipeline,
final String kafkaInputTopic, final String kafkaInputTopic,
final Properties kafkaWritePropsForPipeline, final Properties kafkaWritePropsForPipeline,
final String kafkaOutputTopic, final String kafkaOutputTopic,
final int windowSizeInDays, final Duration windowSize,
final int hoppingSizeInDays) { final Duration hoppingSize) {
super(kafkaReadPropsForPipeline, kafkaInputTopic, super(kafkaReadPropsForPipeline, kafkaInputTopic,
kafkaWritePropsForPipeline,kafkaOutputTopic); kafkaWritePropsForPipeline,kafkaOutputTopic);
this.windowSizeInDays = windowSizeInDays; this.windowSize = windowSize;
this.hoppingSizeInDays = hoppingSizeInDays; this.hoppingSize = hoppingSize;
} }
...@@ -117,8 +118,7 @@ public class Uc3PipelineFactory extends PipelineFactory { ...@@ -117,8 +118,7 @@ public class Uc3PipelineFactory extends PipelineFactory {
// group by new keys // group by new keys
.groupingKey(Entry::getKey) .groupingKey(Entry::getKey)
// Sliding/Hopping Window // Sliding/Hopping Window
.window(WindowDefinition.sliding(TimeUnit.DAYS.toMillis(windowSizeInDays), .window(WindowDefinition.sliding(windowSize.toMillis(), hoppingSize.toMillis()))
TimeUnit.DAYS.toMillis(hoppingSizeInDays)))
// get average value of group (sensoreId,hourOfDay) // get average value of group (sensoreId,hourOfDay)
.aggregate( .aggregate(
AggregateOperations.averagingDouble(record -> record.getValue().getValueInW())) AggregateOperations.averagingDouble(record -> record.getValue().getValueInW()))
......
...@@ -12,13 +12,14 @@ import com.hazelcast.jet.pipeline.test.AssertionCompletedException; ...@@ -12,13 +12,14 @@ import com.hazelcast.jet.pipeline.test.AssertionCompletedException;
import com.hazelcast.jet.pipeline.test.Assertions; import com.hazelcast.jet.pipeline.test.Assertions;
import com.hazelcast.jet.pipeline.test.TestSources; import com.hazelcast.jet.pipeline.test.TestSources;
import com.hazelcast.jet.test.SerialTest; import com.hazelcast.jet.test.SerialTest;
import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.TimeZone; import java.util.TimeZone;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.TimeZone;
import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionException;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
...@@ -57,8 +58,8 @@ public class Uc3PipelineTest extends JetTestSupport { ...@@ -57,8 +58,8 @@ public class Uc3PipelineTest extends JetTestSupport {
final int testItemsPerSecond = 1; final int testItemsPerSecond = 1;
final String testSensorName = "TEST-SENSOR"; final String testSensorName = "TEST-SENSOR";
final Double testValueInW = 10.0; final Double testValueInW = 10.0;
final int testHopSizeInSec = 1; final Duration testHopSize = Duration.ofSeconds(1);
final int testWindowSizeInSec = 50; final Duration testWindowSize = Duration.ofSeconds(50);
// Used to check hourOfDay // Used to check hourOfDay
final long mockTimestamp = 1632741651; final long mockTimestamp = 1632741651;
...@@ -82,7 +83,7 @@ public class Uc3PipelineTest extends JetTestSupport { ...@@ -82,7 +83,7 @@ public class Uc3PipelineTest extends JetTestSupport {
// Create pipeline to test // Create pipeline to test
final Properties properties = new Properties(); final Properties properties = new Properties();
final Uc3PipelineFactory factory = new Uc3PipelineFactory( final Uc3PipelineFactory factory = new Uc3PipelineFactory(
properties,"", properties,"", testWindowSizeInSec, testHopSizeInSec); properties,"", properties,"", testWindowSize, testHopSize);
this.uc3Topology = factory.extendUc3Topology(testSource); this.uc3Topology = factory.extendUc3Topology(testSource);
...@@ -98,7 +99,7 @@ public class Uc3PipelineTest extends JetTestSupport { ...@@ -98,7 +99,7 @@ public class Uc3PipelineTest extends JetTestSupport {
// Assertion Configuration // Assertion Configuration
final int timeout = 10; final int timeout = 10;
final String testSensorName = "TEST-SENSOR"; final String testSensorName = "TEST-SENSOR";
final Double testValueInW = 10.0; final double testValueInW = 10.0;
// Used to check hourOfDay // Used to check hourOfDay
final long mockTimestamp = 1632741651; final long mockTimestamp = 1632741651;
...@@ -116,13 +117,12 @@ public class Uc3PipelineTest extends JetTestSupport { ...@@ -116,13 +117,12 @@ public class Uc3PipelineTest extends JetTestSupport {
for (final Entry<String, String> entry : collection) { for (final Entry<String, String> entry : collection) {
// Build hour of day // Build hour of day
long timestamp = mockTimestamp; final int expectedHour = LocalDateTime.ofInstant(Instant.ofEpochMilli(mockTimestamp),
final int expectedHour = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp),
TimeZone.getDefault().toZoneId()).getHour(); TimeZone.getDefault().toZoneId()).getHour();
// Compare expected output with generated output // Compare expected output with generated output
final String expectedKey = testSensorName + ";" + expectedHour; final String expectedKey = testSensorName + ";" + expectedHour;
final String expectedValue = testValueInW.toString(); final String expectedValue = Double.toString(testValueInW);
// DEBUG // DEBUG
LOGGER.info( LOGGER.info(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment