Skip to content
Snippets Groups Projects

Unify logging among all streaming engines

Merged Sören Henning requested to merge unify-logging into main
1 file
+ 0
4
Compare changes
  • Side-by-side
  • Inline
@@ -11,8 +11,6 @@ import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.commons.kafka.avro.SchemaRegistryAvroSerdeFactory;
import rocks.theodolite.benchmarks.commons.kstreams.GenericSerde;
import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord;
@@ -23,8 +21,6 @@ import rocks.theodolite.benchmarks.uc2.kstreams.util.StatsFactory;
*/
public class TopologyBuilder {
private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class);
private final String inputTopic;
private final String outputTopic;
private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory;
@@ -54,10 +50,6 @@ public class TopologyBuilder {
this.srAvroSerdeFactory.<ActivePowerRecord>forValues()))
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(this.duration))
// .aggregate(
// () -> 0.0,
// (key, activePowerRecord, agg) -> agg + activePowerRecord.getValueInW(),
// Materialized.with(Serdes.String(), Serdes.Double()))
.aggregate(
() -> Stats.of(),
(k, record, stats) -> StatsFactory.accumulate(stats, record.getValueInW()),
@@ -66,7 +58,7 @@ public class TopologyBuilder {
GenericSerde.from(Stats::toByteArray, Stats::fromByteArray)))
.toStream()
.map((k, s) -> KeyValue.pair(k.key(), s.toString()))
.peek((k, v) -> LOGGER.info(k + ": " + v))
// .peek((k, v) -> LOGGER.info(k + ": " + v))
.to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String()));
return this.builder.build(properties);
Loading