Skip to content
Snippets Groups Projects
Commit 409d4ab5 authored by Sören Henning's avatar Sören Henning
Browse files

Keep Beam UC4 working

parent 2b53d632
No related branches found
No related tags found
1 merge request!306Align all downsample intervals for UC2
Pipeline #10171 passed
......@@ -29,8 +29,10 @@ public final class ConfigurationKeys {
public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days";
// UC4
public static final String GRACE_PERIOD_MS = "grace.period.ms";
public static final String EMIT_PERIOD_SECONDS = "kafka.window.duration.minutes";
// public static final String EMIT_PERIOD_MS = "emit.period.ms";
public static final String GRACE_PERIOD_MS = "grace.period.ms";
// BEAM
public static final String ENABLE_AUTO_COMMIT = "enable.auto.commit";
......@@ -41,7 +43,9 @@ public final class ConfigurationKeys {
public static final String SPECIFIC_AVRO_READER = "specific.avro.reader";
public static final String TRIGGER_INTERVAL = "trigger.interval";
// Used for UC3 + UC4:
public static final String TRIGGER_INTERVAL_SECONDS = "trigger.interval";
// public static final String TRIGGER_INTERVAL_SECONDS = "trigger.interval.seconds";
private ConfigurationKeys() {}
......
......@@ -48,7 +48,7 @@ public class PipelineFactory extends AbstractPipelineFactory {
final Duration aggregationAdvanceDuration =
Duration.standardDays(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS));
final Duration triggerDelay =
Duration.standardSeconds(this.config.getInt(ConfigurationKeys.TRIGGER_INTERVAL));
Duration.standardSeconds(this.config.getInt(ConfigurationKeys.TRIGGER_INTERVAL_SECONDS));
// Read from Kafka
final KafkaActivePowerTimestampReader kafkaReader = super.buildKafkaReader();
......
......@@ -4,7 +4,6 @@ application.version=0.0.1
kafka.bootstrap.servers=localhost:9092
kafka.input.topic=input
kafka.output.topic=output
kafka.window.duration.minutes=1
schema.registry.url=http://localhost:8081
......
......@@ -73,9 +73,9 @@ public class PipelineFactory extends AbstractPipelineFactory {
this.config.getString(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC);
final Duration duration = Duration.standardSeconds(
this.config.getInt(ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES));
this.config.getInt(ConfigurationKeys.EMIT_PERIOD_SECONDS));
final Duration triggerDelay = Duration.standardSeconds(
this.config.getInt(ConfigurationKeys.TRIGGER_INTERVAL));
this.config.getInt(ConfigurationKeys.TRIGGER_INTERVAL_SECONDS));
final Duration gracePeriod = Duration.standardSeconds(
this.config.getInt(ConfigurationKeys.GRACE_PERIOD_MS));
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment