diff --git a/uc4-application/src/main/java/uc4/application/ConfigurationKeys.java b/uc4-application/src/main/java/uc4/application/ConfigurationKeys.java index b7468ed090b18a934cdcaa7822f774ac9645e4d2..7cbdcaa82f3e32c88edec52329672ad5930fdde5 100644 --- a/uc4-application/src/main/java/uc4/application/ConfigurationKeys.java +++ b/uc4-application/src/main/java/uc4/application/ConfigurationKeys.java @@ -7,9 +7,13 @@ public final class ConfigurationKeys { public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; + public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; + public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; - public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; + public static final String AGGREGATION_DURATION_DAYS = "aggregtion.duration.days"; + + public static final String AGGREGATION_ADVANCE_DAYS = "aggregtion.advance.days"; public static final String NUM_THREADS = "num.threads"; diff --git a/uc4-application/src/main/java/uc4/application/HistoryService.java b/uc4-application/src/main/java/uc4/application/HistoryService.java index 2e08a92d428a543eba76f8a55dbbbf6863e0480a..b6f9c13e018aead3dcb41ad93b88de0dc8b96743 100644 --- a/uc4-application/src/main/java/uc4/application/HistoryService.java +++ b/uc4-application/src/main/java/uc4/application/HistoryService.java @@ -1,6 +1,6 @@ package uc4.application; -import java.util.Objects; +import java.time.Duration; import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; @@ -18,12 +18,8 @@ public class HistoryService { private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); - final String schemaRegistry = Objects.requireNonNull(System.getenv("SCHEMA_REGISTRY_URL")); - /** * Start the service. - * - * @return {@link CompletableFuture} which is completed when the service is successfully started. */ public void run() { this.createKafkaStreamsApplication(); @@ -38,6 +34,10 @@ public class HistoryService { .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) + .aggregtionDuration( + Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS))) + .aggregationAdvance( + Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS))) .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) diff --git a/uc4-application/src/main/java/uc4/streamprocessing/KafkaStreamsBuilder.java b/uc4-application/src/main/java/uc4/streamprocessing/KafkaStreamsBuilder.java index a66150462ed5aa40bda371bab565b272fa333f99..5cef5ffd446665faebb456ea9f836938ea2a5557 100644 --- a/uc4-application/src/main/java/uc4/streamprocessing/KafkaStreamsBuilder.java +++ b/uc4-application/src/main/java/uc4/streamprocessing/KafkaStreamsBuilder.java @@ -21,18 +21,19 @@ public class KafkaStreamsBuilder { private String bootstrapServers; // NOPMD private String inputTopic; // NOPMD private String outputTopic; // NOPMD - private Duration windowDuration; // NOPMD + private Duration aggregtionDuration; // NOPMD + private Duration aggregationAdvance; // NOPMD private int numThreads = -1; // NOPMD private int commitIntervalMs = -1; // NOPMD private int cacheMaxBytesBuff = -1; // NOPMD - public KafkaStreamsBuilder inputTopic(final String inputTopic) { - this.inputTopic = inputTopic; + public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) { + this.bootstrapServers = bootstrapServers; return this; } - public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) { - this.bootstrapServers = bootstrapServers; + public KafkaStreamsBuilder inputTopic(final String inputTopic) { + this.inputTopic = inputTopic; return this; } @@ -41,6 +42,16 @@ public class KafkaStreamsBuilder { return this; } + public KafkaStreamsBuilder aggregtionDuration(final Duration aggregtionDuration) { + this.aggregtionDuration = aggregtionDuration; + return this; + } + + public KafkaStreamsBuilder aggregationAdvance(final Duration aggregationAdvance) { + this.aggregationAdvance = aggregationAdvance; + return this; + } + /** * Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus * one for using the default. @@ -84,9 +95,15 @@ public class KafkaStreamsBuilder { */ public KafkaStreams build() { Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); + Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); + Objects.requireNonNull(this.aggregtionDuration, "Aggregation duration has not been set."); + Objects.requireNonNull(this.aggregationAdvance, "Aggregation advance period has not been set."); // TODO log parameters - final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic, - this.windowDuration); + final TopologyBuilder topologyBuilder = new TopologyBuilder( + this.inputTopic, + this.outputTopic, + this.aggregtionDuration, + this.aggregationAdvance); final Properties properties = PropertiesBuilder.bootstrapServers(this.bootstrapServers) .applicationId(APPLICATION_NAME + '-' + APPLICATION_VERSION) // TODO as parameter .set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0) diff --git a/uc4-application/src/main/java/uc4/streamprocessing/TopologyBuilder.java b/uc4-application/src/main/java/uc4/streamprocessing/TopologyBuilder.java index 59cbfc3e2ab4f495f40a35ce543732223430a4fc..4a76ed883d80449d25ace7bb243846e009e8894c 100644 --- a/uc4-application/src/main/java/uc4/streamprocessing/TopologyBuilder.java +++ b/uc4-application/src/main/java/uc4/streamprocessing/TopologyBuilder.java @@ -15,11 +15,8 @@ import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.TimeWindows; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import titan.ccp.common.kafka.GenericSerde; import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; -import titan.ccp.model.records.HourOfDayActivePowerRecord; import titan.ccp.models.records.ActivePowerRecordFactory; import uc4.streamprocessing.util.StatsFactory; @@ -28,13 +25,15 @@ import uc4.streamprocessing.util.StatsFactory; */ public class TopologyBuilder { - private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); + // private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); private final ZoneId zone = ZoneId.of("Europe/Paris"); // TODO as parameter private final String inputTopic; private final String outputTopic; + private final Duration aggregtionDuration; + private final Duration aggregationAdvance; private final StreamsBuilder builder = new StreamsBuilder(); @@ -42,9 +41,11 @@ public class TopologyBuilder { * Create a new {@link TopologyBuilder} using the given topics. */ public TopologyBuilder(final String inputTopic, final String outputTopic, - final Duration duration) { + final Duration aggregtionDuration, final Duration aggregationAdvance) { this.inputTopic = inputTopic; this.outputTopic = outputTopic; + this.aggregtionDuration = aggregtionDuration; + this.aggregationAdvance = aggregationAdvance; } /** @@ -54,19 +55,13 @@ public class TopologyBuilder { final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory(); final Serde<HourOfDayKey> keySerde = HourOfDayKeySerde.create(); - final StatsRecordFactory<HourOfDayKey, HourOfDayActivePowerRecord> statsRecordFactory = - new HourOfDayRecordFactory(); - final TimeWindows timeWindows = - TimeWindows.of(Duration.ofDays(30)).advanceBy(Duration.ofDays(1)); + // final StatsRecordFactory<HourOfDayKey, HourOfDayActivePowerRecord> statsRecordFactory = new + // HourOfDayRecordFactory(); this.builder .stream(this.inputTopic, Consumed.with(Serdes.String(), IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) - // .mapValues(kieker -> new ActivePowerRecord( - // kieker.getIdentifier(), - // kieker.getTimestamp(), - // kieker.getValueInW())) .selectKey((key, value) -> { final Instant instant = Instant.ofEpochMilli(value.getTimestamp()); final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone); @@ -74,7 +69,7 @@ public class TopologyBuilder { }) .groupByKey( Grouped.with(keySerde, IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) - .windowedBy(timeWindows) + .windowedBy(TimeWindows.of(this.aggregtionDuration).advanceBy(this.aggregationAdvance)) .aggregate( () -> Stats.of(), (k, record, stats) -> StatsFactory.accumulate(stats, record.getValueInW()), diff --git a/uc4-application/src/main/resources/META-INF/application.properties b/uc4-application/src/main/resources/META-INF/application.properties index d2002fd1c8841368d47017b2ce7939bfc42877aa..ca6bdf3c81d2fdb689cdcc5af81fe9a3830d0552 100644 --- a/uc4-application/src/main/resources/META-INF/application.properties +++ b/uc4-application/src/main/resources/META-INF/application.properties @@ -1,6 +1,8 @@ kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input kafka.output.topic=output +aggregtion.duration.days=30 +aggregtion.advance.days=1 num.threads=1 commit.interval.ms=10 cache.max.bytes.buffering=-1