Skip to content
Snippets Groups Projects
Commit 7abfdeb5 authored by Sören Henning's avatar Sören Henning
Browse files

Unify naming of UC4 options

parent 1082b292
Branches
Tags
1 merge request!307Align UC4 implementations among engines
Pipeline #10183 canceled
Showing with 13 additions and 17 deletions
......@@ -19,10 +19,6 @@ public final class ConfigurationKeys {
public static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
public static final String WINDOW_SIZE_MS = "window.size.ms";
public static final String WINDOW_GRACE_MS = "window.grace.ms";
public static final String CHECKPOINTING = "checkpointing";
public static final String CHECKPOINTING_INTERVAL_MS = "checkpointing.interval.ms";
......
......@@ -32,7 +32,7 @@ public class ConfigurationKeys {
// UC4
public static final String KAFKA_CONFIGURATION_TOPIC = "kafka.configuration.topic";
public static final String KAFKA_FEEDBACK_TOPIC = "kafka.feedback.topic";
public static final String EMIT_PERIOD_MS = "window.size";
// public static final String EMIT_PERIOD_MS = "emit.period.ms";
public static final String EMIT_PERIOD_MS = "emit.period.ms";
// public static final String GRACE_PERIOD_MS = "grace.period.ms";
}
......@@ -67,9 +67,9 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService {
final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
final Time windowSize =
Time.milliseconds(this.config.getLong(ConfigurationKeys.WINDOW_SIZE_MS));
Time.milliseconds(this.config.getLong(ConfigurationKeys.EMIT_PERIOD_MS));
final Duration windowGrace =
Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_GRACE_MS));
Duration.ofMillis(this.config.getLong(ConfigurationKeys.GRACE_PERIOD_MS));
final String configurationTopic =
this.config.getString(ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC);
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
......
......@@ -19,9 +19,9 @@ public final class ConfigurationKeys {
public static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
public static final String WINDOW_SIZE_MS = "window.size.ms";
public static final String EMIT_PERIOD_MS = "emit.period.ms";
public static final String WINDOW_GRACE_MS = "window.grace.ms";
public static final String GRACE_PERIOD_MS = "grace.period.ms";
public static final String FLINK_STATE_BACKEND = "flink.state.backend";
......
......@@ -9,8 +9,9 @@ kafka.bootstrap.servers=localhost:9092
kafka.input.topic=input
kafka.output.topic=output
schema.registry.url=http://localhost:8081
window.size.ms=1000
window.grace.ms=0
emit.period.ms=5000
grace.period.ms=0
# Flink configuration
checkpointing.interval.ms=1000
......@@ -43,7 +43,7 @@ public class Uc4PipelineFactory extends PipelineFactory {
private final String kafkaConfigurationTopic;
private final String kafkaFeedbackTopic;
private final Duration windowSize;
private final Duration emitPeriod;
/**
......@@ -79,7 +79,7 @@ public class Uc4PipelineFactory extends PipelineFactory {
this.kafkaFeedbackPropsForPipeline = kafkaFeedbackPropsForPipeline;
this.kafkaConfigurationTopic = kafkaConfigurationTopic;
this.kafkaFeedbackTopic = kafkaFeedbackTopic;
this.windowSize = windowSize;
this.emitPeriod = windowSize;
}
/**
......@@ -231,7 +231,7 @@ public class Uc4PipelineFactory extends PipelineFactory {
// (5) UC4 Last Value Map
// Table with tumbling window differentiation [ (sensorKey,Group) , value ],Time
final StageWithWindow<Entry<SensorGroupKey, ActivePowerRecord>> windowedLastValues =
dupliAsFlatmappedStage.window(WindowDefinition.tumbling(this.windowSize.toMillis()));
dupliAsFlatmappedStage.window(WindowDefinition.tumbling(this.emitPeriod.toMillis()));
final AggregateOperation1<Entry<SensorGroupKey, ActivePowerRecord>, AggregatedActivePowerRecordAccumulator, AggregatedActivePowerRecord> aggrOp = // NOCS
AggregateOperation
......
......@@ -9,6 +9,5 @@ kafka.feedback.topic=aggregation-feedback
schema.registry.url=http://localhost:8081
window.size=5000
#emit.period.ms=5000
emit.period.ms=5000
#grace.period.ms=0
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment