diff --git a/uc1-application/src/main/java/uc1/application/HistoryService.java b/uc1-application/src/main/java/uc1/application/HistoryService.java index 23b1e4b84877221ae80b5c406fffe07c0cabb90c..2c7504ad44cf19a513302f222b53ec69d572c54a 100644 --- a/uc1-application/src/main/java/uc1/application/HistoryService.java +++ b/uc1-application/src/main/java/uc1/application/HistoryService.java @@ -7,44 +7,42 @@ import titan.ccp.common.configuration.Configurations; import uc1.streamprocessing.KafkaStreamsBuilder; /** - * A microservice that manages the history and, therefore, stores and aggregates - * incoming measurements. + * A microservice that manages the history and, therefore, stores and aggregates incoming + * measurements. * */ public class HistoryService { - private final Configuration config = Configurations.create(); - - private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); - - /** - * Start the service. - * - * @return {@link CompletableFuture} which is completed when the service is - * successfully started. - */ - public void run() { - this.createKafkaStreamsApplication(); - } - - /** - * Build and start the underlying Kafka Streams application of the service. - * - */ - private void createKafkaStreamsApplication() { - - final KafkaStreams kafkaStreams = new KafkaStreamsBuilder() - .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) - .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) - .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) - .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) - .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)).build(); - this.stopEvent.thenRun(kafkaStreams::close); - kafkaStreams.start(); - } - - public static void main(final String[] args) { - new HistoryService().run(); - } + private final Configuration config = Configurations.create(); + + private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); + + /** + * Start the service. + */ + public void run() { + this.createKafkaStreamsApplication(); + } + + /** + * Build and start the underlying Kafka Streams application of the service. + * + */ + private void createKafkaStreamsApplication() { + + final KafkaStreams kafkaStreams = new KafkaStreamsBuilder() + .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) + .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) + .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) + .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) + .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) + .build(); + this.stopEvent.thenRun(kafkaStreams::close); + kafkaStreams.start(); + } + + public static void main(final String[] args) { + new HistoryService().run(); + } } diff --git a/uc1-application/src/main/java/uc1/streamprocessing/TopologyBuilder.java b/uc1-application/src/main/java/uc1/streamprocessing/TopologyBuilder.java index 0150045acbb4d85bfb8ea40e786cfe41f35f33f5..1f112858a2153cfb0130379abe763c393520c271 100644 --- a/uc1-application/src/main/java/uc1/streamprocessing/TopologyBuilder.java +++ b/uc1-application/src/main/java/uc1/streamprocessing/TopologyBuilder.java @@ -15,31 +15,33 @@ import titan.ccp.models.records.ActivePowerRecordFactory; */ public class TopologyBuilder { - private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); - - private final String inputTopic; - private final Gson gson; - - private final StreamsBuilder builder = new StreamsBuilder(); - - /** - * Create a new {@link TopologyBuilder} using the given topics. - */ - public TopologyBuilder(final String inputTopic) { - this.inputTopic = inputTopic; - this.gson = new Gson(); - } - - /** - * Build the {@link Topology} for the History microservice. - */ - public Topology build() { - - this.builder - .stream(this.inputTopic, - Consumed.with(Serdes.String(), IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) - .mapValues(v -> this.gson.toJson(v)).foreach((k, v) -> LOGGER.info("Key: " + k + " Value: " + v)); - - return this.builder.build(); - } + private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); + + private final String inputTopic; + private final Gson gson; + + private final StreamsBuilder builder = new StreamsBuilder(); + + /** + * Create a new {@link TopologyBuilder} using the given topics. + */ + public TopologyBuilder(final String inputTopic) { + this.inputTopic = inputTopic; + this.gson = new Gson(); + } + + /** + * Build the {@link Topology} for the History microservice. + */ + public Topology build() { + + this.builder + .stream(this.inputTopic, Consumed.with( + Serdes.String(), + IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) + .mapValues(v -> this.gson.toJson(v)) + .foreach((k, v) -> LOGGER.info("Key: " + k + " Value: " + v)); + + return this.builder.build(); + } }