Skip to content
Snippets Groups Projects
Commit bba206bf authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'main' into feature/374-improve-hazelcastjet-structure

parents 766f89ab 092cc3c0
Branches
Tags
1 merge request!275Refactor hazelcast jet benchmarks:
Pipeline #10126 failed
# Docker Compose Files for Testing # Docker Compose Files for Testing
This directory contains Docker Compose files, which help testing Benchmark implementations. This directory contains Docker Compose files, which help testing Benchmark implementations.
For each stream processing engine (Kafka Streams and Flink) and Benchmark (UC1-4), a Docker Compose file is provided For each stream processing engine (Kafka Streams, Flink, Hazelcast Jet, Beam/Flink and Beam/Samza) and Benchmark
in the corresponding subdirectory. (UC1-4), a Docker Compose file is provided in the corresponding subdirectory.
## Full Dockerized Testing ## Full Dockerized Testing
......
...@@ -21,6 +21,9 @@ public final class ConfigurationKeys { ...@@ -21,6 +21,9 @@ public final class ConfigurationKeys {
public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days"; public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days";
public static final String AGGREGATION_TRIGGER_INTERVAL_SECONDS =
"aggregation.trigger.interval.seconds";
public static final String COMMIT_INTERVAL_MS = "commit.interval.ms"; public static final String COMMIT_INTERVAL_MS = "commit.interval.ms";
public static final String TIME_ZONE = "time.zone"; public static final String TIME_ZONE = "time.zone";
......
...@@ -9,6 +9,7 @@ import org.apache.flink.api.java.functions.KeySelector; ...@@ -9,6 +9,7 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
...@@ -55,6 +56,8 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService { ...@@ -55,6 +56,8 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService {
Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS)); Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS));
final Time aggregationAdvance = final Time aggregationAdvance =
Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS)); Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS));
final Time triggerDuration =
Time.seconds(this.config.getInt(ConfigurationKeys.AGGREGATION_TRIGGER_INTERVAL_SECONDS));
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory( final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory(
...@@ -80,13 +83,14 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService { ...@@ -80,13 +83,14 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService {
return keyFactory.createKey(record.getIdentifier(), dateTime); return keyFactory.createKey(record.getIdentifier(), dateTime);
}) })
.window(SlidingEventTimeWindows.of(aggregationDuration, aggregationAdvance)) .window(SlidingEventTimeWindows.of(aggregationDuration, aggregationAdvance))
.trigger(ContinuousEventTimeTrigger.of(triggerDuration))
.aggregate(new StatsAggregateFunction(), new HourOfDayProcessWindowFunction()) .aggregate(new StatsAggregateFunction(), new HourOfDayProcessWindowFunction())
.map(tuple -> { .map(tuple -> {
final String newKey = keyFactory.getSensorId(tuple.f0); final String sensorId = keyFactory.getSensorId(tuple.f0);
final String newValue = tuple.f1.toString(); final String stats = tuple.f1.toString();
final int hourOfDay = tuple.f0.getHourOfDay(); // final int hourOfDay = tuple.f0.getHourOfDay();
LOGGER.info("{}|{}: {}", newKey, hourOfDay, newValue); // LOGGER.info("{}|{}: {}", newKey, hourOfDay, newValue);
return new Tuple2<>(newKey, newValue); return new Tuple2<>(sensorId, stats);
}) })
.name("map") .name("map")
.returns(Types.TUPLE(Types.STRING, Types.STRING)) .returns(Types.TUPLE(Types.STRING, Types.STRING))
......
...@@ -7,6 +7,7 @@ kafka.output.topic=output ...@@ -7,6 +7,7 @@ kafka.output.topic=output
schema.registry.url=http://localhost:8081 schema.registry.url=http://localhost:8081
aggregation.duration.days=30 aggregation.duration.days=30
aggregation.advance.days=1 aggregation.advance.days=1
aggregation.trigger.interval.seconds=15
num.threads=1 num.threads=1
commit.interval.ms=100 commit.interval.ms=100
cache.max.bytes.buffering=-1 cache.max.bytes.buffering=-1
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment