Skip to content
Snippets Groups Projects
Commit 37397183 authored by Sören Henning's avatar Sören Henning
Browse files

Fix (and suppress) deprecation warnings

parent 32e65a54
No related branches found
No related tags found
2 merge requests!236Upgrade Kafka Streams Benchmarks to Kafka Streams 3.1,!230Upgrade Kafka Streams Benchmarks to Kafka Streams 3.1
Pipeline #6346 failed
......@@ -70,18 +70,15 @@ public abstract class KafkaStreamsBuilder {
// optional configurations
this.setOptionalProperty(propBuilder, StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG,
this.config::getLong,
p -> p >= 0);
this.config::getLong, p -> p >= 0);
this.setOptionalProperty(propBuilder, StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG,
this.config::getInt, p -> p > 0);
this.setOptionalProperty(propBuilder, StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
this.config::getInt,
p -> p >= 0);
this.config::getInt, p -> p >= 0);
this.setOptionalProperty(propBuilder, StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
this.config::getInt, p -> p >= 0);
this.setOptionalProperty(propBuilder, StreamsConfig.MAX_TASK_IDLE_MS_CONFIG,
this.config::getLong,
p -> p >= 0);
this.config::getLong, p -> p >= 0);
this.setOptionalProperty(propBuilder, StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG,
this.config::getInt, p -> p >= 1);
this.setOptionalProperty(propBuilder, StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG,
......@@ -89,22 +86,28 @@ public abstract class KafkaStreamsBuilder {
this.setOptionalProperty(propBuilder, StreamsConfig.NUM_STREAM_THREADS_CONFIG,
this.config::getInt, p -> p > 0);
this.setOptionalProperty(propBuilder, StreamsConfig.POLL_MS_CONFIG,
this.config::getLong,
p -> p >= 0);
this.config::getLong, p -> p >= 0);
this.setOptionalProperty(propBuilder, StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
this.config::getString, p -> StreamsConfig.AT_LEAST_ONCE.equals(p)
|| StreamsConfig.EXACTLY_ONCE.equals(p) || StreamsConfig.EXACTLY_ONCE_BETA.equals(p));
this.config::getString, this::validateProcessingGuarantee);
this.setOptionalProperty(propBuilder, StreamsConfig.REPLICATION_FACTOR_CONFIG,
this.config::getInt, p -> p >= 0);
if (this.config.containsKey(StreamsConfig.TOPOLOGY_OPTIMIZATION)
&& this.config.getBoolean(StreamsConfig.TOPOLOGY_OPTIMIZATION)) {
propBuilder.set(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
if (this.config.containsKey(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG)
&& this.config.getBoolean(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG)) {
propBuilder.set(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
}
return propBuilder.build();
}
@SuppressWarnings("deprecation")
private boolean validateProcessingGuarantee(final String processingGuarantee) {
return StreamsConfig.AT_LEAST_ONCE.equals(processingGuarantee)
// We continue support EXACTLY_ONCE to allow benchmarking it against v2
|| StreamsConfig.EXACTLY_ONCE.equals(processingGuarantee)
|| StreamsConfig.EXACTLY_ONCE_V2.equals(processingGuarantee);
}
/**
* Method to implement a {@link Topology} for a {@code KafkaStreams} application.
*
......@@ -116,7 +119,7 @@ public abstract class KafkaStreamsBuilder {
* Builds the {@link KafkaStreams} instance.
*/
public KafkaStreams build() {
// Create the Kafka streams instance.
// Create the Kafka Streams instance.
final Properties properties = this.buildProperties();
return new KafkaStreams(this.buildTopology(properties), properties);
}
......
......@@ -53,7 +53,7 @@ public class TopologyBuilder {
Consumed.with(Serdes.String(),
this.srAvroSerdeFactory.<ActivePowerRecord>forValues()))
.groupByKey()
.windowedBy(TimeWindows.of(this.duration))
.windowedBy(TimeWindows.ofSizeWithNoGrace(this.duration))
// .aggregate(
// () -> 0.0,
// (key, activePowerRecord, agg) -> agg + activePowerRecord.getValueInW(),
......
......@@ -60,17 +60,18 @@ public class TopologyBuilder {
final Serde<HourOfDayKey> keySerde = HourOfDayKeySerde.create();
this.builder
.stream(this.inputTopic,
Consumed.with(Serdes.String(),
this.srAvroSerdeFactory.<ActivePowerRecord>forValues()))
.stream(this.inputTopic, Consumed.with(
Serdes.String(),
this.srAvroSerdeFactory.<ActivePowerRecord>forValues()))
.selectKey((key, value) -> {
final Instant instant = Instant.ofEpochMilli(value.getTimestamp());
final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone);
return keyFactory.createKey(value.getIdentifier(), dateTime);
})
.groupByKey(
Grouped.with(keySerde, this.srAvroSerdeFactory.forValues()))
.windowedBy(TimeWindows.of(this.aggregtionDuration).advanceBy(this.aggregationAdvance))
.groupByKey(Grouped.with(keySerde, this.srAvroSerdeFactory.forValues()))
.windowedBy(TimeWindows
.ofSizeWithNoGrace(this.aggregtionDuration)
.advanceBy(this.aggregationAdvance))
.aggregate(
() -> Stats.of(),
(k, record, stats) -> StatsFactory.accumulate(stats, record.getValueInW()),
......
......@@ -146,7 +146,7 @@ public class TopologyBuilder {
.groupByKey(Grouped.with(
SensorParentKeySerde.serde(),
this.srAvroSerdeFactory.forValues()))
.windowedBy(TimeWindows.of(this.emitPeriod).grace(this.gracePeriod))
.windowedBy(TimeWindows.ofSizeAndGrace(this.emitPeriod, this.gracePeriod))
.reduce(
// TODO Configurable window aggregation function
(oldVal, newVal) -> newVal.getTimestamp() >= oldVal.getTimestamp() ? newVal : oldVal,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment