Skip to content
Snippets Groups Projects
Commit cfd501ab authored by Sören Henning's avatar Sören Henning
Browse files

Clean up and fix output

parent a51be526
Branches
Tags
No related merge requests found
Pipeline #373 passed
...@@ -9,8 +9,8 @@ import titan.ccp.common.configuration.Configurations; ...@@ -9,8 +9,8 @@ import titan.ccp.common.configuration.Configurations;
import uc3.streamprocessing.KafkaStreamsBuilder; import uc3.streamprocessing.KafkaStreamsBuilder;
/** /**
* A microservice that manages the history and, therefore, stores and aggregates * A microservice that manages the history and, therefore, stores and aggregates incoming
* incoming measurements. * measurements.
* *
*/ */
public class HistoryService { public class HistoryService {
...@@ -18,14 +18,11 @@ public class HistoryService { ...@@ -18,14 +18,11 @@ public class HistoryService {
private final Configuration config = Configurations.create(); private final Configuration config = Configurations.create();
private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); private final CompletableFuture<Void> stopEvent = new CompletableFuture<>();
final int KAFKA_WINDOW_DURATION_MINUTES = Integer private final int windowDurationMinutes = Integer
.parseInt(Objects.requireNonNullElse(System.getenv("KAFKA_WINDOW_DURATION_MINUTES"), "60")); .parseInt(Objects.requireNonNullElse(System.getenv("KAFKA_WINDOW_DURATION_MINUTES"), "60"));
/** /**
* Start the service. * Start the service.
*
* @return {@link CompletableFuture} which is completed when the service is
* successfully started.
*/ */
public void run() { public void run() {
this.createKafkaStreamsApplication(); this.createKafkaStreamsApplication();
...@@ -40,10 +37,11 @@ public class HistoryService { ...@@ -40,10 +37,11 @@ public class HistoryService {
.bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS))
.inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC))
.outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC))
.windowDuration(Duration.ofMinutes(this.KAFKA_WINDOW_DURATION_MINUTES)) .windowDuration(Duration.ofMinutes(this.windowDurationMinutes))
.numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS))
.commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS))
.cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)).build(); .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING))
.build();
this.stopEvent.thenRun(kafkaStreams::close); this.stopEvent.thenRun(kafkaStreams::close);
kafkaStreams.start(); kafkaStreams.start();
} }
... ...
......
...@@ -48,7 +48,8 @@ public class TopologyBuilder { ...@@ -48,7 +48,8 @@ public class TopologyBuilder {
.stream(this.inputTopic, .stream(this.inputTopic,
Consumed.with(Serdes.String(), Consumed.with(Serdes.String(),
IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) IMonitoringRecordSerde.serde(new ActivePowerRecordFactory())))
.groupByKey().windowedBy(TimeWindows.of(this.duration)) .groupByKey()
.windowedBy(TimeWindows.of(this.duration))
// .aggregate( // .aggregate(
// () -> 0.0, // () -> 0.0,
// (key, activePowerRecord, agg) -> agg + activePowerRecord.getValueInW(), // (key, activePowerRecord, agg) -> agg + activePowerRecord.getValueInW(),
...@@ -61,7 +62,7 @@ public class TopologyBuilder { ...@@ -61,7 +62,7 @@ public class TopologyBuilder {
GenericSerde.from(Stats::toByteArray, Stats::fromByteArray))) GenericSerde.from(Stats::toByteArray, Stats::fromByteArray)))
.toStream() .toStream()
.map((k, s) -> KeyValue.pair(k.key(), s.toString())) .map((k, s) -> KeyValue.pair(k.key(), s.toString()))
.peek((k, v) -> System.out.printf("key %s, value %f \n", k, v)) .peek((k, v) -> System.out.println(k + ": " + v))
.to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String())); .to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String()));
return this.builder.build(); return this.builder.build();
... ...
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment