Skip to content
Snippets Groups Projects
Commit 2b53d632 authored by Sören Henning's avatar Sören Henning
Browse files

Align downsample intervals among engines

parent b8378027
No related branches found
No related tags found
1 merge request!306Align all downsample intervals for UC2
Pipeline #10169 passed
Showing
with 23 additions and 33 deletions
......@@ -21,7 +21,7 @@ public final class ConfigurationKeys {
public static final String KAFKA_CONFIGURATION_TOPIC = "kafka.configuration.topic";
// UC2
public static final String KAFKA_WINDOW_DURATION_MINUTES = "kafka.window.duration.minutes";
public static final String DOWNSAMPLE_INTERVAL_MINUTES = "downsample.interval.minutes";
// UC3
public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days";
......
......@@ -21,8 +21,6 @@ spec:
value: "theodolite-kafka-kafka-bootstrap:9092"
- name: SCHEMA_REGISTRY_URL
value: "http://theodolite-kafka-schema-registry:8081"
- name: DOWNSAMPLE_INTERVAL
value: "5000"
#- name: KUBERNETES_DNS_NAME
# value: "titan-ccp-aggregation"
- name: KUBERNETES_NAMESPACE
......
......@@ -50,7 +50,6 @@ services:
BOOTSTRAP_SERVER: benchmark:5701
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_URL: http://schema-registry:8081
KAFKA_WINDOW_DURATION_MINUTES: 60
load-generator:
image: ghcr.io/cau-se/theodolite-uc2-workload-generator:${THEODOLITE_TAG:-latest}
depends_on:
......
......@@ -45,7 +45,6 @@ services:
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_URL: http://schema-registry:8081
KAFKA_WINDOW_DURATION_MINUTES: 60
load-generator:
image: ghcr.io/cau-se/theodolite-uc2-workload-generator:${THEODOLITE_TAG:-latest}
depends_on:
......
......@@ -21,7 +21,7 @@ public class ConfigurationKeys {
public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
// UC2
public static final String DOWNSAMPLE_INTERVAL = "kafka.window.duration.minutes";
public static final String DOWNSAMPLE_INTERVAL_MINUTES = "downsample.interval.minutes";
// UC3
public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days";
......
......@@ -28,7 +28,7 @@ public final class ConfigurationKeys {
public static final String GRACE_PERIOD_MS = "grace.period.ms";
// UC3
public static final String KAFKA_WINDOW_DURATION_MINUTES = "kafka.window.duration.minutes";
public static final String DOWNSAMPLE_INTERVAL_MINUTES = "downsample.interval.minutes";
// UC4
public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days";
......
......@@ -42,8 +42,8 @@ public class PipelineFactory extends AbstractPipelineFactory {
protected void constructPipeline(final Pipeline pipeline) {
final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
final Duration duration = Duration.standardMinutes(
this.config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES));
final Duration downsampleInterval = Duration.standardMinutes(
this.config.getInt(ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES));
final KafkaActivePowerTimestampReader kafkaReader = super.buildKafkaReader();
......@@ -58,7 +58,7 @@ public class PipelineFactory extends AbstractPipelineFactory {
// Apply pipeline transformations
pipeline.apply(kafkaReader)
// Apply a fixed window
.apply(Window.<KV<String, ActivePowerRecord>>into(FixedWindows.of(duration)))
.apply(Window.<KV<String, ActivePowerRecord>>into(FixedWindows.of(downsampleInterval)))
// Aggregate per window for every key
.apply(Combine.<String, ActivePowerRecord, Stats>perKey(new StatsAggregation()))
.setCoder(KvCoder.of(StringUtf8Coder.of(), SerializableCoder.of(Stats.class)))
......
......@@ -4,10 +4,10 @@ application.version=0.0.1
kafka.bootstrap.servers=localhost:9092
kafka.input.topic=input
kafka.output.topic=output
kafka.window.duration.minutes=1
schema.registry.url=http://localhost:8081
downsample.interval.minutes=1
specific.avro.reader=true
# Kafka Settings
......
......@@ -17,7 +17,7 @@ public final class ConfigurationKeys {
public static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
public static final String KAFKA_WINDOW_DURATION_MINUTES = "kafka.window.duration.minutes";
public static final String DOWNSAMPLE_INTERVAL_MINUTES = "downsample.interval.minutes";
public static final String FLINK_STATE_BACKEND = "flink.state.backend";
......
......@@ -39,9 +39,8 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService {
final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC);
final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
final int windowDurationMinutes =
this.config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES);
final Time windowDuration = Time.minutes(windowDurationMinutes);
final Time windowDuration = Time.minutes(
this.config.getInt(ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES));
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory(
......
......@@ -6,7 +6,7 @@ kafka.input.topic=input
kafka.output.topic=output
schema.registry.url=http://localhost:8081
kafka.window.duration.minutes=1
downsample.interval.minutes=1
# Flink configuration
checkpointing.interval.ms=100
checkpointing.interval.ms=1000
......@@ -2,7 +2,6 @@ package rocks.theodolite.benchmarks.uc2.hazelcastjet;
import com.google.common.math.StatsAccumulator;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.time.Duration;
import java.util.Properties;
import org.apache.kafka.common.serialization.StringDeserializer;
......@@ -22,8 +21,8 @@ public class HistoryService extends HazelcastJetService {
/**
* Constructs the use case logic for UC2.
* Retrieves the needed values and instantiates a pipeline factory.
* Constructs the use case logic for UC2. Retrieves the needed values and instantiates a pipeline
* factory.
*/
public HistoryService() {
super(LOGGER);
......@@ -38,11 +37,10 @@ public class HistoryService extends HazelcastJetService {
StringSerializer.class.getCanonicalName());
final String kafkaOutputTopic =
config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString();
this.config.getProperty(ConfigurationKeys.KAFKA_OUTPUT_TOPIC).toString();
final Duration downsampleInterval = Duration.ofMinutes(
Integer.parseInt(config.getProperty(
ConfigurationKeys.DOWNSAMPLE_INTERVAL).toString()));
this.config.getInt(ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES));
this.pipelineFactory = new Uc2PipelineFactory(
kafkaProps,
......
......@@ -4,7 +4,6 @@ application.version=0.0.1
kafka.bootstrap.servers=localhost:9092
kafka.input.topic=input
kafka.output.topic=output
kafka.window.duration.minutes=1
schema.registry.url=http://localhost:8081
downsample.interval.minutes=1
package rocks.theodolite.benchmarks.uc2.kstreams;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.KafkaStreams;
......@@ -18,8 +17,6 @@ public class HistoryService {
private final Configuration config = ServiceConfigurations.createWithDefaults();
private final CompletableFuture<Void> stopEvent = new CompletableFuture<>();
private final int windowDurationMinutes = Integer
.parseInt(Objects.requireNonNullElse(System.getenv("KAFKA_WINDOW_DURATION_MINUTES"), "60"));
/**
* Start the service.
......@@ -36,7 +33,8 @@ public class HistoryService {
final Uc2KafkaStreamsBuilder uc2KafkaStreamsBuilder = new Uc2KafkaStreamsBuilder(this.config);
uc2KafkaStreamsBuilder
.outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC))
.windowDuration(Duration.ofMinutes(this.windowDurationMinutes));
.windowDuration(Duration.ofMinutes(
this.config.getInt(ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES)));
final KafkaStreams kafkaStreams = uc2KafkaStreamsBuilder.build();
......
......@@ -4,9 +4,9 @@ application.version=0.0.1
kafka.bootstrap.servers=localhost:9092
kafka.input.topic=input
kafka.output.topic=output
kafka.window.duration.minutes=1
schema.registry.url=http://localhost:8081
downsample.interval.minutes=1
# Kafka Streams Config
commit.interval.ms=5000
......@@ -73,7 +73,7 @@ public class PipelineFactory extends AbstractPipelineFactory {
this.config.getString(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC);
final Duration duration = Duration.standardSeconds(
this.config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES));
this.config.getInt(ConfigurationKeys.DOWNSAMPLE_INTERVAL_MINUTES));
final Duration triggerDelay = Duration.standardSeconds(
this.config.getInt(ConfigurationKeys.TRIGGER_INTERVAL));
final Duration gracePeriod = Duration.standardSeconds(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment