Skip to content
Snippets Groups Projects
Commit cda97d49 authored by Sören Henning's avatar Sören Henning
Browse files

Add early firing to Hazelcast Jet UC3

parent 4d8e0b49
No related branches found
No related tags found
No related merge requests found
Pipeline #10134 failed
Showing
with 54 additions and 48 deletions
...@@ -26,10 +26,11 @@ public class ConfigurationKeys { ...@@ -26,10 +26,11 @@ public class ConfigurationKeys {
// UC3 // UC3
public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days"; public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days";
public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days"; public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days";
public static final String AGGREGATION_EMIT_PERIOD_SECONDS = "aggregation.emit.period.seconds";
// UC4 // UC4
public static final String KAFKA_CONFIGURATION_TOPIC = "kafka.configuration.topic"; public static final String KAFKA_CONFIGURATION_TOPIC = "kafka.configuration.topic";
public static final String KAFKA_FEEDBACK_TOPIC = "kafka.feedback.topic"; public static final String KAFKA_FEEDBACK_TOPIC = "kafka.feedback.topic";
public static final String WINDOW_SIZE_UC4 = "window.size"; public static final String WINDOW_SIZE_UC4 = "window.size";
} }
package rocks.theodolite.benchmarks.uc3.hazelcastjet; package rocks.theodolite.benchmarks.uc3.hazelcastjet;
import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.time.Duration; import java.time.Duration;
import java.util.Properties; import java.util.Properties;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
...@@ -10,8 +9,6 @@ import org.slf4j.Logger; ...@@ -10,8 +9,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys; import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys;
import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService; import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService;
import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKey;
import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKeySerializer;
/** /**
* A microservice that aggregate incoming messages in a sliding window. * A microservice that aggregate incoming messages in a sliding window.
...@@ -21,8 +18,8 @@ public class HistoryService extends HazelcastJetService { ...@@ -21,8 +18,8 @@ public class HistoryService extends HazelcastJetService {
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class); private static final Logger LOGGER = LoggerFactory.getLogger(HistoryService.class);
/** /**
* Constructs the use case logic for UC3. * Constructs the use case logic for UC3. Retrieves the needed values and instantiates a pipeline
* Retrieves the needed values and instantiates a pipeline factory. * factory.
*/ */
public HistoryService() { public HistoryService() {
super(LOGGER); super(LOGGER);
...@@ -37,21 +34,25 @@ public class HistoryService extends HazelcastJetService { ...@@ -37,21 +34,25 @@ public class HistoryService extends HazelcastJetService {
StringSerializer.class.getCanonicalName()); StringSerializer.class.getCanonicalName());
final String kafkaOutputTopic = final String kafkaOutputTopic =
config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString(); this.config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString();
final Duration windowSize = Duration.ofDays(Integer.parseInt( final Duration windowSize = Duration.ofDays(Integer.parseInt(
config.getProperty(ConfigurationKeys.AGGREGATION_DURATION_DAYS).toString())); this.config.getProperty(ConfigurationKeys.AGGREGATION_DURATION_DAYS).toString()));
final Duration hoppingSize = Duration.ofDays(Integer.parseInt( final Duration hoppingSize = Duration.ofDays(Integer.parseInt(
config.getProperty(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS).toString())); this.config.getProperty(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS).toString()));
final Duration emitPeriod = Duration.ofSeconds(Integer.parseInt(
this.config.getProperty(ConfigurationKeys.AGGREGATION_EMIT_PERIOD_SECONDS).toString()));
this.pipelineFactory = new Uc3PipelineFactory( this.pipelineFactory = new Uc3PipelineFactory(
kafkaProps, kafkaProps,
kafkaInputTopic, this.kafkaInputTopic,
kafkaWriteProps, kafkaWriteProps,
kafkaOutputTopic, kafkaOutputTopic,
windowSize, windowSize,
hoppingSize); hoppingSize,
emitPeriod);
} }
@Override @Override
......
package rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics; package rocks.theodolite.benchmarks.uc3.hazelcastjet;
import java.util.Objects; import java.util.Objects;
......
package rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics; package rocks.theodolite.benchmarks.uc3.hazelcastjet;
import com.hazelcast.nio.ObjectDataInput; import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput; import com.hazelcast.nio.ObjectDataOutput;
......
package rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics; package rocks.theodolite.benchmarks.uc3.hazelcastjet;
import java.time.LocalDateTime; import java.time.LocalDateTime;
......
package rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics; package rocks.theodolite.benchmarks.uc3.hazelcastjet;
import java.time.LocalDateTime; import java.time.LocalDateTime;
......
...@@ -8,7 +8,6 @@ import com.hazelcast.jet.pipeline.Sinks; ...@@ -8,7 +8,6 @@ import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.StreamSource; import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.jet.pipeline.StreamStage; import com.hazelcast.jet.pipeline.StreamStage;
import com.hazelcast.jet.pipeline.WindowDefinition; import com.hazelcast.jet.pipeline.WindowDefinition;
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.time.LocalDateTime; import java.time.LocalDateTime;
...@@ -18,51 +17,52 @@ import java.util.Properties; ...@@ -18,51 +17,52 @@ import java.util.Properties;
import java.util.TimeZone; import java.util.TimeZone;
import rocks.theodolite.benchmarks.commons.hazelcastjet.PipelineFactory; import rocks.theodolite.benchmarks.commons.hazelcastjet.PipelineFactory;
import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord; import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord;
import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HourOfDayKey;
import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.HoursOfDayKeyFactory;
import rocks.theodolite.benchmarks.uc3.hazelcastjet.uc3specifics.StatsKeyFactory;
/** /**
* PipelineFactory for use case 3. * PipelineFactory for use case 3. Allows to build and extend pipelines.
* Allows to build and extend pipelines.
*/ */
public class Uc3PipelineFactory extends PipelineFactory { public class Uc3PipelineFactory extends PipelineFactory {
private final Duration hoppingSize; private final Duration hoppingSize;
private final Duration windowSize; private final Duration windowSize;
private final Duration emitPeriod;
/** /**
* Build a new Pipeline. * Build a new Pipeline.
*
* @param kafkaReadPropsForPipeline Properties Object containing the necessary kafka reads * @param kafkaReadPropsForPipeline Properties Object containing the necessary kafka reads
* attributes. * attributes.
* @param kafkaWritePropsForPipeline Properties Object containing the necessary kafka write * @param kafkaWritePropsForPipeline Properties Object containing the necessary kafka write
* attributes. * attributes.
* @param kafkaInputTopic The name of the input topic used for the pipeline. * @param kafkaInputTopic The name of the input topic used for the pipeline.
* @param kafkaOutputTopic The name of the output topic used for the pipeline. * @param kafkaOutputTopic The name of the output topic used for the pipeline.
* @param hoppingSize The hop length of the sliding window used in the aggregation of * @param hoppingSize The hop length of the sliding window used in the aggregation of this
* this pipeline. * pipeline.
* @param windowSize The window length of the sliding window used in the aggregation of * @param windowSize The window length of the sliding window used in the aggregation of this
* this pipeline. * pipeline.
*/ */
public Uc3PipelineFactory(final Properties kafkaReadPropsForPipeline, public Uc3PipelineFactory(final Properties kafkaReadPropsForPipeline,
final String kafkaInputTopic, final String kafkaInputTopic,
final Properties kafkaWritePropsForPipeline, final Properties kafkaWritePropsForPipeline,
final String kafkaOutputTopic, final String kafkaOutputTopic,
final Duration windowSize, final Duration windowSize,
final Duration hoppingSize) { final Duration hoppingSize,
super(kafkaReadPropsForPipeline, kafkaInputTopic, final Duration emitPeriod) {
kafkaWritePropsForPipeline,kafkaOutputTopic); super(
kafkaReadPropsForPipeline,
kafkaInputTopic,
kafkaWritePropsForPipeline,
kafkaOutputTopic);
this.windowSize = windowSize; this.windowSize = windowSize;
this.hoppingSize = hoppingSize; this.hoppingSize = hoppingSize;
this.emitPeriod = emitPeriod;
} }
/** /**
* Builds a pipeline which can be used for stream processing using Hazelcast Jet. * Builds a pipeline which can be used for stream processing using Hazelcast Jet.
* @return a pipeline used which can be used in a Hazelcast Jet Instance to process data *
* for UC3. * @return a pipeline used which can be used in a Hazelcast Jet Instance to process data for UC3.
*/ */
@Override @Override
public Pipeline buildPipeline() { public Pipeline buildPipeline() {
...@@ -70,7 +70,7 @@ public class Uc3PipelineFactory extends PipelineFactory { ...@@ -70,7 +70,7 @@ public class Uc3PipelineFactory extends PipelineFactory {
// Define the source // Define the source
final StreamSource<Map.Entry<String, ActivePowerRecord>> kafkaSource = KafkaSources final StreamSource<Map.Entry<String, ActivePowerRecord>> kafkaSource = KafkaSources
.<String, ActivePowerRecord>kafka( .<String, ActivePowerRecord>kafka(
kafkaReadPropsForPipeline, kafkaInputTopic); this.kafkaReadPropsForPipeline, this.kafkaInputTopic);
// Extend topology for UC3 // Extend topology for UC3
final StreamStage<Map.Entry<String, String>> uc3Product = final StreamStage<Map.Entry<String, String>> uc3Product =
...@@ -80,9 +80,9 @@ public class Uc3PipelineFactory extends PipelineFactory { ...@@ -80,9 +80,9 @@ public class Uc3PipelineFactory extends PipelineFactory {
uc3Product.writeTo(Sinks.logger()); uc3Product.writeTo(Sinks.logger());
// Add Sink2: Write back to kafka for the final benchmark // Add Sink2: Write back to kafka for the final benchmark
uc3Product.writeTo(KafkaSinks.<String, String>kafka( uc3Product.writeTo(KafkaSinks.<String, String>kafka(
kafkaWritePropsForPipeline, kafkaOutputTopic)); this.kafkaWritePropsForPipeline, this.kafkaOutputTopic));
return pipe; return this.pipe;
} }
/** /**
...@@ -98,11 +98,11 @@ public class Uc3PipelineFactory extends PipelineFactory { ...@@ -98,11 +98,11 @@ public class Uc3PipelineFactory extends PipelineFactory {
* and value of the Entry object. It can be used to be further modified or directly be * and value of the Entry object. It can be used to be further modified or directly be
* written into a sink. * written into a sink.
*/ */
public StreamStage<Map.Entry<String, String>> public StreamStage<Map.Entry<String, String>> extendUc3Topology(
extendUc3Topology(final StreamSource<Map.Entry<String, ActivePowerRecord>> source) { final StreamSource<Map.Entry<String, ActivePowerRecord>> source) {
// Build the pipeline topology. // Build the pipeline topology.
return pipe return this.pipe
.readFrom(source) .readFrom(source)
// use Timestamps // use Timestamps
.withNativeTimestamps(0) .withNativeTimestamps(0)
...@@ -112,7 +112,8 @@ public class Uc3PipelineFactory extends PipelineFactory { ...@@ -112,7 +112,8 @@ public class Uc3PipelineFactory extends PipelineFactory {
.map(record -> { .map(record -> {
final String sensorId = record.getValue().getIdentifier(); final String sensorId = record.getValue().getIdentifier();
final long timestamp = record.getValue().getTimestamp(); final long timestamp = record.getValue().getTimestamp();
final LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), final LocalDateTime dateTime = LocalDateTime.ofInstant(
Instant.ofEpochMilli(timestamp),
TimeZone.getDefault().toZoneId()); TimeZone.getDefault().toZoneId());
final StatsKeyFactory<HourOfDayKey> keyFactory = new HoursOfDayKeyFactory(); final StatsKeyFactory<HourOfDayKey> keyFactory = new HoursOfDayKeyFactory();
...@@ -123,15 +124,17 @@ public class Uc3PipelineFactory extends PipelineFactory { ...@@ -123,15 +124,17 @@ public class Uc3PipelineFactory extends PipelineFactory {
// group by new keys // group by new keys
.groupingKey(Entry::getKey) .groupingKey(Entry::getKey)
// Sliding/Hopping Window // Sliding/Hopping Window
.window(WindowDefinition.sliding(windowSize.toMillis(), hoppingSize.toMillis())) .window(WindowDefinition
.sliding(this.windowSize.toMillis(), this.hoppingSize.toMillis())
.setEarlyResultsPeriod(this.emitPeriod.toMillis()))
// get average value of group (sensoreId,hourOfDay) // get average value of group (sensoreId,hourOfDay)
.aggregate( .aggregate(
AggregateOperations.averagingDouble(record -> record.getValue().getValueInW())) AggregateOperations.averagingDouble(record -> record.getValue().getValueInW()))
// map to return pair (sensorID,hourOfDay) -> (averaged what value) // map to return pair sensorID -> stats
.map(agg -> { .map(agg -> {
final String theValue = agg.getValue().toString(); final String sensorId = agg.getKey().getSensorId();
final String theKey = agg.getKey().toString(); final String stats = agg.getValue().toString(); // TODO just double, not stats
return Map.entry(theKey, theValue); return Map.entry(sensorId, stats);
}); });
} }
} }
...@@ -6,6 +6,7 @@ kafka.input.topic=input ...@@ -6,6 +6,7 @@ kafka.input.topic=input
kafka.output.topic=output kafka.output.topic=output
aggregation.duration.days=30 aggregation.duration.days=30
aggregation.advance.days=1 aggregation.advance.days=1
aggregation.emit.period.seconds=15
schema.registry.url=http://localhost:8081 schema.registry.url=http://localhost:8081
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment