Skip to content
Snippets Groups Projects
Commit 89206227 authored by Sören Henning's avatar Sören Henning
Browse files

Unify logging in UC2

parent a736074e
No related branches found
No related tags found
1 merge request!308Unify logging among all streaming engines
Pipeline #10187 canceled
...@@ -64,7 +64,7 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService { ...@@ -64,7 +64,7 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService {
.map(t -> { .map(t -> {
final String key = t.f0; final String key = t.f0;
final String value = t.f1.toString(); final String value = t.f1.toString();
LOGGER.info("{}: {}", key, value); // TODO align implementations // LOGGER.info("{}: {}", key, value);
return new Tuple2<>(key, value); return new Tuple2<>(key, value);
}).name("map").returns(Types.TUPLE(Types.STRING, Types.STRING)) }).name("map").returns(Types.TUPLE(Types.STRING, Types.STRING))
.addSink(kafkaSink).name("[Kafka Producer] Topic: " + outputTopic); .addSink(kafkaSink).name("[Kafka Producer] Topic: " + outputTopic);
... ...
......
...@@ -3,7 +3,6 @@ package rocks.theodolite.benchmarks.uc2.hazelcastjet; ...@@ -3,7 +3,6 @@ package rocks.theodolite.benchmarks.uc2.hazelcastjet;
import com.hazelcast.jet.kafka.KafkaSinks; import com.hazelcast.jet.kafka.KafkaSinks;
import com.hazelcast.jet.kafka.KafkaSources; import com.hazelcast.jet.kafka.KafkaSources;
import com.hazelcast.jet.pipeline.Pipeline; import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.StreamSource; import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.jet.pipeline.StreamStage; import com.hazelcast.jet.pipeline.StreamStage;
import com.hazelcast.jet.pipeline.WindowDefinition; import com.hazelcast.jet.pipeline.WindowDefinition;
...@@ -62,7 +61,7 @@ public class Uc2PipelineFactory extends PipelineFactory { ...@@ -62,7 +61,7 @@ public class Uc2PipelineFactory extends PipelineFactory {
this.extendUc2Topology(kafkaSource); this.extendUc2Topology(kafkaSource);
// Add Sink1: Logger // Add Sink1: Logger
uc2TopologyProduct.writeTo(Sinks.logger()); // TODO align implementations // uc2TopologyProduct.writeTo(Sinks.logger());
// Add Sink2: Write back to kafka for the final benchmark // Add Sink2: Write back to kafka for the final benchmark
uc2TopologyProduct.writeTo(KafkaSinks.<String, String>kafka( uc2TopologyProduct.writeTo(KafkaSinks.<String, String>kafka(
this.kafkaWritePropsForPipeline, this.kafkaOutputTopic)); this.kafkaWritePropsForPipeline, this.kafkaOutputTopic));
... ...
......
...@@ -54,10 +54,6 @@ public class TopologyBuilder { ...@@ -54,10 +54,6 @@ public class TopologyBuilder {
this.srAvroSerdeFactory.<ActivePowerRecord>forValues())) this.srAvroSerdeFactory.<ActivePowerRecord>forValues()))
.groupByKey() .groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(this.duration)) .windowedBy(TimeWindows.ofSizeWithNoGrace(this.duration))
// .aggregate(
// () -> 0.0,
// (key, activePowerRecord, agg) -> agg + activePowerRecord.getValueInW(),
// Materialized.with(Serdes.String(), Serdes.Double()))
.aggregate( .aggregate(
() -> Stats.of(), () -> Stats.of(),
(k, record, stats) -> StatsFactory.accumulate(stats, record.getValueInW()), (k, record, stats) -> StatsFactory.accumulate(stats, record.getValueInW()),
...@@ -66,7 +62,7 @@ public class TopologyBuilder { ...@@ -66,7 +62,7 @@ public class TopologyBuilder {
GenericSerde.from(Stats::toByteArray, Stats::fromByteArray))) GenericSerde.from(Stats::toByteArray, Stats::fromByteArray)))
.toStream() .toStream()
.map((k, s) -> KeyValue.pair(k.key(), s.toString())) .map((k, s) -> KeyValue.pair(k.key(), s.toString()))
.peek((k, v) -> LOGGER.info(k + ": " + v)) // .peek((k, v) -> LOGGER.info(k + ": " + v))
.to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String())); .to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String()));
return this.builder.build(properties); return this.builder.build(properties);
... ...
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment