Skip to content
Snippets Groups Projects
Commit 5a452d10 authored by Sören Henning's avatar Sören Henning
Browse files

Merge remote-tracking branch 'origin/master' into stu203404/spesb-feature/auto_build_theodolite

parents 40ccbe7b 75346e27
No related branches found
No related tags found
No related merge requests found
Showing
with 29 additions and 20 deletions
...@@ -110,14 +110,15 @@ public abstract class KafkaStreamsBuilder { ...@@ -110,14 +110,15 @@ public abstract class KafkaStreamsBuilder {
* *
* @return A {@code Topology} for a {@code KafkaStreams} application. * @return A {@code Topology} for a {@code KafkaStreams} application.
*/ */
protected abstract Topology buildTopology(); protected abstract Topology buildTopology(Properties properties);
/** /**
* Builds the {@link KafkaStreams} instance. * Builds the {@link KafkaStreams} instance.
*/ */
public KafkaStreams build() { public KafkaStreams build() {
// Create the Kafka streams instance. // Create the Kafka streams instance.
return new KafkaStreams(this.buildTopology(), this.buildProperties()); final Properties properties = this.buildProperties();
return new KafkaStreams(this.buildTopology(properties), properties);
} }
} }
package theodolite.uc1.streamprocessing; package theodolite.uc1.streamprocessing;
import com.google.gson.Gson; import com.google.gson.Gson;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.Topology;
...@@ -36,7 +37,7 @@ public class TopologyBuilder { ...@@ -36,7 +37,7 @@ public class TopologyBuilder {
/** /**
* Build the {@link Topology} for the History microservice. * Build the {@link Topology} for the History microservice.
*/ */
public Topology build() { public Topology build(final Properties properties) {
this.builder this.builder
.stream(this.inputTopic, Consumed.with( .stream(this.inputTopic, Consumed.with(
Serdes.String(), Serdes.String(),
...@@ -44,6 +45,6 @@ public class TopologyBuilder { ...@@ -44,6 +45,6 @@ public class TopologyBuilder {
.mapValues(v -> this.gson.toJson(v)) .mapValues(v -> this.gson.toJson(v))
.foreach((k, v) -> LOGGER.info("Key: " + k + " Value: " + v)); .foreach((k, v) -> LOGGER.info("Key: " + k + " Value: " + v));
return this.builder.build(); return this.builder.build(properties);
} }
} }
package theodolite.uc1.streamprocessing; package theodolite.uc1.streamprocessing;
import java.util.Objects; import java.util.Objects;
import java.util.Properties;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.Topology;
import theodolite.commons.kafkastreams.KafkaStreamsBuilder; import theodolite.commons.kafkastreams.KafkaStreamsBuilder;
...@@ -16,9 +17,9 @@ public class Uc1KafkaStreamsBuilder extends KafkaStreamsBuilder { ...@@ -16,9 +17,9 @@ public class Uc1KafkaStreamsBuilder extends KafkaStreamsBuilder {
} }
@Override @Override
protected Topology buildTopology() { protected Topology buildTopology(final Properties properties) {
Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
return new TopologyBuilder(this.inputTopic, return new TopologyBuilder(this.inputTopic,
new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl)).build(); new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl)).build(properties);
} }
} }
package theodolite.uc2.streamprocessing; package theodolite.uc2.streamprocessing;
import java.time.Duration; import java.time.Duration;
import java.util.Properties;
import java.util.Set; import java.util.Set;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValue;
...@@ -59,9 +60,9 @@ public class TopologyBuilder { ...@@ -59,9 +60,9 @@ public class TopologyBuilder {
final Duration emitPeriod, final Duration gracePeriod, final Duration emitPeriod, final Duration gracePeriod,
final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory) { final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory) {
this.inputTopic = inputTopic; this.inputTopic = inputTopic;
this.outputTopic = outputTopic;
this.feedbackTopic = feedbackTopic; this.feedbackTopic = feedbackTopic;
this.configurationTopic = configurationTopic; this.configurationTopic = configurationTopic;
this.outputTopic = outputTopic;
this.emitPeriod = emitPeriod; this.emitPeriod = emitPeriod;
this.gracePeriod = gracePeriod; this.gracePeriod = gracePeriod;
...@@ -71,7 +72,7 @@ public class TopologyBuilder { ...@@ -71,7 +72,7 @@ public class TopologyBuilder {
/** /**
* Build the {@link Topology} for the Aggregation microservice. * Build the {@link Topology} for the Aggregation microservice.
*/ */
public Topology build() { public Topology build(final Properties properties) {
// 1. Build Parent-Sensor Table // 1. Build Parent-Sensor Table
final KTable<String, Set<String>> parentSensorTable = this.buildParentSensorTable(); final KTable<String, Set<String>> parentSensorTable = this.buildParentSensorTable();
...@@ -92,7 +93,7 @@ public class TopologyBuilder { ...@@ -92,7 +93,7 @@ public class TopologyBuilder {
// 5. Expose Aggregations Stream // 5. Expose Aggregations Stream
this.exposeOutputStream(aggregations); this.exposeOutputStream(aggregations);
return this.builder.build(); return this.builder.build(properties);
} }
private KTable<String, ActivePowerRecord> buildInputTable() { private KTable<String, ActivePowerRecord> buildInputTable() {
......
...@@ -2,6 +2,7 @@ package theodolite.uc2.streamprocessing; ...@@ -2,6 +2,7 @@ package theodolite.uc2.streamprocessing;
import java.time.Duration; import java.time.Duration;
import java.util.Objects; import java.util.Objects;
import java.util.Properties;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.Topology;
import theodolite.commons.kafkastreams.KafkaStreamsBuilder; import theodolite.commons.kafkastreams.KafkaStreamsBuilder;
...@@ -51,7 +52,7 @@ public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD build ...@@ -51,7 +52,7 @@ public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD build
} }
@Override @Override
protected Topology buildTopology() { protected Topology buildTopology(final Properties properties) {
Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
Objects.requireNonNull(this.feedbackTopic, "Feedback topic has not been set."); Objects.requireNonNull(this.feedbackTopic, "Feedback topic has not been set.");
Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); Objects.requireNonNull(this.outputTopic, "Output topic has not been set.");
...@@ -59,14 +60,14 @@ public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD build ...@@ -59,14 +60,14 @@ public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD build
final TopologyBuilder topologyBuilder = new TopologyBuilder( final TopologyBuilder topologyBuilder = new TopologyBuilder(
this.inputTopic, this.inputTopic,
this.feedbackTopic,
this.outputTopic, this.outputTopic,
this.feedbackTopic,
this.configurationTopic, this.configurationTopic,
this.emitPeriod == null ? EMIT_PERIOD_DEFAULT : this.emitPeriod, this.emitPeriod == null ? EMIT_PERIOD_DEFAULT : this.emitPeriod,
this.gracePeriod == null ? GRACE_PERIOD_DEFAULT : this.gracePeriod, this.gracePeriod == null ? GRACE_PERIOD_DEFAULT : this.gracePeriod,
new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl)); new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl));
return topologyBuilder.build(); return topologyBuilder.build(properties);
} }
} }
...@@ -2,6 +2,7 @@ package theodolite.uc3.streamprocessing; ...@@ -2,6 +2,7 @@ package theodolite.uc3.streamprocessing;
import com.google.common.math.Stats; import com.google.common.math.Stats;
import java.time.Duration; import java.time.Duration;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilder;
...@@ -46,7 +47,7 @@ public class TopologyBuilder { ...@@ -46,7 +47,7 @@ public class TopologyBuilder {
/** /**
* Build the {@link Topology} for the History microservice. * Build the {@link Topology} for the History microservice.
*/ */
public Topology build() { public Topology build(final Properties properties) {
this.builder this.builder
.stream(this.inputTopic, .stream(this.inputTopic,
Consumed.with(Serdes.String(), Consumed.with(Serdes.String(),
...@@ -68,6 +69,6 @@ public class TopologyBuilder { ...@@ -68,6 +69,6 @@ public class TopologyBuilder {
.peek((k, v) -> LOGGER.info(k + ": " + v)) .peek((k, v) -> LOGGER.info(k + ": " + v))
.to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String())); .to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String()));
return this.builder.build(); return this.builder.build(properties);
} }
} }
...@@ -2,6 +2,7 @@ package theodolite.uc3.streamprocessing; ...@@ -2,6 +2,7 @@ package theodolite.uc3.streamprocessing;
import java.time.Duration; import java.time.Duration;
import java.util.Objects; import java.util.Objects;
import java.util.Properties;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.Topology;
import theodolite.commons.kafkastreams.KafkaStreamsBuilder; import theodolite.commons.kafkastreams.KafkaStreamsBuilder;
...@@ -30,14 +31,14 @@ public class Uc3KafkaStreamsBuilder extends KafkaStreamsBuilder { ...@@ -30,14 +31,14 @@ public class Uc3KafkaStreamsBuilder extends KafkaStreamsBuilder {
} }
@Override @Override
protected Topology buildTopology() { protected Topology buildTopology(final Properties properties) {
Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); Objects.requireNonNull(this.outputTopic, "Output topic has not been set.");
Objects.requireNonNull(this.windowDuration, "Window duration has not been set."); Objects.requireNonNull(this.windowDuration, "Window duration has not been set.");
final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic, final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic,
new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl), this.windowDuration); new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl), this.windowDuration);
return topologyBuilder.build(); return topologyBuilder.build(properties);
} }
} }
...@@ -5,6 +5,7 @@ import java.time.Duration; ...@@ -5,6 +5,7 @@ import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneId; import java.time.ZoneId;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValue;
...@@ -54,7 +55,7 @@ public class TopologyBuilder { ...@@ -54,7 +55,7 @@ public class TopologyBuilder {
/** /**
* Build the {@link Topology} for the History microservice. * Build the {@link Topology} for the History microservice.
*/ */
public Topology build() { public Topology build(final Properties properties) {
final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory(); final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory();
final Serde<HourOfDayKey> keySerde = HourOfDayKeySerde.create(); final Serde<HourOfDayKey> keySerde = HourOfDayKeySerde.create();
...@@ -89,6 +90,6 @@ public class TopologyBuilder { ...@@ -89,6 +90,6 @@ public class TopologyBuilder {
Serdes.String())); Serdes.String()));
// this.serdes.avroValues())); // this.serdes.avroValues()));
return this.builder.build(); return this.builder.build(properties);
} }
} }
...@@ -2,6 +2,7 @@ package theodolite.uc4.streamprocessing; ...@@ -2,6 +2,7 @@ package theodolite.uc4.streamprocessing;
import java.time.Duration; import java.time.Duration;
import java.util.Objects; import java.util.Objects;
import java.util.Properties;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.Topology;
import theodolite.commons.kafkastreams.KafkaStreamsBuilder; import theodolite.commons.kafkastreams.KafkaStreamsBuilder;
...@@ -36,7 +37,7 @@ public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder { ...@@ -36,7 +37,7 @@ public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder {
} }
@Override @Override
protected Topology buildTopology() { protected Topology buildTopology(final Properties properties) {
Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); Objects.requireNonNull(this.outputTopic, "Output topic has not been set.");
Objects.requireNonNull(this.aggregtionDuration, "Aggregation duration has not been set."); Objects.requireNonNull(this.aggregtionDuration, "Aggregation duration has not been set.");
...@@ -49,7 +50,7 @@ public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder { ...@@ -49,7 +50,7 @@ public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder {
this.aggregtionDuration, this.aggregtionDuration,
this.aggregationAdvance); this.aggregationAdvance);
return topologyBuilder.build(); return topologyBuilder.build(properties);
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment