Skip to content
Snippets Groups Projects
Commit ca47c503 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'unify-logging' into 'main'

Unify logging among all streaming engines

Closes #406

See merge request !308
parents cbf172a2 edd0fc63
No related branches found
No related tags found
1 merge request!308Unify logging among all streaming engines
Pipeline #10202 passed
Showing with 8 additions and 20 deletions
......@@ -64,7 +64,7 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService {
.map(t -> {
final String key = t.f0;
final String value = t.f1.toString();
LOGGER.info("{}: {}", key, value); // TODO align implementations
// LOGGER.info("{}: {}", key, value);
return new Tuple2<>(key, value);
}).name("map").returns(Types.TUPLE(Types.STRING, Types.STRING))
.addSink(kafkaSink).name("[Kafka Producer] Topic: " + outputTopic);
......
......@@ -3,7 +3,6 @@ package rocks.theodolite.benchmarks.uc2.hazelcastjet;
import com.hazelcast.jet.kafka.KafkaSinks;
import com.hazelcast.jet.kafka.KafkaSources;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.jet.pipeline.StreamStage;
import com.hazelcast.jet.pipeline.WindowDefinition;
......@@ -62,7 +61,7 @@ public class Uc2PipelineFactory extends PipelineFactory {
this.extendUc2Topology(kafkaSource);
// Add Sink1: Logger
uc2TopologyProduct.writeTo(Sinks.logger()); // TODO align implementations
// uc2TopologyProduct.writeTo(Sinks.logger());
// Add Sink2: Write back to kafka for the final benchmark
uc2TopologyProduct.writeTo(KafkaSinks.<String, String>kafka(
this.kafkaWritePropsForPipeline, this.kafkaOutputTopic));
......
......@@ -11,8 +11,6 @@ import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.commons.kafka.avro.SchemaRegistryAvroSerdeFactory;
import rocks.theodolite.benchmarks.commons.kstreams.GenericSerde;
import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord;
......@@ -23,8 +21,6 @@ import rocks.theodolite.benchmarks.uc2.kstreams.util.StatsFactory;
*/
public class TopologyBuilder {
private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class);
private final String inputTopic;
private final String outputTopic;
private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory;
......@@ -54,10 +50,6 @@ public class TopologyBuilder {
this.srAvroSerdeFactory.<ActivePowerRecord>forValues()))
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(this.duration))
// .aggregate(
// () -> 0.0,
// (key, activePowerRecord, agg) -> agg + activePowerRecord.getValueInW(),
// Materialized.with(Serdes.String(), Serdes.Double()))
.aggregate(
() -> Stats.of(),
(k, record, stats) -> StatsFactory.accumulate(stats, record.getValueInW()),
......@@ -66,7 +58,7 @@ public class TopologyBuilder {
GenericSerde.from(Stats::toByteArray, Stats::fromByteArray)))
.toStream()
.map((k, s) -> KeyValue.pair(k.key(), s.toString()))
.peek((k, v) -> LOGGER.info(k + ": " + v))
// .peek((k, v) -> LOGGER.info(k + ": " + v))
.to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String()));
return this.builder.build(properties);
......
......@@ -9,7 +9,7 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.common.serialization.Serdes;
......@@ -83,7 +83,7 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService {
return keyFactory.createKey(record.getIdentifier(), dateTime);
})
.window(SlidingEventTimeWindows.of(aggregationDuration, aggregationAdvance))
.trigger(ContinuousEventTimeTrigger.of(triggerDuration))
.trigger(ContinuousProcessingTimeTrigger.of(triggerDuration))
.aggregate(new StatsAggregateFunction(), new HourOfDayProcessWindowFunction())
.map(tuple -> {
final String sensorId = keyFactory.getSensorId(tuple.f0);
......
......@@ -3,7 +3,6 @@ package rocks.theodolite.benchmarks.uc3.hazelcastjet;
import com.hazelcast.jet.kafka.KafkaSinks;
import com.hazelcast.jet.kafka.KafkaSources;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.jet.pipeline.StreamStage;
import com.hazelcast.jet.pipeline.WindowDefinition;
......@@ -77,7 +76,7 @@ public class Uc3PipelineFactory extends PipelineFactory {
this.extendUc3Topology(kafkaSource);
// Add Sink1: Logger
uc3Product.writeTo(Sinks.logger());
// uc3Product.writeTo(Sinks.logger());
// Add Sink2: Write back to kafka for the final benchmark
uc3Product.writeTo(KafkaSinks.<String, String>kafka(
this.kafkaWritePropsForPipeline, this.kafkaOutputTopic));
......
......@@ -81,9 +81,7 @@ public class TopologyBuilder {
.map((key, stats) -> KeyValue.pair(
keyFactory.getSensorId(key.key()),
stats.toString()))
// TODO
// statsRecordFactory.create(key, value)))
// .peek((k, v) -> LOGGER.info("{}: {}", k, v)) // TODO Temp logging
// .peek((k, v) -> LOGGER.info("{}: {}", k, v))
.to(
this.outputTopic,
Produced.with(
......
......@@ -109,7 +109,7 @@ public class Uc4PipelineFactory extends PipelineFactory {
this.kafkaWritePropsForPipeline, this.kafkaFeedbackTopic));
// Log aggregation product
uc4Aggregation.writeTo(Sinks.logger());
// uc4Aggregation.writeTo(Sinks.logger());
// Add Sink2: Write back to kafka output topic
uc4Aggregation.writeTo(KafkaSinks.kafka(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment