From c441517fbe097ae559a82a913a8e7bb70f26babe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de> Date: Sat, 26 Nov 2022 20:22:41 +0100 Subject: [PATCH] Clean up configuration keys classes --- .../commons/beam/ConfigurationKeys.java | 28 +------------ .../hazelcastjet/ConfigurationKeys.java | 26 ++++-------- .../commons/kstreams/ConfigurationKeys.java | 21 ---------- .../uc1/flink/ConfigurationKeys.java | 24 ----------- .../uc1/flink/HistoryServiceFlinkJob.java | 1 + .../benchmarks/uc2/beam/PipelineFactory.java | 4 +- .../uc2/beam/Uc2ConfigurationKeys.java | 15 +++++++ .../uc2/flink/ConfigurationKeys.java | 35 ---------------- .../uc2/flink/HistoryServiceFlinkJob.java | 7 ++-- .../uc2/flink/Uc2ConfigurationKeys.java | 16 +++++++ .../uc2/hazelcastjet/HistoryService.java | 5 +-- .../hazelcastjet/Uc2ConfigurationKeys.java | 12 ++++++ .../uc2/kstreams/HistoryService.java | 5 +-- .../uc2/kstreams/Uc2ConfigurationKeys.java | 14 +++++++ .../benchmarks/uc3/beam/PipelineFactory.java | 8 ++-- .../uc3/beam/Uc3ConfigurationKeys.java | 19 +++++++++ .../uc3/flink/ConfigurationKeys.java | 42 ------------------- .../uc3/flink/HistoryServiceFlinkJob.java | 11 ++--- .../uc3/flink/Uc3ConfigurationKeys.java | 21 ++++++++++ .../uc3/hazelcastjet/HistoryService.java | 9 ++-- .../hazelcastjet/Uc3ConfigurationKeys.java | 17 ++++++++ .../uc3/kstreams/HistoryService.java | 7 ++-- .../uc3/kstreams/Uc3ConfigurationKeys.java | 16 +++++++ .../benchmarks/uc4/beam/PipelineFactory.java | 12 +++--- .../uc4/beam/Uc4ConfigurationKeys.java | 23 ++++++++++ .../uc4/flink/AggregationServiceFlinkJob.java | 14 ++++--- .../uc4/flink/ConfigurationKeys.java | 41 ------------------ .../uc4/flink/Uc4ConfigurationKeys.java | 22 ++++++++++ .../uc4/hazelcastjet/HistoryService.java | 9 ++-- .../hazelcastjet/Uc4ConfigurationKeys.java | 17 ++++++++ .../uc4/kstreams/AggregationService.java | 11 +++-- .../uc4/kstreams/Uc4ConfigurationKeys.java | 21 ++++++++++ 32 files changed, 272 insertions(+), 261 deletions(-) delete mode 100644 theodolite-benchmarks/uc1-flink/src/main/java/rocks/theodolite/benchmarks/uc1/flink/ConfigurationKeys.java create mode 100644 theodolite-benchmarks/uc2-beam/src/main/java/rocks/theodolite/benchmarks/uc2/beam/Uc2ConfigurationKeys.java delete mode 100644 theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/ConfigurationKeys.java create mode 100644 theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/Uc2ConfigurationKeys.java create mode 100644 theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2ConfigurationKeys.java create mode 100644 theodolite-benchmarks/uc2-kstreams/src/main/java/rocks/theodolite/benchmarks/uc2/kstreams/Uc2ConfigurationKeys.java create mode 100644 theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/Uc3ConfigurationKeys.java delete mode 100644 theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/ConfigurationKeys.java create mode 100644 theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/Uc3ConfigurationKeys.java create mode 100644 theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3ConfigurationKeys.java create mode 100644 theodolite-benchmarks/uc3-kstreams/src/main/java/rocks/theodolite/benchmarks/uc3/kstreams/Uc3ConfigurationKeys.java create mode 100644 theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/Uc4ConfigurationKeys.java delete mode 100644 theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/ConfigurationKeys.java create mode 100644 theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/Uc4ConfigurationKeys.java create mode 100644 theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4ConfigurationKeys.java create mode 100644 theodolite-benchmarks/uc4-kstreams/src/main/java/rocks/theodolite/benchmarks/uc4/kstreams/Uc4ConfigurationKeys.java diff --git a/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/ConfigurationKeys.java b/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/ConfigurationKeys.java index 9ab378f8e..51105fcec 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/ConfigurationKeys.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/ConfigurationKeys.java @@ -4,7 +4,7 @@ package rocks.theodolite.benchmarks.commons.beam; * Keys to access configuration parameters. */ public final class ConfigurationKeys { - // Common keys + public static final String APPLICATION_NAME = "application.name"; public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; @@ -13,27 +13,6 @@ public final class ConfigurationKeys { public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; - // Additional topics - public static final String KAFKA_FEEDBACK_TOPIC = "kafka.feedback.topic"; - - public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; - - public static final String KAFKA_CONFIGURATION_TOPIC = "kafka.configuration.topic"; - - // UC2 - public static final String DOWNSAMPLE_INTERVAL_MINUTES = "downsample.interval.minutes"; - - // UC3 - public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days"; - - public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days"; - - // UC4 - public static final String EMIT_PERIOD_MS = "emit.period.ms"; - - public static final String GRACE_PERIOD_MS = "grace.period.ms"; - - // BEAM public static final String ENABLE_AUTO_COMMIT = "enable.auto.commit"; public static final String MAX_POLL_RECORDS = "max.poll.records"; @@ -42,11 +21,6 @@ public final class ConfigurationKeys { public static final String SPECIFIC_AVRO_READER = "specific.avro.reader"; - // Used for UC3 + UC4: - public static final String TRIGGER_INTERVAL_SECONDS = "trigger.interval"; - // public static final String TRIGGER_INTERVAL_SECONDS = "trigger.interval.seconds"; - - private ConfigurationKeys() {} } diff --git a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java index 5215c23c8..141b5a427 100644 --- a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java +++ b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java @@ -7,32 +7,20 @@ public class ConfigurationKeys { public static final String APPLICATION_NAME = "application.name"; - // Common Keys public static final String BOOTSTRAP_SERVER = "BOOTSTRAP_SERVER"; + public static final String KUBERNETES_DNS_NAME = "KUBERNETES_DNS_NAME"; + public static final String PORT = "PORT"; + public static final String PORT_AUTO_INCREMENT = "PORT_AUTO_INCREMENT"; - public static final String CLUSTER_NAME_PREFIX = "CLUSTER_NAME_PREFIX"; - public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; - public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; - public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; - // Additional topics - public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; + public static final String CLUSTER_NAME_PREFIX = "CLUSTER_NAME_PREFIX"; - // UC2 - public static final String DOWNSAMPLE_INTERVAL_MINUTES = "downsample.interval.minutes"; + public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; - // UC3 - public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days"; - public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days"; - public static final String AGGREGATION_EMIT_PERIOD_SECONDS = // NOPMD - "aggregation.emit.period.seconds"; + public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; - // UC4 - public static final String KAFKA_CONFIGURATION_TOPIC = "kafka.configuration.topic"; - public static final String KAFKA_FEEDBACK_TOPIC = "kafka.feedback.topic"; - public static final String EMIT_PERIOD_MS = "emit.period.ms"; - // public static final String GRACE_PERIOD_MS = "grace.period.ms"; + public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; } diff --git a/theodolite-benchmarks/kstreams-commons/src/main/java/rocks/theodolite/benchmarks/commons/kstreams/ConfigurationKeys.java b/theodolite-benchmarks/kstreams-commons/src/main/java/rocks/theodolite/benchmarks/commons/kstreams/ConfigurationKeys.java index 6f1f70e72..61a2df2c4 100644 --- a/theodolite-benchmarks/kstreams-commons/src/main/java/rocks/theodolite/benchmarks/commons/kstreams/ConfigurationKeys.java +++ b/theodolite-benchmarks/kstreams-commons/src/main/java/rocks/theodolite/benchmarks/commons/kstreams/ConfigurationKeys.java @@ -15,27 +15,6 @@ public final class ConfigurationKeys { public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; - // Additional topics - public static final String KAFKA_FEEDBACK_TOPIC = "kafka.feedback.topic"; - - public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; - - public static final String KAFKA_CONFIGURATION_TOPIC = "kafka.configuration.topic"; - - // UC2 - public static final String EMIT_PERIOD_MS = "emit.period.ms"; - - public static final String GRACE_PERIOD_MS = "grace.period.ms"; - - // UC3 - public static final String DOWNSAMPLE_INTERVAL_MINUTES = "downsample.interval.minutes"; - - // UC4 - public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days"; - - public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days"; - - private ConfigurationKeys() {} } diff --git a/theodolite-benchmarks/uc1-flink/src/main/java/rocks/theodolite/benchmarks/uc1/flink/ConfigurationKeys.java b/theodolite-benchmarks/uc1-flink/src/main/java/rocks/theodolite/benchmarks/uc1/flink/ConfigurationKeys.java deleted file mode 100644 index d0884f763..000000000 --- a/theodolite-benchmarks/uc1-flink/src/main/java/rocks/theodolite/benchmarks/uc1/flink/ConfigurationKeys.java +++ /dev/null @@ -1,24 +0,0 @@ -package rocks.theodolite.benchmarks.uc1.flink; - -/** - * Keys to access configuration parameters. - */ -public final class ConfigurationKeys { - - public static final String APPLICATION_NAME = "application.name"; - - public static final String APPLICATION_VERSION = "application.version"; - - public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; - - public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; - - public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; - - public static final String CHECKPOINTING = "checkpointing"; - - public static final String PARALLELISM = "parallelism"; - - private ConfigurationKeys() {} - -} diff --git a/theodolite-benchmarks/uc1-flink/src/main/java/rocks/theodolite/benchmarks/uc1/flink/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc1-flink/src/main/java/rocks/theodolite/benchmarks/uc1/flink/HistoryServiceFlinkJob.java index 9d3412c7f..5166e3144 100644 --- a/theodolite-benchmarks/uc1-flink/src/main/java/rocks/theodolite/benchmarks/uc1/flink/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc1-flink/src/main/java/rocks/theodolite/benchmarks/uc1/flink/HistoryServiceFlinkJob.java @@ -4,6 +4,7 @@ import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import rocks.theodolite.benchmarks.commons.flink.AbstractFlinkService; +import rocks.theodolite.benchmarks.commons.flink.ConfigurationKeys; import rocks.theodolite.benchmarks.commons.flink.KafkaConnectorFactory; import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord; import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter; diff --git a/theodolite-benchmarks/uc2-beam/src/main/java/rocks/theodolite/benchmarks/uc2/beam/PipelineFactory.java b/theodolite-benchmarks/uc2-beam/src/main/java/rocks/theodolite/benchmarks/uc2/beam/PipelineFactory.java index f80856265..d8bf1ba52 100644 --- a/theodolite-benchmarks/uc2-beam/src/main/java/rocks/theodolite/benchmarks/uc2/beam/PipelineFactory.java +++ b/theodolite-benchmarks/uc2-beam/src/main/java/rocks/theodolite/benchmarks/uc2/beam/PipelineFactory.java @@ -40,10 +40,10 @@ public class PipelineFactory extends AbstractPipelineFactory { @Override protected void constructPipeline(final Pipeline pipeline) { - final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); + final String outputTopic = this.config.getString(Uc2ConfigurationKeys.KAFKA_OUTPUT_TOPIC); final Duration downsampleInterval = Duration.standardMinutes( - this.config.getInt(ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES)); + this.config.getInt(Uc2ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES)); final KafkaActivePowerTimestampReader kafkaReader = super.buildKafkaReader(); diff --git a/theodolite-benchmarks/uc2-beam/src/main/java/rocks/theodolite/benchmarks/uc2/beam/Uc2ConfigurationKeys.java b/theodolite-benchmarks/uc2-beam/src/main/java/rocks/theodolite/benchmarks/uc2/beam/Uc2ConfigurationKeys.java new file mode 100644 index 000000000..c55c7f215 --- /dev/null +++ b/theodolite-benchmarks/uc2-beam/src/main/java/rocks/theodolite/benchmarks/uc2/beam/Uc2ConfigurationKeys.java @@ -0,0 +1,15 @@ +package rocks.theodolite.benchmarks.uc2.beam; + +/** + * Keys to access configuration parameters. + */ +public final class Uc2ConfigurationKeys { + + public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; + + public static final String DOWNSAMPLE_INTERVAL_MINUTES = "downsample.interval.minutes"; + + private Uc2ConfigurationKeys() { + } + +} diff --git a/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/ConfigurationKeys.java b/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/ConfigurationKeys.java deleted file mode 100644 index 8ea245eba..000000000 --- a/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/ConfigurationKeys.java +++ /dev/null @@ -1,35 +0,0 @@ -package rocks.theodolite.benchmarks.uc2.flink; - -/** - * Keys to access configuration parameters. - */ -public final class ConfigurationKeys { - - public static final String APPLICATION_NAME = "application.name"; - - public static final String APPLICATION_VERSION = "application.version"; - - public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; - - public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; - - public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; - - public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; - - public static final String DOWNSAMPLE_INTERVAL_MINUTES = "downsample.interval.minutes"; - - public static final String FLINK_STATE_BACKEND = "flink.state.backend"; - - public static final String FLINK_STATE_BACKEND_PATH = "flink.state.backend.path"; - - public static final String FLINK_STATE_BACKEND_MEMORY_SIZE = // NOPMD - "flink.state.backend.memory.size"; - - public static final String CHECKPOINTING = "checkpointing"; - - public static final String PARALLELISM = "parallelism"; - - private ConfigurationKeys() {} - -} diff --git a/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java index e44eeaea9..d349d4086 100644 --- a/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java @@ -11,6 +11,7 @@ import org.apache.kafka.common.serialization.Serdes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rocks.theodolite.benchmarks.commons.flink.AbstractFlinkService; +import rocks.theodolite.benchmarks.commons.flink.ConfigurationKeys; import rocks.theodolite.benchmarks.commons.flink.KafkaConnectorFactory; import rocks.theodolite.benchmarks.commons.flink.serialization.StatsSerializer; import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord; @@ -35,12 +36,12 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService { @Override protected void buildPipeline() { - final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); + final String kafkaBroker = this.config.getString(Uc2ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); - final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); + final String outputTopic = this.config.getString(Uc2ConfigurationKeys.KAFKA_OUTPUT_TOPIC); final Time windowDuration = Time.minutes( - this.config.getInt(ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES)); + this.config.getInt(Uc2ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES)); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory( diff --git a/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/Uc2ConfigurationKeys.java b/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/Uc2ConfigurationKeys.java new file mode 100644 index 000000000..06cc33908 --- /dev/null +++ b/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/Uc2ConfigurationKeys.java @@ -0,0 +1,16 @@ +package rocks.theodolite.benchmarks.uc2.flink; + +/** + * Keys to access configuration parameters. + */ +public final class Uc2ConfigurationKeys { + + public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; + + public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; + + public static final String DOWNSAMPLE_INTERVAL_MINUTES = "downsample.interval.minutes"; + + private Uc2ConfigurationKeys() {} + +} diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java index 6104b8a70..635d8789d 100644 --- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java +++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java @@ -8,7 +8,6 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys; import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService; @@ -37,10 +36,10 @@ public class HistoryService extends HazelcastJetService { StringSerializer.class.getCanonicalName()); final String kafkaOutputTopic = - this.config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString(); + this.config.getProperty(Uc2ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString(); final Duration downsampleInterval = Duration.ofMinutes( - this.config.getInt(ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES)); + this.config.getInt(Uc2ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES)); this.pipelineFactory = new Uc2PipelineFactory( kafkaProps, diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2ConfigurationKeys.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2ConfigurationKeys.java new file mode 100644 index 000000000..16f1c0c04 --- /dev/null +++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2ConfigurationKeys.java @@ -0,0 +1,12 @@ +package rocks.theodolite.benchmarks.uc2.hazelcastjet; + +/** + * Configuration Keys used for Hazelcast Jet Benchmark implementations. + */ +public class Uc2ConfigurationKeys { + + public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; + + public static final String DOWNSAMPLE_INTERVAL_MINUTES = "downsample.interval.minutes"; + +} diff --git a/theodolite-benchmarks/uc2-kstreams/src/main/java/rocks/theodolite/benchmarks/uc2/kstreams/HistoryService.java b/theodolite-benchmarks/uc2-kstreams/src/main/java/rocks/theodolite/benchmarks/uc2/kstreams/HistoryService.java index 8e4d29053..a6375688d 100644 --- a/theodolite-benchmarks/uc2-kstreams/src/main/java/rocks/theodolite/benchmarks/uc2/kstreams/HistoryService.java +++ b/theodolite-benchmarks/uc2-kstreams/src/main/java/rocks/theodolite/benchmarks/uc2/kstreams/HistoryService.java @@ -5,7 +5,6 @@ import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; import rocks.theodolite.benchmarks.commons.commons.configuration.ServiceConfigurations; -import rocks.theodolite.benchmarks.commons.kstreams.ConfigurationKeys; /** * A microservice that manages the history and, therefore, stores and aggregates incoming @@ -32,9 +31,9 @@ public class HistoryService { private void createKafkaStreamsApplication() { final Uc2KafkaStreamsBuilder uc2KafkaStreamsBuilder = new Uc2KafkaStreamsBuilder(this.config); uc2KafkaStreamsBuilder - .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) + .outputTopic(this.config.getString(Uc2ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) .windowDuration(Duration.ofMinutes( - this.config.getInt(ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES))); + this.config.getInt(Uc2ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES))); final KafkaStreams kafkaStreams = uc2KafkaStreamsBuilder.build(); diff --git a/theodolite-benchmarks/uc2-kstreams/src/main/java/rocks/theodolite/benchmarks/uc2/kstreams/Uc2ConfigurationKeys.java b/theodolite-benchmarks/uc2-kstreams/src/main/java/rocks/theodolite/benchmarks/uc2/kstreams/Uc2ConfigurationKeys.java new file mode 100644 index 000000000..1de4827b4 --- /dev/null +++ b/theodolite-benchmarks/uc2-kstreams/src/main/java/rocks/theodolite/benchmarks/uc2/kstreams/Uc2ConfigurationKeys.java @@ -0,0 +1,14 @@ +package rocks.theodolite.benchmarks.uc2.kstreams; + +/** + * Keys to access configuration parameters. + */ +public final class Uc2ConfigurationKeys { + + public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; + + public static final String DOWNSAMPLE_INTERVAL_MINUTES = "downsample.interval.minutes"; + + private Uc2ConfigurationKeys() {} + +} diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/PipelineFactory.java b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/PipelineFactory.java index 3f99277d4..d154b7487 100644 --- a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/PipelineFactory.java +++ b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/PipelineFactory.java @@ -41,14 +41,14 @@ public class PipelineFactory extends AbstractPipelineFactory { @Override protected void constructPipeline(final Pipeline pipeline) { - final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); + final String outputTopic = this.config.getString(Uc3ConfigurationKeys.KAFKA_OUTPUT_TOPIC); final Duration duration = - Duration.standardDays(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS)); + Duration.standardDays(this.config.getInt(Uc3ConfigurationKeys.AGGREGATION_DURATION_DAYS)); final Duration aggregationAdvanceDuration = - Duration.standardDays(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS)); + Duration.standardDays(this.config.getInt(Uc3ConfigurationKeys.AGGREGATION_ADVANCE_DAYS)); final Duration triggerDelay = - Duration.standardSeconds(this.config.getInt(ConfigurationKeys.TRIGGER_INTERVAL_SECONDS)); + Duration.standardSeconds(this.config.getInt(Uc3ConfigurationKeys.TRIGGER_INTERVAL_SECONDS)); // Read from Kafka final KafkaActivePowerTimestampReader kafkaReader = super.buildKafkaReader(); diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/Uc3ConfigurationKeys.java b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/Uc3ConfigurationKeys.java new file mode 100644 index 000000000..ca59d3d2f --- /dev/null +++ b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/Uc3ConfigurationKeys.java @@ -0,0 +1,19 @@ +package rocks.theodolite.benchmarks.uc3.beam; + +/** + * Keys to access configuration parameters. + */ +public final class Uc3ConfigurationKeys { + + public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; + + public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days"; + + public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days"; + + public static final String TRIGGER_INTERVAL_SECONDS = "trigger.interval"; + // public static final String TRIGGER_INTERVAL_SECONDS = "trigger.interval.seconds"; + + private Uc3ConfigurationKeys() {} + +} diff --git a/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/ConfigurationKeys.java b/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/ConfigurationKeys.java deleted file mode 100644 index a19a7f8b2..000000000 --- a/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/ConfigurationKeys.java +++ /dev/null @@ -1,42 +0,0 @@ -package rocks.theodolite.benchmarks.uc3.flink; - -/** - * Keys to access configuration parameters. - */ -public final class ConfigurationKeys { - - public static final String APPLICATION_NAME = "application.name"; - - public static final String APPLICATION_VERSION = "application.version"; - - public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; - - public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; - - public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; - - public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; - - public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days"; - - public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days"; - - public static final String AGGREGATION_TRIGGER_INTERVAL_SECONDS = // NOPMD - "aggregation.trigger.interval.seconds"; - - public static final String TIME_ZONE = "time.zone"; - - public static final String FLINK_STATE_BACKEND = "flink.state.backend"; - - public static final String FLINK_STATE_BACKEND_PATH = "flink.state.backend.path"; - - public static final String FLINK_STATE_BACKEND_MEMORY_SIZE = // NOPMD - "flink.state.backend.memory.size"; - - public static final String CHECKPOINTING = "checkpointing"; - - public static final String PARALLELISM = "parallelism"; - - private ConfigurationKeys() {} - -} diff --git a/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java index aaaba5b3b..9f90f45f3 100644 --- a/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java @@ -16,6 +16,7 @@ import org.apache.kafka.common.serialization.Serdes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rocks.theodolite.benchmarks.commons.flink.AbstractFlinkService; +import rocks.theodolite.benchmarks.commons.flink.ConfigurationKeys; import rocks.theodolite.benchmarks.commons.flink.KafkaConnectorFactory; import rocks.theodolite.benchmarks.commons.flink.serialization.StatsSerializer; import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord; @@ -50,14 +51,14 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService { final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); - final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); - final ZoneId timeZone = ZoneId.of(this.config.getString(ConfigurationKeys.TIME_ZONE)); + final String outputTopic = this.config.getString(Uc3ConfigurationKeys.KAFKA_OUTPUT_TOPIC); + final ZoneId timeZone = ZoneId.of(this.config.getString(Uc3ConfigurationKeys.TIME_ZONE)); final Time aggregationDuration = - Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS)); + Time.days(this.config.getInt(Uc3ConfigurationKeys.AGGREGATION_DURATION_DAYS)); final Time aggregationAdvance = - Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS)); + Time.days(this.config.getInt(Uc3ConfigurationKeys.AGGREGATION_ADVANCE_DAYS)); final Time triggerDuration = - Time.seconds(this.config.getInt(ConfigurationKeys.AGGREGATION_TRIGGER_INTERVAL_SECONDS)); + Time.seconds(this.config.getInt(Uc3ConfigurationKeys.AGGREGATION_TRIGGER_INTERVAL_SECONDS)); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory( diff --git a/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/Uc3ConfigurationKeys.java b/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/Uc3ConfigurationKeys.java new file mode 100644 index 000000000..e3e167cbc --- /dev/null +++ b/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/Uc3ConfigurationKeys.java @@ -0,0 +1,21 @@ +package rocks.theodolite.benchmarks.uc3.flink; + +/** + * Keys to access configuration parameters. + */ +public final class Uc3ConfigurationKeys { + + public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; + + public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days"; + + public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days"; + + public static final String AGGREGATION_TRIGGER_INTERVAL_SECONDS = // NOPMD + "aggregation.trigger.interval.seconds"; + + public static final String TIME_ZONE = "time.zone"; + + private Uc3ConfigurationKeys() {} + +} diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HistoryService.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HistoryService.java index df3f85a42..3f573d300 100644 --- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HistoryService.java +++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HistoryService.java @@ -8,7 +8,6 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys; import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService; /** @@ -35,16 +34,16 @@ public class HistoryService extends HazelcastJetService { StringSerializer.class.getCanonicalName()); final String kafkaOutputTopic = - this.config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString(); + this.config.getProperty(Uc3ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString(); final Duration windowSize = Duration.ofDays(Integer.parseInt( - this.config.getProperty(ConfigurationKeys.AGGREGATION_DURATION_DAYS).toString())); + this.config.getProperty(Uc3ConfigurationKeys.AGGREGATION_DURATION_DAYS).toString())); final Duration hoppingSize = Duration.ofDays(Integer.parseInt( - this.config.getProperty(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS).toString())); + this.config.getProperty(Uc3ConfigurationKeys.AGGREGATION_ADVANCE_DAYS).toString())); final Duration emitPeriod = Duration.ofSeconds(Integer.parseInt( - this.config.getProperty(ConfigurationKeys.AGGREGATION_EMIT_PERIOD_SECONDS).toString())); + this.config.getProperty(Uc3ConfigurationKeys.AGGREGATION_EMIT_PERIOD_SECONDS).toString())); this.pipelineFactory = new Uc3PipelineFactory( kafkaProps, diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3ConfigurationKeys.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3ConfigurationKeys.java new file mode 100644 index 000000000..86978513f --- /dev/null +++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3ConfigurationKeys.java @@ -0,0 +1,17 @@ +package rocks.theodolite.benchmarks.uc3.hazelcastjet; + +/** + * Configuration Keys used for Hazelcast Jet Benchmark implementations. + */ +public class Uc3ConfigurationKeys { + + public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; + + public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days"; + + public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days"; + + public static final String AGGREGATION_EMIT_PERIOD_SECONDS = // NOPMD + "aggregation.emit.period.seconds"; + +} diff --git a/theodolite-benchmarks/uc3-kstreams/src/main/java/rocks/theodolite/benchmarks/uc3/kstreams/HistoryService.java b/theodolite-benchmarks/uc3-kstreams/src/main/java/rocks/theodolite/benchmarks/uc3/kstreams/HistoryService.java index 2151d0be4..637c2e60b 100644 --- a/theodolite-benchmarks/uc3-kstreams/src/main/java/rocks/theodolite/benchmarks/uc3/kstreams/HistoryService.java +++ b/theodolite-benchmarks/uc3-kstreams/src/main/java/rocks/theodolite/benchmarks/uc3/kstreams/HistoryService.java @@ -5,7 +5,6 @@ import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; import rocks.theodolite.benchmarks.commons.commons.configuration.ServiceConfigurations; -import rocks.theodolite.benchmarks.commons.kstreams.ConfigurationKeys; /** * A microservice that manages the history and, therefore, stores and aggregates incoming @@ -33,11 +32,11 @@ public class HistoryService { // Use case specific stream configuration final Uc3KafkaStreamsBuilder uc3KafkaStreamsBuilder = new Uc3KafkaStreamsBuilder(this.config); uc3KafkaStreamsBuilder - .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) + .outputTopic(this.config.getString(Uc3ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) .aggregationDuration( - Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS))) + Duration.ofDays(this.config.getInt(Uc3ConfigurationKeys.AGGREGATION_DURATION_DAYS))) .aggregationAdvance( - Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS))); + Duration.ofDays(this.config.getInt(Uc3ConfigurationKeys.AGGREGATION_ADVANCE_DAYS))); // Configuration of the stream application final KafkaStreams kafkaStreams = uc3KafkaStreamsBuilder.build(); diff --git a/theodolite-benchmarks/uc3-kstreams/src/main/java/rocks/theodolite/benchmarks/uc3/kstreams/Uc3ConfigurationKeys.java b/theodolite-benchmarks/uc3-kstreams/src/main/java/rocks/theodolite/benchmarks/uc3/kstreams/Uc3ConfigurationKeys.java new file mode 100644 index 000000000..db0bdaa3e --- /dev/null +++ b/theodolite-benchmarks/uc3-kstreams/src/main/java/rocks/theodolite/benchmarks/uc3/kstreams/Uc3ConfigurationKeys.java @@ -0,0 +1,16 @@ +package rocks.theodolite.benchmarks.uc3.kstreams; + +/** + * Keys to access configuration parameters. + */ +public final class Uc3ConfigurationKeys { + + public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; + + public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days"; + + public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days"; + + private Uc3ConfigurationKeys() {} + +} diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/PipelineFactory.java b/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/PipelineFactory.java index 0974960a4..2bba46d9b 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/PipelineFactory.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/PipelineFactory.java @@ -67,17 +67,17 @@ public class PipelineFactory extends AbstractPipelineFactory { @Override protected void constructPipeline(final Pipeline pipeline) { // NOPMD // Additional needed variables - final String feedbackTopic = this.config.getString(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC); - final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); + final String feedbackTopic = this.config.getString(Uc4ConfigurationKeys.KAFKA_FEEDBACK_TOPIC); + final String outputTopic = this.config.getString(Uc4ConfigurationKeys.KAFKA_OUTPUT_TOPIC); final String configurationTopic = - this.config.getString(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC); + this.config.getString(Uc4ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC); final Duration duration = Duration.millis( - this.config.getInt(ConfigurationKeys.EMIT_PERIOD_MS)); + this.config.getInt(Uc4ConfigurationKeys.EMIT_PERIOD_MS)); final Duration triggerDelay = Duration.standardSeconds( - this.config.getInt(ConfigurationKeys.TRIGGER_INTERVAL_SECONDS)); + this.config.getInt(Uc4ConfigurationKeys.TRIGGER_INTERVAL_SECONDS)); final Duration gracePeriod = Duration.standardSeconds(// TODO this is wrong - this.config.getInt(ConfigurationKeys.GRACE_PERIOD_MS)); + this.config.getInt(Uc4ConfigurationKeys.GRACE_PERIOD_MS)); // Read from Kafka final String bootstrapServer = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/Uc4ConfigurationKeys.java b/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/Uc4ConfigurationKeys.java new file mode 100644 index 000000000..860e8c9da --- /dev/null +++ b/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/Uc4ConfigurationKeys.java @@ -0,0 +1,23 @@ +package rocks.theodolite.benchmarks.uc4.beam; + +/** + * Keys to access configuration parameters. + */ +public final class Uc4ConfigurationKeys { + + public static final String KAFKA_FEEDBACK_TOPIC = "kafka.feedback.topic"; + + public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; + + public static final String KAFKA_CONFIGURATION_TOPIC = "kafka.configuration.topic"; + + public static final String EMIT_PERIOD_MS = "emit.period.ms"; + + public static final String GRACE_PERIOD_MS = "grace.period.ms"; + + public static final String TRIGGER_INTERVAL_SECONDS = "trigger.interval"; + // public static final String TRIGGER_INTERVAL_SECONDS = "trigger.interval.seconds"; + + private Uc4ConfigurationKeys() {} + +} diff --git a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java index 331065124..abdb9aaed 100644 --- a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java +++ b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java @@ -19,6 +19,7 @@ import org.slf4j.LoggerFactory; import rocks.theodolite.benchmarks.commons.configuration.events.Event; import rocks.theodolite.benchmarks.commons.configuration.events.EventSerde; import rocks.theodolite.benchmarks.commons.flink.AbstractFlinkService; +import rocks.theodolite.benchmarks.commons.flink.ConfigurationKeys; import rocks.theodolite.benchmarks.commons.flink.KafkaConnectorFactory; import rocks.theodolite.benchmarks.commons.flink.TupleType; import rocks.theodolite.benchmarks.commons.kafka.avro.SchemaRegistryAvroSerdeFactory; @@ -62,16 +63,17 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService { @Override protected void buildPipeline() { // Get configurations - final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); - final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); + final String kafkaBroker = this.config.getString(Uc4ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); + final String schemaRegistryUrl = + this.config.getString(Uc4ConfigurationKeys.SCHEMA_REGISTRY_URL); final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); - final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); + final String outputTopic = this.config.getString(Uc4ConfigurationKeys.KAFKA_OUTPUT_TOPIC); final Time windowSize = - Time.milliseconds(this.config.getLong(ConfigurationKeys.EMIT_PERIOD_MS)); + Time.milliseconds(this.config.getLong(Uc4ConfigurationKeys.EMIT_PERIOD_MS)); final Duration windowGrace = - Duration.ofMillis(this.config.getLong(ConfigurationKeys.GRACE_PERIOD_MS)); + Duration.ofMillis(this.config.getLong(Uc4ConfigurationKeys.GRACE_PERIOD_MS)); final String configurationTopic = - this.config.getString(ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC); + this.config.getString(Uc4ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory( diff --git a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/ConfigurationKeys.java b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/ConfigurationKeys.java deleted file mode 100644 index 6016865fb..000000000 --- a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/ConfigurationKeys.java +++ /dev/null @@ -1,41 +0,0 @@ -package rocks.theodolite.benchmarks.uc4.flink; - -/** - * Keys to access configuration parameters. - */ -public final class ConfigurationKeys { - - public static final String APPLICATION_NAME = "application.name"; - - public static final String APPLICATION_VERSION = "application.version"; - - public static final String CONFIGURATION_KAFKA_TOPIC = "configuration.kafka.topic"; - - public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; - - public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; - - public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; - - public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; - - public static final String EMIT_PERIOD_MS = "emit.period.ms"; - - public static final String GRACE_PERIOD_MS = "grace.period.ms"; - - public static final String FLINK_STATE_BACKEND = "flink.state.backend"; - - public static final String FLINK_STATE_BACKEND_PATH = "flink.state.backend.path"; - - public static final String FLINK_STATE_BACKEND_MEMORY_SIZE = // NOPMD - "flink.state.backend.memory.size"; - - public static final String DEBUG = "debug"; - - public static final String CHECKPOINTING = "checkpointing"; - - public static final String PARALLELISM = "parallelism"; - - private ConfigurationKeys() {} - -} diff --git a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/Uc4ConfigurationKeys.java b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/Uc4ConfigurationKeys.java new file mode 100644 index 000000000..6fd2b0fa0 --- /dev/null +++ b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/Uc4ConfigurationKeys.java @@ -0,0 +1,22 @@ +package rocks.theodolite.benchmarks.uc4.flink; + +/** + * Keys to access configuration parameters. + */ +public final class Uc4ConfigurationKeys { + + public static final String CONFIGURATION_KAFKA_TOPIC = "configuration.kafka.topic"; + + public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; + + public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; + + public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; + + public static final String EMIT_PERIOD_MS = "emit.period.ms"; + + public static final String GRACE_PERIOD_MS = "grace.period.ms"; + + private Uc4ConfigurationKeys() {} + +} diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java index 678f774a5..97ea33eda 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java @@ -8,7 +8,6 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys; import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService; import rocks.theodolite.benchmarks.commons.model.sensorregistry.ImmutableSensorRegistry; @@ -47,15 +46,15 @@ public class HistoryService extends HazelcastJetService { StringSerializer.class.getCanonicalName(), KafkaAvroSerializer.class.getCanonicalName()); - final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); + final String outputTopic = this.config.getString(Uc4ConfigurationKeys.KAFKA_OUTPUT_TOPIC); final String configurationTopic = - this.config.getString(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC); + this.config.getString(Uc4ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC); - final String feedbackTopic = this.config.getString(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC); + final String feedbackTopic = this.config.getString(Uc4ConfigurationKeys.KAFKA_FEEDBACK_TOPIC); final Duration windowSize = Duration.ofMillis( - this.config.getInt(ConfigurationKeys.EMIT_PERIOD_MS)); + this.config.getInt(Uc4ConfigurationKeys.EMIT_PERIOD_MS)); this.pipelineFactory = new Uc4PipelineFactory( kafkaProps, diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4ConfigurationKeys.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4ConfigurationKeys.java new file mode 100644 index 000000000..6c4c63396 --- /dev/null +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4ConfigurationKeys.java @@ -0,0 +1,17 @@ +package rocks.theodolite.benchmarks.uc4.hazelcastjet; + +/** + * Configuration Keys used for Hazelcast Jet Benchmark implementations. + */ +public class Uc4ConfigurationKeys { + + public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; + + public static final String KAFKA_CONFIGURATION_TOPIC = "kafka.configuration.topic"; + + public static final String KAFKA_FEEDBACK_TOPIC = "kafka.feedback.topic"; + + public static final String EMIT_PERIOD_MS = "emit.period.ms"; + // public static final String GRACE_PERIOD_MS = "grace.period.ms"; + +} diff --git a/theodolite-benchmarks/uc4-kstreams/src/main/java/rocks/theodolite/benchmarks/uc4/kstreams/AggregationService.java b/theodolite-benchmarks/uc4-kstreams/src/main/java/rocks/theodolite/benchmarks/uc4/kstreams/AggregationService.java index 26ea02957..4119fbefa 100644 --- a/theodolite-benchmarks/uc4-kstreams/src/main/java/rocks/theodolite/benchmarks/uc4/kstreams/AggregationService.java +++ b/theodolite-benchmarks/uc4-kstreams/src/main/java/rocks/theodolite/benchmarks/uc4/kstreams/AggregationService.java @@ -5,7 +5,6 @@ import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; import rocks.theodolite.benchmarks.commons.commons.configuration.ServiceConfigurations; -import rocks.theodolite.benchmarks.commons.kstreams.ConfigurationKeys; /** * A microservice that manages the history and, therefore, stores and aggregates incoming @@ -37,11 +36,11 @@ public class AggregationService { private void createKafkaStreamsApplication() { final Uc4KafkaStreamsBuilder uc4KafkaStreamsBuilder = new Uc4KafkaStreamsBuilder(this.config); uc4KafkaStreamsBuilder - .feedbackTopic(this.config.getString(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC)) - .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) - .configurationTopic(this.config.getString(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC)) - .emitPeriod(Duration.ofMillis(this.config.getLong(ConfigurationKeys.EMIT_PERIOD_MS))) - .gracePeriod(Duration.ofMillis(this.config.getLong(ConfigurationKeys.GRACE_PERIOD_MS))); + .feedbackTopic(this.config.getString(Uc4ConfigurationKeys.KAFKA_FEEDBACK_TOPIC)) + .outputTopic(this.config.getString(Uc4ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) + .configurationTopic(this.config.getString(Uc4ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC)) + .emitPeriod(Duration.ofMillis(this.config.getLong(Uc4ConfigurationKeys.EMIT_PERIOD_MS))) + .gracePeriod(Duration.ofMillis(this.config.getLong(Uc4ConfigurationKeys.GRACE_PERIOD_MS))); final KafkaStreams kafkaStreams = uc4KafkaStreamsBuilder.build(); diff --git a/theodolite-benchmarks/uc4-kstreams/src/main/java/rocks/theodolite/benchmarks/uc4/kstreams/Uc4ConfigurationKeys.java b/theodolite-benchmarks/uc4-kstreams/src/main/java/rocks/theodolite/benchmarks/uc4/kstreams/Uc4ConfigurationKeys.java new file mode 100644 index 000000000..58eb80aed --- /dev/null +++ b/theodolite-benchmarks/uc4-kstreams/src/main/java/rocks/theodolite/benchmarks/uc4/kstreams/Uc4ConfigurationKeys.java @@ -0,0 +1,21 @@ +package rocks.theodolite.benchmarks.uc4.kstreams; + +/** + * Keys to access configuration parameters. + */ +public final class Uc4ConfigurationKeys { + + public static final String KAFKA_FEEDBACK_TOPIC = "kafka.feedback.topic"; + + public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; + + public static final String KAFKA_CONFIGURATION_TOPIC = "kafka.configuration.topic"; + + public static final String EMIT_PERIOD_MS = "emit.period.ms"; + + public static final String GRACE_PERIOD_MS = "grace.period.ms"; + + private Uc4ConfigurationKeys() { + } + +} -- GitLab