diff --git a/uc1-application/src/main/java/uc1/streamprocessing/KafkaStreamsBuilder.java b/uc1-application/src/main/java/uc1/streamprocessing/KafkaStreamsBuilder.java new file mode 100644 index 0000000000000000000000000000000000000000..706cf79022b2485b349bfe7ae144145dda013d20 --- /dev/null +++ b/uc1-application/src/main/java/uc1/streamprocessing/KafkaStreamsBuilder.java @@ -0,0 +1,92 @@ +package uc1.streamprocessing; + +import java.util.Objects; +import java.util.Properties; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; +import titan.ccp.common.kafka.streams.PropertiesBuilder; + +/** + * Builder for the Kafka Streams configuration. + */ +public class KafkaStreamsBuilder { + + private static final String APPLICATION_NAME = "titan-ccp-history"; + private static final String APPLICATION_VERSION = "0.0.1"; + + // private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStreamsBuilder.class); + + private String bootstrapServers; // NOPMD + private String inputTopic; // NOPMD + private int numThreads = -1; // NOPMD + private int commitIntervalMs = -1; // NOPMD + private int cacheMaxBytesBuff = -1; // NOPMD + + public KafkaStreamsBuilder inputTopic(final String inputTopic) { + this.inputTopic = inputTopic; + return this; + } + + public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) { + this.bootstrapServers = bootstrapServers; + return this; + } + + /** + * Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus + * one for using the default. + */ + public KafkaStreamsBuilder numThreads(final int numThreads) { + if (numThreads < -1 || numThreads == 0) { + throw new IllegalArgumentException("Number of threads must be greater 0 or -1."); + } + this.numThreads = numThreads; + return this; + } + + /** + * Sets the Kafka Streams property for the frequency with which to save the position (offsets in + * source topics) of tasks (commit.interval.ms). Must be zero for processing all record, for + * example, when processing bulks of records. Can be minus one for using the default. + */ + public KafkaStreamsBuilder commitIntervalMs(final int commitIntervalMs) { + if (commitIntervalMs < -1) { + throw new IllegalArgumentException("Commit interval must be greater or equal -1."); + } + this.commitIntervalMs = commitIntervalMs; + return this; + } + + /** + * Sets the Kafka Streams property for maximum number of memory bytes to be used for record caches + * across all threads (cache.max.bytes.buffering). Must be zero for processing all record, for + * example, when processing bulks of records. Can be minus one for using the default. + */ + public KafkaStreamsBuilder cacheMaxBytesBuffering(final int cacheMaxBytesBuffering) { + if (cacheMaxBytesBuffering < -1) { + throw new IllegalArgumentException("Cache max bytes buffering must be greater or equal -1."); + } + this.cacheMaxBytesBuff = cacheMaxBytesBuffering; + return this; + } + + /** + * Builds the {@link KafkaStreams} instance. + */ + public KafkaStreams build() { + Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); + // TODO log parameters + final TopologyBuilder topologyBuilder = new TopologyBuilder( + this.inputTopic); + final Properties properties = PropertiesBuilder + .bootstrapServers(this.bootstrapServers) + .applicationId(APPLICATION_NAME + '-' + APPLICATION_VERSION) // TODO as parameter + .set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0) + .set(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs, p -> p >= 0) + .set(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuff, p -> p >= 0) + .set(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG") + .build(); + return new KafkaStreams(topologyBuilder.build(), properties); + } + +} diff --git a/uc1-application/src/main/java/uc1/streamprocessing/TopologyBuilder.java b/uc1-application/src/main/java/uc1/streamprocessing/TopologyBuilder.java new file mode 100644 index 0000000000000000000000000000000000000000..140c592f4e33334fbced6a80b82173c00d19eb25 --- /dev/null +++ b/uc1-application/src/main/java/uc1/streamprocessing/TopologyBuilder.java @@ -0,0 +1,45 @@ +package uc1.streamprocessing; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.kstream.Consumed; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; +import titan.ccp.models.records.ActivePowerRecordFactory; + +/** + * Builds Kafka Stream Topology for the History microservice. + */ +public class TopologyBuilder { + + private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); + + private final String inputTopic; + + private final StreamsBuilder builder = new StreamsBuilder(); + + /** + * Create a new {@link TopologyBuilder} using the given topics. + */ + public TopologyBuilder(final String inputTopic) { + this.inputTopic = inputTopic; + } + + /** + * Build the {@link Topology} for the History microservice. + */ + public Topology build() { + + this.builder + .stream(this.inputTopic, Consumed.with( + Serdes.String(), + IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) + .mapValues(value -> value.getValueInW()) + .foreach((key, measurement) -> LOGGER + .info("Key: " + key + " Value: " + measurement)); + + return this.builder.build(); + } +}