diff --git a/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java index 3ef6ddbd56632d4e034604ff01aa3f8118233338..e44eeaea9e4d9d6d5ce6408ae337ab15f93baa10 100644 --- a/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java @@ -64,7 +64,7 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService { .map(t -> { final String key = t.f0; final String value = t.f1.toString(); - LOGGER.info("{}: {}", key, value); // TODO align implementations + // LOGGER.info("{}: {}", key, value); return new Tuple2<>(key, value); }).name("map").returns(Types.TUPLE(Types.STRING, Types.STRING)) .addSink(kafkaSink).name("[Kafka Producer] Topic: " + outputTopic); diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java index 209234d4bd0d3081f8d4f9f92f9db848ccc46e4f..83a1a51972b74702800595857a138ad64e98977b 100644 --- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java +++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc2/hazelcastjet/Uc2PipelineFactory.java @@ -3,7 +3,6 @@ package rocks.theodolite.benchmarks.uc2.hazelcastjet; import com.hazelcast.jet.kafka.KafkaSinks; import com.hazelcast.jet.kafka.KafkaSources; import com.hazelcast.jet.pipeline.Pipeline; -import com.hazelcast.jet.pipeline.Sinks; import com.hazelcast.jet.pipeline.StreamSource; import com.hazelcast.jet.pipeline.StreamStage; import com.hazelcast.jet.pipeline.WindowDefinition; @@ -62,7 +61,7 @@ public class Uc2PipelineFactory extends PipelineFactory { this.extendUc2Topology(kafkaSource); // Add Sink1: Logger - uc2TopologyProduct.writeTo(Sinks.logger()); // TODO align implementations + // uc2TopologyProduct.writeTo(Sinks.logger()); // Add Sink2: Write back to kafka for the final benchmark uc2TopologyProduct.writeTo(KafkaSinks.<String, String>kafka( this.kafkaWritePropsForPipeline, this.kafkaOutputTopic)); diff --git a/theodolite-benchmarks/uc2-kstreams/src/main/java/rocks/theodolite/benchmarks/uc2/kstreams/TopologyBuilder.java b/theodolite-benchmarks/uc2-kstreams/src/main/java/rocks/theodolite/benchmarks/uc2/kstreams/TopologyBuilder.java index cd1d8cd92149d368a27452fa7689f5549a9c2bc7..6aa342f03364fb7f20d404796ba25165caf6a869 100644 --- a/theodolite-benchmarks/uc2-kstreams/src/main/java/rocks/theodolite/benchmarks/uc2/kstreams/TopologyBuilder.java +++ b/theodolite-benchmarks/uc2-kstreams/src/main/java/rocks/theodolite/benchmarks/uc2/kstreams/TopologyBuilder.java @@ -11,8 +11,6 @@ import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.TimeWindows; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import rocks.theodolite.benchmarks.commons.kafka.avro.SchemaRegistryAvroSerdeFactory; import rocks.theodolite.benchmarks.commons.kstreams.GenericSerde; import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord; @@ -23,8 +21,6 @@ import rocks.theodolite.benchmarks.uc2.kstreams.util.StatsFactory; */ public class TopologyBuilder { - private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); - private final String inputTopic; private final String outputTopic; private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory; @@ -54,10 +50,6 @@ public class TopologyBuilder { this.srAvroSerdeFactory.<ActivePowerRecord>forValues())) .groupByKey() .windowedBy(TimeWindows.ofSizeWithNoGrace(this.duration)) - // .aggregate( - // () -> 0.0, - // (key, activePowerRecord, agg) -> agg + activePowerRecord.getValueInW(), - // Materialized.with(Serdes.String(), Serdes.Double())) .aggregate( () -> Stats.of(), (k, record, stats) -> StatsFactory.accumulate(stats, record.getValueInW()), @@ -66,7 +58,7 @@ public class TopologyBuilder { GenericSerde.from(Stats::toByteArray, Stats::fromByteArray))) .toStream() .map((k, s) -> KeyValue.pair(k.key(), s.toString())) - .peek((k, v) -> LOGGER.info(k + ": " + v)) + // .peek((k, v) -> LOGGER.info(k + ": " + v)) .to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String())); return this.builder.build(properties); diff --git a/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java index 9a85a87a85222d5ee17f6f33bc7fa8425d5f1a03..aaaba5b3b4d86ebcdb205cdef16285b6ee47ae4c 100644 --- a/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java @@ -9,7 +9,7 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger; +import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.kafka.common.serialization.Serdes; @@ -83,7 +83,7 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService { return keyFactory.createKey(record.getIdentifier(), dateTime); }) .window(SlidingEventTimeWindows.of(aggregationDuration, aggregationAdvance)) - .trigger(ContinuousEventTimeTrigger.of(triggerDuration)) + .trigger(ContinuousProcessingTimeTrigger.of(triggerDuration)) .aggregate(new StatsAggregateFunction(), new HourOfDayProcessWindowFunction()) .map(tuple -> { final String sensorId = keyFactory.getSensorId(tuple.f0); diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java index d5dae2bde75856b44b73ee11bc2808de644a27f0..1bf2bb4ce72b7dece96b036daaaaf475180c72f8 100644 --- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java +++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc3/hazelcastjet/Uc3PipelineFactory.java @@ -3,7 +3,6 @@ package rocks.theodolite.benchmarks.uc3.hazelcastjet; import com.hazelcast.jet.kafka.KafkaSinks; import com.hazelcast.jet.kafka.KafkaSources; import com.hazelcast.jet.pipeline.Pipeline; -import com.hazelcast.jet.pipeline.Sinks; import com.hazelcast.jet.pipeline.StreamSource; import com.hazelcast.jet.pipeline.StreamStage; import com.hazelcast.jet.pipeline.WindowDefinition; @@ -77,7 +76,7 @@ public class Uc3PipelineFactory extends PipelineFactory { this.extendUc3Topology(kafkaSource); // Add Sink1: Logger - uc3Product.writeTo(Sinks.logger()); + // uc3Product.writeTo(Sinks.logger()); // Add Sink2: Write back to kafka for the final benchmark uc3Product.writeTo(KafkaSinks.<String, String>kafka( this.kafkaWritePropsForPipeline, this.kafkaOutputTopic)); diff --git a/theodolite-benchmarks/uc3-kstreams/src/main/java/rocks/theodolite/benchmarks/uc3/kstreams/TopologyBuilder.java b/theodolite-benchmarks/uc3-kstreams/src/main/java/rocks/theodolite/benchmarks/uc3/kstreams/TopologyBuilder.java index d6e000d815b0871e065af4a71d89d0e19949e73c..5c946cc287e97503c426b932e01b447ce82ac854 100644 --- a/theodolite-benchmarks/uc3-kstreams/src/main/java/rocks/theodolite/benchmarks/uc3/kstreams/TopologyBuilder.java +++ b/theodolite-benchmarks/uc3-kstreams/src/main/java/rocks/theodolite/benchmarks/uc3/kstreams/TopologyBuilder.java @@ -81,9 +81,7 @@ public class TopologyBuilder { .map((key, stats) -> KeyValue.pair( keyFactory.getSensorId(key.key()), stats.toString())) - // TODO - // statsRecordFactory.create(key, value))) - // .peek((k, v) -> LOGGER.info("{}: {}", k, v)) // TODO Temp logging + // .peek((k, v) -> LOGGER.info("{}: {}", k, v)) .to( this.outputTopic, Produced.with( diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java index e0ba823596474f87c79d488a5bbe7e71749f54fb..59b5941fb9f0090074869b00d49ad26c68e40165 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/rocks/theodolite/benchmarks/uc4/hazelcastjet/Uc4PipelineFactory.java @@ -109,7 +109,7 @@ public class Uc4PipelineFactory extends PipelineFactory { this.kafkaWritePropsForPipeline, this.kafkaFeedbackTopic)); // Log aggregation product - uc4Aggregation.writeTo(Sinks.logger()); + // uc4Aggregation.writeTo(Sinks.logger()); // Add Sink2: Write back to kafka output topic uc4Aggregation.writeTo(KafkaSinks.kafka(