Skip to content
Snippets Groups Projects
Commit e19409f3 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'rearrange-use-case-enumeration' into 'master'

Rearrange use case enumeration

Closes #72

See merge request !88
parents aeb21cfa 3aefa699
No related branches found
No related tags found
No related merge requests found
Showing
with 72 additions and 288 deletions
......@@ -67,7 +67,6 @@ configure(useCaseApplications) {
implementation 'org.apache.kafka:kafka-streams:2.6.0' // enable TransformerSuppliers
implementation 'com.google.code.gson:gson:2.8.2'
implementation 'com.google.guava:guava:24.1-jre'
implementation 'org.jctools:jctools-core:2.1.1'
implementation 'org.slf4j:slf4j-simple:1.7.25'
implementation project(':application-kafkastreams-commons')
......@@ -82,8 +81,6 @@ configure(useCaseGenerators) {
// These dependencies are used internally, and not exposed to consumers on their own compile classpath.
implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
implementation 'com.google.guava:guava:24.1-jre'
implementation 'org.jctools:jctools-core:2.1.1'
implementation 'org.slf4j:slf4j-simple:1.7.25'
// These dependencies are used for the workload-generator-commmon
......
......
......@@ -32,7 +32,7 @@ cleanup.qualify_static_member_accesses_with_declaring_class=true
cleanup.qualify_static_method_accesses_with_declaring_class=false
cleanup.remove_private_constructors=true
cleanup.remove_redundant_modifiers=false
cleanup.remove_redundant_semicolons=false
cleanup.remove_redundant_semicolons=true
cleanup.remove_redundant_type_arguments=true
cleanup.remove_trailing_whitespaces=true
cleanup.remove_trailing_whitespaces_all=true
......
......
File changed. Contains only whitespace changes. Show whitespace changes.
mainClassName = "theodolite.uc2.application.AggregationService"
mainClassName = "theodolite.uc2.application.HistoryService"
package theodolite.uc4.application;
package theodolite.uc2.application;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.KafkaStreams;
import theodolite.commons.kafkastreams.ConfigurationKeys;
import theodolite.uc4.streamprocessing.Uc4KafkaStreamsBuilder;
import theodolite.uc2.streamprocessing.Uc2KafkaStreamsBuilder;
import titan.ccp.common.configuration.ServiceConfigurations;
/**
......@@ -18,6 +19,8 @@ public class HistoryService {
private final Configuration config = ServiceConfigurations.createWithDefaults();
private final CompletableFuture<Void> stopEvent = new CompletableFuture<>();
private final int windowDurationMinutes = Integer
.parseInt(Objects.requireNonNullElse(System.getenv("KAFKA_WINDOW_DURATION_MINUTES"), "60"));
/**
* Start the service.
......@@ -31,17 +34,12 @@ public class HistoryService {
*
*/
private void createKafkaStreamsApplication() {
// Use case specific stream configuration
final Uc4KafkaStreamsBuilder uc4KafkaStreamsBuilder = new Uc4KafkaStreamsBuilder(this.config);
uc4KafkaStreamsBuilder
final Uc2KafkaStreamsBuilder uc2KafkaStreamsBuilder = new Uc2KafkaStreamsBuilder(this.config);
uc2KafkaStreamsBuilder
.outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC))
.aggregtionDuration(
Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS)))
.aggregationAdvance(
Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS)));
.windowDuration(Duration.ofMinutes(this.windowDurationMinutes));
// Configuration of the stream application
final KafkaStreams kafkaStreams = uc4KafkaStreamsBuilder.build();
final KafkaStreams kafkaStreams = uc2KafkaStreamsBuilder.build();
this.stopEvent.thenRun(kafkaStreams::close);
kafkaStreams.start();
......
......
package theodolite.uc2.streamprocessing;
import com.google.common.math.Stats;
import java.time.Duration;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.Suppressed.BufferConfig;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.WindowedSerdes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.uc2.streamprocessing.util.StatsFactory;
import titan.ccp.common.kafka.GenericSerde;
import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
import titan.ccp.configuration.events.Event;
import titan.ccp.configuration.events.EventSerde;
import titan.ccp.model.records.ActivePowerRecord;
import titan.ccp.model.records.AggregatedActivePowerRecord;
import titan.ccp.model.sensorregistry.SensorRegistry;
/**
* Builds Kafka Stream Topology for the History microservice.
*/
public class TopologyBuilder {
// Streams Variables
private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class);
private final String inputTopic;
private final String feedbackTopic;
private final String outputTopic;
private final String configurationTopic;
private final Duration emitPeriod;
private final Duration gracePeriod;
// Serdes
private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory;
private final Duration duration;
private final StreamsBuilder builder = new StreamsBuilder();
private final RecordAggregator recordAggregator = new RecordAggregator();
/**
* Create a new {@link TopologyBuilder} using the given topics.
*
* @param inputTopic The topic where to read sensor measurements from.
* @param configurationTopic The topic where the hierarchy of the sensors is published.
* @param feedbackTopic The topic where aggregation results are written to for feedback.
* @param outputTopic The topic where to publish aggregation results.
* @param emitPeriod The Duration results are emitted with.
* @param gracePeriod The Duration for how long late arriving records are considered.
* @param srAvroSerdeFactory Factory for creating avro SERDEs
*
*/
public TopologyBuilder(final String inputTopic, final String outputTopic,
final String feedbackTopic, final String configurationTopic,
final Duration emitPeriod, final Duration gracePeriod,
final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory) {
final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory,
final Duration duration) {
this.inputTopic = inputTopic;
this.outputTopic = outputTopic;
this.feedbackTopic = feedbackTopic;
this.configurationTopic = configurationTopic;
this.emitPeriod = emitPeriod;
this.gracePeriod = gracePeriod;
this.srAvroSerdeFactory = srAvroSerdeFactory;
this.duration = duration;
}
/**
* Build the {@link Topology} for the Aggregation microservice.
* Build the {@link Topology} for the History microservice.
*/
public Topology build(final Properties properties) {
// 1. Build Parent-Sensor Table
final KTable<String, Set<String>> parentSensorTable = this.buildParentSensorTable();
// 2. Build Input Table
final KTable<String, ActivePowerRecord> inputTable = this.buildInputTable();
// 3. Build Last Value Table from Input and Parent-Sensor Table
final KTable<Windowed<SensorParentKey>, ActivePowerRecord> lastValueTable =
this.buildLastValueTable(parentSensorTable, inputTable);
// 4. Build Aggregations Stream
final KTable<Windowed<String>, AggregatedActivePowerRecord> aggregations =
this.buildAggregationStream(lastValueTable);
// 6. Expose Feedback Stream
this.exposeFeedbackStream(aggregations);
// 5. Expose Aggregations Stream
this.exposeOutputStream(aggregations);
return this.builder.build(properties);
}
private KTable<String, ActivePowerRecord> buildInputTable() {
final KStream<String, ActivePowerRecord> values = this.builder
.stream(this.inputTopic, Consumed.with(
Serdes.String(),
this.srAvroSerdeFactory.forValues()));
final KStream<String, ActivePowerRecord> aggregationsInput = this.builder
.stream(this.feedbackTopic, Consumed.with(
Serdes.String(),
this.srAvroSerdeFactory.<AggregatedActivePowerRecord>forValues()))
.mapValues(r -> new ActivePowerRecord(r.getIdentifier(), r.getTimestamp(), r.getSumInW()));
final KTable<String, ActivePowerRecord> inputTable = values
.merge(aggregationsInput)
.groupByKey(Grouped.with(
Serdes.String(),
this.srAvroSerdeFactory.forValues()))
.reduce((aggr, value) -> value, Materialized.with(
Serdes.String(),
this.srAvroSerdeFactory.forValues()));
return inputTable;
}
private KTable<String, Set<String>> buildParentSensorTable() {
final KStream<Event, String> configurationStream = this.builder
.stream(this.configurationTopic, Consumed.with(EventSerde.serde(), Serdes.String()))
.filter((key, value) -> key == Event.SENSOR_REGISTRY_CHANGED
|| key == Event.SENSOR_REGISTRY_STATUS);
return configurationStream
.mapValues(data -> SensorRegistry.fromJson(data))
.flatTransform(new ChildParentsTransformerSupplier())
.groupByKey(Grouped.with(Serdes.String(), OptionalParentsSerde.serde()))
.aggregate(
() -> Set.<String>of(),
(key, newValue, oldValue) -> newValue.orElse(null),
Materialized.with(Serdes.String(), ParentsSerde.serde()));
}
private KTable<Windowed<SensorParentKey>, ActivePowerRecord> buildLastValueTable(
final KTable<String, Set<String>> parentSensorTable,
final KTable<String, ActivePowerRecord> inputTable) {
return inputTable
.join(parentSensorTable, (record, parents) -> new JointRecordParents(parents, record))
.toStream()
.flatTransform(new JointFlatTransformerSupplier())
.groupByKey(Grouped.with(
SensorParentKeySerde.serde(),
this.srAvroSerdeFactory.forValues()))
.windowedBy(TimeWindows.of(this.emitPeriod).grace(this.gracePeriod))
.reduce(
// TODO Configurable window aggregation function
(oldVal, newVal) -> newVal.getTimestamp() >= oldVal.getTimestamp() ? newVal : oldVal,
Materialized.with(
SensorParentKeySerde.serde(),
this.srAvroSerdeFactory.forValues()));
}
private KTable<Windowed<String>, AggregatedActivePowerRecord> buildAggregationStream(
final KTable<Windowed<SensorParentKey>, ActivePowerRecord> lastValueTable) {
return lastValueTable
.groupBy(
(k, v) -> KeyValue.pair(new Windowed<>(k.key().getParent(), k.window()), v),
Grouped.with(
new WindowedSerdes.TimeWindowedSerde<>(
Serdes.String(),
this.emitPeriod.toMillis()),
this.srAvroSerdeFactory.forValues()))
this.builder
.stream(this.inputTopic,
Consumed.with(Serdes.String(),
this.srAvroSerdeFactory.<ActivePowerRecord>forValues()))
.groupByKey()
.windowedBy(TimeWindows.of(this.duration))
// .aggregate(
// () -> 0.0,
// (key, activePowerRecord, agg) -> agg + activePowerRecord.getValueInW(),
// Materialized.with(Serdes.String(), Serdes.Double()))
.aggregate(
() -> null,
this.recordAggregator::add,
this.recordAggregator::substract,
() -> Stats.of(),
(k, record, stats) -> StatsFactory.accumulate(stats, record.getValueInW()),
Materialized.with(
new WindowedSerdes.TimeWindowedSerde<>(
Serdes.String(),
this.emitPeriod.toMillis()),
this.srAvroSerdeFactory.forValues()))
// TODO timestamp -1 indicates that this record is emitted by an substract event
.filter((k, record) -> record.getTimestamp() != -1);
}
private void exposeFeedbackStream(
final KTable<Windowed<String>, AggregatedActivePowerRecord> aggregations) {
aggregations
GenericSerde.from(Stats::toByteArray, Stats::fromByteArray)))
.toStream()
.filter((k, record) -> record != null)
.selectKey((k, v) -> k.key())
.to(this.feedbackTopic, Produced.with(
Serdes.String(),
this.srAvroSerdeFactory.forValues()));
}
private void exposeOutputStream(
final KTable<Windowed<String>, AggregatedActivePowerRecord> aggregations) {
.map((k, s) -> KeyValue.pair(k.key(), s.toString()))
.peek((k, v) -> LOGGER.info(k + ": " + v))
.to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String()));
aggregations
// .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
.suppress(Suppressed.untilTimeLimit(this.emitPeriod, BufferConfig.unbounded()))
.toStream()
.filter((k, record) -> record != null)
.selectKey((k, v) -> k.key())
.to(this.outputTopic, Produced.with(
Serdes.String(),
this.srAvroSerdeFactory.forValues()));
return this.builder.build(properties);
}
}
......@@ -11,62 +11,33 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
/**
* Builder for the Kafka Streams configuration.
*/
public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD builder method
public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder {
private static final Duration EMIT_PERIOD_DEFAULT = Duration.ofSeconds(1);
private static final Duration GRACE_PERIOD_DEFAULT = Duration.ZERO;
private String feedbackTopic; // NOPMD
private String outputTopic; // NOPMD
private String configurationTopic; // NOPMD
private Duration emitPeriod; // NOPMD
private Duration gracePeriod; // NOPMD
private Duration windowDuration; // NOPMD
public Uc2KafkaStreamsBuilder(final Configuration config) {
super(config);
}
public Uc2KafkaStreamsBuilder feedbackTopic(final String feedbackTopic) {
this.feedbackTopic = feedbackTopic;
return this;
}
public Uc2KafkaStreamsBuilder outputTopic(final String outputTopic) {
this.outputTopic = outputTopic;
return this;
}
public Uc2KafkaStreamsBuilder configurationTopic(final String configurationTopic) {
this.configurationTopic = configurationTopic;
return this;
}
public Uc2KafkaStreamsBuilder emitPeriod(final Duration emitPeriod) {
this.emitPeriod = Objects.requireNonNull(emitPeriod);
return this;
}
public Uc2KafkaStreamsBuilder gracePeriod(final Duration gracePeriod) {
this.gracePeriod = Objects.requireNonNull(gracePeriod);
public Uc2KafkaStreamsBuilder windowDuration(final Duration windowDuration) {
this.windowDuration = windowDuration;
return this;
}
@Override
protected Topology buildTopology(final Properties properties) {
Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
Objects.requireNonNull(this.feedbackTopic, "Feedback topic has not been set.");
Objects.requireNonNull(this.outputTopic, "Output topic has not been set.");
Objects.requireNonNull(this.configurationTopic, "Configuration topic has not been set.");
final TopologyBuilder topologyBuilder = new TopologyBuilder(
this.inputTopic,
this.outputTopic,
this.feedbackTopic,
this.configurationTopic,
this.emitPeriod == null ? EMIT_PERIOD_DEFAULT : this.emitPeriod,
this.gracePeriod == null ? GRACE_PERIOD_DEFAULT : this.gracePeriod,
new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl));
Objects.requireNonNull(this.windowDuration, "Window duration has not been set.");
final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic,
new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl), this.windowDuration);
return topologyBuilder.build(properties);
}
......
......
package theodolite.uc4.streamprocessing.util;
package theodolite.uc2.streamprocessing.util;
import com.google.common.math.Stats;
import com.google.common.math.StatsAccumulator;
......
......
......@@ -3,11 +3,7 @@ application.version=0.0.1
kafka.bootstrap.servers=localhost:9092
kafka.input.topic=input
kafka.configuration.topic=configuration
kafka.feedback.topic=aggregation-feedback
kafka.output.topic=output
kafka.window.duration.minutes=1
schema.registry.url=http://localhost:8091
emit.period.ms=5000
grace.period.ms=0
\ No newline at end of file
......@@ -32,7 +32,7 @@ cleanup.qualify_static_member_accesses_with_declaring_class=true
cleanup.qualify_static_method_accesses_with_declaring_class=false
cleanup.remove_private_constructors=true
cleanup.remove_redundant_modifiers=false
cleanup.remove_redundant_semicolons=false
cleanup.remove_redundant_semicolons=true
cleanup.remove_redundant_type_arguments=true
cleanup.remove_trailing_whitespaces=true
cleanup.remove_trailing_whitespaces_all=true
......
......
File changed. Contains only whitespace changes. Show whitespace changes.
File changed. Contains only whitespace changes. Show whitespace changes.
package theodolite.uc2.workloadgenerator;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.commons.workloadgeneration.KeySpace;
import titan.ccp.configuration.events.Event;
import titan.ccp.model.sensorregistry.SensorRegistry;
/**
* Load generator for Theodolite use case UC2.
*/
public final class LoadGenerator {
private static final int SLEEP_PERIOD = 30_000;
private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class);
private LoadGenerator() {}
/**
* Start load generator.
*/
public static void main(final String[] args) {
final boolean sendRegistry = Boolean.parseBoolean(Objects.requireNonNullElse(
System.getenv("SEND_REGISTRY"),
"true"));
final String kafkaBootstrapServers = Objects.requireNonNullElse(
System.getenv("KAFKA_BOOTSTRAP_SERVERS"),
"localhost:9092");
final int numSensors = Integer.parseInt(Objects.requireNonNullElse(
System.getenv("NUM_SENSORS"),
"1"));
final int numNestedGroups = Integer.parseInt(Objects.requireNonNullElse(
System.getenv("NUM_NESTED_GROUPS"),
"1"));
// Build sensor hierarchy
final SensorRegistry sensorRegistry =
new SensorRegistryBuilder(numNestedGroups, numSensors).build();
LOGGER.info("Start workload generator for use case UC2");
theodolite.commons.workloadgeneration.LoadGenerator.fromEnvironment()
.withKeySpace(new KeySpace("s_", sensorRegistry.getMachineSensors().size()))
.withBeforeAction(() -> {
if (sendRegistry) {
final ConfigPublisher configPublisher =
new ConfigPublisher(kafkaBootstrapServers, "configuration");
configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson());
configPublisher.close();
LOGGER.info("Configuration sent.");
LOGGER.info("Now wait 30 seconds...");
try {
Thread.sleep(SLEEP_PERIOD);
} catch (final InterruptedException e) {
LOGGER.error(e.getMessage(), e);
}
LOGGER.info("...and start generating load.");
theodolite.commons.workloadgeneration.LoadGenerator.fromEnvironment().run();
}
})
.run();
}
}
......@@ -32,7 +32,7 @@ cleanup.qualify_static_member_accesses_with_declaring_class=true
cleanup.qualify_static_method_accesses_with_declaring_class=false
cleanup.remove_private_constructors=true
cleanup.remove_redundant_modifiers=false
cleanup.remove_redundant_semicolons=false
cleanup.remove_redundant_semicolons=true
cleanup.remove_redundant_type_arguments=true
cleanup.remove_trailing_whitespaces=true
cleanup.remove_trailing_whitespaces_all=true
......
......
FROM openjdk:11-slim
ADD build/distributions/uc3-application.tar /
CMD JAVA_OPTS="$JAVA_OPTS -Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL" \
/uc3-application/bin/uc3-application
package theodolite.uc3.application;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.KafkaStreams;
......@@ -19,8 +18,6 @@ public class HistoryService {
private final Configuration config = ServiceConfigurations.createWithDefaults();
private final CompletableFuture<Void> stopEvent = new CompletableFuture<>();
private final int windowDurationMinutes = Integer
.parseInt(Objects.requireNonNullElse(System.getenv("KAFKA_WINDOW_DURATION_MINUTES"), "60"));
/**
* Start the service.
......@@ -34,11 +31,16 @@ public class HistoryService {
*
*/
private void createKafkaStreamsApplication() {
// Use case specific stream configuration
final Uc3KafkaStreamsBuilder uc3KafkaStreamsBuilder = new Uc3KafkaStreamsBuilder(this.config);
uc3KafkaStreamsBuilder
.outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC))
.windowDuration(Duration.ofMinutes(this.windowDurationMinutes));
.aggregtionDuration(
Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS)))
.aggregationAdvance(
Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS)));
// Configuration of the stream application
final KafkaStreams kafkaStreams = uc3KafkaStreamsBuilder.build();
this.stopEvent.thenRun(kafkaStreams::close);
......
......
package theodolite.uc4.streamprocessing;
package theodolite.uc3.streamprocessing;
import java.util.Objects;
......
......
package theodolite.uc4.streamprocessing;
package theodolite.uc3.streamprocessing;
import java.time.LocalDateTime;
......
......
package theodolite.uc4.streamprocessing;
package theodolite.uc3.streamprocessing;
import org.apache.kafka.common.serialization.Serde;
import titan.ccp.common.kafka.simpleserdes.BufferSerde;
......
......
package theodolite.uc4.streamprocessing;
package theodolite.uc3.streamprocessing;
import com.google.common.math.Stats;
import org.apache.kafka.streams.kstream.Windowed;
......
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment