Skip to content
Snippets Groups Projects
Commit cb59dae2 authored by Björn Vonheiden's avatar Björn Vonheiden
Browse files

Use avro records instead of kieker in uc2-application

Replace the kieker records with the avro records in uc2 application.
parent 3c289fab
No related branches found
No related tags found
2 merge requests!28Use Titan CC Avro Records in UC App and Workload Generator,!13Migrate to new Titan CC records
Showing with 34 additions and 29 deletions
......@@ -5,7 +5,7 @@ import java.util.concurrent.CompletableFuture;
import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.KafkaStreams;
import theodolite.uc2.streamprocessing.Uc2KafkaStreamsBuilder;
import titan.ccp.common.configuration.Configurations;
import titan.ccp.common.configuration.ServiceConfigurations;
/**
* A microservice that manages the history and, therefore, stores and aggregates incoming
......@@ -14,7 +14,7 @@ import titan.ccp.common.configuration.Configurations;
*/
public class AggregationService {
private final Configuration config = Configurations.create();
private final Configuration config = ServiceConfigurations.createWithDefaults();
private final CompletableFuture<Void> stopEvent = new CompletableFuture<>();
......@@ -47,6 +47,7 @@ public class AggregationService {
// Configuration of the stream application
final KafkaStreams kafkaStreams = uc2KafkaStreamsBuilder
.bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS))
.schemaRegistry(this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL))
.numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS))
.commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS))
.cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING))
......
......@@ -23,6 +23,8 @@ public final class ConfigurationKeys {
public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering";
public static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
private ConfigurationKeys() {}
}
......@@ -9,7 +9,7 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import titan.ccp.models.records.ActivePowerRecord;
import titan.ccp.model.records.ActivePowerRecord;
/**
* Transforms the join result of an {@link ActivePowerRecord} and the corresponding sensor parents
......
......@@ -8,7 +8,7 @@ import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import titan.ccp.models.records.ActivePowerRecord;
import titan.ccp.model.records.ActivePowerRecord;
/**
* Factory class configuration required by {@link JointFlatTransformerFactory}.
......
package theodolite.uc2.streamprocessing;
import java.util.Set;
import titan.ccp.models.records.ActivePowerRecord;
import titan.ccp.model.records.ActivePowerRecord;
/**
* A joined pair of an {@link ActivePowerRecord} and its associated parents. Both the record and the
......
package theodolite.uc2.streamprocessing;
import org.apache.kafka.streams.kstream.Windowed;
import titan.ccp.models.records.ActivePowerRecord;
import titan.ccp.models.records.AggregatedActivePowerRecord;
import titan.ccp.model.records.ActivePowerRecord;
import titan.ccp.model.records.AggregatedActivePowerRecord;
/**
* Updates an {@link AggregatedActivePowerRecord} by a new {@link ActivePowerRecord}.
......@@ -19,7 +19,7 @@ public class RecordAggregator {
final double average = count == 0 ? 0.0 : sum / count;
return new AggregatedActivePowerRecord(
identifier.key(), record.getTimestamp(),
0.0, 0.0, count, sum, average);
count, sum, average);
}
/**
......@@ -32,8 +32,7 @@ public class RecordAggregator {
final double average = count == 0 ? 0.0 : sum / count;
return new AggregatedActivePowerRecord(
// TODO timestamp -1 indicates that this record is emitted by an substract event
identifier.key(), -1,
0.0, 0.0, count, sum, average);
identifier.key(), -1L, count, sum, average);
}
}
......@@ -18,40 +18,44 @@ import org.apache.kafka.streams.kstream.Suppressed.BufferConfig;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.WindowedSerdes;
import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
import titan.ccp.configuration.events.Event;
import titan.ccp.configuration.events.EventSerde;
import titan.ccp.model.records.ActivePowerRecord;
import titan.ccp.model.records.AggregatedActivePowerRecord;
import titan.ccp.model.sensorregistry.SensorRegistry;
import titan.ccp.models.records.ActivePowerRecord;
import titan.ccp.models.records.ActivePowerRecordFactory;
import titan.ccp.models.records.AggregatedActivePowerRecord;
import titan.ccp.models.records.AggregatedActivePowerRecordFactory;
/**
* Builds Kafka Stream Topology for the History microservice.
*/
public class TopologyBuilder {
private static final int LATENCY_OUTPOUT_THRESHOLD = 1000;
// private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class);
private final String inputTopic;
private final String outputTopic;
private final String configurationTopic;
private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory;
private final Duration windowSize;
private final Duration gracePeriod;
private final StreamsBuilder builder = new StreamsBuilder();
private final RecordAggregator recordAggregator = new RecordAggregator();
private StatsAccumulator latencyStats = new StatsAccumulator();
private long lastTime = System.currentTimeMillis();
/**
* Create a new {@link TopologyBuilder} using the given topics.
*/
public TopologyBuilder(final String inputTopic, final String outputTopic,
final String configurationTopic, final Duration windowSize, final Duration gracePeriod) {
final String configurationTopic, final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory,
final Duration windowSize, final Duration gracePeriod) {
this.inputTopic = inputTopic;
this.outputTopic = outputTopic;
this.configurationTopic = configurationTopic;
this.srAvroSerdeFactory = srAvroSerdeFactory;
this.windowSize = windowSize;
this.gracePeriod = gracePeriod;
}
......@@ -84,11 +88,11 @@ public class TopologyBuilder {
final KStream<String, ActivePowerRecord> values = this.builder
.stream(this.inputTopic, Consumed.with(
Serdes.String(),
IMonitoringRecordSerde.serde(new ActivePowerRecordFactory())));
this.srAvroSerdeFactory.forValues()));
final KStream<String, ActivePowerRecord> aggregationsInput = this.builder
.stream(this.outputTopic, Consumed.with(
Serdes.String(),
IMonitoringRecordSerde.serde(new AggregatedActivePowerRecordFactory())))
this.srAvroSerdeFactory.<AggregatedActivePowerRecord>forValues()))
.mapValues(r -> new ActivePowerRecord(r.getIdentifier(), r.getTimestamp(), r.getSumInW()));
final KTable<String, ActivePowerRecord> inputTable = values
......@@ -96,9 +100,9 @@ public class TopologyBuilder {
.mapValues((k, v) -> new ActivePowerRecord(v.getIdentifier(), System.currentTimeMillis(),
v.getValueInW()))
.groupByKey(Grouped.with(Serdes.String(),
IMonitoringRecordSerde.serde(new ActivePowerRecordFactory())))
this.srAvroSerdeFactory.forValues()))
.reduce((aggr, value) -> value, Materialized.with(Serdes.String(),
IMonitoringRecordSerde.serde(new ActivePowerRecordFactory())));
this.srAvroSerdeFactory.forValues()));
return inputTable;
}
......@@ -140,13 +144,13 @@ public class TopologyBuilder {
jointFlatMapTransformerFactory.getStoreName())
.groupByKey(Grouped.with(
SensorParentKeySerde.serde(),
IMonitoringRecordSerde.serde(new ActivePowerRecordFactory())))
this.srAvroSerdeFactory.forValues()))
.windowedBy(TimeWindows.of(this.windowSize).grace(this.gracePeriod))
.reduce(
// TODO Configurable window aggregation function
(aggValue, newValue) -> newValue,
Materialized.with(SensorParentKeySerde.serde(),
IMonitoringRecordSerde.serde(new ActivePowerRecordFactory())));
this.srAvroSerdeFactory.forValues()));
}
......@@ -159,14 +163,14 @@ public class TopologyBuilder {
new WindowedSerdes.TimeWindowedSerde<>(
Serdes.String(),
this.windowSize.toMillis()),
IMonitoringRecordSerde.serde(new ActivePowerRecordFactory())))
this.srAvroSerdeFactory.forValues()))
.aggregate(
() -> null, this.recordAggregator::add, this.recordAggregator::substract,
Materialized.with(
new WindowedSerdes.TimeWindowedSerde<>(
Serdes.String(),
this.windowSize.toMillis()),
IMonitoringRecordSerde.serde(new AggregatedActivePowerRecordFactory())))
this.srAvroSerdeFactory.forValues()))
.suppress(Suppressed.untilTimeLimit(this.windowSize, BufferConfig.unbounded()))
// .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
.toStream()
......@@ -175,16 +179,13 @@ public class TopologyBuilder {
.map((k, v) -> KeyValue.pair(k.key(), v)); // TODO compute Timestamp
}
private StatsAccumulator latencyStats = new StatsAccumulator();
private long lastTime = System.currentTimeMillis();
private void exposeOutputStream(final KStream<String, AggregatedActivePowerRecord> aggregations) {
aggregations
.peek((k, v) -> {
final long time = System.currentTimeMillis();
final long latency = time - v.getTimestamp();
this.latencyStats.add(latency);
if (time - this.lastTime >= 1000) {
if (time - this.lastTime >= LATENCY_OUTPOUT_THRESHOLD) {
System.out.println("latency,"
+ time + ','
+ this.latencyStats.mean() + ','
......@@ -205,6 +206,6 @@ public class TopologyBuilder {
})
.to(this.outputTopic, Produced.with(
Serdes.String(),
IMonitoringRecordSerde.serde(new AggregatedActivePowerRecordFactory())));
this.srAvroSerdeFactory.forValues()));
}
}
......@@ -4,6 +4,7 @@ import java.time.Duration;
import java.util.Objects;
import org.apache.kafka.streams.Topology;
import theodolite.commons.kafkastreams.KafkaStreamsBuilder;
import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
/**
* Builder for the Kafka Streams configuration.
......@@ -54,6 +55,7 @@ public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD build
this.inputTopic,
this.outputTopic,
this.configurationTopic,
new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl),
this.windowSize == null ? WINDOW_SIZE_DEFAULT : this.windowSize,
this.gracePeriod == null ? GRACE_PERIOD_DEFAULT : this.gracePeriod);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment