Skip to content
Snippets Groups Projects
Commit 23a13526 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'feature/topologyOptimization' into 'master'

enable topic optimization

See merge request !76
parents 0f8ed021 1dc59867
No related branches found
No related tags found
1 merge request!76enable topic optimization
Pipeline #1439 passed
Pipeline: theodolite

#1440

    Showing
    with 27 additions and 18 deletions
    ...@@ -110,14 +110,15 @@ public abstract class KafkaStreamsBuilder { ...@@ -110,14 +110,15 @@ public abstract class KafkaStreamsBuilder {
    * *
    * @return A {@code Topology} for a {@code KafkaStreams} application. * @return A {@code Topology} for a {@code KafkaStreams} application.
    */ */
    protected abstract Topology buildTopology(); protected abstract Topology buildTopology(Properties properties);
    /** /**
    * Builds the {@link KafkaStreams} instance. * Builds the {@link KafkaStreams} instance.
    */ */
    public KafkaStreams build() { public KafkaStreams build() {
    // Create the Kafka streams instance. // Create the Kafka streams instance.
    return new KafkaStreams(this.buildTopology(), this.buildProperties()); final Properties properties = this.buildProperties();
    return new KafkaStreams(this.buildTopology(properties), properties);
    } }
    } }
    package theodolite.uc1.streamprocessing; package theodolite.uc1.streamprocessing;
    import com.google.gson.Gson; import com.google.gson.Gson;
    import java.util.Properties;
    import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.Topology;
    ...@@ -36,7 +37,7 @@ public class TopologyBuilder { ...@@ -36,7 +37,7 @@ public class TopologyBuilder {
    /** /**
    * Build the {@link Topology} for the History microservice. * Build the {@link Topology} for the History microservice.
    */ */
    public Topology build() { public Topology build(final Properties properties) {
    this.builder this.builder
    .stream(this.inputTopic, Consumed.with( .stream(this.inputTopic, Consumed.with(
    Serdes.String(), Serdes.String(),
    ...@@ -44,6 +45,6 @@ public class TopologyBuilder { ...@@ -44,6 +45,6 @@ public class TopologyBuilder {
    .mapValues(v -> this.gson.toJson(v)) .mapValues(v -> this.gson.toJson(v))
    .foreach((k, v) -> LOGGER.info("Key: " + k + " Value: " + v)); .foreach((k, v) -> LOGGER.info("Key: " + k + " Value: " + v));
    return this.builder.build(); return this.builder.build(properties);
    } }
    } }
    package theodolite.uc1.streamprocessing; package theodolite.uc1.streamprocessing;
    import java.util.Objects; import java.util.Objects;
    import java.util.Properties;
    import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
    import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.Topology;
    import theodolite.commons.kafkastreams.KafkaStreamsBuilder; import theodolite.commons.kafkastreams.KafkaStreamsBuilder;
    ...@@ -16,9 +17,9 @@ public class Uc1KafkaStreamsBuilder extends KafkaStreamsBuilder { ...@@ -16,9 +17,9 @@ public class Uc1KafkaStreamsBuilder extends KafkaStreamsBuilder {
    } }
    @Override @Override
    protected Topology buildTopology() { protected Topology buildTopology(final Properties properties) {
    Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
    return new TopologyBuilder(this.inputTopic, return new TopologyBuilder(this.inputTopic,
    new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl)).build(); new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl)).build(properties);
    } }
    } }
    package theodolite.uc2.streamprocessing; package theodolite.uc2.streamprocessing;
    import java.time.Duration; import java.time.Duration;
    import java.util.Properties;
    import java.util.Set; import java.util.Set;
    import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValue;
    ...@@ -71,7 +72,7 @@ public class TopologyBuilder { ...@@ -71,7 +72,7 @@ public class TopologyBuilder {
    /** /**
    * Build the {@link Topology} for the Aggregation microservice. * Build the {@link Topology} for the Aggregation microservice.
    */ */
    public Topology build() { public Topology build(final Properties properties) {
    // 1. Build Parent-Sensor Table // 1. Build Parent-Sensor Table
    final KTable<String, Set<String>> parentSensorTable = this.buildParentSensorTable(); final KTable<String, Set<String>> parentSensorTable = this.buildParentSensorTable();
    ...@@ -92,7 +93,7 @@ public class TopologyBuilder { ...@@ -92,7 +93,7 @@ public class TopologyBuilder {
    // 5. Expose Aggregations Stream // 5. Expose Aggregations Stream
    this.exposeOutputStream(aggregations); this.exposeOutputStream(aggregations);
    return this.builder.build(); return this.builder.build(properties);
    } }
    private KTable<String, ActivePowerRecord> buildInputTable() { private KTable<String, ActivePowerRecord> buildInputTable() {
    ......
    ...@@ -2,6 +2,7 @@ package theodolite.uc2.streamprocessing; ...@@ -2,6 +2,7 @@ package theodolite.uc2.streamprocessing;
    import java.time.Duration; import java.time.Duration;
    import java.util.Objects; import java.util.Objects;
    import java.util.Properties;
    import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
    import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.Topology;
    import theodolite.commons.kafkastreams.KafkaStreamsBuilder; import theodolite.commons.kafkastreams.KafkaStreamsBuilder;
    ...@@ -51,7 +52,7 @@ public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD build ...@@ -51,7 +52,7 @@ public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD build
    } }
    @Override @Override
    protected Topology buildTopology() { protected Topology buildTopology(final Properties properties) {
    Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
    Objects.requireNonNull(this.feedbackTopic, "Feedback topic has not been set."); Objects.requireNonNull(this.feedbackTopic, "Feedback topic has not been set.");
    Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); Objects.requireNonNull(this.outputTopic, "Output topic has not been set.");
    ...@@ -66,7 +67,7 @@ public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD build ...@@ -66,7 +67,7 @@ public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD build
    this.gracePeriod == null ? GRACE_PERIOD_DEFAULT : this.gracePeriod, this.gracePeriod == null ? GRACE_PERIOD_DEFAULT : this.gracePeriod,
    new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl)); new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl));
    return topologyBuilder.build(); return topologyBuilder.build(properties);
    } }
    } }
    ...@@ -2,6 +2,7 @@ package theodolite.uc3.streamprocessing; ...@@ -2,6 +2,7 @@ package theodolite.uc3.streamprocessing;
    import com.google.common.math.Stats; import com.google.common.math.Stats;
    import java.time.Duration; import java.time.Duration;
    import java.util.Properties;
    import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValue;
    import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilder;
    ...@@ -46,7 +47,7 @@ public class TopologyBuilder { ...@@ -46,7 +47,7 @@ public class TopologyBuilder {
    /** /**
    * Build the {@link Topology} for the History microservice. * Build the {@link Topology} for the History microservice.
    */ */
    public Topology build() { public Topology build(final Properties properties) {
    this.builder this.builder
    .stream(this.inputTopic, .stream(this.inputTopic,
    Consumed.with(Serdes.String(), Consumed.with(Serdes.String(),
    ...@@ -68,6 +69,6 @@ public class TopologyBuilder { ...@@ -68,6 +69,6 @@ public class TopologyBuilder {
    .peek((k, v) -> LOGGER.info(k + ": " + v)) .peek((k, v) -> LOGGER.info(k + ": " + v))
    .to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String())); .to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String()));
    return this.builder.build(); return this.builder.build(properties);
    } }
    } }
    ...@@ -2,6 +2,7 @@ package theodolite.uc3.streamprocessing; ...@@ -2,6 +2,7 @@ package theodolite.uc3.streamprocessing;
    import java.time.Duration; import java.time.Duration;
    import java.util.Objects; import java.util.Objects;
    import java.util.Properties;
    import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
    import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.Topology;
    import theodolite.commons.kafkastreams.KafkaStreamsBuilder; import theodolite.commons.kafkastreams.KafkaStreamsBuilder;
    ...@@ -30,14 +31,14 @@ public class Uc3KafkaStreamsBuilder extends KafkaStreamsBuilder { ...@@ -30,14 +31,14 @@ public class Uc3KafkaStreamsBuilder extends KafkaStreamsBuilder {
    } }
    @Override @Override
    protected Topology buildTopology() { protected Topology buildTopology(final Properties properties) {
    Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
    Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); Objects.requireNonNull(this.outputTopic, "Output topic has not been set.");
    Objects.requireNonNull(this.windowDuration, "Window duration has not been set."); Objects.requireNonNull(this.windowDuration, "Window duration has not been set.");
    final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic, final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic,
    new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl), this.windowDuration); new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl), this.windowDuration);
    return topologyBuilder.build(); return topologyBuilder.build(properties);
    } }
    } }
    ...@@ -5,6 +5,7 @@ import java.time.Duration; ...@@ -5,6 +5,7 @@ import java.time.Duration;
    import java.time.Instant; import java.time.Instant;
    import java.time.LocalDateTime; import java.time.LocalDateTime;
    import java.time.ZoneId; import java.time.ZoneId;
    import java.util.Properties;
    import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serde;
    import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValue;
    ...@@ -54,7 +55,7 @@ public class TopologyBuilder { ...@@ -54,7 +55,7 @@ public class TopologyBuilder {
    /** /**
    * Build the {@link Topology} for the History microservice. * Build the {@link Topology} for the History microservice.
    */ */
    public Topology build() { public Topology build(final Properties properties) {
    final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory(); final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory();
    final Serde<HourOfDayKey> keySerde = HourOfDayKeySerde.create(); final Serde<HourOfDayKey> keySerde = HourOfDayKeySerde.create();
    ...@@ -89,6 +90,6 @@ public class TopologyBuilder { ...@@ -89,6 +90,6 @@ public class TopologyBuilder {
    Serdes.String())); Serdes.String()));
    // this.serdes.avroValues())); // this.serdes.avroValues()));
    return this.builder.build(); return this.builder.build(properties);
    } }
    } }
    ...@@ -2,6 +2,7 @@ package theodolite.uc4.streamprocessing; ...@@ -2,6 +2,7 @@ package theodolite.uc4.streamprocessing;
    import java.time.Duration; import java.time.Duration;
    import java.util.Objects; import java.util.Objects;
    import java.util.Properties;
    import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
    import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.Topology;
    import theodolite.commons.kafkastreams.KafkaStreamsBuilder; import theodolite.commons.kafkastreams.KafkaStreamsBuilder;
    ...@@ -36,7 +37,7 @@ public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder { ...@@ -36,7 +37,7 @@ public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder {
    } }
    @Override @Override
    protected Topology buildTopology() { protected Topology buildTopology(final Properties properties) {
    Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
    Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); Objects.requireNonNull(this.outputTopic, "Output topic has not been set.");
    Objects.requireNonNull(this.aggregtionDuration, "Aggregation duration has not been set."); Objects.requireNonNull(this.aggregtionDuration, "Aggregation duration has not been set.");
    ...@@ -49,7 +50,7 @@ public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder { ...@@ -49,7 +50,7 @@ public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder {
    this.aggregtionDuration, this.aggregtionDuration,
    this.aggregationAdvance); this.aggregationAdvance);
    return topologyBuilder.build(); return topologyBuilder.build(properties);
    } }
    } }
    0% Loading or .
    You are about to add 0 people to the discussion. Proceed with caution.
    Please register or to comment