Skip to content
Snippets Groups Projects
Commit 917a3f65 authored by Sören Henning's avatar Sören Henning
Browse files

Unify logging for UC3

parent 1bbbf38e
No related branches found
No related tags found
1 merge request!308Unify logging among all streaming engines
Pipeline #10189 canceled
...@@ -3,7 +3,6 @@ package rocks.theodolite.benchmarks.uc3.hazelcastjet; ...@@ -3,7 +3,6 @@ package rocks.theodolite.benchmarks.uc3.hazelcastjet;
import com.hazelcast.jet.kafka.KafkaSinks; import com.hazelcast.jet.kafka.KafkaSinks;
import com.hazelcast.jet.kafka.KafkaSources; import com.hazelcast.jet.kafka.KafkaSources;
import com.hazelcast.jet.pipeline.Pipeline; import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.StreamSource; import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.jet.pipeline.StreamStage; import com.hazelcast.jet.pipeline.StreamStage;
import com.hazelcast.jet.pipeline.WindowDefinition; import com.hazelcast.jet.pipeline.WindowDefinition;
...@@ -77,7 +76,7 @@ public class Uc3PipelineFactory extends PipelineFactory { ...@@ -77,7 +76,7 @@ public class Uc3PipelineFactory extends PipelineFactory {
this.extendUc3Topology(kafkaSource); this.extendUc3Topology(kafkaSource);
// Add Sink1: Logger // Add Sink1: Logger
uc3Product.writeTo(Sinks.logger()); // uc3Product.writeTo(Sinks.logger());
// Add Sink2: Write back to kafka for the final benchmark // Add Sink2: Write back to kafka for the final benchmark
uc3Product.writeTo(KafkaSinks.<String, String>kafka( uc3Product.writeTo(KafkaSinks.<String, String>kafka(
this.kafkaWritePropsForPipeline, this.kafkaOutputTopic)); this.kafkaWritePropsForPipeline, this.kafkaOutputTopic));
......
...@@ -81,9 +81,7 @@ public class TopologyBuilder { ...@@ -81,9 +81,7 @@ public class TopologyBuilder {
.map((key, stats) -> KeyValue.pair( .map((key, stats) -> KeyValue.pair(
keyFactory.getSensorId(key.key()), keyFactory.getSensorId(key.key()),
stats.toString())) stats.toString()))
// TODO // .peek((k, v) -> LOGGER.info("{}: {}", k, v))
// statsRecordFactory.create(key, value)))
// .peek((k, v) -> LOGGER.info("{}: {}", k, v)) // TODO Temp logging
.to( .to(
this.outputTopic, this.outputTopic,
Produced.with( Produced.with(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment