diff --git a/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/ConfigurationKeys.java b/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/ConfigurationKeys.java index 9ab378f8e7aa3fe8f67d4062da2f504671a284bc..51105fcec36fcfb28b810a8ee2f37ed856908d76 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/ConfigurationKeys.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/ConfigurationKeys.java @@ -4,7 +4,7 @@ package rocks.theodolite.benchmarks.commons.beam; * Keys to access configuration parameters. */ public final class ConfigurationKeys { - // Common keys + public static final String APPLICATION_NAME = "application.name"; public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; @@ -13,27 +13,6 @@ public final class ConfigurationKeys { public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; - // Additional topics - public static final String KAFKA_FEEDBACK_TOPIC = "kafka.feedback.topic"; - - public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; - - public static final String KAFKA_CONFIGURATION_TOPIC = "kafka.configuration.topic"; - - // UC2 - public static final String DOWNSAMPLE_INTERVAL_MINUTES = "downsample.interval.minutes"; - - // UC3 - public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days"; - - public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days"; - - // UC4 - public static final String EMIT_PERIOD_MS = "emit.period.ms"; - - public static final String GRACE_PERIOD_MS = "grace.period.ms"; - - // BEAM public static final String ENABLE_AUTO_COMMIT = "enable.auto.commit"; public static final String MAX_POLL_RECORDS = "max.poll.records"; @@ -42,11 +21,6 @@ public final class ConfigurationKeys { public static final String SPECIFIC_AVRO_READER = "specific.avro.reader"; - // Used for UC3 + UC4: - public static final String TRIGGER_INTERVAL_SECONDS = "trigger.interval"; - // public static final String TRIGGER_INTERVAL_SECONDS = "trigger.interval.seconds"; - - private ConfigurationKeys() {} } diff --git a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java index 5215c23c83f9b4e86b6e81d634f7fd6575977c4c..141b5a427d33ad50bb4881d0063c3069ec4ac6ac 100644 --- a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java +++ b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/ConfigurationKeys.java @@ -7,32 +7,20 @@ public class ConfigurationKeys { public static final String APPLICATION_NAME = "application.name"; - // Common Keys public static final String BOOTSTRAP_SERVER = "BOOTSTRAP_SERVER"; + public static final String KUBERNETES_DNS_NAME = "KUBERNETES_DNS_NAME"; + public static final String PORT = "PORT"; + public static final String PORT_AUTO_INCREMENT = "PORT_AUTO_INCREMENT"; - public static final String CLUSTER_NAME_PREFIX = "CLUSTER_NAME_PREFIX"; - public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; - public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; - public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; - // Additional topics - public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; + public static final String CLUSTER_NAME_PREFIX = "CLUSTER_NAME_PREFIX"; - // UC2 - public static final String DOWNSAMPLE_INTERVAL_MINUTES = "downsample.interval.minutes"; + public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; - // UC3 - public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days"; - public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days"; - public static final String AGGREGATION_EMIT_PERIOD_SECONDS = // NOPMD - "aggregation.emit.period.seconds"; + public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; - // UC4 - public static final String KAFKA_CONFIGURATION_TOPIC = "kafka.configuration.topic"; - public static final String KAFKA_FEEDBACK_TOPIC = "kafka.feedback.topic"; - public static final String EMIT_PERIOD_MS = "emit.period.ms"; - // public static final String GRACE_PERIOD_MS = "grace.period.ms"; + public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; } diff --git a/theodolite-benchmarks/kstreams-commons/src/main/java/rocks/theodolite/benchmarks/commons/kstreams/ConfigurationKeys.java b/theodolite-benchmarks/kstreams-commons/src/main/java/rocks/theodolite/benchmarks/commons/kstreams/ConfigurationKeys.java index 6f1f70e72d5eb2add605b0a38bbabf8cdd2bfd6f..61a2df2c4014af7367020cc326efd0060241ccc8 100644 --- a/theodolite-benchmarks/kstreams-commons/src/main/java/rocks/theodolite/benchmarks/commons/kstreams/ConfigurationKeys.java +++ b/theodolite-benchmarks/kstreams-commons/src/main/java/rocks/theodolite/benchmarks/commons/kstreams/ConfigurationKeys.java @@ -15,27 +15,6 @@ public final class ConfigurationKeys { public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; - // Additional topics - public static final String KAFKA_FEEDBACK_TOPIC = "kafka.feedback.topic"; - - public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; - - public static final String KAFKA_CONFIGURATION_TOPIC = "kafka.configuration.topic"; - - // UC2 - public static final String EMIT_PERIOD_MS = "emit.period.ms"; - - public static final String GRACE_PERIOD_MS = "grace.period.ms"; - - // UC3 - public static final String DOWNSAMPLE_INTERVAL_MINUTES = "downsample.interval.minutes"; - - // UC4 - public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days"; - - public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days"; - - private ConfigurationKeys() {} } diff --git a/theodolite-benchmarks/uc1-flink/src/main/java/rocks/theodolite/benchmarks/uc1/flink/ConfigurationKeys.java b/theodolite-benchmarks/uc1-flink/src/main/java/rocks/theodolite/benchmarks/uc1/flink/ConfigurationKeys.java deleted file mode 100644 index d0884f76374ae6e63895869f3abcc1ec2fe22d6f..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/uc1-flink/src/main/java/rocks/theodolite/benchmarks/uc1/flink/ConfigurationKeys.java +++ /dev/null @@ -1,24 +0,0 @@ -package rocks.theodolite.benchmarks.uc1.flink; - -/** - * Keys to access configuration parameters. - */ -public final class ConfigurationKeys { - - public static final String APPLICATION_NAME = "application.name"; - - public static final String APPLICATION_VERSION = "application.version"; - - public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; - - public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; - - public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; - - public static final String CHECKPOINTING = "checkpointing"; - - public static final String PARALLELISM = "parallelism"; - - private ConfigurationKeys() {} - -} diff --git a/theodolite-benchmarks/uc1-flink/src/main/java/rocks/theodolite/benchmarks/uc1/flink/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc1-flink/src/main/java/rocks/theodolite/benchmarks/uc1/flink/HistoryServiceFlinkJob.java index 9d3412c7f7a318b471902f9f2f38e714bf1034ec..5166e314494879799c396fc254582a69cf5d4c62 100644 --- a/theodolite-benchmarks/uc1-flink/src/main/java/rocks/theodolite/benchmarks/uc1/flink/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc1-flink/src/main/java/rocks/theodolite/benchmarks/uc1/flink/HistoryServiceFlinkJob.java @@ -4,6 +4,7 @@ import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import rocks.theodolite.benchmarks.commons.flink.AbstractFlinkService; +import rocks.theodolite.benchmarks.commons.flink.ConfigurationKeys; import rocks.theodolite.benchmarks.commons.flink.KafkaConnectorFactory; import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord; import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter; diff --git a/theodolite-benchmarks/uc2-beam/src/main/java/rocks/theodolite/benchmarks/uc2/beam/PipelineFactory.java b/theodolite-benchmarks/uc2-beam/src/main/java/rocks/theodolite/benchmarks/uc2/beam/PipelineFactory.java index f8085626541eebbea844e880dc3bb6f12dc37b64..d8bf1ba526693d32c4c15ccdfb112351ce7e1c0e 100644 --- a/theodolite-benchmarks/uc2-beam/src/main/java/rocks/theodolite/benchmarks/uc2/beam/PipelineFactory.java +++ b/theodolite-benchmarks/uc2-beam/src/main/java/rocks/theodolite/benchmarks/uc2/beam/PipelineFactory.java @@ -40,10 +40,10 @@ public class PipelineFactory extends AbstractPipelineFactory { @Override protected void constructPipeline(final Pipeline pipeline) { - final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); + final String outputTopic = this.config.getString(Uc2ConfigurationKeys.KAFKA_OUTPUT_TOPIC); final Duration downsampleInterval = Duration.standardMinutes( - this.config.getInt(ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES)); + this.config.getInt(Uc2ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES)); final KafkaActivePowerTimestampReader kafkaReader = super.buildKafkaReader(); diff --git a/theodolite-benchmarks/uc2-beam/src/main/java/rocks/theodolite/benchmarks/uc2/beam/Uc2ConfigurationKeys.java b/theodolite-benchmarks/uc2-beam/src/main/java/rocks/theodolite/benchmarks/uc2/beam/Uc2ConfigurationKeys.java new file mode 100644 index 0000000000000000000000000000000000000000..c55c7f21522fefb0dbb38e011a0f935c762f20c8 --- /dev/null +++ b/theodolite-benchmarks/uc2-beam/src/main/java/rocks/theodolite/benchmarks/uc2/beam/Uc2ConfigurationKeys.java @@ -0,0 +1,15 @@ +package rocks.theodolite.benchmarks.uc2.beam; + +/** + * Keys to access configuration parameters. + */ +public final class Uc2ConfigurationKeys { + + public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; + + public static final String DOWNSAMPLE_INTERVAL_MINUTES = "downsample.interval.minutes"; + + private Uc2ConfigurationKeys() { + } + +} diff --git a/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/ConfigurationKeys.java b/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/ConfigurationKeys.java deleted file mode 100644 index 8ea245ebaef96b9e9a3f536efebf34ac5041628d..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/ConfigurationKeys.java +++ /dev/null @@ -1,35 +0,0 @@ -package rocks.theodolite.benchmarks.uc2.flink; - -/** - * Keys to access configuration parameters. - */ -public final class ConfigurationKeys { - - public static final String APPLICATION_NAME = "application.name"; - - public static final String APPLICATION_VERSION = "application.version"; - - public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; - - public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; - - public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; - - public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; - - public static final String DOWNSAMPLE_INTERVAL_MINUTES = "downsample.interval.minutes"; - - public static final String FLINK_STATE_BACKEND = "flink.state.backend"; - - public static final String FLINK_STATE_BACKEND_PATH = "flink.state.backend.path"; - - public static final String FLINK_STATE_BACKEND_MEMORY_SIZE = // NOPMD - "flink.state.backend.memory.size"; - - public static final String CHECKPOINTING = "checkpointing"; - - public static final String PARALLELISM = "parallelism"; - - private ConfigurationKeys() {} - -} diff --git a/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java index e44eeaea9e4d9d6d5ce6408ae337ab15f93baa10..d349d4086c7cf597a18fb681a1755f10a40b551a 100644 --- a/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java @@ -11,6 +11,7 @@ import org.apache.kafka.common.serialization.Serdes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rocks.theodolite.benchmarks.commons.flink.AbstractFlinkService; +import rocks.theodolite.benchmarks.commons.flink.ConfigurationKeys; import rocks.theodolite.benchmarks.commons.flink.KafkaConnectorFactory; import rocks.theodolite.benchmarks.commons.flink.serialization.StatsSerializer; import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord; @@ -35,12 +36,12 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService { @Override protected void buildPipeline() { - final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); + final String kafkaBroker = this.config.getString(Uc2ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); - final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); + final String outputTopic = this.config.getString(Uc2ConfigurationKeys.KAFKA_OUTPUT_TOPIC); final Time windowDuration = Time.minutes( - this.config.getInt(ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES)); + this.config.getInt(Uc2ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES)); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory( diff --git a/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/Uc2ConfigurationKeys.java b/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/Uc2ConfigurationKeys.java new file mode 100644 index 0000000000000000000000000000000000000000..06cc33908a501770bd91907923e8a3536ff84af4 --- /dev/null +++ b/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/Uc2ConfigurationKeys.java @@ -0,0 +1,16 @@ +package rocks.theodolite.benchmarks.uc2.flink; + +/** + * Keys to access configuration parameters. + */ +public final class Uc2ConfigurationKeys { + + public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; + + public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; + + public static final String DOWNSAMPLE_INTERVAL_MINUTES = "downsample.interval.minutes"; + + private Uc2ConfigurationKeys() {} + +} diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java index 6104b8a7093428bc255effcee2918dac69d05531..635d8789dd7ae210da5c71cd2d30c946463cad0b 100644 --- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java +++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/HistoryService.java @@ -8,7 +8,6 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys; import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService; @@ -37,10 +36,10 @@ public class HistoryService extends HazelcastJetService { StringSerializer.class.getCanonicalName()); final String kafkaOutputTopic = - this.config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString(); + this.config.getProperty(Uc2ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString(); final Duration downsampleInterval = Duration.ofMinutes( - this.config.getInt(ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES)); + this.config.getInt(Uc2ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES)); this.pipelineFactory = new Uc2PipelineFactory( kafkaProps, diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2ConfigurationKeys.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2ConfigurationKeys.java new file mode 100644 index 0000000000000000000000000000000000000000..16f1c0c048bbac1f5e23aaba1f55b6d54edd80fd --- /dev/null +++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2ConfigurationKeys.java @@ -0,0 +1,12 @@ +package rocks.theodolite.benchmarks.uc2.hazelcastjet; + +/** + * Configuration Keys used for Hazelcast Jet Benchmark implementations. + */ +public class Uc2ConfigurationKeys { + + public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; + + public static final String DOWNSAMPLE_INTERVAL_MINUTES = "downsample.interval.minutes"; + +} diff --git a/theodolite-benchmarks/uc2-kstreams/src/main/java/rocks/theodolite/benchmarks/uc2/kstreams/HistoryService.java b/theodolite-benchmarks/uc2-kstreams/src/main/java/rocks/theodolite/benchmarks/uc2/kstreams/HistoryService.java index 8e4d290531c1497bc2792b12d2182ba31cc192b9..a6375688d619c9ff65c4fd97bc28702d73284f60 100644 --- a/theodolite-benchmarks/uc2-kstreams/src/main/java/rocks/theodolite/benchmarks/uc2/kstreams/HistoryService.java +++ b/theodolite-benchmarks/uc2-kstreams/src/main/java/rocks/theodolite/benchmarks/uc2/kstreams/HistoryService.java @@ -5,7 +5,6 @@ import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; import rocks.theodolite.benchmarks.commons.commons.configuration.ServiceConfigurations; -import rocks.theodolite.benchmarks.commons.kstreams.ConfigurationKeys; /** * A microservice that manages the history and, therefore, stores and aggregates incoming @@ -32,9 +31,9 @@ public class HistoryService { private void createKafkaStreamsApplication() { final Uc2KafkaStreamsBuilder uc2KafkaStreamsBuilder = new Uc2KafkaStreamsBuilder(this.config); uc2KafkaStreamsBuilder - .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) + .outputTopic(this.config.getString(Uc2ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) .windowDuration(Duration.ofMinutes( - this.config.getInt(ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES))); + this.config.getInt(Uc2ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES))); final KafkaStreams kafkaStreams = uc2KafkaStreamsBuilder.build(); diff --git a/theodolite-benchmarks/uc2-kstreams/src/main/java/rocks/theodolite/benchmarks/uc2/kstreams/Uc2ConfigurationKeys.java b/theodolite-benchmarks/uc2-kstreams/src/main/java/rocks/theodolite/benchmarks/uc2/kstreams/Uc2ConfigurationKeys.java new file mode 100644 index 0000000000000000000000000000000000000000..1de4827b4b1911e1c883e9503d086aa2d5909cfa --- /dev/null +++ b/theodolite-benchmarks/uc2-kstreams/src/main/java/rocks/theodolite/benchmarks/uc2/kstreams/Uc2ConfigurationKeys.java @@ -0,0 +1,14 @@ +package rocks.theodolite.benchmarks.uc2.kstreams; + +/** + * Keys to access configuration parameters. + */ +public final class Uc2ConfigurationKeys { + + public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; + + public static final String DOWNSAMPLE_INTERVAL_MINUTES = "downsample.interval.minutes"; + + private Uc2ConfigurationKeys() {} + +} diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/PipelineFactory.java b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/PipelineFactory.java index 3f99277d4972df15841bf567b80bfcf03f2221a4..d154b7487b398dcd40f13a1322b7f242053c18a7 100644 --- a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/PipelineFactory.java +++ b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/PipelineFactory.java @@ -41,14 +41,14 @@ public class PipelineFactory extends AbstractPipelineFactory { @Override protected void constructPipeline(final Pipeline pipeline) { - final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); + final String outputTopic = this.config.getString(Uc3ConfigurationKeys.KAFKA_OUTPUT_TOPIC); final Duration duration = - Duration.standardDays(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS)); + Duration.standardDays(this.config.getInt(Uc3ConfigurationKeys.AGGREGATION_DURATION_DAYS)); final Duration aggregationAdvanceDuration = - Duration.standardDays(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS)); + Duration.standardDays(this.config.getInt(Uc3ConfigurationKeys.AGGREGATION_ADVANCE_DAYS)); final Duration triggerDelay = - Duration.standardSeconds(this.config.getInt(ConfigurationKeys.TRIGGER_INTERVAL_SECONDS)); + Duration.standardSeconds(this.config.getInt(Uc3ConfigurationKeys.TRIGGER_INTERVAL_SECONDS)); // Read from Kafka final KafkaActivePowerTimestampReader kafkaReader = super.buildKafkaReader(); diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/Uc3ConfigurationKeys.java b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/Uc3ConfigurationKeys.java new file mode 100644 index 0000000000000000000000000000000000000000..ca59d3d2f5db72bd5378e7616b323f74d926e51e --- /dev/null +++ b/theodolite-benchmarks/uc3-beam/src/main/java/rocks/theodolite/benchmarks/uc3/beam/Uc3ConfigurationKeys.java @@ -0,0 +1,19 @@ +package rocks.theodolite.benchmarks.uc3.beam; + +/** + * Keys to access configuration parameters. + */ +public final class Uc3ConfigurationKeys { + + public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; + + public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days"; + + public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days"; + + public static final String TRIGGER_INTERVAL_SECONDS = "trigger.interval"; + // public static final String TRIGGER_INTERVAL_SECONDS = "trigger.interval.seconds"; + + private Uc3ConfigurationKeys() {} + +} diff --git a/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/ConfigurationKeys.java b/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/ConfigurationKeys.java deleted file mode 100644 index a19a7f8b2660591f82f52d50deb6413e76c5e392..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/ConfigurationKeys.java +++ /dev/null @@ -1,42 +0,0 @@ -package rocks.theodolite.benchmarks.uc3.flink; - -/** - * Keys to access configuration parameters. - */ -public final class ConfigurationKeys { - - public static final String APPLICATION_NAME = "application.name"; - - public static final String APPLICATION_VERSION = "application.version"; - - public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; - - public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; - - public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; - - public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; - - public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days"; - - public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days"; - - public static final String AGGREGATION_TRIGGER_INTERVAL_SECONDS = // NOPMD - "aggregation.trigger.interval.seconds"; - - public static final String TIME_ZONE = "time.zone"; - - public static final String FLINK_STATE_BACKEND = "flink.state.backend"; - - public static final String FLINK_STATE_BACKEND_PATH = "flink.state.backend.path"; - - public static final String FLINK_STATE_BACKEND_MEMORY_SIZE = // NOPMD - "flink.state.backend.memory.size"; - - public static final String CHECKPOINTING = "checkpointing"; - - public static final String PARALLELISM = "parallelism"; - - private ConfigurationKeys() {} - -} diff --git a/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java index aaaba5b3b4d86ebcdb205cdef16285b6ee47ae4c..9f90f45f3b9ee8353b2e4a320024c18828198059 100644 --- a/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java @@ -16,6 +16,7 @@ import org.apache.kafka.common.serialization.Serdes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rocks.theodolite.benchmarks.commons.flink.AbstractFlinkService; +import rocks.theodolite.benchmarks.commons.flink.ConfigurationKeys; import rocks.theodolite.benchmarks.commons.flink.KafkaConnectorFactory; import rocks.theodolite.benchmarks.commons.flink.serialization.StatsSerializer; import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord; @@ -50,14 +51,14 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService { final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); - final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); - final ZoneId timeZone = ZoneId.of(this.config.getString(ConfigurationKeys.TIME_ZONE)); + final String outputTopic = this.config.getString(Uc3ConfigurationKeys.KAFKA_OUTPUT_TOPIC); + final ZoneId timeZone = ZoneId.of(this.config.getString(Uc3ConfigurationKeys.TIME_ZONE)); final Time aggregationDuration = - Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS)); + Time.days(this.config.getInt(Uc3ConfigurationKeys.AGGREGATION_DURATION_DAYS)); final Time aggregationAdvance = - Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS)); + Time.days(this.config.getInt(Uc3ConfigurationKeys.AGGREGATION_ADVANCE_DAYS)); final Time triggerDuration = - Time.seconds(this.config.getInt(ConfigurationKeys.AGGREGATION_TRIGGER_INTERVAL_SECONDS)); + Time.seconds(this.config.getInt(Uc3ConfigurationKeys.AGGREGATION_TRIGGER_INTERVAL_SECONDS)); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory( diff --git a/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/Uc3ConfigurationKeys.java b/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/Uc3ConfigurationKeys.java new file mode 100644 index 0000000000000000000000000000000000000000..e3e167cbc7ee8e043856ed3c5ce4f882e882c2c8 --- /dev/null +++ b/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/Uc3ConfigurationKeys.java @@ -0,0 +1,21 @@ +package rocks.theodolite.benchmarks.uc3.flink; + +/** + * Keys to access configuration parameters. + */ +public final class Uc3ConfigurationKeys { + + public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; + + public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days"; + + public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days"; + + public static final String AGGREGATION_TRIGGER_INTERVAL_SECONDS = // NOPMD + "aggregation.trigger.interval.seconds"; + + public static final String TIME_ZONE = "time.zone"; + + private Uc3ConfigurationKeys() {} + +} diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HistoryService.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HistoryService.java index df3f85a425e58a84c4962f90b407672ba1460b22..3f573d300117dd167f5e2e9ab6927836ccac4db1 100644 --- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HistoryService.java +++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/HistoryService.java @@ -8,7 +8,6 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys; import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService; /** @@ -35,16 +34,16 @@ public class HistoryService extends HazelcastJetService { StringSerializer.class.getCanonicalName()); final String kafkaOutputTopic = - this.config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString(); + this.config.getProperty(Uc3ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString(); final Duration windowSize = Duration.ofDays(Integer.parseInt( - this.config.getProperty(ConfigurationKeys.AGGREGATION_DURATION_DAYS).toString())); + this.config.getProperty(Uc3ConfigurationKeys.AGGREGATION_DURATION_DAYS).toString())); final Duration hoppingSize = Duration.ofDays(Integer.parseInt( - this.config.getProperty(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS).toString())); + this.config.getProperty(Uc3ConfigurationKeys.AGGREGATION_ADVANCE_DAYS).toString())); final Duration emitPeriod = Duration.ofSeconds(Integer.parseInt( - this.config.getProperty(ConfigurationKeys.AGGREGATION_EMIT_PERIOD_SECONDS).toString())); + this.config.getProperty(Uc3ConfigurationKeys.AGGREGATION_EMIT_PERIOD_SECONDS).toString())); this.pipelineFactory = new Uc3PipelineFactory( kafkaProps, diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3ConfigurationKeys.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3ConfigurationKeys.java new file mode 100644 index 0000000000000000000000000000000000000000..86978513fedbbe64dc5818e16e306f75d7aeb65c --- /dev/null +++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3ConfigurationKeys.java @@ -0,0 +1,17 @@ +package rocks.theodolite.benchmarks.uc3.hazelcastjet; + +/** + * Configuration Keys used for Hazelcast Jet Benchmark implementations. + */ +public class Uc3ConfigurationKeys { + + public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; + + public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days"; + + public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days"; + + public static final String AGGREGATION_EMIT_PERIOD_SECONDS = // NOPMD + "aggregation.emit.period.seconds"; + +} diff --git a/theodolite-benchmarks/uc3-kstreams/src/main/java/rocks/theodolite/benchmarks/uc3/kstreams/HistoryService.java b/theodolite-benchmarks/uc3-kstreams/src/main/java/rocks/theodolite/benchmarks/uc3/kstreams/HistoryService.java index 2151d0be487d89f23b3bb170a95ff60270948841..637c2e60b81592cf7181aafb7e7ea5840a753128 100644 --- a/theodolite-benchmarks/uc3-kstreams/src/main/java/rocks/theodolite/benchmarks/uc3/kstreams/HistoryService.java +++ b/theodolite-benchmarks/uc3-kstreams/src/main/java/rocks/theodolite/benchmarks/uc3/kstreams/HistoryService.java @@ -5,7 +5,6 @@ import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; import rocks.theodolite.benchmarks.commons.commons.configuration.ServiceConfigurations; -import rocks.theodolite.benchmarks.commons.kstreams.ConfigurationKeys; /** * A microservice that manages the history and, therefore, stores and aggregates incoming @@ -33,11 +32,11 @@ public class HistoryService { // Use case specific stream configuration final Uc3KafkaStreamsBuilder uc3KafkaStreamsBuilder = new Uc3KafkaStreamsBuilder(this.config); uc3KafkaStreamsBuilder - .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) + .outputTopic(this.config.getString(Uc3ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) .aggregationDuration( - Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS))) + Duration.ofDays(this.config.getInt(Uc3ConfigurationKeys.AGGREGATION_DURATION_DAYS))) .aggregationAdvance( - Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS))); + Duration.ofDays(this.config.getInt(Uc3ConfigurationKeys.AGGREGATION_ADVANCE_DAYS))); // Configuration of the stream application final KafkaStreams kafkaStreams = uc3KafkaStreamsBuilder.build(); diff --git a/theodolite-benchmarks/uc3-kstreams/src/main/java/rocks/theodolite/benchmarks/uc3/kstreams/Uc3ConfigurationKeys.java b/theodolite-benchmarks/uc3-kstreams/src/main/java/rocks/theodolite/benchmarks/uc3/kstreams/Uc3ConfigurationKeys.java new file mode 100644 index 0000000000000000000000000000000000000000..db0bdaa3e7171302aeaa117b1446937ce467d131 --- /dev/null +++ b/theodolite-benchmarks/uc3-kstreams/src/main/java/rocks/theodolite/benchmarks/uc3/kstreams/Uc3ConfigurationKeys.java @@ -0,0 +1,16 @@ +package rocks.theodolite.benchmarks.uc3.kstreams; + +/** + * Keys to access configuration parameters. + */ +public final class Uc3ConfigurationKeys { + + public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; + + public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days"; + + public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days"; + + private Uc3ConfigurationKeys() {} + +} diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/PipelineFactory.java b/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/PipelineFactory.java index 0974960a4c61fafb9bf00aecd23aa02f122e33c1..2bba46d9bd66d32fce3e9937ecae62a32a509e8f 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/PipelineFactory.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/PipelineFactory.java @@ -67,17 +67,17 @@ public class PipelineFactory extends AbstractPipelineFactory { @Override protected void constructPipeline(final Pipeline pipeline) { // NOPMD // Additional needed variables - final String feedbackTopic = this.config.getString(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC); - final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); + final String feedbackTopic = this.config.getString(Uc4ConfigurationKeys.KAFKA_FEEDBACK_TOPIC); + final String outputTopic = this.config.getString(Uc4ConfigurationKeys.KAFKA_OUTPUT_TOPIC); final String configurationTopic = - this.config.getString(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC); + this.config.getString(Uc4ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC); final Duration duration = Duration.millis( - this.config.getInt(ConfigurationKeys.EMIT_PERIOD_MS)); + this.config.getInt(Uc4ConfigurationKeys.EMIT_PERIOD_MS)); final Duration triggerDelay = Duration.standardSeconds( - this.config.getInt(ConfigurationKeys.TRIGGER_INTERVAL_SECONDS)); + this.config.getInt(Uc4ConfigurationKeys.TRIGGER_INTERVAL_SECONDS)); final Duration gracePeriod = Duration.standardSeconds(// TODO this is wrong - this.config.getInt(ConfigurationKeys.GRACE_PERIOD_MS)); + this.config.getInt(Uc4ConfigurationKeys.GRACE_PERIOD_MS)); // Read from Kafka final String bootstrapServer = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/Uc4ConfigurationKeys.java b/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/Uc4ConfigurationKeys.java new file mode 100644 index 0000000000000000000000000000000000000000..860e8c9daa148e3ed245b2d8aec54227d8e91a19 --- /dev/null +++ b/theodolite-benchmarks/uc4-beam/src/main/java/rocks/theodolite/benchmarks/uc4/beam/Uc4ConfigurationKeys.java @@ -0,0 +1,23 @@ +package rocks.theodolite.benchmarks.uc4.beam; + +/** + * Keys to access configuration parameters. + */ +public final class Uc4ConfigurationKeys { + + public static final String KAFKA_FEEDBACK_TOPIC = "kafka.feedback.topic"; + + public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; + + public static final String KAFKA_CONFIGURATION_TOPIC = "kafka.configuration.topic"; + + public static final String EMIT_PERIOD_MS = "emit.period.ms"; + + public static final String GRACE_PERIOD_MS = "grace.period.ms"; + + public static final String TRIGGER_INTERVAL_SECONDS = "trigger.interval"; + // public static final String TRIGGER_INTERVAL_SECONDS = "trigger.interval.seconds"; + + private Uc4ConfigurationKeys() {} + +} diff --git a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java index 3310651241697505508f0b4b536edc19e5de5256..abdb9aaed5d5209c2932d15039d9fecb687327b5 100644 --- a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java +++ b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java @@ -19,6 +19,7 @@ import org.slf4j.LoggerFactory; import rocks.theodolite.benchmarks.commons.configuration.events.Event; import rocks.theodolite.benchmarks.commons.configuration.events.EventSerde; import rocks.theodolite.benchmarks.commons.flink.AbstractFlinkService; +import rocks.theodolite.benchmarks.commons.flink.ConfigurationKeys; import rocks.theodolite.benchmarks.commons.flink.KafkaConnectorFactory; import rocks.theodolite.benchmarks.commons.flink.TupleType; import rocks.theodolite.benchmarks.commons.kafka.avro.SchemaRegistryAvroSerdeFactory; @@ -62,16 +63,17 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService { @Override protected void buildPipeline() { // Get configurations - final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); - final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); + final String kafkaBroker = this.config.getString(Uc4ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); + final String schemaRegistryUrl = + this.config.getString(Uc4ConfigurationKeys.SCHEMA_REGISTRY_URL); final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); - final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); + final String outputTopic = this.config.getString(Uc4ConfigurationKeys.KAFKA_OUTPUT_TOPIC); final Time windowSize = - Time.milliseconds(this.config.getLong(ConfigurationKeys.EMIT_PERIOD_MS)); + Time.milliseconds(this.config.getLong(Uc4ConfigurationKeys.EMIT_PERIOD_MS)); final Duration windowGrace = - Duration.ofMillis(this.config.getLong(ConfigurationKeys.GRACE_PERIOD_MS)); + Duration.ofMillis(this.config.getLong(Uc4ConfigurationKeys.GRACE_PERIOD_MS)); final String configurationTopic = - this.config.getString(ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC); + this.config.getString(Uc4ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory( diff --git a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/ConfigurationKeys.java b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/ConfigurationKeys.java deleted file mode 100644 index 6016865fb094d628c8703a7300fbfb83f46afc12..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/ConfigurationKeys.java +++ /dev/null @@ -1,41 +0,0 @@ -package rocks.theodolite.benchmarks.uc4.flink; - -/** - * Keys to access configuration parameters. - */ -public final class ConfigurationKeys { - - public static final String APPLICATION_NAME = "application.name"; - - public static final String APPLICATION_VERSION = "application.version"; - - public static final String CONFIGURATION_KAFKA_TOPIC = "configuration.kafka.topic"; - - public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; - - public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; - - public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; - - public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; - - public static final String EMIT_PERIOD_MS = "emit.period.ms"; - - public static final String GRACE_PERIOD_MS = "grace.period.ms"; - - public static final String FLINK_STATE_BACKEND = "flink.state.backend"; - - public static final String FLINK_STATE_BACKEND_PATH = "flink.state.backend.path"; - - public static final String FLINK_STATE_BACKEND_MEMORY_SIZE = // NOPMD - "flink.state.backend.memory.size"; - - public static final String DEBUG = "debug"; - - public static final String CHECKPOINTING = "checkpointing"; - - public static final String PARALLELISM = "parallelism"; - - private ConfigurationKeys() {} - -} diff --git a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/Uc4ConfigurationKeys.java b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/Uc4ConfigurationKeys.java new file mode 100644 index 0000000000000000000000000000000000000000..6fd2b0fa0ec50febd213fb3f7d24463d2bd6f51c --- /dev/null +++ b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/Uc4ConfigurationKeys.java @@ -0,0 +1,22 @@ +package rocks.theodolite.benchmarks.uc4.flink; + +/** + * Keys to access configuration parameters. + */ +public final class Uc4ConfigurationKeys { + + public static final String CONFIGURATION_KAFKA_TOPIC = "configuration.kafka.topic"; + + public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; + + public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; + + public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; + + public static final String EMIT_PERIOD_MS = "emit.period.ms"; + + public static final String GRACE_PERIOD_MS = "grace.period.ms"; + + private Uc4ConfigurationKeys() {} + +} diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java index 678f774a59166dd97337a584f708b1d4036c2c0b..97ea33eda56f34d5f1a2f8e5def8373c259540d0 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/HistoryService.java @@ -8,7 +8,6 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys; import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService; import rocks.theodolite.benchmarks.commons.model.sensorregistry.ImmutableSensorRegistry; @@ -47,15 +46,15 @@ public class HistoryService extends HazelcastJetService { StringSerializer.class.getCanonicalName(), KafkaAvroSerializer.class.getCanonicalName()); - final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); + final String outputTopic = this.config.getString(Uc4ConfigurationKeys.KAFKA_OUTPUT_TOPIC); final String configurationTopic = - this.config.getString(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC); + this.config.getString(Uc4ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC); - final String feedbackTopic = this.config.getString(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC); + final String feedbackTopic = this.config.getString(Uc4ConfigurationKeys.KAFKA_FEEDBACK_TOPIC); final Duration windowSize = Duration.ofMillis( - this.config.getInt(ConfigurationKeys.EMIT_PERIOD_MS)); + this.config.getInt(Uc4ConfigurationKeys.EMIT_PERIOD_MS)); this.pipelineFactory = new Uc4PipelineFactory( kafkaProps, diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4ConfigurationKeys.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4ConfigurationKeys.java new file mode 100644 index 0000000000000000000000000000000000000000..6c4c63396208cafc68a83835f29609b8582370ca --- /dev/null +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4ConfigurationKeys.java @@ -0,0 +1,17 @@ +package rocks.theodolite.benchmarks.uc4.hazelcastjet; + +/** + * Configuration Keys used for Hazelcast Jet Benchmark implementations. + */ +public class Uc4ConfigurationKeys { + + public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; + + public static final String KAFKA_CONFIGURATION_TOPIC = "kafka.configuration.topic"; + + public static final String KAFKA_FEEDBACK_TOPIC = "kafka.feedback.topic"; + + public static final String EMIT_PERIOD_MS = "emit.period.ms"; + // public static final String GRACE_PERIOD_MS = "grace.period.ms"; + +} diff --git a/theodolite-benchmarks/uc4-kstreams/src/main/java/rocks/theodolite/benchmarks/uc4/kstreams/AggregationService.java b/theodolite-benchmarks/uc4-kstreams/src/main/java/rocks/theodolite/benchmarks/uc4/kstreams/AggregationService.java index 26ea02957fb013c61c4ee0c3e2f280b0b9b8c993..4119fbefa3fb98b1573b757285cca688210a5bcc 100644 --- a/theodolite-benchmarks/uc4-kstreams/src/main/java/rocks/theodolite/benchmarks/uc4/kstreams/AggregationService.java +++ b/theodolite-benchmarks/uc4-kstreams/src/main/java/rocks/theodolite/benchmarks/uc4/kstreams/AggregationService.java @@ -5,7 +5,6 @@ import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; import rocks.theodolite.benchmarks.commons.commons.configuration.ServiceConfigurations; -import rocks.theodolite.benchmarks.commons.kstreams.ConfigurationKeys; /** * A microservice that manages the history and, therefore, stores and aggregates incoming @@ -37,11 +36,11 @@ public class AggregationService { private void createKafkaStreamsApplication() { final Uc4KafkaStreamsBuilder uc4KafkaStreamsBuilder = new Uc4KafkaStreamsBuilder(this.config); uc4KafkaStreamsBuilder - .feedbackTopic(this.config.getString(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC)) - .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) - .configurationTopic(this.config.getString(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC)) - .emitPeriod(Duration.ofMillis(this.config.getLong(ConfigurationKeys.EMIT_PERIOD_MS))) - .gracePeriod(Duration.ofMillis(this.config.getLong(ConfigurationKeys.GRACE_PERIOD_MS))); + .feedbackTopic(this.config.getString(Uc4ConfigurationKeys.KAFKA_FEEDBACK_TOPIC)) + .outputTopic(this.config.getString(Uc4ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) + .configurationTopic(this.config.getString(Uc4ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC)) + .emitPeriod(Duration.ofMillis(this.config.getLong(Uc4ConfigurationKeys.EMIT_PERIOD_MS))) + .gracePeriod(Duration.ofMillis(this.config.getLong(Uc4ConfigurationKeys.GRACE_PERIOD_MS))); final KafkaStreams kafkaStreams = uc4KafkaStreamsBuilder.build(); diff --git a/theodolite-benchmarks/uc4-kstreams/src/main/java/rocks/theodolite/benchmarks/uc4/kstreams/Uc4ConfigurationKeys.java b/theodolite-benchmarks/uc4-kstreams/src/main/java/rocks/theodolite/benchmarks/uc4/kstreams/Uc4ConfigurationKeys.java new file mode 100644 index 0000000000000000000000000000000000000000..58eb80aedc842f2634362795e5dfba5ac4a43fa7 --- /dev/null +++ b/theodolite-benchmarks/uc4-kstreams/src/main/java/rocks/theodolite/benchmarks/uc4/kstreams/Uc4ConfigurationKeys.java @@ -0,0 +1,21 @@ +package rocks.theodolite.benchmarks.uc4.kstreams; + +/** + * Keys to access configuration parameters. + */ +public final class Uc4ConfigurationKeys { + + public static final String KAFKA_FEEDBACK_TOPIC = "kafka.feedback.topic"; + + public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; + + public static final String KAFKA_CONFIGURATION_TOPIC = "kafka.configuration.topic"; + + public static final String EMIT_PERIOD_MS = "emit.period.ms"; + + public static final String GRACE_PERIOD_MS = "grace.period.ms"; + + private Uc4ConfigurationKeys() { + } + +}