Skip to content
Snippets Groups Projects
Commit 6ecc7eea authored by Sören Henning's avatar Sören Henning
Browse files

Set trigger interval in Flink UC4

parent c06b308a
No related branches found
No related tags found
No related merge requests found
Pipeline #10435 passed
......@@ -15,8 +15,6 @@ public final class Uc4ConfigurationKeys {
public static final String GRACE_PERIOD_MS = "grace.period.ms";
// public static final String TRIGGER_ENABLE = "trigger.enable";
public static final String TRIGGER_INTERVAL_SECONDS = "trigger.interval.seconds";
private Uc4ConfigurationKeys() {}
......
......@@ -10,6 +10,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
......@@ -72,6 +73,8 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService {
Time.milliseconds(this.config.getLong(Uc4ConfigurationKeys.EMIT_PERIOD_MS));
final Duration windowGrace =
Duration.ofMillis(this.config.getLong(Uc4ConfigurationKeys.GRACE_PERIOD_MS));
final Time triggerDuration =
Time.seconds(this.config.getLong(Uc4ConfigurationKeys.TRIGGER_INTERVAL_SECONDS));
final String configurationTopic =
this.config.getString(Uc4ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC);
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
......@@ -147,6 +150,7 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService {
.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(windowGrace))
.keyBy(t -> t.f0.getParent())
.window(TumblingEventTimeWindows.of(windowSize))
.trigger(ContinuousProcessingTimeTrigger.of(triggerDuration))
.process(new RecordAggregationProcessWindowFunction())
.name("[Aggregate] ((Sensor, Group), ActivePowerRecord) -> AggregatedActivePowerRecord");
......
......@@ -17,6 +17,8 @@ public final class Uc4ConfigurationKeys {
public static final String GRACE_PERIOD_MS = "grace.period.ms";
public static final String TRIGGER_INTERVAL_SECONDS = "trigger.interval.seconds";
private Uc4ConfigurationKeys() {}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment