Skip to content
Snippets Groups Projects
Commit b78c1326 authored by Björn Vonheiden's avatar Björn Vonheiden
Browse files

Use avro records instead of kieker in uc4-application

Replace the kieker records with the avro records in uc4 application.
parent b0218c46
No related branches found
No related tags found
2 merge requests!28Use Titan CC Avro Records in UC App and Workload Generator,!13Migrate to new Titan CC records
This commit is part of merge request !28. Comments created here will be created in the context of that merge request.
...@@ -6,7 +6,7 @@ import org.apache.commons.configuration2.Configuration; ...@@ -6,7 +6,7 @@ import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams;
import theodolite.commons.kafkastreams.ConfigurationKeys; import theodolite.commons.kafkastreams.ConfigurationKeys;
import theodolite.uc4.streamprocessing.Uc4KafkaStreamsBuilder; import theodolite.uc4.streamprocessing.Uc4KafkaStreamsBuilder;
import titan.ccp.common.configuration.Configurations; import titan.ccp.common.configuration.ServiceConfigurations;
/** /**
* A microservice that manages the history and, therefore, stores and aggregates incoming * A microservice that manages the history and, therefore, stores and aggregates incoming
...@@ -15,7 +15,7 @@ import titan.ccp.common.configuration.Configurations; ...@@ -15,7 +15,7 @@ import titan.ccp.common.configuration.Configurations;
*/ */
public class HistoryService { public class HistoryService {
private final Configuration config = Configurations.create(); private final Configuration config = ServiceConfigurations.createWithDefaults();
private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); private final CompletableFuture<Void> stopEvent = new CompletableFuture<>();
...@@ -44,6 +44,7 @@ public class HistoryService { ...@@ -44,6 +44,7 @@ public class HistoryService {
// Configuration of the stream application // Configuration of the stream application
final KafkaStreams kafkaStreams = uc4KafkaStreamsBuilder final KafkaStreams kafkaStreams = uc4KafkaStreamsBuilder
.bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS))
.schemaRegistry(this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL))
.numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS))
.commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS))
.cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING))
......
...@@ -17,8 +17,8 @@ import org.apache.kafka.streams.kstream.Produced; ...@@ -17,8 +17,8 @@ import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.TimeWindows;
import theodolite.uc4.streamprocessing.util.StatsFactory; import theodolite.uc4.streamprocessing.util.StatsFactory;
import titan.ccp.common.kafka.GenericSerde; import titan.ccp.common.kafka.GenericSerde;
import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
import titan.ccp.models.records.ActivePowerRecordFactory; import titan.ccp.model.records.ActivePowerRecord;
/** /**
* Builds Kafka Stream Topology for the History microservice. * Builds Kafka Stream Topology for the History microservice.
...@@ -32,6 +32,7 @@ public class TopologyBuilder { ...@@ -32,6 +32,7 @@ public class TopologyBuilder {
private final String inputTopic; private final String inputTopic;
private final String outputTopic; private final String outputTopic;
private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory;
private final Duration aggregtionDuration; private final Duration aggregtionDuration;
private final Duration aggregationAdvance; private final Duration aggregationAdvance;
...@@ -41,9 +42,11 @@ public class TopologyBuilder { ...@@ -41,9 +42,11 @@ public class TopologyBuilder {
* Create a new {@link TopologyBuilder} using the given topics. * Create a new {@link TopologyBuilder} using the given topics.
*/ */
public TopologyBuilder(final String inputTopic, final String outputTopic, public TopologyBuilder(final String inputTopic, final String outputTopic,
final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory,
final Duration aggregtionDuration, final Duration aggregationAdvance) { final Duration aggregtionDuration, final Duration aggregationAdvance) {
this.inputTopic = inputTopic; this.inputTopic = inputTopic;
this.outputTopic = outputTopic; this.outputTopic = outputTopic;
this.srAvroSerdeFactory = srAvroSerdeFactory;
this.aggregtionDuration = aggregtionDuration; this.aggregtionDuration = aggregtionDuration;
this.aggregationAdvance = aggregationAdvance; this.aggregationAdvance = aggregationAdvance;
} }
...@@ -58,14 +61,14 @@ public class TopologyBuilder { ...@@ -58,14 +61,14 @@ public class TopologyBuilder {
this.builder this.builder
.stream(this.inputTopic, .stream(this.inputTopic,
Consumed.with(Serdes.String(), Consumed.with(Serdes.String(),
IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) this.srAvroSerdeFactory.<ActivePowerRecord>forValues()))
.selectKey((key, value) -> { .selectKey((key, value) -> {
final Instant instant = Instant.ofEpochMilli(value.getTimestamp()); final Instant instant = Instant.ofEpochMilli(value.getTimestamp());
final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone); final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone);
return keyFactory.createKey(value.getIdentifier(), dateTime); return keyFactory.createKey(value.getIdentifier(), dateTime);
}) })
.groupByKey( .groupByKey(
Grouped.with(keySerde, IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) Grouped.with(keySerde, this.srAvroSerdeFactory.forValues()))
.windowedBy(TimeWindows.of(this.aggregtionDuration).advanceBy(this.aggregationAdvance)) .windowedBy(TimeWindows.of(this.aggregtionDuration).advanceBy(this.aggregationAdvance))
.aggregate( .aggregate(
() -> Stats.of(), () -> Stats.of(),
......
...@@ -4,6 +4,7 @@ import java.time.Duration; ...@@ -4,6 +4,7 @@ import java.time.Duration;
import java.util.Objects; import java.util.Objects;
import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.Topology;
import theodolite.commons.kafkastreams.KafkaStreamsBuilder; import theodolite.commons.kafkastreams.KafkaStreamsBuilder;
import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
/** /**
* Builder for the Kafka Streams configuration. * Builder for the Kafka Streams configuration.
...@@ -45,6 +46,7 @@ public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder { ...@@ -45,6 +46,7 @@ public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder {
final TopologyBuilder topologyBuilder = new TopologyBuilder( final TopologyBuilder topologyBuilder = new TopologyBuilder(
this.inputTopic, this.inputTopic,
this.outputTopic, this.outputTopic,
new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl),
this.aggregtionDuration, this.aggregtionDuration,
this.aggregationAdvance); this.aggregationAdvance);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment