From 892062271d6706052fd982d088234fd44b33263e Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?S=C3=B6ren=20Henning?= <soeren.henning@email.uni-kiel.de>
Date: Sat, 26 Nov 2022 19:35:49 +0100
Subject: [PATCH] Unify logging in UC2

---
 .../benchmarks/uc2/flink/HistoryServiceFlinkJob.java        | 2 +-
 .../benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java     | 3 +--
 .../theodolite/benchmarks/uc2/kstreams/TopologyBuilder.java | 6 +-----
 3 files changed, 3 insertions(+), 8 deletions(-)

diff --git a/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java
index 3ef6ddbd5..e44eeaea9 100644
--- a/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java
+++ b/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java
@@ -64,7 +64,7 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService {
         .map(t -> {
           final String key = t.f0;
           final String value = t.f1.toString();
-          LOGGER.info("{}: {}", key, value); // TODO align implementations
+          // LOGGER.info("{}: {}", key, value);
           return new Tuple2<>(key, value);
         }).name("map").returns(Types.TUPLE(Types.STRING, Types.STRING))
         .addSink(kafkaSink).name("[Kafka Producer] Topic: " + outputTopic);
diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java
index 209234d4b..83a1a5197 100644
--- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java
+++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java
@@ -3,7 +3,6 @@ package rocks.theodolite.benchmarks.uc2.hazelcastjet;
 import com.hazelcast.jet.kafka.KafkaSinks;
 import com.hazelcast.jet.kafka.KafkaSources;
 import com.hazelcast.jet.pipeline.Pipeline;
-import com.hazelcast.jet.pipeline.Sinks;
 import com.hazelcast.jet.pipeline.StreamSource;
 import com.hazelcast.jet.pipeline.StreamStage;
 import com.hazelcast.jet.pipeline.WindowDefinition;
@@ -62,7 +61,7 @@ public class Uc2PipelineFactory extends PipelineFactory {
         this.extendUc2Topology(kafkaSource);
 
     // Add Sink1: Logger
-    uc2TopologyProduct.writeTo(Sinks.logger()); // TODO align implementations
+    // uc2TopologyProduct.writeTo(Sinks.logger());
     // Add Sink2: Write back to kafka for the final benchmark
     uc2TopologyProduct.writeTo(KafkaSinks.<String, String>kafka(
         this.kafkaWritePropsForPipeline, this.kafkaOutputTopic));
diff --git a/theodolite-benchmarks/uc2-kstreams/src/main/java/rocks/theodolite/benchmarks/uc2/kstreams/TopologyBuilder.java b/theodolite-benchmarks/uc2-kstreams/src/main/java/rocks/theodolite/benchmarks/uc2/kstreams/TopologyBuilder.java
index cd1d8cd92..d21aee7c0 100644
--- a/theodolite-benchmarks/uc2-kstreams/src/main/java/rocks/theodolite/benchmarks/uc2/kstreams/TopologyBuilder.java
+++ b/theodolite-benchmarks/uc2-kstreams/src/main/java/rocks/theodolite/benchmarks/uc2/kstreams/TopologyBuilder.java
@@ -54,10 +54,6 @@ public class TopologyBuilder {
                 this.srAvroSerdeFactory.<ActivePowerRecord>forValues()))
         .groupByKey()
         .windowedBy(TimeWindows.ofSizeWithNoGrace(this.duration))
-        // .aggregate(
-        // () -> 0.0,
-        // (key, activePowerRecord, agg) -> agg + activePowerRecord.getValueInW(),
-        // Materialized.with(Serdes.String(), Serdes.Double()))
         .aggregate(
             () -> Stats.of(),
             (k, record, stats) -> StatsFactory.accumulate(stats, record.getValueInW()),
@@ -66,7 +62,7 @@ public class TopologyBuilder {
                 GenericSerde.from(Stats::toByteArray, Stats::fromByteArray)))
         .toStream()
         .map((k, s) -> KeyValue.pair(k.key(), s.toString()))
-        .peek((k, v) -> LOGGER.info(k + ": " + v))
+        // .peek((k, v) -> LOGGER.info(k + ": " + v))
         .to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String()));
 
     return this.builder.build(properties);
-- 
GitLab