diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java index d5dae2bde75856b44b73ee11bc2808de644a27f0..1bf2bb4ce72b7dece96b036daaaaf475180c72f8 100644 --- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java +++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java @@ -3,7 +3,6 @@ package rocks.theodolite.benchmarks.uc3.hazelcastjet; import com.hazelcast.jet.kafka.KafkaSinks; import com.hazelcast.jet.kafka.KafkaSources; import com.hazelcast.jet.pipeline.Pipeline; -import com.hazelcast.jet.pipeline.Sinks; import com.hazelcast.jet.pipeline.StreamSource; import com.hazelcast.jet.pipeline.StreamStage; import com.hazelcast.jet.pipeline.WindowDefinition; @@ -77,7 +76,7 @@ public class Uc3PipelineFactory extends PipelineFactory { this.extendUc3Topology(kafkaSource); // Add Sink1: Logger - uc3Product.writeTo(Sinks.logger()); + // uc3Product.writeTo(Sinks.logger()); // Add Sink2: Write back to kafka for the final benchmark uc3Product.writeTo(KafkaSinks.<String, String>kafka( this.kafkaWritePropsForPipeline, this.kafkaOutputTopic)); diff --git a/theodolite-benchmarks/uc3-kstreams/src/main/java/rocks/theodolite/benchmarks/uc3/kstreams/TopologyBuilder.java b/theodolite-benchmarks/uc3-kstreams/src/main/java/rocks/theodolite/benchmarks/uc3/kstreams/TopologyBuilder.java index d6e000d815b0871e065af4a71d89d0e19949e73c..5c946cc287e97503c426b932e01b447ce82ac854 100644 --- a/theodolite-benchmarks/uc3-kstreams/src/main/java/rocks/theodolite/benchmarks/uc3/kstreams/TopologyBuilder.java +++ b/theodolite-benchmarks/uc3-kstreams/src/main/java/rocks/theodolite/benchmarks/uc3/kstreams/TopologyBuilder.java @@ -81,9 +81,7 @@ public class TopologyBuilder { .map((key, stats) -> KeyValue.pair( keyFactory.getSensorId(key.key()), stats.toString())) - // TODO - // statsRecordFactory.create(key, value))) - // .peek((k, v) -> LOGGER.info("{}: {}", k, v)) // TODO Temp logging + // .peek((k, v) -> LOGGER.info("{}: {}", k, v)) .to( this.outputTopic, Produced.with(