Skip to content
Snippets Groups Projects
Commit 9bb59d99 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'code-cleanup' into main

parents 744d55b5 2012da38
Branches
Tags
No related merge requests found
Pipeline #10217 passed
Showing
with 129 additions and 119 deletions
......@@ -4,7 +4,7 @@ package rocks.theodolite.benchmarks.commons.beam;
* Keys to access configuration parameters.
*/
public final class ConfigurationKeys {
// Common keys
public static final String APPLICATION_NAME = "application.name";
public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
......@@ -13,27 +13,6 @@ public final class ConfigurationKeys {
public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
// Additional topics
public static final String KAFKA_FEEDBACK_TOPIC = "kafka.feedback.topic";
public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
public static final String KAFKA_CONFIGURATION_TOPIC = "kafka.configuration.topic";
// UC2
public static final String DOWNSAMPLE_INTERVAL_MINUTES = "downsample.interval.minutes";
// UC3
public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days";
public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days";
// UC4
public static final String EMIT_PERIOD_MS = "emit.period.ms";
public static final String GRACE_PERIOD_MS = "grace.period.ms";
// BEAM
public static final String ENABLE_AUTO_COMMIT = "enable.auto.commit";
public static final String MAX_POLL_RECORDS = "max.poll.records";
......@@ -42,11 +21,6 @@ public final class ConfigurationKeys {
public static final String SPECIFIC_AVRO_READER = "specific.avro.reader";
// Used for UC3 + UC4:
public static final String TRIGGER_INTERVAL_SECONDS = "trigger.interval";
// public static final String TRIGGER_INTERVAL_SECONDS = "trigger.interval.seconds";
private ConfigurationKeys() {}
}
......@@ -20,8 +20,8 @@ spec:
image: ghcr.io/cau-se/theodolite-uc1-beam-flink:latest
args: ["standalone-job", "--job-classname", "rocks.theodolite.benchmarks.uc1.beam.flink.Uc1BeamFlink",
"--parallelism=$(PARALLELISM)",
"--disableMetrics=true",
"--fasterCopy"]
"--disableMetrics=$(DISABLE_METRICS)",
"--fasterCopy=$(FASTER_COPY)"]
# optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
env:
- name: KAFKA_BOOTSTRAP_SERVERS
......@@ -32,6 +32,10 @@ spec:
value: "false"
- name: PARALLELISM
value: "1"
- name: DISABLE_METRICS
value: "true"
- name: FASTER_COPY
value: "true"
- name: "FLINK_STATE_BACKEND"
value: "rocksdb"
- name: JOB_MANAGER_RPC_ADDRESS
......
......@@ -20,8 +20,8 @@ spec:
image: ghcr.io/cau-se/theodolite-uc2-beam-flink:latest
args: ["standalone-job", "--job-classname", "rocks.theodolite.benchmarks.uc2.beam.flink.Uc2BeamFlink",
"--parallelism=$(PARALLELISM)",
"--disableMetrics=true",
"--fasterCopy"]
"--disableMetrics=$(DISABLE_METRICS)",
"--fasterCopy=$(FASTER_COPY)"]
# optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
env:
- name: KAFKA_BOOTSTRAP_SERVERS
......@@ -32,6 +32,10 @@ spec:
value: "false"
- name: PARALLELISM
value: "1"
- name: DISABLE_METRICS
value: "true"
- name: FASTER_COPY
value: "true"
- name: "FLINK_STATE_BACKEND"
value: "rocksdb"
- name: JOB_MANAGER_RPC_ADDRESS
......
......@@ -20,8 +20,8 @@ spec:
image: ghcr.io/cau-se/theodolite-uc3-beam-flink:latest
args: ["standalone-job", "--job-classname", "rocks.theodolite.benchmarks.uc3.beam.flink.Uc3BeamFlink",
"--parallelism=$(PARALLELISM)",
"--disableMetrics=true",
"--fasterCopy"]
"--disableMetrics=$(DISABLE_METRICS)",
"--fasterCopy=$(FASTER_COPY)"]
# optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
env:
- name: KAFKA_BOOTSTRAP_SERVERS
......@@ -32,6 +32,10 @@ spec:
value: "false"
- name: PARALLELISM
value: "1"
- name: DISABLE_METRICS
value: "true"
- name: FASTER_COPY
value: "true"
- name: "FLINK_STATE_BACKEND"
value: "rocksdb"
- name: JOB_MANAGER_RPC_ADDRESS
......
......@@ -20,8 +20,8 @@ spec:
image: ghcr.io/cau-se/theodolite-uc4-beam-flink:latest
args: ["standalone-job", "--job-classname", "rocks.theodolite.benchmarks.uc4.beam.flink.Uc4BeamFlink",
"--parallelism=$(PARALLELISM)",
"--disableMetrics=true",
"--fasterCopy"]
"--disableMetrics=$(DISABLE_METRICS)",
"--fasterCopy=$(FASTER_COPY)"]
# optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
env:
- name: KAFKA_BOOTSTRAP_SERVERS
......@@ -32,6 +32,10 @@ spec:
value: "false"
- name: PARALLELISM
value: "1"
- name: DISABLE_METRICS
value: "true"
- name: FASTER_COPY
value: "true"
- name: "FLINK_STATE_BACKEND"
value: "rocksdb"
- name: JOB_MANAGER_RPC_ADDRESS
......
......@@ -7,32 +7,20 @@ public class ConfigurationKeys {
public static final String APPLICATION_NAME = "application.name";
// Common Keys
public static final String BOOTSTRAP_SERVER = "BOOTSTRAP_SERVER";
public static final String KUBERNETES_DNS_NAME = "KUBERNETES_DNS_NAME";
public static final String PORT = "PORT";
public static final String PORT_AUTO_INCREMENT = "PORT_AUTO_INCREMENT";
public static final String CLUSTER_NAME_PREFIX = "CLUSTER_NAME_PREFIX";
public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
public static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
// Additional topics
public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
public static final String CLUSTER_NAME_PREFIX = "CLUSTER_NAME_PREFIX";
// UC2
public static final String DOWNSAMPLE_INTERVAL_MINUTES = "downsample.interval.minutes";
public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
// UC3
public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days";
public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days";
public static final String AGGREGATION_EMIT_PERIOD_SECONDS = // NOPMD
"aggregation.emit.period.seconds";
public static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
// UC4
public static final String KAFKA_CONFIGURATION_TOPIC = "kafka.configuration.topic";
public static final String KAFKA_FEEDBACK_TOPIC = "kafka.feedback.topic";
public static final String EMIT_PERIOD_MS = "emit.period.ms";
// public static final String GRACE_PERIOD_MS = "grace.period.ms";
public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
}
......@@ -15,27 +15,6 @@ public final class ConfigurationKeys {
public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
// Additional topics
public static final String KAFKA_FEEDBACK_TOPIC = "kafka.feedback.topic";
public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
public static final String KAFKA_CONFIGURATION_TOPIC = "kafka.configuration.topic";
// UC2
public static final String EMIT_PERIOD_MS = "emit.period.ms";
public static final String GRACE_PERIOD_MS = "grace.period.ms";
// UC3
public static final String DOWNSAMPLE_INTERVAL_MINUTES = "downsample.interval.minutes";
// UC4
public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days";
public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days";
private ConfigurationKeys() {}
}
package rocks.theodolite.benchmarks.uc1.flink;
/**
* Keys to access configuration parameters.
*/
public final class ConfigurationKeys {
public static final String APPLICATION_NAME = "application.name";
public static final String APPLICATION_VERSION = "application.version";
public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
public static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
public static final String CHECKPOINTING = "checkpointing";
public static final String PARALLELISM = "parallelism";
private ConfigurationKeys() {}
}
......@@ -4,6 +4,7 @@ import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import rocks.theodolite.benchmarks.commons.flink.AbstractFlinkService;
import rocks.theodolite.benchmarks.commons.flink.ConfigurationKeys;
import rocks.theodolite.benchmarks.commons.flink.KafkaConnectorFactory;
import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord;
import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
......
......@@ -40,10 +40,10 @@ public class PipelineFactory extends AbstractPipelineFactory {
@Override
protected void constructPipeline(final Pipeline pipeline) {
final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
final String outputTopic = this.config.getString(Uc2ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
final Duration downsampleInterval = Duration.standardMinutes(
this.config.getInt(ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES));
this.config.getInt(Uc2ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES));
final KafkaActivePowerTimestampReader kafkaReader = super.buildKafkaReader();
......
package rocks.theodolite.benchmarks.uc2.beam;
/**
* Keys to access configuration parameters.
*/
public final class Uc2ConfigurationKeys {
public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
public static final String DOWNSAMPLE_INTERVAL_MINUTES = "downsample.interval.minutes";
private Uc2ConfigurationKeys() {}
}
......@@ -11,6 +11,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.commons.flink.AbstractFlinkService;
import rocks.theodolite.benchmarks.commons.flink.ConfigurationKeys;
import rocks.theodolite.benchmarks.commons.flink.KafkaConnectorFactory;
import rocks.theodolite.benchmarks.commons.flink.serialization.StatsSerializer;
import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord;
......@@ -35,12 +36,12 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService {
@Override
protected void buildPipeline() {
final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
final String kafkaBroker = this.config.getString(Uc2ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
final String outputTopic = this.config.getString(Uc2ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
final Time windowDuration = Time.minutes(
this.config.getInt(ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES));
this.config.getInt(Uc2ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES));
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory(
......
......@@ -3,33 +3,14 @@ package rocks.theodolite.benchmarks.uc2.flink;
/**
* Keys to access configuration parameters.
*/
public final class ConfigurationKeys {
public static final String APPLICATION_NAME = "application.name";
public static final String APPLICATION_VERSION = "application.version";
public final class Uc2ConfigurationKeys {
public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
public static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
public static final String DOWNSAMPLE_INTERVAL_MINUTES = "downsample.interval.minutes";
public static final String FLINK_STATE_BACKEND = "flink.state.backend";
public static final String FLINK_STATE_BACKEND_PATH = "flink.state.backend.path";
public static final String FLINK_STATE_BACKEND_MEMORY_SIZE = // NOPMD
"flink.state.backend.memory.size";
public static final String CHECKPOINTING = "checkpointing";
public static final String PARALLELISM = "parallelism";
private ConfigurationKeys() {}
private Uc2ConfigurationKeys() {}
}
......@@ -8,7 +8,6 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.commons.hazelcastjet.ConfigurationKeys;
import rocks.theodolite.benchmarks.commons.hazelcastjet.HazelcastJetService;
......@@ -35,10 +34,11 @@ public class HistoryService extends HazelcastJetService {
StringSerializer.class.getCanonicalName(),
StringSerializer.class.getCanonicalName());
final String kafkaOutputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
final String kafkaOutputTopic =
this.config.getProperty(Uc2ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString();
final Duration downsampleInterval = Duration.ofMinutes(
this.config.getInt(ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES));
this.config.getInt(Uc2ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES));
this.pipelineFactory = new Uc2PipelineFactory(
kafkaProps,
......
package rocks.theodolite.benchmarks.uc2.hazelcastjet;
/**
* Configuration Keys used for Hazelcast Jet Benchmark implementations.
*/
public class Uc2ConfigurationKeys {
public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
public static final String DOWNSAMPLE_INTERVAL_MINUTES = "downsample.interval.minutes";
}
......@@ -5,7 +5,6 @@ import java.util.concurrent.CompletableFuture;
import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.KafkaStreams;
import rocks.theodolite.benchmarks.commons.commons.configuration.ServiceConfigurations;
import rocks.theodolite.benchmarks.commons.kstreams.ConfigurationKeys;
/**
* A microservice that manages the history and, therefore, stores and aggregates incoming
......@@ -32,9 +31,9 @@ public class HistoryService {
private void createKafkaStreamsApplication() {
final Uc2KafkaStreamsBuilder uc2KafkaStreamsBuilder = new Uc2KafkaStreamsBuilder(this.config);
uc2KafkaStreamsBuilder
.outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC))
.outputTopic(this.config.getString(Uc2ConfigurationKeys.KAFKA_OUTPUT_TOPIC))
.windowDuration(Duration.ofMinutes(
this.config.getInt(ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES)));
this.config.getInt(Uc2ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES)));
final KafkaStreams kafkaStreams = uc2KafkaStreamsBuilder.build();
......
package rocks.theodolite.benchmarks.uc2.kstreams;
/**
* Keys to access configuration parameters.
*/
public final class Uc2ConfigurationKeys {
public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
public static final String DOWNSAMPLE_INTERVAL_MINUTES = "downsample.interval.minutes";
private Uc2ConfigurationKeys() {}
}
......@@ -41,14 +41,14 @@ public class PipelineFactory extends AbstractPipelineFactory {
@Override
protected void constructPipeline(final Pipeline pipeline) {
final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
final String outputTopic = this.config.getString(Uc3ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
final Duration duration =
Duration.standardDays(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS));
Duration.standardDays(this.config.getInt(Uc3ConfigurationKeys.AGGREGATION_DURATION_DAYS));
final Duration aggregationAdvanceDuration =
Duration.standardDays(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS));
Duration.standardDays(this.config.getInt(Uc3ConfigurationKeys.AGGREGATION_ADVANCE_DAYS));
final Duration triggerDelay =
Duration.standardSeconds(this.config.getInt(ConfigurationKeys.TRIGGER_INTERVAL_SECONDS));
Duration.standardSeconds(this.config.getInt(Uc3ConfigurationKeys.TRIGGER_INTERVAL_SECONDS));
// Read from Kafka
final KafkaActivePowerTimestampReader kafkaReader = super.buildKafkaReader();
......
package rocks.theodolite.benchmarks.uc3.beam;
/**
* Keys to access configuration parameters.
*/
public final class Uc3ConfigurationKeys {
public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days";
public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days";
public static final String TRIGGER_INTERVAL_SECONDS = "trigger.interval";
// public static final String TRIGGER_INTERVAL_SECONDS = "trigger.interval.seconds";
private Uc3ConfigurationKeys() {}
}
......@@ -16,6 +16,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.commons.flink.AbstractFlinkService;
import rocks.theodolite.benchmarks.commons.flink.ConfigurationKeys;
import rocks.theodolite.benchmarks.commons.flink.KafkaConnectorFactory;
import rocks.theodolite.benchmarks.commons.flink.serialization.StatsSerializer;
import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord;
......@@ -50,14 +51,14 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService {
final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
final ZoneId timeZone = ZoneId.of(this.config.getString(ConfigurationKeys.TIME_ZONE));
final String outputTopic = this.config.getString(Uc3ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
final ZoneId timeZone = ZoneId.of(this.config.getString(Uc3ConfigurationKeys.TIME_ZONE));
final Time aggregationDuration =
Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS));
Time.days(this.config.getInt(Uc3ConfigurationKeys.AGGREGATION_DURATION_DAYS));
final Time aggregationAdvance =
Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS));
Time.days(this.config.getInt(Uc3ConfigurationKeys.AGGREGATION_ADVANCE_DAYS));
final Time triggerDuration =
Time.seconds(this.config.getInt(ConfigurationKeys.AGGREGATION_TRIGGER_INTERVAL_SECONDS));
Time.seconds(this.config.getInt(Uc3ConfigurationKeys.AGGREGATION_TRIGGER_INTERVAL_SECONDS));
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment