Skip to content
Snippets Groups Projects
Commit c7bc7c30 authored by Sören Henning's avatar Sören Henning
Browse files

Clean up

parent d37ec932
No related branches found
No related tags found
No related merge requests found
Pipeline #379 canceled
......@@ -7,44 +7,42 @@ import titan.ccp.common.configuration.Configurations;
import uc1.streamprocessing.KafkaStreamsBuilder;
/**
* A microservice that manages the history and, therefore, stores and aggregates
* incoming measurements.
* A microservice that manages the history and, therefore, stores and aggregates incoming
* measurements.
*
*/
public class HistoryService {
private final Configuration config = Configurations.create();
private final CompletableFuture<Void> stopEvent = new CompletableFuture<>();
/**
* Start the service.
*
* @return {@link CompletableFuture} which is completed when the service is
* successfully started.
*/
public void run() {
this.createKafkaStreamsApplication();
}
/**
* Build and start the underlying Kafka Streams application of the service.
*
*/
private void createKafkaStreamsApplication() {
final KafkaStreams kafkaStreams = new KafkaStreamsBuilder()
.bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS))
.inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC))
.numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS))
.commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS))
.cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)).build();
this.stopEvent.thenRun(kafkaStreams::close);
kafkaStreams.start();
}
public static void main(final String[] args) {
new HistoryService().run();
}
private final Configuration config = Configurations.create();
private final CompletableFuture<Void> stopEvent = new CompletableFuture<>();
/**
* Start the service.
*/
public void run() {
this.createKafkaStreamsApplication();
}
/**
* Build and start the underlying Kafka Streams application of the service.
*
*/
private void createKafkaStreamsApplication() {
final KafkaStreams kafkaStreams = new KafkaStreamsBuilder()
.bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS))
.inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC))
.numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS))
.commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS))
.cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING))
.build();
this.stopEvent.thenRun(kafkaStreams::close);
kafkaStreams.start();
}
public static void main(final String[] args) {
new HistoryService().run();
}
}
......@@ -15,31 +15,33 @@ import titan.ccp.models.records.ActivePowerRecordFactory;
*/
public class TopologyBuilder {
private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class);
private final String inputTopic;
private final Gson gson;
private final StreamsBuilder builder = new StreamsBuilder();
/**
* Create a new {@link TopologyBuilder} using the given topics.
*/
public TopologyBuilder(final String inputTopic) {
this.inputTopic = inputTopic;
this.gson = new Gson();
}
/**
* Build the {@link Topology} for the History microservice.
*/
public Topology build() {
this.builder
.stream(this.inputTopic,
Consumed.with(Serdes.String(), IMonitoringRecordSerde.serde(new ActivePowerRecordFactory())))
.mapValues(v -> this.gson.toJson(v)).foreach((k, v) -> LOGGER.info("Key: " + k + " Value: " + v));
return this.builder.build();
}
private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class);
private final String inputTopic;
private final Gson gson;
private final StreamsBuilder builder = new StreamsBuilder();
/**
* Create a new {@link TopologyBuilder} using the given topics.
*/
public TopologyBuilder(final String inputTopic) {
this.inputTopic = inputTopic;
this.gson = new Gson();
}
/**
* Build the {@link Topology} for the History microservice.
*/
public Topology build() {
this.builder
.stream(this.inputTopic, Consumed.with(
Serdes.String(),
IMonitoringRecordSerde.serde(new ActivePowerRecordFactory())))
.mapValues(v -> this.gson.toJson(v))
.foreach((k, v) -> LOGGER.info("Key: " + k + " Value: " + v));
return this.builder.build();
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment