diff --git a/theodolite-benchmarks/uc3-flink/Dockerfile b/theodolite-benchmarks/uc3-flink/Dockerfile index cef05c0296f55f0cf7391dd35dd1806ec0efa287..744ad389e15a093c8eb1e1ce7ae7352f69c30c33 100644 --- a/theodolite-benchmarks/uc3-flink/Dockerfile +++ b/theodolite-benchmarks/uc3-flink/Dockerfile @@ -1,3 +1,3 @@ FROM flink:1.13-java11 -ADD build/libs/uc3-flink-all.jar /opt/flink/usrlib/artifacts/uc3-flink-all.jar \ No newline at end of file +ADD build/libs/uc3-flink-all.jar /opt/flink/usrlib/artifacts/uc3-flink-all.jar diff --git a/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/ConfigurationKeys.java b/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/ConfigurationKeys.java index 980f07b9b1478bd2c5fa74c89d1aaff4c10f60df..9e42f1ffb41be4f9e40e690b110d0556af67b34a 100644 --- a/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/ConfigurationKeys.java +++ b/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/ConfigurationKeys.java @@ -21,6 +21,9 @@ public final class ConfigurationKeys { public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days"; + public static final String AGGREGATION_TRIGGER_INTERVAL_SECONDS = + "aggregation.trigger.interval.seconds"; + public static final String COMMIT_INTERVAL_MS = "commit.interval.ms"; public static final String TIME_ZONE = "time.zone"; diff --git a/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java index d80f64fafb69d3e0287347a8f90080584d4fcd82..9a85a87a85222d5ee17f6f33bc7fa8425d5f1a03 100644 --- a/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java @@ -9,6 +9,7 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.kafka.common.serialization.Serdes; @@ -55,6 +56,8 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService { Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS)); final Time aggregationAdvance = Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS)); + final Time triggerDuration = + Time.seconds(this.config.getInt(ConfigurationKeys.AGGREGATION_TRIGGER_INTERVAL_SECONDS)); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory( @@ -80,13 +83,14 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService { return keyFactory.createKey(record.getIdentifier(), dateTime); }) .window(SlidingEventTimeWindows.of(aggregationDuration, aggregationAdvance)) + .trigger(ContinuousEventTimeTrigger.of(triggerDuration)) .aggregate(new StatsAggregateFunction(), new HourOfDayProcessWindowFunction()) .map(tuple -> { - final String newKey = keyFactory.getSensorId(tuple.f0); - final String newValue = tuple.f1.toString(); - final int hourOfDay = tuple.f0.getHourOfDay(); - LOGGER.info("{}|{}: {}", newKey, hourOfDay, newValue); - return new Tuple2<>(newKey, newValue); + final String sensorId = keyFactory.getSensorId(tuple.f0); + final String stats = tuple.f1.toString(); + // final int hourOfDay = tuple.f0.getHourOfDay(); + // LOGGER.info("{}|{}: {}", newKey, hourOfDay, newValue); + return new Tuple2<>(sensorId, stats); }) .name("map") .returns(Types.TUPLE(Types.STRING, Types.STRING)) diff --git a/theodolite-benchmarks/uc3-flink/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc3-flink/src/main/resources/META-INF/application.properties index 6b6874674ce6a0abea73ea6d983c00c15deb8bb1..4423cd38c867a50f758437aa8eac8b12c7f10e62 100644 --- a/theodolite-benchmarks/uc3-flink/src/main/resources/META-INF/application.properties +++ b/theodolite-benchmarks/uc3-flink/src/main/resources/META-INF/application.properties @@ -7,6 +7,7 @@ kafka.output.topic=output schema.registry.url=http://localhost:8081 aggregation.duration.days=30 aggregation.advance.days=1 +aggregation.trigger.interval.seconds=15 num.threads=1 commit.interval.ms=100 cache.max.bytes.buffering=-1