Skip to content
Snippets Groups Projects
Commit cb278698 authored by Sören Henning's avatar Sören Henning
Browse files

Align time unit config for UC4

parent d66a8cbe
No related branches found
No related tags found
No related merge requests found
Pipeline #10479 passed
Showing
with 11 additions and 17 deletions
...@@ -67,8 +67,6 @@ services: ...@@ -67,8 +67,6 @@ services:
parallelism.default: 1 parallelism.default: 1
state.backend: rocksdb state.backend: rocksdb
state.checkpoints.dir: file:///data/flink/checkpoints state.checkpoints.dir: file:///data/flink/checkpoints
- GRACE_PERIOD_MS=5000
- TRIGGER_INTERVAL_SECONDS=1
depends_on: depends_on:
- schema-registry - schema-registry
- kafka - kafka
......
...@@ -52,8 +52,6 @@ services: ...@@ -52,8 +52,6 @@ services:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092 KAFKA_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_URL: http://schema-registry:8081 SCHEMA_REGISTRY_URL: http://schema-registry:8081
ENABLE_METRICS: "false" ENABLE_METRICS: "false"
GRACE_PERIOD_MS: 5000
TRIGGER_INTERVAL_SECONDS: 1
load-generator: load-generator:
image: ghcr.io/cau-se/theodolite-uc4-workload-generator:${THEODOLITE_TAG:-latest} image: ghcr.io/cau-se/theodolite-uc4-workload-generator:${THEODOLITE_TAG:-latest}
depends_on: depends_on:
......
...@@ -45,8 +45,6 @@ services: ...@@ -45,8 +45,6 @@ services:
environment: environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092 KAFKA_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_URL: http://schema-registry:8081 SCHEMA_REGISTRY_URL: http://schema-registry:8081
COMMIT_INTERVAL_MS: 1000
GRACE_PERIOD_MS: 5000
load-generator: load-generator:
image: ghcr.io/cau-se/theodolite-uc4-workload-generator:${THEODOLITE_TAG:-latest} image: ghcr.io/cau-se/theodolite-uc4-workload-generator:${THEODOLITE_TAG:-latest}
depends_on: depends_on:
......
...@@ -75,8 +75,8 @@ public class PipelineFactory extends AbstractPipelineFactory { ...@@ -75,8 +75,8 @@ public class PipelineFactory extends AbstractPipelineFactory {
final Duration duration = Duration.millis( final Duration duration = Duration.millis(
this.config.getInt(Uc4ConfigurationKeys.EMIT_PERIOD_MS)); this.config.getInt(Uc4ConfigurationKeys.EMIT_PERIOD_MS));
// final boolean enableTrigger = this.config.getBoolean(Uc4ConfigurationKeys.TRIGGER_ENABLE); // final boolean enableTrigger = this.config.getBoolean(Uc4ConfigurationKeys.TRIGGER_ENABLE);
final Duration triggerDelay = Duration.standardSeconds( final Duration triggerDelay = Duration.millis(
this.config.getInt(Uc4ConfigurationKeys.TRIGGER_INTERVAL_SECONDS)); this.config.getInt(Uc4ConfigurationKeys.TRIGGER_INTERVAL_MS));
final Duration gracePeriod = Duration.millis( final Duration gracePeriod = Duration.millis(
this.config.getInt(Uc4ConfigurationKeys.GRACE_PERIOD_MS)); this.config.getInt(Uc4ConfigurationKeys.GRACE_PERIOD_MS));
......
...@@ -15,7 +15,7 @@ public final class Uc4ConfigurationKeys { ...@@ -15,7 +15,7 @@ public final class Uc4ConfigurationKeys {
public static final String GRACE_PERIOD_MS = "grace.period.ms"; public static final String GRACE_PERIOD_MS = "grace.period.ms";
public static final String TRIGGER_INTERVAL_SECONDS = "trigger.interval.seconds"; public static final String TRIGGER_INTERVAL_MS = "trigger.interval.ms";
private Uc4ConfigurationKeys() {} private Uc4ConfigurationKeys() {}
......
...@@ -10,7 +10,7 @@ kafka.feedback.topic=aggregation-feedback ...@@ -10,7 +10,7 @@ kafka.feedback.topic=aggregation-feedback
schema.registry.url=http://localhost:8081 schema.registry.url=http://localhost:8081
emit.period.ms=5000 emit.period.ms=5000
trigger.interval.seconds=1 trigger.interval.ms=1000
grace.period.ms=5000 grace.period.ms=5000
specific.avro.reader=true specific.avro.reader=true
......
...@@ -75,7 +75,7 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService { ...@@ -75,7 +75,7 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService {
final Duration windowGrace = final Duration windowGrace =
Duration.ofMillis(this.config.getLong(Uc4ConfigurationKeys.GRACE_PERIOD_MS)); Duration.ofMillis(this.config.getLong(Uc4ConfigurationKeys.GRACE_PERIOD_MS));
final Time triggerDuration = final Time triggerDuration =
Time.seconds(this.config.getLong(Uc4ConfigurationKeys.TRIGGER_INTERVAL_SECONDS)); Time.milliseconds(this.config.getLong(Uc4ConfigurationKeys.TRIGGER_INTERVAL_MS));
final String configurationTopic = final String configurationTopic =
this.config.getString(Uc4ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC); this.config.getString(Uc4ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC);
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
......
...@@ -19,7 +19,7 @@ public final class Uc4ConfigurationKeys { ...@@ -19,7 +19,7 @@ public final class Uc4ConfigurationKeys {
public static final String GRACE_PERIOD_MS = "grace.period.ms"; public static final String GRACE_PERIOD_MS = "grace.period.ms";
public static final String TRIGGER_INTERVAL_SECONDS = "trigger.interval.seconds"; public static final String TRIGGER_INTERVAL_MS = "trigger.interval.ms";
private Uc4ConfigurationKeys() {} private Uc4ConfigurationKeys() {}
......
...@@ -10,7 +10,7 @@ kafka.feedback.topic=aggregation-feedback ...@@ -10,7 +10,7 @@ kafka.feedback.topic=aggregation-feedback
schema.registry.url=http://localhost:8081 schema.registry.url=http://localhost:8081
emit.period.ms=5000 emit.period.ms=5000
trigger.interval.seconds=1 trigger.interval.ms=1000
grace.period.ms=5000 grace.period.ms=5000
# Flink configuration # Flink configuration
......
...@@ -59,8 +59,8 @@ public class HistoryService extends HazelcastJetService { ...@@ -59,8 +59,8 @@ public class HistoryService extends HazelcastJetService {
final Duration gracePeriod = Duration.ofMillis( final Duration gracePeriod = Duration.ofMillis(
this.config.getInt(Uc4ConfigurationKeys.GRACE_PERIOD_MS)); this.config.getInt(Uc4ConfigurationKeys.GRACE_PERIOD_MS));
final Duration triggerPeriod = Duration.ofSeconds( final Duration triggerPeriod = Duration.ofMillis(
this.config.getInt(Uc4ConfigurationKeys.TRIGGER_INTERVAL_SECONDS)); this.config.getInt(Uc4ConfigurationKeys.TRIGGER_INTERVAL_MS));
this.pipelineFactory = new Uc4PipelineFactory( this.pipelineFactory = new Uc4PipelineFactory(
kafkaProps, kafkaProps,
......
...@@ -15,6 +15,6 @@ public class Uc4ConfigurationKeys { ...@@ -15,6 +15,6 @@ public class Uc4ConfigurationKeys {
public static final String GRACE_PERIOD_MS = "grace.period.ms"; public static final String GRACE_PERIOD_MS = "grace.period.ms";
public static final String TRIGGER_INTERVAL_SECONDS = "trigger.interval.seconds"; public static final String TRIGGER_INTERVAL_MS = "trigger.interval.ms";
} }
...@@ -10,5 +10,5 @@ kafka.feedback.topic=aggregation-feedback ...@@ -10,5 +10,5 @@ kafka.feedback.topic=aggregation-feedback
schema.registry.url=http://localhost:8081 schema.registry.url=http://localhost:8081
emit.period.ms=5000 emit.period.ms=5000
trigger.interval.seconds=1 trigger.interval.ms=1000
grace.period.ms=5000 grace.period.ms=5000
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment