Skip to content
Snippets Groups Projects
Commit 993cf117 authored by Sören Henning's avatar Sören Henning
Browse files

Rearrange use case enumeration in sources

parent d4e8c2f2
No related branches found
No related tags found
2 merge requests!88Rearrange use case enumeration,!84Gitlab CI for Theodolite-Kotlin-Quarkus
Pipeline #2098 passed
Showing
with 255 additions and 81 deletions
package theodolite.uc2.streamprocessing; package theodolite.uc4.streamprocessing;
import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serde;
import titan.ccp.common.kafka.simpleserdes.BufferSerde; import titan.ccp.common.kafka.simpleserdes.BufferSerde;
......
package theodolite.uc4.streamprocessing; package theodolite.uc4.streamprocessing;
import com.google.common.math.Stats;
import java.time.Duration; import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Properties; import java.util.Properties;
import org.apache.kafka.common.serialization.Serde; import java.util.Set;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.Suppressed.BufferConfig;
import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.TimeWindows;
import theodolite.uc4.streamprocessing.util.StatsFactory; import org.apache.kafka.streams.kstream.Windowed;
import titan.ccp.common.kafka.GenericSerde; import org.apache.kafka.streams.kstream.WindowedSerdes;
import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
import titan.ccp.configuration.events.Event;
import titan.ccp.configuration.events.EventSerde;
import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
import titan.ccp.model.records.AggregatedActivePowerRecord;
import titan.ccp.model.sensorregistry.SensorRegistry;
/** /**
* Builds Kafka Stream Topology for the History microservice. * Builds Kafka Stream Topology for the History microservice.
*/ */
public class TopologyBuilder { public class TopologyBuilder {
// Streams Variables
// private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class);
private final ZoneId zone = ZoneId.of("Europe/Paris"); // TODO as parameter
private final String inputTopic; private final String inputTopic;
private final String feedbackTopic;
private final String outputTopic; private final String outputTopic;
private final String configurationTopic;
private final Duration emitPeriod;
private final Duration gracePeriod;
// Serdes
private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory; private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory;
private final Duration aggregtionDuration;
private final Duration aggregationAdvance;
private final StreamsBuilder builder = new StreamsBuilder(); private final StreamsBuilder builder = new StreamsBuilder();
private final RecordAggregator recordAggregator = new RecordAggregator();
/** /**
* Create a new {@link TopologyBuilder} using the given topics. * Create a new {@link TopologyBuilder} using the given topics.
*
* @param inputTopic The topic where to read sensor measurements from.
* @param configurationTopic The topic where the hierarchy of the sensors is published.
* @param feedbackTopic The topic where aggregation results are written to for feedback.
* @param outputTopic The topic where to publish aggregation results.
* @param emitPeriod The Duration results are emitted with.
* @param gracePeriod The Duration for how long late arriving records are considered.
* @param srAvroSerdeFactory Factory for creating avro SERDEs
*
*/ */
public TopologyBuilder(final String inputTopic, final String outputTopic, public TopologyBuilder(final String inputTopic, final String outputTopic,
final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory, final String feedbackTopic, final String configurationTopic,
final Duration aggregtionDuration, final Duration aggregationAdvance) { final Duration emitPeriod, final Duration gracePeriod,
final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory) {
this.inputTopic = inputTopic; this.inputTopic = inputTopic;
this.outputTopic = outputTopic; this.outputTopic = outputTopic;
this.feedbackTopic = feedbackTopic;
this.configurationTopic = configurationTopic;
this.emitPeriod = emitPeriod;
this.gracePeriod = gracePeriod;
this.srAvroSerdeFactory = srAvroSerdeFactory; this.srAvroSerdeFactory = srAvroSerdeFactory;
this.aggregtionDuration = aggregtionDuration;
this.aggregationAdvance = aggregationAdvance;
} }
/** /**
* Build the {@link Topology} for the History microservice. * Build the {@link Topology} for the Aggregation microservice.
*/ */
public Topology build(final Properties properties) { public Topology build(final Properties properties) {
final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory(); // 1. Build Parent-Sensor Table
final Serde<HourOfDayKey> keySerde = HourOfDayKeySerde.create(); final KTable<String, Set<String>> parentSensorTable = this.buildParentSensorTable();
this.builder // 2. Build Input Table
.stream(this.inputTopic, final KTable<String, ActivePowerRecord> inputTable = this.buildInputTable();
Consumed.with(Serdes.String(),
this.srAvroSerdeFactory.<ActivePowerRecord>forValues())) // 3. Build Last Value Table from Input and Parent-Sensor Table
.selectKey((key, value) -> { final KTable<Windowed<SensorParentKey>, ActivePowerRecord> lastValueTable =
final Instant instant = Instant.ofEpochMilli(value.getTimestamp()); this.buildLastValueTable(parentSensorTable, inputTable);
final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone);
return keyFactory.createKey(value.getIdentifier(), dateTime); // 4. Build Aggregations Stream
}) final KTable<Windowed<String>, AggregatedActivePowerRecord> aggregations =
.groupByKey( this.buildAggregationStream(lastValueTable);
Grouped.with(keySerde, this.srAvroSerdeFactory.forValues()))
.windowedBy(TimeWindows.of(this.aggregtionDuration).advanceBy(this.aggregationAdvance)) // 6. Expose Feedback Stream
this.exposeFeedbackStream(aggregations);
// 5. Expose Aggregations Stream
this.exposeOutputStream(aggregations);
return this.builder.build(properties);
}
private KTable<String, ActivePowerRecord> buildInputTable() {
final KStream<String, ActivePowerRecord> values = this.builder
.stream(this.inputTopic, Consumed.with(
Serdes.String(),
this.srAvroSerdeFactory.forValues()));
final KStream<String, ActivePowerRecord> aggregationsInput = this.builder
.stream(this.feedbackTopic, Consumed.with(
Serdes.String(),
this.srAvroSerdeFactory.<AggregatedActivePowerRecord>forValues()))
.mapValues(r -> new ActivePowerRecord(r.getIdentifier(), r.getTimestamp(), r.getSumInW()));
final KTable<String, ActivePowerRecord> inputTable = values
.merge(aggregationsInput)
.groupByKey(Grouped.with(
Serdes.String(),
this.srAvroSerdeFactory.forValues()))
.reduce((aggr, value) -> value, Materialized.with(
Serdes.String(),
this.srAvroSerdeFactory.forValues()));
return inputTable;
}
private KTable<String, Set<String>> buildParentSensorTable() {
final KStream<Event, String> configurationStream = this.builder
.stream(this.configurationTopic, Consumed.with(EventSerde.serde(), Serdes.String()))
.filter((key, value) -> key == Event.SENSOR_REGISTRY_CHANGED
|| key == Event.SENSOR_REGISTRY_STATUS);
return configurationStream
.mapValues(data -> SensorRegistry.fromJson(data))
.flatTransform(new ChildParentsTransformerSupplier())
.groupByKey(Grouped.with(Serdes.String(), OptionalParentsSerde.serde()))
.aggregate(
() -> Set.<String>of(),
(key, newValue, oldValue) -> newValue.orElse(null),
Materialized.with(Serdes.String(), ParentsSerde.serde()));
}
private KTable<Windowed<SensorParentKey>, ActivePowerRecord> buildLastValueTable(
final KTable<String, Set<String>> parentSensorTable,
final KTable<String, ActivePowerRecord> inputTable) {
return inputTable
.join(parentSensorTable, (record, parents) -> new JointRecordParents(parents, record))
.toStream()
.flatTransform(new JointFlatTransformerSupplier())
.groupByKey(Grouped.with(
SensorParentKeySerde.serde(),
this.srAvroSerdeFactory.forValues()))
.windowedBy(TimeWindows.of(this.emitPeriod).grace(this.gracePeriod))
.reduce(
// TODO Configurable window aggregation function
(oldVal, newVal) -> newVal.getTimestamp() >= oldVal.getTimestamp() ? newVal : oldVal,
Materialized.with(
SensorParentKeySerde.serde(),
this.srAvroSerdeFactory.forValues()));
}
private KTable<Windowed<String>, AggregatedActivePowerRecord> buildAggregationStream(
final KTable<Windowed<SensorParentKey>, ActivePowerRecord> lastValueTable) {
return lastValueTable
.groupBy(
(k, v) -> KeyValue.pair(new Windowed<>(k.key().getParent(), k.window()), v),
Grouped.with(
new WindowedSerdes.TimeWindowedSerde<>(
Serdes.String(),
this.emitPeriod.toMillis()),
this.srAvroSerdeFactory.forValues()))
.aggregate( .aggregate(
() -> Stats.of(), () -> null,
(k, record, stats) -> StatsFactory.accumulate(stats, record.getValueInW()), this.recordAggregator::add,
Materialized.with(keySerde, this.recordAggregator::substract,
GenericSerde.from(Stats::toByteArray, Stats::fromByteArray))) Materialized.with(
new WindowedSerdes.TimeWindowedSerde<>(
Serdes.String(),
this.emitPeriod.toMillis()),
this.srAvroSerdeFactory.forValues()))
// TODO timestamp -1 indicates that this record is emitted by an substract event
.filter((k, record) -> record.getTimestamp() != -1);
}
private void exposeFeedbackStream(
final KTable<Windowed<String>, AggregatedActivePowerRecord> aggregations) {
aggregations
.toStream() .toStream()
.map((key, stats) -> KeyValue.pair( .filter((k, record) -> record != null)
keyFactory.getSensorId(key.key()), .selectKey((k, v) -> k.key())
stats.toString())) .to(this.feedbackTopic, Produced.with(
// TODO
// statsRecordFactory.create(key, value)))
// .peek((k, v) -> LOGGER.info("{}: {}", k, v)) // TODO Temp logging
.to(
this.outputTopic,
Produced.with(
Serdes.String(), Serdes.String(),
Serdes.String())); this.srAvroSerdeFactory.forValues()));
// this.serdes.avroValues())); }
return this.builder.build(properties); private void exposeOutputStream(
final KTable<Windowed<String>, AggregatedActivePowerRecord> aggregations) {
aggregations
// .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
.suppress(Suppressed.untilTimeLimit(this.emitPeriod, BufferConfig.unbounded()))
.toStream()
.filter((k, record) -> record != null)
.selectKey((k, v) -> k.key())
.to(this.outputTopic, Produced.with(
Serdes.String(),
this.srAvroSerdeFactory.forValues()));
} }
} }
...@@ -11,44 +11,61 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; ...@@ -11,44 +11,61 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
/** /**
* Builder for the Kafka Streams configuration. * Builder for the Kafka Streams configuration.
*/ */
public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder { public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD builder method
private static final Duration EMIT_PERIOD_DEFAULT = Duration.ofSeconds(1);
private static final Duration GRACE_PERIOD_DEFAULT = Duration.ZERO;
private String feedbackTopic; // NOPMD
private String outputTopic; // NOPMD private String outputTopic; // NOPMD
private Duration aggregtionDuration; // NOPMD private String configurationTopic; // NOPMD
private Duration aggregationAdvance; // NOPMD private Duration emitPeriod; // NOPMD
private Duration gracePeriod; // NOPMD
public Uc4KafkaStreamsBuilder(final Configuration config) { public Uc4KafkaStreamsBuilder(final Configuration config) {
super(config); super(config);
} }
public Uc4KafkaStreamsBuilder feedbackTopic(final String feedbackTopic) {
this.feedbackTopic = feedbackTopic;
return this;
}
public Uc4KafkaStreamsBuilder outputTopic(final String outputTopic) { public Uc4KafkaStreamsBuilder outputTopic(final String outputTopic) {
this.outputTopic = outputTopic; this.outputTopic = outputTopic;
return this; return this;
} }
public Uc4KafkaStreamsBuilder aggregtionDuration(final Duration aggregtionDuration) { public Uc4KafkaStreamsBuilder configurationTopic(final String configurationTopic) {
this.aggregtionDuration = aggregtionDuration; this.configurationTopic = configurationTopic;
return this;
}
public Uc4KafkaStreamsBuilder emitPeriod(final Duration emitPeriod) {
this.emitPeriod = Objects.requireNonNull(emitPeriod);
return this; return this;
} }
public Uc4KafkaStreamsBuilder aggregationAdvance(final Duration aggregationAdvance) { public Uc4KafkaStreamsBuilder gracePeriod(final Duration gracePeriod) {
this.aggregationAdvance = aggregationAdvance; this.gracePeriod = Objects.requireNonNull(gracePeriod);
return this; return this;
} }
@Override @Override
protected Topology buildTopology(final Properties properties) { protected Topology buildTopology(final Properties properties) {
Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
Objects.requireNonNull(this.feedbackTopic, "Feedback topic has not been set.");
Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); Objects.requireNonNull(this.outputTopic, "Output topic has not been set.");
Objects.requireNonNull(this.aggregtionDuration, "Aggregation duration has not been set."); Objects.requireNonNull(this.configurationTopic, "Configuration topic has not been set.");
Objects.requireNonNull(this.aggregationAdvance, "Aggregation advance period has not been set.");
final TopologyBuilder topologyBuilder = new TopologyBuilder( final TopologyBuilder topologyBuilder = new TopologyBuilder(
this.inputTopic, this.inputTopic,
this.outputTopic, this.outputTopic,
new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl), this.feedbackTopic,
this.aggregtionDuration, this.configurationTopic,
this.aggregationAdvance); this.emitPeriod == null ? EMIT_PERIOD_DEFAULT : this.emitPeriod,
this.gracePeriod == null ? GRACE_PERIOD_DEFAULT : this.gracePeriod,
new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl));
return topologyBuilder.build(properties); return topologyBuilder.build(properties);
} }
......
...@@ -3,8 +3,11 @@ application.version=0.0.1 ...@@ -3,8 +3,11 @@ application.version=0.0.1
kafka.bootstrap.servers=localhost:9092 kafka.bootstrap.servers=localhost:9092
kafka.input.topic=input kafka.input.topic=input
kafka.configuration.topic=configuration
kafka.feedback.topic=aggregation-feedback
kafka.output.topic=output kafka.output.topic=output
aggregation.duration.days=30
aggregation.advance.days=1
schema.registry.url=http://localhost:8091 schema.registry.url=http://localhost:8091
emit.period.ms=5000
grace.period.ms=0
\ No newline at end of file
package theodolite.uc2.streamprocessing; package theodolite.uc4.streamprocessing;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
......
package theodolite.uc2.streamprocessing; package theodolite.uc4.streamprocessing;
import java.util.Set; import java.util.Set;
import org.junit.Test; import org.junit.Test;
......
package theodolite.uc2.streamprocessing; package theodolite.uc4.streamprocessing;
import org.junit.Test; import org.junit.Test;
......
package theodolite.uc2.streamprocessing; package theodolite.uc4.streamprocessing;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import java.util.function.Function; import java.util.function.Function;
......
package theodolite.uc2.streamprocessing; package theodolite.uc4.streamprocessing;
import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serde;
......
package theodolite.uc2.workloadgenerator; package theodolite.uc4.workloadgenerator;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
......
package theodolite.uc4.workloadgenerator; package theodolite.uc4.workloadgenerator;
import java.util.Objects;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import theodolite.commons.workloadgeneration.KeySpace;
import titan.ccp.configuration.events.Event;
import titan.ccp.model.sensorregistry.SensorRegistry;
/** /**
* Load generator for Theodolite use case UC4. * Load generator for Theodolite use case UC4.
*/ */
public final class LoadGenerator { public final class LoadGenerator {
private static final int SLEEP_PERIOD = 30_000;
private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class);
private LoadGenerator() { private LoadGenerator() {}
throw new UnsupportedOperationException();
}
/**
* Start load generator.
*/
public static void main(final String[] args) { public static void main(final String[] args) {
final boolean sendRegistry = Boolean.parseBoolean(Objects.requireNonNullElse(
System.getenv("SEND_REGISTRY"),
"true"));
final String kafkaBootstrapServers = Objects.requireNonNullElse(
System.getenv("KAFKA_BOOTSTRAP_SERVERS"),
"localhost:9092");
final int numSensors = Integer.parseInt(Objects.requireNonNullElse(
System.getenv("NUM_SENSORS"),
"1"));
final int numNestedGroups = Integer.parseInt(Objects.requireNonNullElse(
System.getenv("NUM_NESTED_GROUPS"),
"1"));
// Build sensor hierarchy
final SensorRegistry sensorRegistry =
new SensorRegistryBuilder(numNestedGroups, numSensors).build();
LOGGER.info("Start workload generator for use case UC4"); LOGGER.info("Start workload generator for use case UC4");
theodolite.commons.workloadgeneration.LoadGenerator.fromEnvironment().run(); theodolite.commons.workloadgeneration.LoadGenerator.fromEnvironment()
.withKeySpace(new KeySpace("s_", sensorRegistry.getMachineSensors().size()))
.withBeforeAction(() -> {
if (sendRegistry) {
final ConfigPublisher configPublisher =
new ConfigPublisher(kafkaBootstrapServers, "configuration");
configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson());
configPublisher.close();
LOGGER.info("Configuration sent.");
LOGGER.info("Now wait 30 seconds...");
try {
Thread.sleep(SLEEP_PERIOD);
} catch (final InterruptedException e) {
LOGGER.error(e.getMessage(), e);
}
LOGGER.info("...and start generating load.");
}
})
.run();
} }
} }
package theodolite.uc2.workloadgenerator; package theodolite.uc4.workloadgenerator;
import titan.ccp.model.sensorregistry.MutableAggregatedSensor; import titan.ccp.model.sensorregistry.MutableAggregatedSensor;
import titan.ccp.model.sensorregistry.MutableSensorRegistry; import titan.ccp.model.sensorregistry.MutableSensorRegistry;
......
package theodolite.uc2.workloadgenerator; package theodolite.uc4.workloadgenerator;
import java.util.Collection; import java.util.Collection;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment