Skip to content
Snippets Groups Projects
Commit c0a1dba3 authored by Björn Vonheiden's avatar Björn Vonheiden
Browse files

enable topic optimization

parent cac915db
No related branches found
No related tags found
No related merge requests found
Showing
with 27 additions and 18 deletions
...@@ -110,14 +110,15 @@ public abstract class KafkaStreamsBuilder { ...@@ -110,14 +110,15 @@ public abstract class KafkaStreamsBuilder {
* *
* @return A {@code Topology} for a {@code KafkaStreams} application. * @return A {@code Topology} for a {@code KafkaStreams} application.
*/ */
protected abstract Topology buildTopology(); protected abstract Topology buildTopology(Properties properties);
/** /**
* Builds the {@link KafkaStreams} instance. * Builds the {@link KafkaStreams} instance.
*/ */
public KafkaStreams build() { public KafkaStreams build() {
// Create the Kafka streams instance. // Create the Kafka streams instance.
return new KafkaStreams(this.buildTopology(), this.buildProperties()); final Properties properties = this.buildProperties();
return new KafkaStreams(this.buildTopology(properties), properties);
} }
} }
package theodolite.uc1.streamprocessing; package theodolite.uc1.streamprocessing;
import com.google.gson.Gson; import com.google.gson.Gson;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.Topology;
...@@ -36,7 +37,7 @@ public class TopologyBuilder { ...@@ -36,7 +37,7 @@ public class TopologyBuilder {
/** /**
* Build the {@link Topology} for the History microservice. * Build the {@link Topology} for the History microservice.
*/ */
public Topology build() { public Topology build(final Properties properties) {
this.builder this.builder
.stream(this.inputTopic, Consumed.with( .stream(this.inputTopic, Consumed.with(
Serdes.String(), Serdes.String(),
...@@ -44,6 +45,6 @@ public class TopologyBuilder { ...@@ -44,6 +45,6 @@ public class TopologyBuilder {
.mapValues(v -> this.gson.toJson(v)) .mapValues(v -> this.gson.toJson(v))
.foreach((k, v) -> LOGGER.info("Key: " + k + " Value: " + v)); .foreach((k, v) -> LOGGER.info("Key: " + k + " Value: " + v));
return this.builder.build(); return this.builder.build(properties);
} }
} }
package theodolite.uc1.streamprocessing; package theodolite.uc1.streamprocessing;
import java.util.Objects; import java.util.Objects;
import java.util.Properties;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.Topology;
import theodolite.commons.kafkastreams.KafkaStreamsBuilder; import theodolite.commons.kafkastreams.KafkaStreamsBuilder;
...@@ -16,9 +17,9 @@ public class Uc1KafkaStreamsBuilder extends KafkaStreamsBuilder { ...@@ -16,9 +17,9 @@ public class Uc1KafkaStreamsBuilder extends KafkaStreamsBuilder {
} }
@Override @Override
protected Topology buildTopology() { protected Topology buildTopology(final Properties properties) {
Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
return new TopologyBuilder(this.inputTopic, return new TopologyBuilder(this.inputTopic,
new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl)).build(); new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl)).build(properties);
} }
} }
package theodolite.uc2.streamprocessing; package theodolite.uc2.streamprocessing;
import java.time.Duration; import java.time.Duration;
import java.util.Properties;
import java.util.Set; import java.util.Set;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValue;
...@@ -71,7 +72,7 @@ public class TopologyBuilder { ...@@ -71,7 +72,7 @@ public class TopologyBuilder {
/** /**
* Build the {@link Topology} for the Aggregation microservice. * Build the {@link Topology} for the Aggregation microservice.
*/ */
public Topology build() { public Topology build(final Properties properties) {
// 1. Build Parent-Sensor Table // 1. Build Parent-Sensor Table
final KTable<String, Set<String>> parentSensorTable = this.buildParentSensorTable(); final KTable<String, Set<String>> parentSensorTable = this.buildParentSensorTable();
...@@ -92,7 +93,7 @@ public class TopologyBuilder { ...@@ -92,7 +93,7 @@ public class TopologyBuilder {
// 5. Expose Aggregations Stream // 5. Expose Aggregations Stream
this.exposeOutputStream(aggregations); this.exposeOutputStream(aggregations);
return this.builder.build(); return this.builder.build(properties);
} }
private KTable<String, ActivePowerRecord> buildInputTable() { private KTable<String, ActivePowerRecord> buildInputTable() {
......
...@@ -2,6 +2,7 @@ package theodolite.uc2.streamprocessing; ...@@ -2,6 +2,7 @@ package theodolite.uc2.streamprocessing;
import java.time.Duration; import java.time.Duration;
import java.util.Objects; import java.util.Objects;
import java.util.Properties;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.Topology;
import theodolite.commons.kafkastreams.KafkaStreamsBuilder; import theodolite.commons.kafkastreams.KafkaStreamsBuilder;
...@@ -51,7 +52,7 @@ public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD build ...@@ -51,7 +52,7 @@ public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD build
} }
@Override @Override
protected Topology buildTopology() { protected Topology buildTopology(final Properties properties) {
Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
Objects.requireNonNull(this.feedbackTopic, "Feedback topic has not been set."); Objects.requireNonNull(this.feedbackTopic, "Feedback topic has not been set.");
Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); Objects.requireNonNull(this.outputTopic, "Output topic has not been set.");
...@@ -66,7 +67,7 @@ public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD build ...@@ -66,7 +67,7 @@ public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD build
this.gracePeriod == null ? GRACE_PERIOD_DEFAULT : this.gracePeriod, this.gracePeriod == null ? GRACE_PERIOD_DEFAULT : this.gracePeriod,
new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl)); new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl));
return topologyBuilder.build(); return topologyBuilder.build(properties);
} }
} }
...@@ -2,6 +2,7 @@ package theodolite.uc3.streamprocessing; ...@@ -2,6 +2,7 @@ package theodolite.uc3.streamprocessing;
import com.google.common.math.Stats; import com.google.common.math.Stats;
import java.time.Duration; import java.time.Duration;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilder;
...@@ -46,7 +47,7 @@ public class TopologyBuilder { ...@@ -46,7 +47,7 @@ public class TopologyBuilder {
/** /**
* Build the {@link Topology} for the History microservice. * Build the {@link Topology} for the History microservice.
*/ */
public Topology build() { public Topology build(final Properties properties) {
this.builder this.builder
.stream(this.inputTopic, .stream(this.inputTopic,
Consumed.with(Serdes.String(), Consumed.with(Serdes.String(),
...@@ -68,6 +69,6 @@ public class TopologyBuilder { ...@@ -68,6 +69,6 @@ public class TopologyBuilder {
.peek((k, v) -> LOGGER.info(k + ": " + v)) .peek((k, v) -> LOGGER.info(k + ": " + v))
.to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String())); .to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String()));
return this.builder.build(); return this.builder.build(properties);
} }
} }
...@@ -2,6 +2,7 @@ package theodolite.uc3.streamprocessing; ...@@ -2,6 +2,7 @@ package theodolite.uc3.streamprocessing;
import java.time.Duration; import java.time.Duration;
import java.util.Objects; import java.util.Objects;
import java.util.Properties;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.Topology;
import theodolite.commons.kafkastreams.KafkaStreamsBuilder; import theodolite.commons.kafkastreams.KafkaStreamsBuilder;
...@@ -30,14 +31,14 @@ public class Uc3KafkaStreamsBuilder extends KafkaStreamsBuilder { ...@@ -30,14 +31,14 @@ public class Uc3KafkaStreamsBuilder extends KafkaStreamsBuilder {
} }
@Override @Override
protected Topology buildTopology() { protected Topology buildTopology(final Properties properties) {
Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); Objects.requireNonNull(this.outputTopic, "Output topic has not been set.");
Objects.requireNonNull(this.windowDuration, "Window duration has not been set."); Objects.requireNonNull(this.windowDuration, "Window duration has not been set.");
final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic, final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic,
new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl), this.windowDuration); new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl), this.windowDuration);
return topologyBuilder.build(); return topologyBuilder.build(properties);
} }
} }
...@@ -5,6 +5,7 @@ import java.time.Duration; ...@@ -5,6 +5,7 @@ import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneId; import java.time.ZoneId;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValue;
...@@ -54,7 +55,7 @@ public class TopologyBuilder { ...@@ -54,7 +55,7 @@ public class TopologyBuilder {
/** /**
* Build the {@link Topology} for the History microservice. * Build the {@link Topology} for the History microservice.
*/ */
public Topology build() { public Topology build(final Properties properties) {
final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory(); final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory();
final Serde<HourOfDayKey> keySerde = HourOfDayKeySerde.create(); final Serde<HourOfDayKey> keySerde = HourOfDayKeySerde.create();
...@@ -89,6 +90,6 @@ public class TopologyBuilder { ...@@ -89,6 +90,6 @@ public class TopologyBuilder {
Serdes.String())); Serdes.String()));
// this.serdes.avroValues())); // this.serdes.avroValues()));
return this.builder.build(); return this.builder.build(properties);
} }
} }
...@@ -2,6 +2,7 @@ package theodolite.uc4.streamprocessing; ...@@ -2,6 +2,7 @@ package theodolite.uc4.streamprocessing;
import java.time.Duration; import java.time.Duration;
import java.util.Objects; import java.util.Objects;
import java.util.Properties;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.Topology;
import theodolite.commons.kafkastreams.KafkaStreamsBuilder; import theodolite.commons.kafkastreams.KafkaStreamsBuilder;
...@@ -36,7 +37,7 @@ public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder { ...@@ -36,7 +37,7 @@ public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder {
} }
@Override @Override
protected Topology buildTopology() { protected Topology buildTopology(final Properties properties) {
Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); Objects.requireNonNull(this.outputTopic, "Output topic has not been set.");
Objects.requireNonNull(this.aggregtionDuration, "Aggregation duration has not been set."); Objects.requireNonNull(this.aggregtionDuration, "Aggregation duration has not been set.");
...@@ -49,7 +50,7 @@ public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder { ...@@ -49,7 +50,7 @@ public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder {
this.aggregtionDuration, this.aggregtionDuration,
this.aggregationAdvance); this.aggregationAdvance);
return topologyBuilder.build(); return topologyBuilder.build(properties);
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment