Skip to content
Snippets Groups Projects
Commit 8bd84de4 authored by Sören Henning's avatar Sören Henning
Browse files

Remove need for schema registry

parent cfd501ab
No related branches found
No related tags found
No related merge requests found
Pipeline #374 passed
...@@ -36,7 +36,6 @@ public class HistoryService { ...@@ -36,7 +36,6 @@ public class HistoryService {
private void createKafkaStreamsApplication() { private void createKafkaStreamsApplication() {
final KafkaStreams kafkaStreams = new KafkaStreamsBuilder() final KafkaStreams kafkaStreams = new KafkaStreamsBuilder()
.bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS))
.schemaRegistry(this.schemaRegistry)
.inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC))
.outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC))
.numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS))
......
...@@ -22,7 +22,6 @@ public class KafkaStreamsBuilder { ...@@ -22,7 +22,6 @@ public class KafkaStreamsBuilder {
private String inputTopic; // NOPMD private String inputTopic; // NOPMD
private String outputTopic; // NOPMD private String outputTopic; // NOPMD
private Duration windowDuration; // NOPMD private Duration windowDuration; // NOPMD
private String schemaRegistryUrl; // NOPMD
private int numThreads = -1; // NOPMD private int numThreads = -1; // NOPMD
private int commitIntervalMs = -1; // NOPMD private int commitIntervalMs = -1; // NOPMD
private int cacheMaxBytesBuff = -1; // NOPMD private int cacheMaxBytesBuff = -1; // NOPMD
...@@ -37,11 +36,6 @@ public class KafkaStreamsBuilder { ...@@ -37,11 +36,6 @@ public class KafkaStreamsBuilder {
return this; return this;
} }
public KafkaStreamsBuilder schemaRegistry(final String url) {
this.schemaRegistryUrl = url;
return this;
}
public KafkaStreamsBuilder outputTopic(final String outputTopic) { public KafkaStreamsBuilder outputTopic(final String outputTopic) {
this.outputTopic = outputTopic; this.outputTopic = outputTopic;
return this; return this;
...@@ -92,7 +86,7 @@ public class KafkaStreamsBuilder { ...@@ -92,7 +86,7 @@ public class KafkaStreamsBuilder {
Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
// TODO log parameters // TODO log parameters
final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic, final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic,
this.windowDuration, this.schemaRegistryUrl); this.windowDuration);
final Properties properties = PropertiesBuilder.bootstrapServers(this.bootstrapServers) final Properties properties = PropertiesBuilder.bootstrapServers(this.bootstrapServers)
.applicationId(APPLICATION_NAME + '-' + APPLICATION_VERSION) // TODO as parameter .applicationId(APPLICATION_NAME + '-' + APPLICATION_VERSION) // TODO as parameter
.set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0) .set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0)
......
package uc4.streamprocessing;
import com.google.common.math.Stats;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.common.serialization.Serde;
import titan.ccp.common.kafka.GenericSerde;
import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
import titan.ccp.model.records.ActivePowerRecord;
import titan.ccp.model.records.AggregatedActivePowerRecord;
import titan.ccp.model.records.WindowedActivePowerRecord;
final class Serdes {
private final SchemaRegistryAvroSerdeFactory avroSerdeFactory;
public Serdes(final String schemaRegistryUrl) {
this.avroSerdeFactory = new SchemaRegistryAvroSerdeFactory(schemaRegistryUrl);
}
public Serde<String> string() {
return org.apache.kafka.common.serialization.Serdes.String();
}
public Serde<WindowedActivePowerRecord> windowedActivePowerValues() {
return this.avroSerdeFactory.forKeys();
}
public Serde<ActivePowerRecord> activePowerRecordValues() {
return this.avroSerdeFactory.forValues();
}
public Serde<AggregatedActivePowerRecord> aggregatedActivePowerRecordValues() {
return this.avroSerdeFactory.forValues();
}
public <T extends SpecificRecord> Serde<T> avroValues() {
return this.avroSerdeFactory.forValues();
}
public Serde<Stats> stats() {
return GenericSerde.from(Stats::toByteArray, Stats::fromByteArray);
}
}
...@@ -6,6 +6,7 @@ import java.time.Instant; ...@@ -6,6 +6,7 @@ import java.time.Instant;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneId; import java.time.ZoneId;
import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.Topology;
...@@ -16,8 +17,8 @@ import org.apache.kafka.streams.kstream.Produced; ...@@ -16,8 +17,8 @@ import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.TimeWindows;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import titan.ccp.common.kafka.GenericSerde;
import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
import titan.ccp.model.records.ActivePowerRecord;
import titan.ccp.model.records.HourOfDayActivePowerRecord; import titan.ccp.model.records.HourOfDayActivePowerRecord;
import titan.ccp.models.records.ActivePowerRecordFactory; import titan.ccp.models.records.ActivePowerRecordFactory;
import uc4.streamprocessing.util.StatsFactory; import uc4.streamprocessing.util.StatsFactory;
...@@ -34,7 +35,6 @@ public class TopologyBuilder { ...@@ -34,7 +35,6 @@ public class TopologyBuilder {
private final String inputTopic; private final String inputTopic;
private final String outputTopic; private final String outputTopic;
private final Serdes serdes;
private final StreamsBuilder builder = new StreamsBuilder(); private final StreamsBuilder builder = new StreamsBuilder();
...@@ -42,10 +42,9 @@ public class TopologyBuilder { ...@@ -42,10 +42,9 @@ public class TopologyBuilder {
* Create a new {@link TopologyBuilder} using the given topics. * Create a new {@link TopologyBuilder} using the given topics.
*/ */
public TopologyBuilder(final String inputTopic, final String outputTopic, public TopologyBuilder(final String inputTopic, final String outputTopic,
final Duration duration, final String schemaRegistryUrl) { final Duration duration) {
this.inputTopic = inputTopic; this.inputTopic = inputTopic;
this.outputTopic = outputTopic; this.outputTopic = outputTopic;
this.serdes = new Serdes(schemaRegistryUrl);
} }
/** /**
...@@ -59,27 +58,28 @@ public class TopologyBuilder { ...@@ -59,27 +58,28 @@ public class TopologyBuilder {
new HourOfDayRecordFactory(); new HourOfDayRecordFactory();
final TimeWindows timeWindows = final TimeWindows timeWindows =
TimeWindows.of(Duration.ofDays(30)).advanceBy(Duration.ofDays(1)); TimeWindows.of(Duration.ofDays(30)).advanceBy(Duration.ofDays(1));
final String statsTopic = "output";
this.builder this.builder
.stream(this.inputTopic, .stream(this.inputTopic,
Consumed.with(this.serdes.string(), Consumed.with(Serdes.String(),
IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) IMonitoringRecordSerde.serde(new ActivePowerRecordFactory())))
.mapValues(kieker -> new ActivePowerRecord( // .mapValues(kieker -> new ActivePowerRecord(
kieker.getIdentifier(), // kieker.getIdentifier(),
kieker.getTimestamp(), // kieker.getTimestamp(),
kieker.getValueInW())) // kieker.getValueInW()))
.selectKey((key, value) -> { .selectKey((key, value) -> {
final Instant instant = Instant.ofEpochMilli(value.getTimestamp()); final Instant instant = Instant.ofEpochMilli(value.getTimestamp());
final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone); final LocalDateTime dateTime = LocalDateTime.ofInstant(instant, this.zone);
return keyFactory.createKey(value.getIdentifier(), dateTime); return keyFactory.createKey(value.getIdentifier(), dateTime);
}) })
.groupByKey(Grouped.with(keySerde, this.serdes.activePowerRecordValues())) .groupByKey(
Grouped.with(keySerde, IMonitoringRecordSerde.serde(new ActivePowerRecordFactory())))
.windowedBy(timeWindows) .windowedBy(timeWindows)
.aggregate( .aggregate(
() -> Stats.of(), () -> Stats.of(),
(k, record, stats) -> StatsFactory.accumulate(stats, record.getValueInW()), (k, record, stats) -> StatsFactory.accumulate(stats, record.getValueInW()),
Materialized.with(keySerde, this.serdes.stats())) Materialized.with(keySerde,
GenericSerde.from(Stats::toByteArray, Stats::fromByteArray)))
.toStream() .toStream()
.map((key, stats) -> KeyValue.pair( .map((key, stats) -> KeyValue.pair(
keyFactory.getSensorId(key.key()), keyFactory.getSensorId(key.key()),
...@@ -88,10 +88,10 @@ public class TopologyBuilder { ...@@ -88,10 +88,10 @@ public class TopologyBuilder {
// statsRecordFactory.create(key, value))) // statsRecordFactory.create(key, value)))
// .peek((k, v) -> LOGGER.info("{}: {}", k, v)) // TODO Temp logging // .peek((k, v) -> LOGGER.info("{}: {}", k, v)) // TODO Temp logging
.to( .to(
statsTopic, this.outputTopic,
Produced.with( Produced.with(
this.serdes.string(), Serdes.String(),
this.serdes.string())); Serdes.String()));
// this.serdes.avroValues())); // this.serdes.avroValues()));
return this.builder.build(); return this.builder.build();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment