Skip to content
Snippets Groups Projects
Commit 2d87de45 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'upgrade-flink' into 'master'

Upgrade Kafka Streams Benchmarks to Kafka Streams 3.1

Closes #303

See merge request !236
parents 809402a5 3f0e61e8
No related branches found
No related tags found
1 merge request!236Upgrade Kafka Streams Benchmarks to Kafka Streams 3.1
Pipeline #6455 passed
...@@ -22,7 +22,7 @@ dependencies { ...@@ -22,7 +22,7 @@ dependencies {
// These dependencies are used internally, and not exposed to consumers on their own compile classpath. // These dependencies are used internally, and not exposed to consumers on their own compile classpath.
implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
implementation 'org.apache.kafka:kafka-streams:2.6.0' // enable TransformerSuppliers implementation 'org.apache.kafka:kafka-streams:3.1.0'
implementation 'com.google.code.gson:gson:2.8.2' implementation 'com.google.code.gson:gson:2.8.2'
implementation 'com.google.guava:guava:24.1-jre' implementation 'com.google.guava:guava:24.1-jre'
implementation 'org.slf4j:slf4j-simple:1.7.25' implementation 'org.slf4j:slf4j-simple:1.7.25'
......
...@@ -17,7 +17,7 @@ dependencies { ...@@ -17,7 +17,7 @@ dependencies {
// implementation 'org.slf4j:slf4j-simple:1.7.25' // implementation 'org.slf4j:slf4j-simple:1.7.25'
implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
implementation 'org.apache.kafka:kafka-streams:2.6.0' implementation 'org.apache.kafka:kafka-streams:3.1.0'
// Use JUnit test framework // Use JUnit test framework
testImplementation 'junit:junit:4.12' testImplementation 'junit:junit:4.12'
......
...@@ -70,18 +70,15 @@ public abstract class KafkaStreamsBuilder { ...@@ -70,18 +70,15 @@ public abstract class KafkaStreamsBuilder {
// optional configurations // optional configurations
this.setOptionalProperty(propBuilder, StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, this.setOptionalProperty(propBuilder, StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG,
this.config::getLong, this.config::getLong, p -> p >= 0);
p -> p >= 0);
this.setOptionalProperty(propBuilder, StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, this.setOptionalProperty(propBuilder, StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG,
this.config::getInt, p -> p > 0); this.config::getInt, p -> p > 0);
this.setOptionalProperty(propBuilder, StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.setOptionalProperty(propBuilder, StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
this.config::getInt, this.config::getInt, p -> p >= 0);
p -> p >= 0);
this.setOptionalProperty(propBuilder, StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.setOptionalProperty(propBuilder, StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
this.config::getInt, p -> p >= 0); this.config::getInt, p -> p >= 0);
this.setOptionalProperty(propBuilder, StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, this.setOptionalProperty(propBuilder, StreamsConfig.MAX_TASK_IDLE_MS_CONFIG,
this.config::getLong, this.config::getLong, p -> p >= 0);
p -> p >= 0);
this.setOptionalProperty(propBuilder, StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, this.setOptionalProperty(propBuilder, StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG,
this.config::getInt, p -> p >= 1); this.config::getInt, p -> p >= 1);
this.setOptionalProperty(propBuilder, StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, this.setOptionalProperty(propBuilder, StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG,
...@@ -89,22 +86,28 @@ public abstract class KafkaStreamsBuilder { ...@@ -89,22 +86,28 @@ public abstract class KafkaStreamsBuilder {
this.setOptionalProperty(propBuilder, StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.setOptionalProperty(propBuilder, StreamsConfig.NUM_STREAM_THREADS_CONFIG,
this.config::getInt, p -> p > 0); this.config::getInt, p -> p > 0);
this.setOptionalProperty(propBuilder, StreamsConfig.POLL_MS_CONFIG, this.setOptionalProperty(propBuilder, StreamsConfig.POLL_MS_CONFIG,
this.config::getLong, this.config::getLong, p -> p >= 0);
p -> p >= 0);
this.setOptionalProperty(propBuilder, StreamsConfig.PROCESSING_GUARANTEE_CONFIG, this.setOptionalProperty(propBuilder, StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
this.config::getString, p -> StreamsConfig.AT_LEAST_ONCE.equals(p) this.config::getString, this::validateProcessingGuarantee);
|| StreamsConfig.EXACTLY_ONCE.equals(p) || StreamsConfig.EXACTLY_ONCE_BETA.equals(p));
this.setOptionalProperty(propBuilder, StreamsConfig.REPLICATION_FACTOR_CONFIG, this.setOptionalProperty(propBuilder, StreamsConfig.REPLICATION_FACTOR_CONFIG,
this.config::getInt, p -> p >= 0); this.config::getInt, p -> p >= 0);
if (this.config.containsKey(StreamsConfig.TOPOLOGY_OPTIMIZATION) if (this.config.containsKey(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG)
&& this.config.getBoolean(StreamsConfig.TOPOLOGY_OPTIMIZATION)) { && this.config.getBoolean(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG)) {
propBuilder.set(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); propBuilder.set(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
} }
return propBuilder.build(); return propBuilder.build();
} }
@SuppressWarnings("deprecation")
private boolean validateProcessingGuarantee(final String processingGuarantee) {
return StreamsConfig.AT_LEAST_ONCE.equals(processingGuarantee)
// We continue support EXACTLY_ONCE to allow benchmarking it against v2
|| StreamsConfig.EXACTLY_ONCE.equals(processingGuarantee)
|| StreamsConfig.EXACTLY_ONCE_V2.equals(processingGuarantee);
}
/** /**
* Method to implement a {@link Topology} for a {@code KafkaStreams} application. * Method to implement a {@link Topology} for a {@code KafkaStreams} application.
* *
...@@ -116,7 +119,7 @@ public abstract class KafkaStreamsBuilder { ...@@ -116,7 +119,7 @@ public abstract class KafkaStreamsBuilder {
* Builds the {@link KafkaStreams} instance. * Builds the {@link KafkaStreams} instance.
*/ */
public KafkaStreams build() { public KafkaStreams build() {
// Create the Kafka streams instance. // Create the Kafka Streams instance.
final Properties properties = this.buildProperties(); final Properties properties = this.buildProperties();
return new KafkaStreams(this.buildTopology(properties), properties); return new KafkaStreams(this.buildTopology(properties), properties);
} }
......
...@@ -53,7 +53,7 @@ public class TopologyBuilder { ...@@ -53,7 +53,7 @@ public class TopologyBuilder {
Consumed.with(Serdes.String(), Consumed.with(Serdes.String(),
this.srAvroSerdeFactory.<ActivePowerRecord>forValues())) this.srAvroSerdeFactory.<ActivePowerRecord>forValues()))
.groupByKey() .groupByKey()
.windowedBy(TimeWindows.of(this.duration)) .windowedBy(TimeWindows.ofSizeWithNoGrace(this.duration))
// .aggregate( // .aggregate(
// () -> 0.0, // () -> 0.0,
// (key, activePowerRecord, agg) -> agg + activePowerRecord.getValueInW(), // (key, activePowerRecord, agg) -> agg + activePowerRecord.getValueInW(),
......
...@@ -60,17 +60,18 @@ public class TopologyBuilder { ...@@ -60,17 +60,18 @@ public class TopologyBuilder {
final Serde<HourOfDayKey> keySerde = HourOfDayKeySerde.create(); final Serde<HourOfDayKey> keySerde = HourOfDayKeySerde.create();
this.builder this.builder
.stream(this.inputTopic, .stream(this.inputTopic, Consumed.with(
Consumed.with(Serdes.String(), Serdes.String(),
this.srAvroSerdeFactory.<ActivePowerRecord>forValues())) this.srAvroSerdeFactory.<ActivePowerRecord>forValues()))
.selectKey((key, value) -> { .selectKey((key, value) -> {
final Instant instant = Instant.ofEpochMilli(value.getTimestamp()); final Instant instant = Instant.ofEpochMilli(value.getTimestamp());
final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone); final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone);
return keyFactory.createKey(value.getIdentifier(), dateTime); return keyFactory.createKey(value.getIdentifier(), dateTime);
}) })
.groupByKey( .groupByKey(Grouped.with(keySerde, this.srAvroSerdeFactory.forValues()))
Grouped.with(keySerde, this.srAvroSerdeFactory.forValues())) .windowedBy(TimeWindows
.windowedBy(TimeWindows.of(this.aggregtionDuration).advanceBy(this.aggregationAdvance)) .ofSizeWithNoGrace(this.aggregtionDuration)
.advanceBy(this.aggregationAdvance))
.aggregate( .aggregate(
() -> Stats.of(), () -> Stats.of(),
(k, record, stats) -> StatsFactory.accumulate(stats, record.getValueInW()), (k, record, stats) -> StatsFactory.accumulate(stats, record.getValueInW()),
......
...@@ -146,7 +146,7 @@ public class TopologyBuilder { ...@@ -146,7 +146,7 @@ public class TopologyBuilder {
.groupByKey(Grouped.with( .groupByKey(Grouped.with(
SensorParentKeySerde.serde(), SensorParentKeySerde.serde(),
this.srAvroSerdeFactory.forValues())) this.srAvroSerdeFactory.forValues()))
.windowedBy(TimeWindows.of(this.emitPeriod).grace(this.gracePeriod)) .windowedBy(TimeWindows.ofSizeAndGrace(this.emitPeriod, this.gracePeriod))
.reduce( .reduce(
// TODO Configurable window aggregation function // TODO Configurable window aggregation function
(oldVal, newVal) -> newVal.getTimestamp() >= oldVal.getTimestamp() ? newVal : oldVal, (oldVal, newVal) -> newVal.getTimestamp() >= oldVal.getTimestamp() ? newVal : oldVal,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment