Skip to content
Snippets Groups Projects
Commit 62dc3ce4 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'align-downsample-intervals' into 'main'

Align all downsample intervals for UC2

See merge request !306
parents 6d3d8e45 a0f9a16d
No related branches found
No related tags found
1 merge request!306Align all downsample intervals for UC2
Pipeline #10192 passed
Showing
with 31 additions and 38 deletions
......@@ -21,7 +21,7 @@ public final class ConfigurationKeys {
public static final String KAFKA_CONFIGURATION_TOPIC = "kafka.configuration.topic";
// UC2
public static final String KAFKA_WINDOW_DURATION_MINUTES = "kafka.window.duration.minutes";
public static final String DOWNSAMPLE_INTERVAL_MINUTES = "downsample.interval.minutes";
// UC3
public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days";
......@@ -29,8 +29,10 @@ public final class ConfigurationKeys {
public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days";
// UC4
public static final String GRACE_PERIOD_MS = "grace.period.ms";
public static final String EMIT_PERIOD_SECONDS = "kafka.window.duration.minutes";
// public static final String EMIT_PERIOD_MS = "emit.period.ms";
public static final String GRACE_PERIOD_MS = "grace.period.ms";
// BEAM
public static final String ENABLE_AUTO_COMMIT = "enable.auto.commit";
......@@ -41,7 +43,9 @@ public final class ConfigurationKeys {
public static final String SPECIFIC_AVRO_READER = "specific.avro.reader";
public static final String TRIGGER_INTERVAL = "trigger.interval";
// Used for UC3 + UC4:
public static final String TRIGGER_INTERVAL_SECONDS = "trigger.interval";
// public static final String TRIGGER_INTERVAL_SECONDS = "trigger.interval.seconds";
private ConfigurationKeys() {}
......
......@@ -21,8 +21,6 @@ spec:
value: "theodolite-kafka-kafka-bootstrap:9092"
- name: SCHEMA_REGISTRY_URL
value: "http://theodolite-kafka-schema-registry:8081"
- name: DOWNSAMPLE_INTERVAL
value: "5000"
#- name: KUBERNETES_DNS_NAME
# value: "titan-ccp-aggregation"
- name: KUBERNETES_NAMESPACE
......
......@@ -50,7 +50,6 @@ services:
BOOTSTRAP_SERVER: benchmark:5701
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_URL: http://schema-registry:8081
KAFKA_WINDOW_DURATION_MINUTES: 60
load-generator:
image: ghcr.io/cau-se/theodolite-uc2-workload-generator:${THEODOLITE_TAG:-latest}
depends_on:
......
......@@ -45,7 +45,6 @@ services:
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_URL: http://schema-registry:8081
KAFKA_WINDOW_DURATION_MINUTES: 60
load-generator:
image: ghcr.io/cau-se/theodolite-uc2-workload-generator:${THEODOLITE_TAG:-latest}
depends_on:
......
......@@ -21,7 +21,7 @@ public class ConfigurationKeys {
public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
// UC2
public static final String DOWNSAMPLE_INTERVAL = "kafka.window.duration.minutes";
public static final String DOWNSAMPLE_INTERVAL_MINUTES = "downsample.interval.minutes";
// UC3
public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days";
......
......@@ -28,7 +28,7 @@ public final class ConfigurationKeys {
public static final String GRACE_PERIOD_MS = "grace.period.ms";
// UC3
public static final String KAFKA_WINDOW_DURATION_MINUTES = "kafka.window.duration.minutes";
public static final String DOWNSAMPLE_INTERVAL_MINUTES = "downsample.interval.minutes";
// UC4
public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days";
......
......@@ -42,8 +42,8 @@ public class PipelineFactory extends AbstractPipelineFactory {
protected void constructPipeline(final Pipeline pipeline) {
final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
final Duration duration = Duration.standardMinutes(
this.config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES));
final Duration downsampleInterval = Duration.standardMinutes(
this.config.getInt(ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES));
final KafkaActivePowerTimestampReader kafkaReader = super.buildKafkaReader();
......@@ -58,7 +58,7 @@ public class PipelineFactory extends AbstractPipelineFactory {
// Apply pipeline transformations
pipeline.apply(kafkaReader)
// Apply a fixed window
.apply(Window.<KV<String, ActivePowerRecord>>into(FixedWindows.of(duration)))
.apply(Window.<KV<String, ActivePowerRecord>>into(FixedWindows.of(downsampleInterval)))
// Aggregate per window for every key
.apply(Combine.<String, ActivePowerRecord, Stats>perKey(new StatsAggregation()))
.setCoder(KvCoder.of(StringUtf8Coder.of(), SerializableCoder.of(Stats.class)))
......
......@@ -4,10 +4,10 @@ application.version=0.0.1
kafka.bootstrap.servers=localhost:9092
kafka.input.topic=input
kafka.output.topic=output
kafka.window.duration.minutes=1
schema.registry.url=http://localhost:8081
downsample.interval.minutes=1
specific.avro.reader=true
# Kafka Settings
......
......@@ -17,7 +17,7 @@ public final class ConfigurationKeys {
public static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
public static final String KAFKA_WINDOW_DURATION_MINUTES = "kafka.window.duration.minutes";
public static final String DOWNSAMPLE_INTERVAL_MINUTES = "downsample.interval.minutes";
public static final String FLINK_STATE_BACKEND = "flink.state.backend";
......
......@@ -39,9 +39,8 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService {
final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
final int windowDurationMinutes =
this.config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES);
final Time windowDuration = Time.minutes(windowDurationMinutes);
final Time windowDuration = Time.minutes(
this.config.getInt(ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES));
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory(
......
......@@ -6,7 +6,7 @@ kafka.input.topic=input
kafka.output.topic=output
schema.registry.url=http://localhost:8081
kafka.window.duration.minutes=1
downsample.interval.minutes=1
# Flink configuration
checkpointing.interval.ms=100
checkpointing.interval.ms=1000
......@@ -2,7 +2,6 @@ package rocks.theodolite.benchmarks.uc2.hazelcastjet;
import com.google.common.math.StatsAccumulator;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.time.Duration;
import java.util.Properties;
import org.apache.kafka.common.serialization.StringDeserializer;
......@@ -22,8 +21,8 @@ public class HistoryService extends HazelcastJetService {
/**
* Constructs the use case logic for UC2.
* Retrieves the needed values and instantiates a pipeline factory.
* Constructs the use case logic for UC2. Retrieves the needed values and instantiates a pipeline
* factory.
*/
public HistoryService() {
super(LOGGER);
......@@ -38,11 +37,10 @@ public class HistoryService extends HazelcastJetService {
StringSerializer.class.getCanonicalName());
final String kafkaOutputTopic =
config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString();
this.config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString();
final Duration downsampleInterval = Duration.ofMinutes(
Integer.parseInt(config.getProperty(
ConfigurationKeys.DOWNSAMPLE_INTERVAL).toString()));
this.config.getInt(ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES));
this.pipelineFactory = new Uc2PipelineFactory(
kafkaProps,
......
......@@ -4,7 +4,6 @@ application.version=0.0.1
kafka.bootstrap.servers=localhost:9092
kafka.input.topic=input
kafka.output.topic=output
kafka.window.duration.minutes=1
schema.registry.url=http://localhost:8081
downsample.interval.minutes=1
package rocks.theodolite.benchmarks.uc2.kstreams;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.KafkaStreams;
......@@ -18,8 +17,6 @@ public class HistoryService {
private final Configuration config = ServiceConfigurations.createWithDefaults();
private final CompletableFuture<Void> stopEvent = new CompletableFuture<>();
private final int windowDurationMinutes = Integer
.parseInt(Objects.requireNonNullElse(System.getenv("KAFKA_WINDOW_DURATION_MINUTES"), "60"));
/**
* Start the service.
......@@ -36,7 +33,8 @@ public class HistoryService {
final Uc2KafkaStreamsBuilder uc2KafkaStreamsBuilder = new Uc2KafkaStreamsBuilder(this.config);
uc2KafkaStreamsBuilder
.outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC))
.windowDuration(Duration.ofMinutes(this.windowDurationMinutes));
.windowDuration(Duration.ofMinutes(
this.config.getInt(ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES)));
final KafkaStreams kafkaStreams = uc2KafkaStreamsBuilder.build();
......
......@@ -4,9 +4,9 @@ application.version=0.0.1
kafka.bootstrap.servers=localhost:9092
kafka.input.topic=input
kafka.output.topic=output
kafka.window.duration.minutes=1
schema.registry.url=http://localhost:8081
downsample.interval.minutes=1
# Kafka Streams Config
commit.interval.ms=5000
......@@ -48,7 +48,7 @@ public class PipelineFactory extends AbstractPipelineFactory {
final Duration aggregationAdvanceDuration =
Duration.standardDays(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS));
final Duration triggerDelay =
Duration.standardSeconds(this.config.getInt(ConfigurationKeys.TRIGGER_INTERVAL));
Duration.standardSeconds(this.config.getInt(ConfigurationKeys.TRIGGER_INTERVAL_SECONDS));
// Read from Kafka
final KafkaActivePowerTimestampReader kafkaReader = super.buildKafkaReader();
......
......@@ -4,7 +4,6 @@ application.version=0.0.1
kafka.bootstrap.servers=localhost:9092
kafka.input.topic=input
kafka.output.topic=output
kafka.window.duration.minutes=1
schema.registry.url=http://localhost:8081
......
......@@ -73,9 +73,9 @@ public class PipelineFactory extends AbstractPipelineFactory {
this.config.getString(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC);
final Duration duration = Duration.standardSeconds(
this.config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES));
this.config.getInt(ConfigurationKeys.EMIT_PERIOD_SECONDS));
final Duration triggerDelay = Duration.standardSeconds(
this.config.getInt(ConfigurationKeys.TRIGGER_INTERVAL));
this.config.getInt(ConfigurationKeys.TRIGGER_INTERVAL_SECONDS));
final Duration gracePeriod = Duration.standardSeconds(
this.config.getInt(ConfigurationKeys.GRACE_PERIOD_MS));
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment