Skip to content
Snippets Groups Projects
Commit abe9abcf authored by Benedikt Wetzel's avatar Benedikt Wetzel
Browse files

add src folder

parent 6ea94977
No related branches found
No related tags found
1 merge request!1Add Implementations of Use Cases
package uc3.application;
/**
* Keys to access configuration parameters.
*/
public final class ConfigurationKeys {
public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
public static final String NUM_THREADS = "num.threads";
public static final String COMMIT_INTERVAL_MS = "commit.interval.ms";
public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering";
public static final String KAFKA_WINDOW_DURATION_MINUTES = "kafka.window.duration.minutes";
private ConfigurationKeys() {
}
}
package uc3.application;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.KafkaStreams;
import titan.ccp.common.configuration.Configurations;
import uc3.streamprocessing.KafkaStreamsBuilder;
/**
* A microservice that manages the history and, therefore, stores and aggregates
* incoming measurements.
*
*/
public class HistoryService {
private final Configuration config = Configurations.create();
private final CompletableFuture<Void> stopEvent = new CompletableFuture<>();
/**
* Start the service.
*
* @return {@link CompletableFuture} which is completed when the service is
* successfully started.
*/
public void run() {
this.createKafkaStreamsApplication();
}
/**
* Build and start the underlying Kafka Streams application of the service.
*
*/
private void createKafkaStreamsApplication() {
final KafkaStreams kafkaStreams = new KafkaStreamsBuilder()
.bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS))
.inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC))
.outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC))
.windowDuration(Duration.ofMinutes(this.config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES)))
.numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS))
.commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS))
.cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)).build();
this.stopEvent.thenRun(kafkaStreams::close);
kafkaStreams.start();
}
public static void main(final String[] args) {
new HistoryService().run();
}
}
package uc3.streamprocessing;
import java.time.Duration;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import titan.ccp.common.kafka.streams.PropertiesBuilder;
/**
* Builder for the Kafka Streams configuration.
*/
public class KafkaStreamsBuilder {
private static final String APPLICATION_NAME = "titan-ccp-history";
private static final String APPLICATION_VERSION = "0.0.1";
// private static final Logger LOGGER =
// LoggerFactory.getLogger(KafkaStreamsBuilder.class);
private String bootstrapServers; // NOPMD
private String inputTopic; // NOPMD
private String outputTopic; // NOPMD
private Duration windowDuration; // NOPMD
private int numThreads = -1; // NOPMD
private int commitIntervalMs = -1; // NOPMD
private int cacheMaxBytesBuff = -1; // NOPMD
public KafkaStreamsBuilder inputTopic(final String inputTopic) {
this.inputTopic = inputTopic;
return this;
}
public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
return this;
}
public KafkaStreamsBuilder outputTopic(final String outputTopic) {
this.outputTopic = outputTopic;
return this;
}
public KafkaStreamsBuilder windowDuration(final Duration windowDuration) {
this.windowDuration = windowDuration;
return this;
}
/**
* Sets the Kafka Streams property for the number of threads
* (num.stream.threads). Can be minus one for using the default.
*/
public KafkaStreamsBuilder numThreads(final int numThreads) {
if (numThreads < -1 || numThreads == 0) {
throw new IllegalArgumentException("Number of threads must be greater 0 or -1.");
}
this.numThreads = numThreads;
return this;
}
/**
* Sets the Kafka Streams property for the frequency with which to save the
* position (offsets in source topics) of tasks (commit.interval.ms). Must be
* zero for processing all record, for example, when processing bulks of
* records. Can be minus one for using the default.
*/
public KafkaStreamsBuilder commitIntervalMs(final int commitIntervalMs) {
if (commitIntervalMs < -1) {
throw new IllegalArgumentException("Commit interval must be greater or equal -1.");
}
this.commitIntervalMs = commitIntervalMs;
return this;
}
/**
* Sets the Kafka Streams property for maximum number of memory bytes to be used
* for record caches across all threads (cache.max.bytes.buffering). Must be
* zero for processing all record, for example, when processing bulks of
* records. Can be minus one for using the default.
*/
public KafkaStreamsBuilder cacheMaxBytesBuffering(final int cacheMaxBytesBuffering) {
if (cacheMaxBytesBuffering < -1) {
throw new IllegalArgumentException("Cache max bytes buffering must be greater or equal -1.");
}
this.cacheMaxBytesBuff = cacheMaxBytesBuffering;
return this;
}
/**
* Builds the {@link KafkaStreams} instance.
*/
public KafkaStreams build() {
Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
// TODO log parameters
final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic,
this.windowDuration);
final Properties properties = PropertiesBuilder.bootstrapServers(this.bootstrapServers)
.applicationId(APPLICATION_NAME + '-' + APPLICATION_VERSION) // TODO as parameter
.set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0)
.set(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs, p -> p >= 0)
.set(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuff, p -> p >= 0)
.set(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG").build();
return new KafkaStreams(topologyBuilder.build(), properties);
}
}
package uc3.streamprocessing;
import com.google.gson.Gson;
import java.time.Duration;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
import titan.ccp.models.records.ActivePowerRecordFactory;
/**
* Builds Kafka Stream Topology for the History microservice.
*/
public class TopologyBuilder {
private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class);
private final String inputTopic;
private final String outputTopic;
private final Duration duration;
private final Gson gson;
private final StreamsBuilder builder = new StreamsBuilder();
/**
* Create a new {@link TopologyBuilder} using the given topics.
*/
public TopologyBuilder(final String inputTopic, final String outputTopic, final Duration duration) {
this.inputTopic = inputTopic;
this.outputTopic = outputTopic;
this.duration = duration;
this.gson = new Gson();
}
/**
* Build the {@link Topology} for the History microservice.
*/
public Topology build() {
this.builder
.stream(this.inputTopic,
Consumed.with(Serdes.String(), IMonitoringRecordSerde.serde(new ActivePowerRecordFactory())))
.groupByKey().windowedBy(TimeWindows.of(this.duration))
.aggregate(() -> 0.0, (key, activePowerRecord, agg) -> agg + activePowerRecord.getValueInW(),
Materialized.with(Serdes.String(), Serdes.Double()))
.toStream().peek((k, v) -> System.out.printf("key %s, value %f \n", k, v)).to(this.outputTopic);
return this.builder.build();
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment