diff --git a/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java index 3ef6ddbd56632d4e034604ff01aa3f8118233338..e44eeaea9e4d9d6d5ce6408ae337ab15f93baa10 100644 --- a/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java @@ -64,7 +64,7 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService { .map(t -> { final String key = t.f0; final String value = t.f1.toString(); - LOGGER.info("{}: {}", key, value); // TODO align implementations + // LOGGER.info("{}: {}", key, value); return new Tuple2<>(key, value); }).name("map").returns(Types.TUPLE(Types.STRING, Types.STRING)) .addSink(kafkaSink).name("[Kafka Producer] Topic: " + outputTopic); diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java index 209234d4bd0d3081f8d4f9f92f9db848ccc46e4f..83a1a51972b74702800595857a138ad64e98977b 100644 --- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java +++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java @@ -3,7 +3,6 @@ package rocks.theodolite.benchmarks.uc2.hazelcastjet; import com.hazelcast.jet.kafka.KafkaSinks; import com.hazelcast.jet.kafka.KafkaSources; import com.hazelcast.jet.pipeline.Pipeline; -import com.hazelcast.jet.pipeline.Sinks; import com.hazelcast.jet.pipeline.StreamSource; import com.hazelcast.jet.pipeline.StreamStage; import com.hazelcast.jet.pipeline.WindowDefinition; @@ -62,7 +61,7 @@ public class Uc2PipelineFactory extends PipelineFactory { this.extendUc2Topology(kafkaSource); // Add Sink1: Logger - uc2TopologyProduct.writeTo(Sinks.logger()); // TODO align implementations + // uc2TopologyProduct.writeTo(Sinks.logger()); // Add Sink2: Write back to kafka for the final benchmark uc2TopologyProduct.writeTo(KafkaSinks.<String, String>kafka( this.kafkaWritePropsForPipeline, this.kafkaOutputTopic)); diff --git a/theodolite-benchmarks/uc2-kstreams/src/main/java/rocks/theodolite/benchmarks/uc2/kstreams/TopologyBuilder.java b/theodolite-benchmarks/uc2-kstreams/src/main/java/rocks/theodolite/benchmarks/uc2/kstreams/TopologyBuilder.java index cd1d8cd92149d368a27452fa7689f5549a9c2bc7..d21aee7c035d1b30785b274dfa81fb2b322e63c1 100644 --- a/theodolite-benchmarks/uc2-kstreams/src/main/java/rocks/theodolite/benchmarks/uc2/kstreams/TopologyBuilder.java +++ b/theodolite-benchmarks/uc2-kstreams/src/main/java/rocks/theodolite/benchmarks/uc2/kstreams/TopologyBuilder.java @@ -54,10 +54,6 @@ public class TopologyBuilder { this.srAvroSerdeFactory.<ActivePowerRecord>forValues())) .groupByKey() .windowedBy(TimeWindows.ofSizeWithNoGrace(this.duration)) - // .aggregate( - // () -> 0.0, - // (key, activePowerRecord, agg) -> agg + activePowerRecord.getValueInW(), - // Materialized.with(Serdes.String(), Serdes.Double())) .aggregate( () -> Stats.of(), (k, record, stats) -> StatsFactory.accumulate(stats, record.getValueInW()), @@ -66,7 +62,7 @@ public class TopologyBuilder { GenericSerde.from(Stats::toByteArray, Stats::fromByteArray))) .toStream() .map((k, s) -> KeyValue.pair(k.key(), s.toString())) - .peek((k, v) -> LOGGER.info(k + ": " + v)) + // .peek((k, v) -> LOGGER.info(k + ": " + v)) .to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String())); return this.builder.build(properties);