Skip to content
Snippets Groups Projects

Align UC4 implementations among engines

Merged Sören Henning requested to merge align-uc4-among-engines into main
1 file
+ 1
1
Compare changes
  • Side-by-side
  • Inline
@@ -72,11 +72,11 @@ public class PipelineFactory extends AbstractPipelineFactory {
final String configurationTopic =
this.config.getString(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC);
final Duration duration = Duration.standardSeconds(
this.config.getInt(ConfigurationKeys.EMIT_PERIOD_SECONDS));
final Duration duration = Duration.millis(
this.config.getInt(ConfigurationKeys.EMIT_PERIOD_MS));
final Duration triggerDelay = Duration.standardSeconds(
this.config.getInt(ConfigurationKeys.TRIGGER_INTERVAL_SECONDS));
final Duration gracePeriod = Duration.standardSeconds(
final Duration gracePeriod = Duration.standardSeconds(// TODO this is wrong
this.config.getInt(ConfigurationKeys.GRACE_PERIOD_MS));
// Read from Kafka
@@ -115,9 +115,11 @@ public class PipelineFactory extends AbstractPipelineFactory {
.apply("Read Windows", Window.into(FixedWindows.of(duration)))
.apply("Set trigger for input", Window
.<KV<String, ActivePowerRecord>>configure()
.triggering(Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(triggerDelay)))
.triggering(Repeatedly
.forever(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(triggerDelay)))
.withAllowedLateness(gracePeriod)
.discardingFiredPanes());
@@ -204,7 +206,8 @@ public class PipelineFactory extends AbstractPipelineFactory {
.apply("Reset trigger for aggregations", Window
.<KV<String, ActivePowerRecord>>configure()
.triggering(Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(triggerDelay)))
.withAllowedLateness(gracePeriod)
.discardingFiredPanes())
Loading