Skip to content
Snippets Groups Projects
Commit 22e0eee4 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'main' into enhanceFlinkConfiguration

parents 8df89bea ca47c503
No related branches found
No related tags found
1 merge request!178Enhance Flink configuration
Pipeline #10211 passed
Showing with 8 additions and 20 deletions
...@@ -64,7 +64,7 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService { ...@@ -64,7 +64,7 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService {
.map(t -> { .map(t -> {
final String key = t.f0; final String key = t.f0;
final String value = t.f1.toString(); final String value = t.f1.toString();
LOGGER.info("{}: {}", key, value); // TODO align implementations // LOGGER.info("{}: {}", key, value);
return new Tuple2<>(key, value); return new Tuple2<>(key, value);
}).name("map").returns(Types.TUPLE(Types.STRING, Types.STRING)) }).name("map").returns(Types.TUPLE(Types.STRING, Types.STRING))
.addSink(kafkaSink).name("[Kafka Producer] Topic: " + outputTopic); .addSink(kafkaSink).name("[Kafka Producer] Topic: " + outputTopic);
......
...@@ -3,7 +3,6 @@ package rocks.theodolite.benchmarks.uc2.hazelcastjet; ...@@ -3,7 +3,6 @@ package rocks.theodolite.benchmarks.uc2.hazelcastjet;
import com.hazelcast.jet.kafka.KafkaSinks; import com.hazelcast.jet.kafka.KafkaSinks;
import com.hazelcast.jet.kafka.KafkaSources; import com.hazelcast.jet.kafka.KafkaSources;
import com.hazelcast.jet.pipeline.Pipeline; import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.StreamSource; import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.jet.pipeline.StreamStage; import com.hazelcast.jet.pipeline.StreamStage;
import com.hazelcast.jet.pipeline.WindowDefinition; import com.hazelcast.jet.pipeline.WindowDefinition;
...@@ -62,7 +61,7 @@ public class Uc2PipelineFactory extends PipelineFactory { ...@@ -62,7 +61,7 @@ public class Uc2PipelineFactory extends PipelineFactory {
this.extendUc2Topology(kafkaSource); this.extendUc2Topology(kafkaSource);
// Add Sink1: Logger // Add Sink1: Logger
uc2TopologyProduct.writeTo(Sinks.logger()); // TODO align implementations // uc2TopologyProduct.writeTo(Sinks.logger());
// Add Sink2: Write back to kafka for the final benchmark // Add Sink2: Write back to kafka for the final benchmark
uc2TopologyProduct.writeTo(KafkaSinks.<String, String>kafka( uc2TopologyProduct.writeTo(KafkaSinks.<String, String>kafka(
this.kafkaWritePropsForPipeline, this.kafkaOutputTopic)); this.kafkaWritePropsForPipeline, this.kafkaOutputTopic));
......
...@@ -11,8 +11,6 @@ import org.apache.kafka.streams.kstream.Consumed; ...@@ -11,8 +11,6 @@ import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.TimeWindows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.commons.kafka.avro.SchemaRegistryAvroSerdeFactory; import rocks.theodolite.benchmarks.commons.kafka.avro.SchemaRegistryAvroSerdeFactory;
import rocks.theodolite.benchmarks.commons.kstreams.GenericSerde; import rocks.theodolite.benchmarks.commons.kstreams.GenericSerde;
import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord; import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord;
...@@ -23,8 +21,6 @@ import rocks.theodolite.benchmarks.uc2.kstreams.util.StatsFactory; ...@@ -23,8 +21,6 @@ import rocks.theodolite.benchmarks.uc2.kstreams.util.StatsFactory;
*/ */
public class TopologyBuilder { public class TopologyBuilder {
private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class);
private final String inputTopic; private final String inputTopic;
private final String outputTopic; private final String outputTopic;
private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory; private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory;
...@@ -54,10 +50,6 @@ public class TopologyBuilder { ...@@ -54,10 +50,6 @@ public class TopologyBuilder {
this.srAvroSerdeFactory.<ActivePowerRecord>forValues())) this.srAvroSerdeFactory.<ActivePowerRecord>forValues()))
.groupByKey() .groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(this.duration)) .windowedBy(TimeWindows.ofSizeWithNoGrace(this.duration))
// .aggregate(
// () -> 0.0,
// (key, activePowerRecord, agg) -> agg + activePowerRecord.getValueInW(),
// Materialized.with(Serdes.String(), Serdes.Double()))
.aggregate( .aggregate(
() -> Stats.of(), () -> Stats.of(),
(k, record, stats) -> StatsFactory.accumulate(stats, record.getValueInW()), (k, record, stats) -> StatsFactory.accumulate(stats, record.getValueInW()),
...@@ -66,7 +58,7 @@ public class TopologyBuilder { ...@@ -66,7 +58,7 @@ public class TopologyBuilder {
GenericSerde.from(Stats::toByteArray, Stats::fromByteArray))) GenericSerde.from(Stats::toByteArray, Stats::fromByteArray)))
.toStream() .toStream()
.map((k, s) -> KeyValue.pair(k.key(), s.toString())) .map((k, s) -> KeyValue.pair(k.key(), s.toString()))
.peek((k, v) -> LOGGER.info(k + ": " + v)) // .peek((k, v) -> LOGGER.info(k + ": " + v))
.to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String())); .to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String()));
return this.builder.build(properties); return this.builder.build(properties);
......
...@@ -9,7 +9,7 @@ import org.apache.flink.api.java.functions.KeySelector; ...@@ -9,7 +9,7 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
...@@ -83,7 +83,7 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService { ...@@ -83,7 +83,7 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService {
return keyFactory.createKey(record.getIdentifier(), dateTime); return keyFactory.createKey(record.getIdentifier(), dateTime);
}) })
.window(SlidingEventTimeWindows.of(aggregationDuration, aggregationAdvance)) .window(SlidingEventTimeWindows.of(aggregationDuration, aggregationAdvance))
.trigger(ContinuousEventTimeTrigger.of(triggerDuration)) .trigger(ContinuousProcessingTimeTrigger.of(triggerDuration))
.aggregate(new StatsAggregateFunction(), new HourOfDayProcessWindowFunction()) .aggregate(new StatsAggregateFunction(), new HourOfDayProcessWindowFunction())
.map(tuple -> { .map(tuple -> {
final String sensorId = keyFactory.getSensorId(tuple.f0); final String sensorId = keyFactory.getSensorId(tuple.f0);
......
...@@ -3,7 +3,6 @@ package rocks.theodolite.benchmarks.uc3.hazelcastjet; ...@@ -3,7 +3,6 @@ package rocks.theodolite.benchmarks.uc3.hazelcastjet;
import com.hazelcast.jet.kafka.KafkaSinks; import com.hazelcast.jet.kafka.KafkaSinks;
import com.hazelcast.jet.kafka.KafkaSources; import com.hazelcast.jet.kafka.KafkaSources;
import com.hazelcast.jet.pipeline.Pipeline; import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.StreamSource; import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.jet.pipeline.StreamStage; import com.hazelcast.jet.pipeline.StreamStage;
import com.hazelcast.jet.pipeline.WindowDefinition; import com.hazelcast.jet.pipeline.WindowDefinition;
...@@ -77,7 +76,7 @@ public class Uc3PipelineFactory extends PipelineFactory { ...@@ -77,7 +76,7 @@ public class Uc3PipelineFactory extends PipelineFactory {
this.extendUc3Topology(kafkaSource); this.extendUc3Topology(kafkaSource);
// Add Sink1: Logger // Add Sink1: Logger
uc3Product.writeTo(Sinks.logger()); // uc3Product.writeTo(Sinks.logger());
// Add Sink2: Write back to kafka for the final benchmark // Add Sink2: Write back to kafka for the final benchmark
uc3Product.writeTo(KafkaSinks.<String, String>kafka( uc3Product.writeTo(KafkaSinks.<String, String>kafka(
this.kafkaWritePropsForPipeline, this.kafkaOutputTopic)); this.kafkaWritePropsForPipeline, this.kafkaOutputTopic));
......
...@@ -81,9 +81,7 @@ public class TopologyBuilder { ...@@ -81,9 +81,7 @@ public class TopologyBuilder {
.map((key, stats) -> KeyValue.pair( .map((key, stats) -> KeyValue.pair(
keyFactory.getSensorId(key.key()), keyFactory.getSensorId(key.key()),
stats.toString())) stats.toString()))
// TODO // .peek((k, v) -> LOGGER.info("{}: {}", k, v))
// statsRecordFactory.create(key, value)))
// .peek((k, v) -> LOGGER.info("{}: {}", k, v)) // TODO Temp logging
.to( .to(
this.outputTopic, this.outputTopic,
Produced.with( Produced.with(
......
...@@ -109,7 +109,7 @@ public class Uc4PipelineFactory extends PipelineFactory { ...@@ -109,7 +109,7 @@ public class Uc4PipelineFactory extends PipelineFactory {
this.kafkaWritePropsForPipeline, this.kafkaFeedbackTopic)); this.kafkaWritePropsForPipeline, this.kafkaFeedbackTopic));
// Log aggregation product // Log aggregation product
uc4Aggregation.writeTo(Sinks.logger()); // uc4Aggregation.writeTo(Sinks.logger());
// Add Sink2: Write back to kafka output topic // Add Sink2: Write back to kafka output topic
uc4Aggregation.writeTo(KafkaSinks.kafka( uc4Aggregation.writeTo(KafkaSinks.kafka(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment