Skip to content
Snippets Groups Projects
Commit b0218c46 authored by Björn Vonheiden's avatar Björn Vonheiden
Browse files

Use avro records instead of kieker in uc3-application

Replace the kieker records with the avro records in uc3 application.
parent 7a881fcb
No related branches found
No related tags found
2 merge requests!28Use Titan CC Avro Records in UC App and Workload Generator,!13Migrate to new Titan CC records
...@@ -31,6 +31,9 @@ public final class ConfigurationKeys { ...@@ -31,6 +31,9 @@ public final class ConfigurationKeys {
public static final String WINDOW_GRACE_MS = "window.grace.ms"; public static final String WINDOW_GRACE_MS = "window.grace.ms";
// UC3
public static final String KAFKA_WINDOW_DURATION_MINUTES = "kafka.window.duration.minutes";
// UC4 // UC4
public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days"; public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days";
......
...@@ -5,8 +5,12 @@ configuration.kafka.topic=configuration ...@@ -5,8 +5,12 @@ configuration.kafka.topic=configuration
kafka.bootstrap.servers=localhost:9092 kafka.bootstrap.servers=localhost:9092
kafka.input.topic=input kafka.input.topic=input
kafka.output.topic=output kafka.output.topic=output
schema.registry.url=http://localhost:8091
window.size.ms=1000 window.size.ms=1000
window.grace.ms=0 window.grace.ms=0
num.threads=1 num.threads=1
commit.interval.ms=100 commit.interval.ms=100
cache.max.bytes.buffering=-1 cache.max.bytes.buffering=-1
...@@ -7,7 +7,7 @@ import org.apache.commons.configuration2.Configuration; ...@@ -7,7 +7,7 @@ import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams;
import theodolite.commons.kafkastreams.ConfigurationKeys; import theodolite.commons.kafkastreams.ConfigurationKeys;
import theodolite.uc3.streamprocessing.Uc3KafkaStreamsBuilder; import theodolite.uc3.streamprocessing.Uc3KafkaStreamsBuilder;
import titan.ccp.common.configuration.Configurations; import titan.ccp.common.configuration.ServiceConfigurations;
/** /**
* A microservice that manages the history and, therefore, stores and aggregates incoming * A microservice that manages the history and, therefore, stores and aggregates incoming
...@@ -16,7 +16,7 @@ import titan.ccp.common.configuration.Configurations; ...@@ -16,7 +16,7 @@ import titan.ccp.common.configuration.Configurations;
*/ */
public class HistoryService { public class HistoryService {
private final Configuration config = Configurations.create(); private final Configuration config = ServiceConfigurations.createWithDefaults();
private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); private final CompletableFuture<Void> stopEvent = new CompletableFuture<>();
private final int windowDurationMinutes = Integer private final int windowDurationMinutes = Integer
......
...@@ -14,8 +14,8 @@ import org.slf4j.Logger; ...@@ -14,8 +14,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import theodolite.uc3.streamprocessing.util.StatsFactory; import theodolite.uc3.streamprocessing.util.StatsFactory;
import titan.ccp.common.kafka.GenericSerde; import titan.ccp.common.kafka.GenericSerde;
import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
import titan.ccp.models.records.ActivePowerRecordFactory; import titan.ccp.model.records.ActivePowerRecord;
/** /**
* Builds Kafka Stream Topology for the History microservice. * Builds Kafka Stream Topology for the History microservice.
...@@ -26,6 +26,7 @@ public class TopologyBuilder { ...@@ -26,6 +26,7 @@ public class TopologyBuilder {
private final String inputTopic; private final String inputTopic;
private final String outputTopic; private final String outputTopic;
private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory;
private final Duration duration; private final Duration duration;
private final StreamsBuilder builder = new StreamsBuilder(); private final StreamsBuilder builder = new StreamsBuilder();
...@@ -34,9 +35,11 @@ public class TopologyBuilder { ...@@ -34,9 +35,11 @@ public class TopologyBuilder {
* Create a new {@link TopologyBuilder} using the given topics. * Create a new {@link TopologyBuilder} using the given topics.
*/ */
public TopologyBuilder(final String inputTopic, final String outputTopic, public TopologyBuilder(final String inputTopic, final String outputTopic,
final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory,
final Duration duration) { final Duration duration) {
this.inputTopic = inputTopic; this.inputTopic = inputTopic;
this.outputTopic = outputTopic; this.outputTopic = outputTopic;
this.srAvroSerdeFactory = srAvroSerdeFactory;
this.duration = duration; this.duration = duration;
} }
...@@ -47,7 +50,7 @@ public class TopologyBuilder { ...@@ -47,7 +50,7 @@ public class TopologyBuilder {
this.builder this.builder
.stream(this.inputTopic, .stream(this.inputTopic,
Consumed.with(Serdes.String(), Consumed.with(Serdes.String(),
IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) this.srAvroSerdeFactory.<ActivePowerRecord>forKeys()))
.groupByKey() .groupByKey()
.windowedBy(TimeWindows.of(this.duration)) .windowedBy(TimeWindows.of(this.duration))
// .aggregate( // .aggregate(
...@@ -62,7 +65,7 @@ public class TopologyBuilder { ...@@ -62,7 +65,7 @@ public class TopologyBuilder {
GenericSerde.from(Stats::toByteArray, Stats::fromByteArray))) GenericSerde.from(Stats::toByteArray, Stats::fromByteArray)))
.toStream() .toStream()
.map((k, s) -> KeyValue.pair(k.key(), s.toString())) .map((k, s) -> KeyValue.pair(k.key(), s.toString()))
.peek((k, v) -> System.out.println(k + ": " + v)) .peek((k, v) -> LOGGER.info(k + ": " + v))
.to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String())); .to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String()));
return this.builder.build(); return this.builder.build();
......
...@@ -4,6 +4,7 @@ import java.time.Duration; ...@@ -4,6 +4,7 @@ import java.time.Duration;
import java.util.Objects; import java.util.Objects;
import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.Topology;
import theodolite.commons.kafkastreams.KafkaStreamsBuilder; import theodolite.commons.kafkastreams.KafkaStreamsBuilder;
import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
/** /**
* Builder for the Kafka Streams configuration. * Builder for the Kafka Streams configuration.
...@@ -36,7 +37,7 @@ public class Uc3KafkaStreamsBuilder extends KafkaStreamsBuilder { ...@@ -36,7 +37,7 @@ public class Uc3KafkaStreamsBuilder extends KafkaStreamsBuilder {
Objects.requireNonNull(this.windowDuration, "Window duration has not been set."); Objects.requireNonNull(this.windowDuration, "Window duration has not been set.");
final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic, final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic,
this.windowDuration); new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl), this.windowDuration);
return topologyBuilder.build(); return topologyBuilder.build();
} }
......
kafka.bootstrap.servers=localhost:9092 kafka.bootstrap.servers=localhost:9092
kafka.input.topic=input kafka.input.topic=input
kafka.output.topic=output kafka.output.topic=output
kafka.window.duration.minutes=1
schema.registry.url=http://localhost:8091
num.threads=1 num.threads=1
commit.interval.ms=100 commit.interval.ms=100
cache.max.bytes.buffering=-1 cache.max.bytes.buffering=-1
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment