Skip to content
Snippets Groups Projects
Commit ec54912f authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'master' of git.se.informatik.uni-kiel.de:she/theodolite

parents 8d206f50 43edba38
No related branches found
No related tags found
No related merge requests found
Pipeline #905 passed with warnings
Showing
with 183 additions and 133 deletions
...@@ -22,14 +22,16 @@ public final class ConfigurationKeys { ...@@ -22,14 +22,16 @@ public final class ConfigurationKeys {
public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
// Additional topics // Additional topics
public static final String KAFKA_FEEDBACK_TOPIC = "kafka.feedback.topic";
public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
public static final String CONFIGURATION_KAFKA_TOPIC = "configuration.kafka.topic"; public static final String KAFKA_CONFIGURATION_TOPIC = "kafka.configuration.topic";
// UC2 // UC2
public static final String WINDOW_SIZE_MS = "window.size.ms"; public static final String EMIT_PERIOD_MS = "emit.period.ms";
public static final String WINDOW_GRACE_MS = "window.grace.ms"; public static final String GRACE_PERIOD_MS = "grace.period.ms";
// UC3 // UC3
public static final String KAFKA_WINDOW_DURATION_MINUTES = "kafka.window.duration.minutes"; public static final String KAFKA_WINDOW_DURATION_MINUTES = "kafka.window.duration.minutes";
......
...@@ -64,9 +64,10 @@ configure(useCaseApplications) { ...@@ -64,9 +64,10 @@ configure(useCaseApplications) {
// These dependencies are used internally, and not exposed to consumers on their own compile classpath. // These dependencies are used internally, and not exposed to consumers on their own compile classpath.
implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
implementation 'org.apache.kafka:kafka-streams:2.6.0' // enable TransformerSuppliers
implementation 'com.google.guava:guava:24.1-jre' implementation 'com.google.guava:guava:24.1-jre'
implementation 'org.jctools:jctools-core:2.1.1' implementation 'org.jctools:jctools-core:2.1.1'
implementation 'org.slf4j:slf4j-simple:1.6.1' implementation 'org.slf4j:slf4j-simple:1.7.25'
implementation project(':application-kafkastreams-commons') implementation project(':application-kafkastreams-commons')
// Use JUnit test framework // Use JUnit test framework
...@@ -82,7 +83,7 @@ configure(useCaseGenerators) { ...@@ -82,7 +83,7 @@ configure(useCaseGenerators) {
implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
implementation 'com.google.guava:guava:24.1-jre' implementation 'com.google.guava:guava:24.1-jre'
implementation 'org.jctools:jctools-core:2.1.1' implementation 'org.jctools:jctools-core:2.1.1'
implementation 'org.slf4j:slf4j-simple:1.6.1' implementation 'org.slf4j:slf4j-simple:1.7.25'
// These dependencies are used for the workload-generator-commmon // These dependencies are used for the workload-generator-commmon
implementation project(':workload-generator-commons') implementation project(':workload-generator-commons')
...@@ -96,7 +97,7 @@ configure(useCaseGenerators) { ...@@ -96,7 +97,7 @@ configure(useCaseGenerators) {
configure(commonProjects) { configure(commonProjects) {
dependencies { dependencies {
// These dependencies are used internally, and not exposed to consumers on their own compile classpath. // These dependencies are used internally, and not exposed to consumers on their own compile classpath.
implementation 'org.slf4j:slf4j-simple:1.6.1' implementation 'org.slf4j:slf4j-simple:1.7.25'
implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
......
...@@ -22,7 +22,7 @@ echo "EXECUTION_MINUTES: $EXECUTION_MINUTES" ...@@ -22,7 +22,7 @@ echo "EXECUTION_MINUTES: $EXECUTION_MINUTES"
#PARTITIONS=40 #PARTITIONS=40
#kubectl run temp-kafka --rm --attach --restart=Never --image=solsson/kafka --command -- bash -c "./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic input --partitions $PARTITIONS --replication-factor 1; ./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic configuration --partitions 1 --replication-factor 1; ./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic output --partitions $PARTITIONS --replication-factor 1" #kubectl run temp-kafka --rm --attach --restart=Never --image=solsson/kafka --command -- bash -c "./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic input --partitions $PARTITIONS --replication-factor 1; ./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic configuration --partitions 1 --replication-factor 1; ./bin/kafka-topics.sh --zookeeper my-confluent-cp-zookeeper:2181 --create --topic output --partitions $PARTITIONS --replication-factor 1"
PARTITIONS=$PARTITIONS PARTITIONS=$PARTITIONS
kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic input --partitions $PARTITIONS --replication-factor 1; kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic configuration --partitions 1 --replication-factor 1; kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic output --partitions $PARTITIONS --replication-factor 1" kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic input --partitions $PARTITIONS --replication-factor 1; kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic aggregation-feedback --partitions $PARTITIONS --replication-factor 1; kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic configuration --partitions 1 --replication-factor 1; kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --create --topic output --partitions $PARTITIONS --replication-factor 1"
# Start workload generator # Start workload generator
NUM_NESTED_GROUPS=$DIM_VALUE NUM_NESTED_GROUPS=$DIM_VALUE
...@@ -73,9 +73,9 @@ echo "$APPLICATION_YAML" | kubectl delete -f - ...@@ -73,9 +73,9 @@ echo "$APPLICATION_YAML" | kubectl delete -f -
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input,output,configuration,titan-.*'" #kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input,output,configuration,titan-.*'"
echo "Finished execution, print topics:" echo "Finished execution, print topics:"
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p' #kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p'
while test $(kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(theodolite-.*|input|output|configuration)( - marked for deletion)?$/p' | wc -l) -gt 0 while test $(kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(theodolite-.*|input|aggregation-feedback|output|configuration)( - marked for deletion)?$/p' | wc -l) -gt 0
do do
kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input|output|configuration|theodolite-.*'" kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --delete --topic 'input|aggregation-feedback|output|configuration|theodolite-.*'"
echo "Wait for topic deletion" echo "Wait for topic deletion"
sleep 5s sleep 5s
#echo "Finished waiting, print topics:" #echo "Finished waiting, print topics:"
...@@ -104,7 +104,7 @@ do ...@@ -104,7 +104,7 @@ do
if [ $found -ne 1 ]; then if [ $found -ne 1 ]; then
echo "ZooKeeper reset was successful." echo "ZooKeeper reset was successful."
break break
else else
echo "ZooKeeper reset was not successful. Retrying in 5s." echo "ZooKeeper reset was not successful. Retrying in 5s."
sleep 5s sleep 5s
fi fi
......
...@@ -28,6 +28,8 @@ spec: ...@@ -28,6 +28,8 @@ spec:
value: "{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}" value: "{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}"
- name: JAVA_OPTS - name: JAVA_OPTS
value: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=5555" value: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=5555"
- name: LOG_LEVEL
value: "INFO"
resources: resources:
limits: limits:
memory: "{{MEMORY_LIMIT}}" memory: "{{MEMORY_LIMIT}}"
......
...@@ -40,10 +40,11 @@ public class AggregationService { ...@@ -40,10 +40,11 @@ public class AggregationService {
final Uc2KafkaStreamsBuilder uc2KafkaStreamsBuilder = new Uc2KafkaStreamsBuilder(); final Uc2KafkaStreamsBuilder uc2KafkaStreamsBuilder = new Uc2KafkaStreamsBuilder();
uc2KafkaStreamsBuilder uc2KafkaStreamsBuilder
.inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC))
.feedbackTopic(this.config.getString(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC))
.outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC))
.configurationTopic(this.config.getString(ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC)) .configurationTopic(this.config.getString(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC))
.windowSize(Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_SIZE_MS))) .emitPeriod(Duration.ofMillis(this.config.getLong(ConfigurationKeys.EMIT_PERIOD_MS)))
.gracePeriod(Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_GRACE_MS))); .gracePeriod(Duration.ofMillis(this.config.getLong(ConfigurationKeys.GRACE_PERIOD_MS)));
// Configuration of the stream application // Configuration of the stream application
final KafkaStreams kafkaStreams = uc2KafkaStreamsBuilder final KafkaStreams kafkaStreams = uc2KafkaStreamsBuilder
......
package theodolite.uc2.application;
/**
* Keys to access configuration parameters.
*/
public final class ConfigurationKeys {
public static final String APPLICATION_NAME = "application.name";
public static final String APPLICATION_VERSION = "application.version";
public static final String CONFIGURATION_KAFKA_TOPIC = "configuration.kafka.topic";
public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
public static final String WINDOW_SIZE_MS = "window.size.ms";
public static final String WINDOW_GRACE_MS = "window.grace.ms";
public static final String NUM_THREADS = "num.threads";
public static final String COMMIT_INTERVAL_MS = "commit.interval.ms";
public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering";
private ConfigurationKeys() {}
}
...@@ -5,6 +5,7 @@ import java.util.Optional; ...@@ -5,6 +5,7 @@ import java.util.Optional;
import java.util.Set; import java.util.Set;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier; import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.StoreBuilder;
...@@ -13,40 +14,27 @@ import titan.ccp.configuration.events.Event; ...@@ -13,40 +14,27 @@ import titan.ccp.configuration.events.Event;
import titan.ccp.model.sensorregistry.SensorRegistry; import titan.ccp.model.sensorregistry.SensorRegistry;
/** /**
* Factory class configuration required by {@link ChildParentsTransformer}. * Supplier class for a {@link ChildParentsTransformer}.
*/ */
public class ChildParentsTransformerFactory { public class ChildParentsTransformerSupplier implements
TransformerSupplier<Event, SensorRegistry, Iterable<KeyValue<String, Optional<Set<String>>>>> {
private static final String STORE_NAME = "CHILD-PARENTS-TRANSFORM-STATE"; private static final String STORE_NAME = "CHILD-PARENTS-TRANSFORM-STATE";
/** @Override
* Returns a {@link TransformerSupplier} for {@link ChildParentsTransformer}. public Transformer<Event, SensorRegistry, Iterable<KeyValue<String, Optional<Set<String>>>>> get() { // NOCS
*/ return new ChildParentsTransformer(STORE_NAME);
public TransformerSupplier<Event, SensorRegistry, Iterable<KeyValue<String, Optional<Set<String>>>>> getTransformerSupplier() { // NOCS
return new TransformerSupplier<>() {
@Override
public ChildParentsTransformer get() {
return new ChildParentsTransformer(STORE_NAME);
}
};
} }
/** @Override
* Returns a {@link StoreBuilder} for {@link ChildParentsTransformer}. public Set<StoreBuilder<?>> stores() {
*/ final StoreBuilder<KeyValueStore<String, Set<String>>> store = Stores.keyValueStoreBuilder(
public StoreBuilder<KeyValueStore<String, Set<String>>> getStoreBuilder() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(STORE_NAME), Stores.persistentKeyValueStore(STORE_NAME),
Serdes.String(), Serdes.String(),
ParentsSerde.serde()) ParentsSerde.serde())
.withLoggingEnabled(Map.of()); .withLoggingEnabled(Map.of());
}
/** return Set.of(store);
* Returns the store name for {@link ChildParentsTransformer}.
*/
public String getStoreName() {
return STORE_NAME;
} }
} }
...@@ -4,6 +4,7 @@ import java.util.Map; ...@@ -4,6 +4,7 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier; import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.StoreBuilder;
...@@ -11,40 +12,27 @@ import org.apache.kafka.streams.state.Stores; ...@@ -11,40 +12,27 @@ import org.apache.kafka.streams.state.Stores;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
/** /**
* Factory class configuration required by {@link JointFlatTransformerFactory}. * Supplier class for {@link JointFlatTransformerSupplier}.
*/ */
public class JointFlatTransformerFactory { public class JointFlatTransformerSupplier implements
TransformerSupplier<String, JointRecordParents, Iterable<KeyValue<SensorParentKey, ActivePowerRecord>>> { // NOCS
private static final String STORE_NAME = "JOINT-FLAT-MAP-TRANSFORM-STATE"; private static final String STORE_NAME = "JOINT-FLAT-MAP-TRANSFORM-STATE";
/** @Override
* Returns a {@link TransformerSupplier} for {@link JointFlatTransformer}. public Transformer<String, JointRecordParents, Iterable<KeyValue<SensorParentKey, ActivePowerRecord>>> get() { // NOCS
*/ return new JointFlatTransformer(STORE_NAME);
public TransformerSupplier<String, JointRecordParents, Iterable<KeyValue<SensorParentKey, ActivePowerRecord>>> getTransformerSupplier() { // NOCS
return new TransformerSupplier<>() {
@Override
public JointFlatTransformer get() {
return new JointFlatTransformer(STORE_NAME);
}
};
} }
/** @Override
* Returns a {@link StoreBuilder} for {@link JointFlatTransformer}. public Set<StoreBuilder<?>> stores() {
*/ final StoreBuilder<KeyValueStore<String, Set<String>>> store = Stores.keyValueStoreBuilder(
public StoreBuilder<KeyValueStore<String, Set<String>>> getStoreBuilder() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(STORE_NAME), Stores.persistentKeyValueStore(STORE_NAME),
Serdes.String(), Serdes.String(),
ParentsSerde.serde()) ParentsSerde.serde())
.withLoggingEnabled(Map.of()); .withLoggingEnabled(Map.of());
}
/** return Set.of(store);
* Returns the store name for {@link JointFlatTransformer}.
*/
public String getStoreName() {
return STORE_NAME;
} }
} }
package theodolite.uc2.streamprocessing; package theodolite.uc2.streamprocessing;
import com.google.common.math.StatsAccumulator;
import java.time.Duration; import java.time.Duration;
import java.util.Set; import java.util.Set;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
...@@ -31,40 +30,48 @@ import titan.ccp.model.sensorregistry.SensorRegistry; ...@@ -31,40 +30,48 @@ import titan.ccp.model.sensorregistry.SensorRegistry;
* Builds Kafka Stream Topology for the History microservice. * Builds Kafka Stream Topology for the History microservice.
*/ */
public class TopologyBuilder { public class TopologyBuilder {
// Streams Variables
private static final int LATENCY_OUTPOUT_THRESHOLD = 1000;
private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class);
private final String inputTopic; private final String inputTopic;
private final String feedbackTopic;
private final String outputTopic; private final String outputTopic;
private final String configurationTopic; private final String configurationTopic;
private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory; private final Duration emitPeriod;
private final Duration windowSize;
private final Duration gracePeriod; private final Duration gracePeriod;
// SERDEs
private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory;
private final StreamsBuilder builder = new StreamsBuilder(); private final StreamsBuilder builder = new StreamsBuilder();
private final RecordAggregator recordAggregator = new RecordAggregator(); private final RecordAggregator recordAggregator = new RecordAggregator();
private StatsAccumulator latencyStats = new StatsAccumulator();
private long lastTime = System.currentTimeMillis();
/** /**
* Create a new {@link TopologyBuilder} using the given topics. * Create a new {@link TopologyBuilder} using the given topics.
*
* @param inputTopic The topic where to read sensor measurements from.
* @param configurationTopic The topic where the hierarchy of the sensors is published.
* @param feedbackTopic The topic where aggregation results are written to for feedback.
* @param outputTopic The topic where to publish aggregation results.
* @param emitPeriod The Duration results are emitted with.
* @param gracePeriod The Duration for how long late arriving records are considered.
* @param srAvroSerdeFactory Factory for creating avro SERDEs
*
*/ */
public TopologyBuilder(final String inputTopic, final String outputTopic, public TopologyBuilder(final String inputTopic, final String outputTopic,
final String configurationTopic, final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory, final String feedbackTopic, final String configurationTopic,
final Duration windowSize, final Duration gracePeriod) { final Duration emitPeriod, final Duration gracePeriod,
final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory) {
this.inputTopic = inputTopic; this.inputTopic = inputTopic;
this.outputTopic = outputTopic; this.feedbackTopic = feedbackTopic;
this.configurationTopic = configurationTopic; this.configurationTopic = configurationTopic;
this.srAvroSerdeFactory = srAvroSerdeFactory; this.outputTopic = outputTopic;
this.windowSize = windowSize; this.emitPeriod = emitPeriod;
this.gracePeriod = gracePeriod; this.gracePeriod = gracePeriod;
this.srAvroSerdeFactory = srAvroSerdeFactory;
} }
/** /**
* Build the {@link Topology} for the History microservice. * Build the {@link Topology} for the Aggregation microservice.
*/ */
public Topology build() { public Topology build() {
// 1. Build Parent-Sensor Table // 1. Build Parent-Sensor Table
...@@ -78,9 +85,12 @@ public class TopologyBuilder { ...@@ -78,9 +85,12 @@ public class TopologyBuilder {
this.buildLastValueTable(parentSensorTable, inputTable); this.buildLastValueTable(parentSensorTable, inputTable);
// 4. Build Aggregations Stream // 4. Build Aggregations Stream
final KStream<String, AggregatedActivePowerRecord> aggregations = final KTable<Windowed<String>, AggregatedActivePowerRecord> aggregations =
this.buildAggregationStream(lastValueTable); this.buildAggregationStream(lastValueTable);
// 6. Expose Feedback Stream
this.exposeFeedbackStream(aggregations);
// 5. Expose Aggregations Stream // 5. Expose Aggregations Stream
this.exposeOutputStream(aggregations); this.exposeOutputStream(aggregations);
...@@ -92,19 +102,20 @@ public class TopologyBuilder { ...@@ -92,19 +102,20 @@ public class TopologyBuilder {
.stream(this.inputTopic, Consumed.with( .stream(this.inputTopic, Consumed.with(
Serdes.String(), Serdes.String(),
this.srAvroSerdeFactory.forValues())); this.srAvroSerdeFactory.forValues()));
final KStream<String, ActivePowerRecord> aggregationsInput = this.builder final KStream<String, ActivePowerRecord> aggregationsInput = this.builder
.stream(this.outputTopic, Consumed.with( .stream(this.feedbackTopic, Consumed.with(
Serdes.String(), Serdes.String(),
this.srAvroSerdeFactory.<AggregatedActivePowerRecord>forValues())) this.srAvroSerdeFactory.<AggregatedActivePowerRecord>forValues()))
.mapValues(r -> new ActivePowerRecord(r.getIdentifier(), r.getTimestamp(), r.getSumInW())); .mapValues(r -> new ActivePowerRecord(r.getIdentifier(), r.getTimestamp(), r.getSumInW()));
final KTable<String, ActivePowerRecord> inputTable = values final KTable<String, ActivePowerRecord> inputTable = values
.merge(aggregationsInput) .merge(aggregationsInput)
.mapValues((k, v) -> new ActivePowerRecord(v.getIdentifier(), System.currentTimeMillis(), .groupByKey(Grouped.with(
v.getValueInW())) Serdes.String(),
.groupByKey(Grouped.with(Serdes.String(),
this.srAvroSerdeFactory.forValues())) this.srAvroSerdeFactory.forValues()))
.reduce((aggr, value) -> value, Materialized.with(Serdes.String(), .reduce((aggr, value) -> value, Materialized.with(
Serdes.String(),
this.srAvroSerdeFactory.forValues())); this.srAvroSerdeFactory.forValues()));
return inputTable; return inputTable;
} }
...@@ -115,15 +126,9 @@ public class TopologyBuilder { ...@@ -115,15 +126,9 @@ public class TopologyBuilder {
.filter((key, value) -> key == Event.SENSOR_REGISTRY_CHANGED .filter((key, value) -> key == Event.SENSOR_REGISTRY_CHANGED
|| key == Event.SENSOR_REGISTRY_STATUS); || key == Event.SENSOR_REGISTRY_STATUS);
final ChildParentsTransformerFactory childParentsTransformerFactory =
new ChildParentsTransformerFactory();
this.builder.addStateStore(childParentsTransformerFactory.getStoreBuilder());
return configurationStream return configurationStream
.mapValues(data -> SensorRegistry.fromJson(data)) .mapValues(data -> SensorRegistry.fromJson(data))
.flatTransform( .flatTransform(new ChildParentsTransformerSupplier())
childParentsTransformerFactory.getTransformerSupplier(),
childParentsTransformerFactory.getStoreName())
.groupByKey(Grouped.with(Serdes.String(), OptionalParentsSerde.serde())) .groupByKey(Grouped.with(Serdes.String(), OptionalParentsSerde.serde()))
.aggregate( .aggregate(
() -> Set.<String>of(), () -> Set.<String>of(),
...@@ -131,33 +136,27 @@ public class TopologyBuilder { ...@@ -131,33 +136,27 @@ public class TopologyBuilder {
Materialized.with(Serdes.String(), ParentsSerde.serde())); Materialized.with(Serdes.String(), ParentsSerde.serde()));
} }
private KTable<Windowed<SensorParentKey>, ActivePowerRecord> buildLastValueTable( private KTable<Windowed<SensorParentKey>, ActivePowerRecord> buildLastValueTable(
final KTable<String, Set<String>> parentSensorTable, final KTable<String, Set<String>> parentSensorTable,
final KTable<String, ActivePowerRecord> inputTable) { final KTable<String, ActivePowerRecord> inputTable) {
final JointFlatTransformerFactory jointFlatMapTransformerFactory =
new JointFlatTransformerFactory();
this.builder.addStateStore(jointFlatMapTransformerFactory.getStoreBuilder());
return inputTable return inputTable
.join(parentSensorTable, (record, parents) -> new JointRecordParents(parents, record)) .join(parentSensorTable, (record, parents) -> new JointRecordParents(parents, record))
.toStream() .toStream()
.flatTransform( .flatTransform(new JointFlatTransformerSupplier())
jointFlatMapTransformerFactory.getTransformerSupplier(),
jointFlatMapTransformerFactory.getStoreName())
.groupByKey(Grouped.with( .groupByKey(Grouped.with(
SensorParentKeySerde.serde(), SensorParentKeySerde.serde(),
this.srAvroSerdeFactory.forValues())) this.srAvroSerdeFactory.forValues()))
.windowedBy(TimeWindows.of(this.windowSize).grace(this.gracePeriod)) .windowedBy(TimeWindows.of(this.emitPeriod).grace(this.gracePeriod))
.reduce( .reduce(
// TODO Configurable window aggregation function // TODO Configurable window aggregation function
(aggValue, newValue) -> newValue, (oldVal, newVal) -> newVal.getTimestamp() >= oldVal.getTimestamp() ? newVal : oldVal,
Materialized.with(SensorParentKeySerde.serde(), Materialized.with(
SensorParentKeySerde.serde(),
this.srAvroSerdeFactory.forValues())); this.srAvroSerdeFactory.forValues()));
} }
private KStream<String, AggregatedActivePowerRecord> buildAggregationStream( private KTable<Windowed<String>, AggregatedActivePowerRecord> buildAggregationStream(
final KTable<Windowed<SensorParentKey>, ActivePowerRecord> lastValueTable) { final KTable<Windowed<SensorParentKey>, ActivePowerRecord> lastValueTable) {
return lastValueTable return lastValueTable
.groupBy( .groupBy(
...@@ -165,50 +164,42 @@ public class TopologyBuilder { ...@@ -165,50 +164,42 @@ public class TopologyBuilder {
Grouped.with( Grouped.with(
new WindowedSerdes.TimeWindowedSerde<>( new WindowedSerdes.TimeWindowedSerde<>(
Serdes.String(), Serdes.String(),
this.windowSize.toMillis()), this.emitPeriod.toMillis()),
this.srAvroSerdeFactory.forValues())) this.srAvroSerdeFactory.forValues()))
.aggregate( .aggregate(
() -> null, this.recordAggregator::add, this.recordAggregator::substract, () -> null,
this.recordAggregator::add,
this.recordAggregator::substract,
Materialized.with( Materialized.with(
new WindowedSerdes.TimeWindowedSerde<>( new WindowedSerdes.TimeWindowedSerde<>(
Serdes.String(), Serdes.String(),
this.windowSize.toMillis()), this.emitPeriod.toMillis()),
this.srAvroSerdeFactory.forValues())) this.srAvroSerdeFactory.forValues()))
.suppress(Suppressed.untilTimeLimit(this.windowSize, BufferConfig.unbounded()))
// .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
.toStream()
// TODO timestamp -1 indicates that this record is emitted by an substract event // TODO timestamp -1 indicates that this record is emitted by an substract event
.filter((k, record) -> record.getTimestamp() != -1) .filter((k, record) -> record.getTimestamp() != -1);
.map((k, v) -> KeyValue.pair(k.key(), v)); // TODO compute Timestamp }
private void exposeFeedbackStream(
final KTable<Windowed<String>, AggregatedActivePowerRecord> aggregations) {
aggregations
.toStream()
.filter((k, record) -> record != null)
.selectKey((k, v) -> k.key())
.to(this.feedbackTopic, Produced.with(
Serdes.String(),
this.srAvroSerdeFactory.forValues()));
} }
private void exposeOutputStream(final KStream<String, AggregatedActivePowerRecord> aggregations) { private void exposeOutputStream(
final KTable<Windowed<String>, AggregatedActivePowerRecord> aggregations) {
aggregations aggregations
.peek((k, v) -> { // .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
final long time = System.currentTimeMillis(); .suppress(Suppressed.untilTimeLimit(this.emitPeriod, BufferConfig.unbounded()))
final long latency = time - v.getTimestamp(); .toStream()
this.latencyStats.add(latency); .filter((k, record) -> record != null)
if (time - this.lastTime >= LATENCY_OUTPOUT_THRESHOLD) { .selectKey((k, v) -> k.key())
if (LOGGER.isInfoEnabled()) {
LOGGER.info("latency,"
+ time + ','
+ this.latencyStats.mean() + ','
+ (this.latencyStats.count() > 0
? this.latencyStats.populationStandardDeviation()
: Double.NaN)
+ ','
+ (this.latencyStats.count() > 1
? this.latencyStats.sampleStandardDeviation()
: Double.NaN)
+ ','
+ this.latencyStats.min() + ','
+ this.latencyStats.max() + ','
+ this.latencyStats.count());
}
this.latencyStats = new StatsAccumulator();
this.lastTime = time;
}
})
.to(this.outputTopic, Produced.with( .to(this.outputTopic, Produced.with(
Serdes.String(), Serdes.String(),
this.srAvroSerdeFactory.forValues())); this.srAvroSerdeFactory.forValues()));
......
...@@ -11,13 +11,14 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; ...@@ -11,13 +11,14 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
*/ */
public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD builder method public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD builder method
private static final Duration WINDOW_SIZE_DEFAULT = Duration.ofSeconds(1); private static final Duration EMIT_PERIOD_DEFAULT = Duration.ofSeconds(1);
private static final Duration GRACE_PERIOD_DEFAULT = Duration.ZERO; private static final Duration GRACE_PERIOD_DEFAULT = Duration.ZERO;
private String inputTopic; // NOPMD private String inputTopic; // NOPMD
private String feedbackTopic; // NOPMD
private String outputTopic; // NOPMD private String outputTopic; // NOPMD
private String configurationTopic; // NOPMD private String configurationTopic; // NOPMD
private Duration windowSize; // NOPMD private Duration emitPeriod; // NOPMD
private Duration gracePeriod; // NOPMD private Duration gracePeriod; // NOPMD
public Uc2KafkaStreamsBuilder inputTopic(final String inputTopic) { public Uc2KafkaStreamsBuilder inputTopic(final String inputTopic) {
...@@ -25,6 +26,11 @@ public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD build ...@@ -25,6 +26,11 @@ public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD build
return this; return this;
} }
public Uc2KafkaStreamsBuilder feedbackTopic(final String feedbackTopic) {
this.feedbackTopic = feedbackTopic;
return this;
}
public Uc2KafkaStreamsBuilder outputTopic(final String outputTopic) { public Uc2KafkaStreamsBuilder outputTopic(final String outputTopic) {
this.outputTopic = outputTopic; this.outputTopic = outputTopic;
return this; return this;
...@@ -35,8 +41,8 @@ public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD build ...@@ -35,8 +41,8 @@ public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD build
return this; return this;
} }
public Uc2KafkaStreamsBuilder windowSize(final Duration windowSize) { public Uc2KafkaStreamsBuilder emitPeriod(final Duration emitPeriod) {
this.windowSize = Objects.requireNonNull(windowSize); this.emitPeriod = Objects.requireNonNull(emitPeriod);
return this; return this;
} }
...@@ -48,16 +54,18 @@ public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD build ...@@ -48,16 +54,18 @@ public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD build
@Override @Override
protected Topology buildTopology() { protected Topology buildTopology() {
Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
Objects.requireNonNull(this.feedbackTopic, "Feedback topic has not been set.");
Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); Objects.requireNonNull(this.outputTopic, "Output topic has not been set.");
Objects.requireNonNull(this.configurationTopic, "Configuration topic has not been set."); Objects.requireNonNull(this.configurationTopic, "Configuration topic has not been set.");
final TopologyBuilder topologyBuilder = new TopologyBuilder( final TopologyBuilder topologyBuilder = new TopologyBuilder(
this.inputTopic, this.inputTopic,
this.feedbackTopic,
this.outputTopic, this.outputTopic,
this.configurationTopic, this.configurationTopic,
new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl), this.emitPeriod == null ? EMIT_PERIOD_DEFAULT : this.emitPeriod,
this.windowSize == null ? WINDOW_SIZE_DEFAULT : this.windowSize, this.gracePeriod == null ? GRACE_PERIOD_DEFAULT : this.gracePeriod,
this.gracePeriod == null ? GRACE_PERIOD_DEFAULT : this.gracePeriod); new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl));
return topologyBuilder.build(); return topologyBuilder.build();
} }
......
application.name=theodolite-uc2-application application.name=theodolite-uc2-application
application.version=0.0.1 application.version=0.0.1
configuration.host=localhost
configuration.port=8082
configuration.kafka.topic=configuration
kafka.bootstrap.servers=localhost:9092 kafka.bootstrap.servers=localhost:9092
kafka.input.topic=input kafka.input.topic=input
kafka.configuration.topic=configuration
kafka.feedback.topic=aggregation-feedback
kafka.output.topic=output kafka.output.topic=output
schema.registry.url=http://localhost:8091 schema.registry.url=http://localhost:8091
window.size.ms=1000 emit.period.ms=5000
window.grace.ms=0 grace.period.ms=0
num.threads=1 num.threads=1
commit.interval.ms=100 commit.interval.ms=100
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment