diff --git a/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/ConfigurationKeys.java b/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/ConfigurationKeys.java index c22c164f62ad22d3c18add75ad5115fd15fb8f14..a98aa89ff17d4013255dad1bf2f47729116be230 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/ConfigurationKeys.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/ConfigurationKeys.java @@ -21,7 +21,7 @@ public final class ConfigurationKeys { public static final String KAFKA_CONFIGURATION_TOPIC = "kafka.configuration.topic"; // UC2 - public static final String KAFKA_WINDOW_DURATION_MINUTES = "kafka.window.duration.minutes"; + public static final String DOWNSAMPLE_INTERVAL_MINUTES = "downsample.interval.minutes"; // UC3 public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days"; diff --git a/theodolite-benchmarks/definitions/uc2-hazelcastjet/resources/uc2-hazelcastjet-deployment.yaml b/theodolite-benchmarks/definitions/uc2-hazelcastjet/resources/uc2-hazelcastjet-deployment.yaml index ae28c0709cf11507ff4d67512b29e92b6292c7cc..0d9f7372416049afd57bb35056ac39d4d7bdd75e 100644 --- a/theodolite-benchmarks/definitions/uc2-hazelcastjet/resources/uc2-hazelcastjet-deployment.yaml +++ b/theodolite-benchmarks/definitions/uc2-hazelcastjet/resources/uc2-hazelcastjet-deployment.yaml @@ -21,8 +21,6 @@ spec: value: "theodolite-kafka-kafka-bootstrap:9092" - name: SCHEMA_REGISTRY_URL value: "http://theodolite-kafka-schema-registry:8081" - - name: DOWNSAMPLE_INTERVAL - value: "5000" #- name: KUBERNETES_DNS_NAME # value: "titan-ccp-aggregation" - name: KUBERNETES_NAMESPACE diff --git a/theodolite-benchmarks/docker-test/uc2-hazelcastjet/docker-compose.yml b/theodolite-benchmarks/docker-test/uc2-hazelcastjet/docker-compose.yml index 1c6d352dc5e8bca18d97cb7649e3d55d10e1d5c8..55b0c457e5eabc2d58687d60cc1f95fd2d0e6023 100644 --- a/theodolite-benchmarks/docker-test/uc2-hazelcastjet/docker-compose.yml +++ b/theodolite-benchmarks/docker-test/uc2-hazelcastjet/docker-compose.yml @@ -50,7 +50,6 @@ services: BOOTSTRAP_SERVER: benchmark:5701 KAFKA_BOOTSTRAP_SERVERS: kafka:9092 SCHEMA_REGISTRY_URL: http://schema-registry:8081 - KAFKA_WINDOW_DURATION_MINUTES: 60 load-generator: image: ghcr.io/cau-se/theodolite-uc2-workload-generator:${THEODOLITE_TAG:-latest} depends_on: diff --git a/theodolite-benchmarks/docker-test/uc2-kstreams/docker-compose.yml b/theodolite-benchmarks/docker-test/uc2-kstreams/docker-compose.yml index efdba90bef634bab76012316f67b0f9be9f79c77..f378fe86a183c370b9e11ab141d105531d1ff40e 100755 --- a/theodolite-benchmarks/docker-test/uc2-kstreams/docker-compose.yml +++ b/theodolite-benchmarks/docker-test/uc2-kstreams/docker-compose.yml @@ -45,7 +45,6 @@ services: environment: KAFKA_BOOTSTRAP_SERVERS: kafka:9092 SCHEMA_REGISTRY_URL: http://schema-registry:8081 - KAFKA_WINDOW_DURATION_MINUTES: 60 load-generator: image: ghcr.io/cau-se/theodolite-uc2-workload-generator:${THEODOLITE_TAG:-latest} depends_on: diff --git a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java index c1f2e646f00a4831dd964cbb76452b1b77ca6b6a..44e82126d365e3d373194160cca3b63cd1c46c98 100644 --- a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java +++ b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java @@ -21,7 +21,7 @@ public class ConfigurationKeys { public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; // UC2 - public static final String DOWNSAMPLE_INTERVAL = "kafka.window.duration.minutes"; + public static final String DOWNSAMPLE_INTERVAL_MINUTES = "downsample.interval.minutes"; // UC3 public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days"; diff --git a/theodolite-benchmarks/kstreams-commons/src/main/java/rocks/theodolite/benchmarks/commons/kstreams/ConfigurationKeys.java b/theodolite-benchmarks/kstreams-commons/src/main/java/rocks/theodolite/benchmarks/commons/kstreams/ConfigurationKeys.java index ca74aa7d9fd88a7d3c20589438e0c9454062d2f0..6f1f70e72d5eb2add605b0a38bbabf8cdd2bfd6f 100644 --- a/theodolite-benchmarks/kstreams-commons/src/main/java/rocks/theodolite/benchmarks/commons/kstreams/ConfigurationKeys.java +++ b/theodolite-benchmarks/kstreams-commons/src/main/java/rocks/theodolite/benchmarks/commons/kstreams/ConfigurationKeys.java @@ -28,7 +28,7 @@ public final class ConfigurationKeys { public static final String GRACE_PERIOD_MS = "grace.period.ms"; // UC3 - public static final String KAFKA_WINDOW_DURATION_MINUTES = "kafka.window.duration.minutes"; + public static final String DOWNSAMPLE_INTERVAL_MINUTES = "downsample.interval.minutes"; // UC4 public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days"; diff --git a/theodolite-benchmarks/uc2-beam/src/main/java/rocks/theodolite/benchmarks/uc2/beam/PipelineFactory.java b/theodolite-benchmarks/uc2-beam/src/main/java/rocks/theodolite/benchmarks/uc2/beam/PipelineFactory.java index decbcae1c4b524f9f39295ecd49275a3c1b09951..f8085626541eebbea844e880dc3bb6f12dc37b64 100644 --- a/theodolite-benchmarks/uc2-beam/src/main/java/rocks/theodolite/benchmarks/uc2/beam/PipelineFactory.java +++ b/theodolite-benchmarks/uc2-beam/src/main/java/rocks/theodolite/benchmarks/uc2/beam/PipelineFactory.java @@ -42,8 +42,8 @@ public class PipelineFactory extends AbstractPipelineFactory { protected void constructPipeline(final Pipeline pipeline) { final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); - final Duration duration = Duration.standardMinutes( - this.config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES)); + final Duration downsampleInterval = Duration.standardMinutes( + this.config.getInt(ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES)); final KafkaActivePowerTimestampReader kafkaReader = super.buildKafkaReader(); @@ -58,7 +58,7 @@ public class PipelineFactory extends AbstractPipelineFactory { // Apply pipeline transformations pipeline.apply(kafkaReader) // Apply a fixed window - .apply(Window.<KV<String, ActivePowerRecord>>into(FixedWindows.of(duration))) + .apply(Window.<KV<String, ActivePowerRecord>>into(FixedWindows.of(downsampleInterval))) // Aggregate per window for every key .apply(Combine.<String, ActivePowerRecord, Stats>perKey(new StatsAggregation())) .setCoder(KvCoder.of(StringUtf8Coder.of(), SerializableCoder.of(Stats.class))) diff --git a/theodolite-benchmarks/uc2-beam/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc2-beam/src/main/resources/META-INF/application.properties index 557689f074683353550363d628382d5f4acf096a..3f81e6005be8e83893f4c7b51f91554493505758 100644 --- a/theodolite-benchmarks/uc2-beam/src/main/resources/META-INF/application.properties +++ b/theodolite-benchmarks/uc2-beam/src/main/resources/META-INF/application.properties @@ -4,10 +4,10 @@ application.version=0.0.1 kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input kafka.output.topic=output -kafka.window.duration.minutes=1 - schema.registry.url=http://localhost:8081 +downsample.interval.minutes=1 + specific.avro.reader=true # Kafka Settings diff --git a/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/ConfigurationKeys.java b/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/ConfigurationKeys.java index a2cdf8f847886c8a1a41f14363f492d40271496b..8ea245ebaef96b9e9a3f536efebf34ac5041628d 100644 --- a/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/ConfigurationKeys.java +++ b/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/ConfigurationKeys.java @@ -17,7 +17,7 @@ public final class ConfigurationKeys { public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; - public static final String KAFKA_WINDOW_DURATION_MINUTES = "kafka.window.duration.minutes"; + public static final String DOWNSAMPLE_INTERVAL_MINUTES = "downsample.interval.minutes"; public static final String FLINK_STATE_BACKEND = "flink.state.backend"; diff --git a/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java index eda3c1a98af7b4e2a360da5a7e84a27166e1e7d0..3ef6ddbd56632d4e034604ff01aa3f8118233338 100644 --- a/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java @@ -39,9 +39,8 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService { final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); - final int windowDurationMinutes = - this.config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES); - final Time windowDuration = Time.minutes(windowDurationMinutes); + final Time windowDuration = Time.minutes( + this.config.getInt(ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES)); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory( diff --git a/theodolite-benchmarks/uc2-flink/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc2-flink/src/main/resources/META-INF/application.properties index 3e8b8e08eacb61a92bc40d16d0c30a65bd1161f8..f12c875e230a3c2871097e7256948dc90d75edf4 100644 --- a/theodolite-benchmarks/uc2-flink/src/main/resources/META-INF/application.properties +++ b/theodolite-benchmarks/uc2-flink/src/main/resources/META-INF/application.properties @@ -6,7 +6,7 @@ kafka.input.topic=input kafka.output.topic=output schema.registry.url=http://localhost:8081 -kafka.window.duration.minutes=1 +downsample.interval.minutes=1 # Flink configuration -checkpointing.interval.ms=100 +checkpointing.interval.ms=1000 diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java index ce95d1a8cdd19b7c925c8b5b71e9ae534b085673..6104b8a7093428bc255effcee2918dac69d05531 100644 --- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java +++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java @@ -2,7 +2,6 @@ package rocks.theodolite.benchmarks.uc2.hazelcastjet; import com.google.common.math.StatsAccumulator; import io.confluent.kafka.serializers.KafkaAvroDeserializer; - import java.time.Duration; import java.util.Properties; import org.apache.kafka.common.serialization.StringDeserializer; @@ -22,8 +21,8 @@ public class HistoryService extends HazelcastJetService { /** - * Constructs the use case logic for UC2. - * Retrieves the needed values and instantiates a pipeline factory. + * Constructs the use case logic for UC2. Retrieves the needed values and instantiates a pipeline + * factory. */ public HistoryService() { super(LOGGER); @@ -38,11 +37,10 @@ public class HistoryService extends HazelcastJetService { StringSerializer.class.getCanonicalName()); final String kafkaOutputTopic = - config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString(); + this.config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString(); final Duration downsampleInterval = Duration.ofMinutes( - Integer.parseInt(config.getProperty( - ConfigurationKeys.DOWNSAMPLE_INTERVAL).toString())); + this.config.getInt(ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES)); this.pipelineFactory = new Uc2PipelineFactory( kafkaProps, diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc2-hazelcastjet/src/main/resources/META-INF/application.properties index 32db468dc27c48ae7345a1ad69c212b942fe00e1..636584ce9c6c1b8b22a8e63252aeda0fae04f1f9 100644 --- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/resources/META-INF/application.properties +++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/resources/META-INF/application.properties @@ -4,7 +4,6 @@ application.version=0.0.1 kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input kafka.output.topic=output -kafka.window.duration.minutes=1 - schema.registry.url=http://localhost:8081 +downsample.interval.minutes=1 diff --git a/theodolite-benchmarks/uc2-kstreams/src/main/java/rocks/theodolite/benchmarks/uc2/kstreams/HistoryService.java b/theodolite-benchmarks/uc2-kstreams/src/main/java/rocks/theodolite/benchmarks/uc2/kstreams/HistoryService.java index 4afc2d91eaaf98226f262f072cfd7e5aed6f847e..8e4d290531c1497bc2792b12d2182ba31cc192b9 100644 --- a/theodolite-benchmarks/uc2-kstreams/src/main/java/rocks/theodolite/benchmarks/uc2/kstreams/HistoryService.java +++ b/theodolite-benchmarks/uc2-kstreams/src/main/java/rocks/theodolite/benchmarks/uc2/kstreams/HistoryService.java @@ -1,7 +1,6 @@ package rocks.theodolite.benchmarks.uc2.kstreams; import java.time.Duration; -import java.util.Objects; import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; @@ -18,8 +17,6 @@ public class HistoryService { private final Configuration config = ServiceConfigurations.createWithDefaults(); private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); - private final int windowDurationMinutes = Integer - .parseInt(Objects.requireNonNullElse(System.getenv("KAFKA_WINDOW_DURATION_MINUTES"), "60")); /** * Start the service. @@ -36,7 +33,8 @@ public class HistoryService { final Uc2KafkaStreamsBuilder uc2KafkaStreamsBuilder = new Uc2KafkaStreamsBuilder(this.config); uc2KafkaStreamsBuilder .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) - .windowDuration(Duration.ofMinutes(this.windowDurationMinutes)); + .windowDuration(Duration.ofMinutes( + this.config.getInt(ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES))); final KafkaStreams kafkaStreams = uc2KafkaStreamsBuilder.build(); diff --git a/theodolite-benchmarks/uc2-kstreams/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc2-kstreams/src/main/resources/META-INF/application.properties index 7765cbff37fd8e72029d933c22c7891b94a7b87c..afba990c187e16e9fd98b310b55e345e9ea78864 100644 --- a/theodolite-benchmarks/uc2-kstreams/src/main/resources/META-INF/application.properties +++ b/theodolite-benchmarks/uc2-kstreams/src/main/resources/META-INF/application.properties @@ -4,9 +4,9 @@ application.version=0.0.1 kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input kafka.output.topic=output -kafka.window.duration.minutes=1 - schema.registry.url=http://localhost:8081 +downsample.interval.minutes=1 + # Kafka Streams Config commit.interval.ms=5000 diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/PipelineFactory.java b/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/PipelineFactory.java index 955f7101515c9467edc2e4900aa5464437f0e904..6627235dc29a1fb59da876a0e3dd7f7d36865d1c 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/PipelineFactory.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/PipelineFactory.java @@ -73,7 +73,7 @@ public class PipelineFactory extends AbstractPipelineFactory { this.config.getString(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC); final Duration duration = Duration.standardSeconds( - this.config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES)); + this.config.getInt(ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES)); final Duration triggerDelay = Duration.standardSeconds( this.config.getInt(ConfigurationKeys.TRIGGER_INTERVAL)); final Duration gracePeriod = Duration.standardSeconds(