Skip to content
Snippets Groups Projects
Commit 8b80086c authored by Sören Henning's avatar Sören Henning
Browse files

Make window size configurable

parent 8bd84de4
No related branches found
No related tags found
No related merge requests found
Pipeline #375 passed
......@@ -7,9 +7,13 @@ public final class ConfigurationKeys {
public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
public static final String AGGREGATION_DURATION_DAYS = "aggregtion.duration.days";
public static final String AGGREGATION_ADVANCE_DAYS = "aggregtion.advance.days";
public static final String NUM_THREADS = "num.threads";
......
package uc4.application;
import java.util.Objects;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.KafkaStreams;
......@@ -18,12 +18,8 @@ public class HistoryService {
private final CompletableFuture<Void> stopEvent = new CompletableFuture<>();
final String schemaRegistry = Objects.requireNonNull(System.getenv("SCHEMA_REGISTRY_URL"));
/**
* Start the service.
*
* @return {@link CompletableFuture} which is completed when the service is successfully started.
*/
public void run() {
this.createKafkaStreamsApplication();
......@@ -38,6 +34,10 @@ public class HistoryService {
.bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS))
.inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC))
.outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC))
.aggregtionDuration(
Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS)))
.aggregationAdvance(
Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS)))
.numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS))
.commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS))
.cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING))
......
......@@ -21,18 +21,19 @@ public class KafkaStreamsBuilder {
private String bootstrapServers; // NOPMD
private String inputTopic; // NOPMD
private String outputTopic; // NOPMD
private Duration windowDuration; // NOPMD
private Duration aggregtionDuration; // NOPMD
private Duration aggregationAdvance; // NOPMD
private int numThreads = -1; // NOPMD
private int commitIntervalMs = -1; // NOPMD
private int cacheMaxBytesBuff = -1; // NOPMD
public KafkaStreamsBuilder inputTopic(final String inputTopic) {
this.inputTopic = inputTopic;
public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
return this;
}
public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
public KafkaStreamsBuilder inputTopic(final String inputTopic) {
this.inputTopic = inputTopic;
return this;
}
......@@ -41,6 +42,16 @@ public class KafkaStreamsBuilder {
return this;
}
public KafkaStreamsBuilder aggregtionDuration(final Duration aggregtionDuration) {
this.aggregtionDuration = aggregtionDuration;
return this;
}
public KafkaStreamsBuilder aggregationAdvance(final Duration aggregationAdvance) {
this.aggregationAdvance = aggregationAdvance;
return this;
}
/**
* Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus
* one for using the default.
......@@ -84,9 +95,15 @@ public class KafkaStreamsBuilder {
*/
public KafkaStreams build() {
Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
Objects.requireNonNull(this.outputTopic, "Output topic has not been set.");
Objects.requireNonNull(this.aggregtionDuration, "Aggregation duration has not been set.");
Objects.requireNonNull(this.aggregationAdvance, "Aggregation advance period has not been set.");
// TODO log parameters
final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic,
this.windowDuration);
final TopologyBuilder topologyBuilder = new TopologyBuilder(
this.inputTopic,
this.outputTopic,
this.aggregtionDuration,
this.aggregationAdvance);
final Properties properties = PropertiesBuilder.bootstrapServers(this.bootstrapServers)
.applicationId(APPLICATION_NAME + '-' + APPLICATION_VERSION) // TODO as parameter
.set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0)
......
......@@ -15,11 +15,8 @@ import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.common.kafka.GenericSerde;
import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
import titan.ccp.model.records.HourOfDayActivePowerRecord;
import titan.ccp.models.records.ActivePowerRecordFactory;
import uc4.streamprocessing.util.StatsFactory;
......@@ -28,13 +25,15 @@ import uc4.streamprocessing.util.StatsFactory;
*/
public class TopologyBuilder {
private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class);
// private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class);
private final ZoneId zone = ZoneId.of("Europe/Paris"); // TODO as parameter
private final String inputTopic;
private final String outputTopic;
private final Duration aggregtionDuration;
private final Duration aggregationAdvance;
private final StreamsBuilder builder = new StreamsBuilder();
......@@ -42,9 +41,11 @@ public class TopologyBuilder {
* Create a new {@link TopologyBuilder} using the given topics.
*/
public TopologyBuilder(final String inputTopic, final String outputTopic,
final Duration duration) {
final Duration aggregtionDuration, final Duration aggregationAdvance) {
this.inputTopic = inputTopic;
this.outputTopic = outputTopic;
this.aggregtionDuration = aggregtionDuration;
this.aggregationAdvance = aggregationAdvance;
}
/**
......@@ -54,19 +55,13 @@ public class TopologyBuilder {
final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory();
final Serde<HourOfDayKey> keySerde = HourOfDayKeySerde.create();
final StatsRecordFactory<HourOfDayKey, HourOfDayActivePowerRecord> statsRecordFactory =
new HourOfDayRecordFactory();
final TimeWindows timeWindows =
TimeWindows.of(Duration.ofDays(30)).advanceBy(Duration.ofDays(1));
// final StatsRecordFactory<HourOfDayKey, HourOfDayActivePowerRecord> statsRecordFactory = new
// HourOfDayRecordFactory();
this.builder
.stream(this.inputTopic,
Consumed.with(Serdes.String(),
IMonitoringRecordSerde.serde(new ActivePowerRecordFactory())))
// .mapValues(kieker -> new ActivePowerRecord(
// kieker.getIdentifier(),
// kieker.getTimestamp(),
// kieker.getValueInW()))
.selectKey((key, value) -> {
final Instant instant = Instant.ofEpochMilli(value.getTimestamp());
final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone);
......@@ -74,7 +69,7 @@ public class TopologyBuilder {
})
.groupByKey(
Grouped.with(keySerde, IMonitoringRecordSerde.serde(new ActivePowerRecordFactory())))
.windowedBy(timeWindows)
.windowedBy(TimeWindows.of(this.aggregtionDuration).advanceBy(this.aggregationAdvance))
.aggregate(
() -> Stats.of(),
(k, record, stats) -> StatsFactory.accumulate(stats, record.getValueInW()),
......
kafka.bootstrap.servers=localhost:9092
kafka.input.topic=input
kafka.output.topic=output
aggregtion.duration.days=30
aggregtion.advance.days=1
num.threads=1
commit.interval.ms=10
cache.max.bytes.buffering=-1
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment