diff --git a/theodolite-benchmarks/kstreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java b/theodolite-benchmarks/kstreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java index 89bd3147f0d3bb7a5fecc5d8c7d277bd294494ad..fe3cf484a81ee3561ad17b6b25d218cd011f2d5d 100644 --- a/theodolite-benchmarks/kstreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java +++ b/theodolite-benchmarks/kstreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java @@ -70,18 +70,15 @@ public abstract class KafkaStreamsBuilder { // optional configurations this.setOptionalProperty(propBuilder, StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, - this.config::getLong, - p -> p >= 0); + this.config::getLong, p -> p >= 0); this.setOptionalProperty(propBuilder, StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, this.config::getInt, p -> p > 0); this.setOptionalProperty(propBuilder, StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, - this.config::getInt, - p -> p >= 0); + this.config::getInt, p -> p >= 0); this.setOptionalProperty(propBuilder, StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.config::getInt, p -> p >= 0); this.setOptionalProperty(propBuilder, StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, - this.config::getLong, - p -> p >= 0); + this.config::getLong, p -> p >= 0); this.setOptionalProperty(propBuilder, StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, this.config::getInt, p -> p >= 1); this.setOptionalProperty(propBuilder, StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, @@ -89,22 +86,28 @@ public abstract class KafkaStreamsBuilder { this.setOptionalProperty(propBuilder, StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.config::getInt, p -> p > 0); this.setOptionalProperty(propBuilder, StreamsConfig.POLL_MS_CONFIG, - this.config::getLong, - p -> p >= 0); + this.config::getLong, p -> p >= 0); this.setOptionalProperty(propBuilder, StreamsConfig.PROCESSING_GUARANTEE_CONFIG, - this.config::getString, p -> StreamsConfig.AT_LEAST_ONCE.equals(p) - || StreamsConfig.EXACTLY_ONCE.equals(p) || StreamsConfig.EXACTLY_ONCE_BETA.equals(p)); + this.config::getString, this::validateProcessingGuarantee); this.setOptionalProperty(propBuilder, StreamsConfig.REPLICATION_FACTOR_CONFIG, this.config::getInt, p -> p >= 0); - if (this.config.containsKey(StreamsConfig.TOPOLOGY_OPTIMIZATION) - && this.config.getBoolean(StreamsConfig.TOPOLOGY_OPTIMIZATION)) { - propBuilder.set(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); + if (this.config.containsKey(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG) + && this.config.getBoolean(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG)) { + propBuilder.set(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); } return propBuilder.build(); } + @SuppressWarnings("deprecation") + private boolean validateProcessingGuarantee(final String processingGuarantee) { + return StreamsConfig.AT_LEAST_ONCE.equals(processingGuarantee) + // We continue support EXACTLY_ONCE to allow benchmarking it against v2 + || StreamsConfig.EXACTLY_ONCE.equals(processingGuarantee) + || StreamsConfig.EXACTLY_ONCE_V2.equals(processingGuarantee); + } + /** * Method to implement a {@link Topology} for a {@code KafkaStreams} application. * @@ -116,7 +119,7 @@ public abstract class KafkaStreamsBuilder { * Builds the {@link KafkaStreams} instance. */ public KafkaStreams build() { - // Create the Kafka streams instance. + // Create the Kafka Streams instance. final Properties properties = this.buildProperties(); return new KafkaStreams(this.buildTopology(properties), properties); } diff --git a/theodolite-benchmarks/uc2-kstreams/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java b/theodolite-benchmarks/uc2-kstreams/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java index eda7c495a2cff6d58b62a8a6a74ea8e1b2d89aca..21dcf14a9322ce5a6381f96f22f5fadb85cc78f0 100644 --- a/theodolite-benchmarks/uc2-kstreams/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java +++ b/theodolite-benchmarks/uc2-kstreams/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java @@ -53,7 +53,7 @@ public class TopologyBuilder { Consumed.with(Serdes.String(), this.srAvroSerdeFactory.<ActivePowerRecord>forValues())) .groupByKey() - .windowedBy(TimeWindows.of(this.duration)) + .windowedBy(TimeWindows.ofSizeWithNoGrace(this.duration)) // .aggregate( // () -> 0.0, // (key, activePowerRecord, agg) -> agg + activePowerRecord.getValueInW(), diff --git a/theodolite-benchmarks/uc3-kstreams/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java b/theodolite-benchmarks/uc3-kstreams/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java index 1e976c07158720b3681d89413a5f277b1395f32d..4c63e21f3d9f1af6c9ef0363d7d01939faae9aef 100644 --- a/theodolite-benchmarks/uc3-kstreams/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java +++ b/theodolite-benchmarks/uc3-kstreams/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java @@ -60,17 +60,18 @@ public class TopologyBuilder { final Serde<HourOfDayKey> keySerde = HourOfDayKeySerde.create(); this.builder - .stream(this.inputTopic, - Consumed.with(Serdes.String(), - this.srAvroSerdeFactory.<ActivePowerRecord>forValues())) + .stream(this.inputTopic, Consumed.with( + Serdes.String(), + this.srAvroSerdeFactory.<ActivePowerRecord>forValues())) .selectKey((key, value) -> { final Instant instant = Instant.ofEpochMilli(value.getTimestamp()); final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone); return keyFactory.createKey(value.getIdentifier(), dateTime); }) - .groupByKey( - Grouped.with(keySerde, this.srAvroSerdeFactory.forValues())) - .windowedBy(TimeWindows.of(this.aggregtionDuration).advanceBy(this.aggregationAdvance)) + .groupByKey(Grouped.with(keySerde, this.srAvroSerdeFactory.forValues())) + .windowedBy(TimeWindows + .ofSizeWithNoGrace(this.aggregtionDuration) + .advanceBy(this.aggregationAdvance)) .aggregate( () -> Stats.of(), (k, record, stats) -> StatsFactory.accumulate(stats, record.getValueInW()), diff --git a/theodolite-benchmarks/uc4-kstreams/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java b/theodolite-benchmarks/uc4-kstreams/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java index 623870313cd341d0594fee38d2fd0ae297abbeae..712b20cb63c2d9f6b77321eaf18eafe4b16854d2 100644 --- a/theodolite-benchmarks/uc4-kstreams/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java +++ b/theodolite-benchmarks/uc4-kstreams/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java @@ -146,7 +146,7 @@ public class TopologyBuilder { .groupByKey(Grouped.with( SensorParentKeySerde.serde(), this.srAvroSerdeFactory.forValues())) - .windowedBy(TimeWindows.of(this.emitPeriod).grace(this.gracePeriod)) + .windowedBy(TimeWindows.ofSizeAndGrace(this.emitPeriod, this.gracePeriod)) .reduce( // TODO Configurable window aggregation function (oldVal, newVal) -> newVal.getTimestamp() >= oldVal.getTimestamp() ? newVal : oldVal,