Skip to content
Snippets Groups Projects
Commit 418e52e7 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'master' into benchmark-smoke-tests

parents 6c95c2b7 0b5b4a26
No related branches found
No related tags found
1 merge request!232Add smoke tests for benchmark
Pipeline #6386 passed
...@@ -15,6 +15,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ...@@ -15,6 +15,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -103,9 +104,11 @@ public final class AggregationServiceFlinkJob { ...@@ -103,9 +104,11 @@ public final class AggregationServiceFlinkJob {
this.env.getConfig().registerTypeWithKryoSerializer(Set.of(1, 2, 3, 4).getClass(), // NOCS this.env.getConfig().registerTypeWithKryoSerializer(Set.of(1, 2, 3, 4).getClass(), // NOCS
new ImmutableSetSerializer()); new ImmutableSetSerializer());
this.env.getConfig().getRegisteredTypesWithKryoSerializers() this.env
.forEach((c, s) -> LOGGER.info("Class " + c.getName() + " registered with serializer " .getConfig()
+ s.getSerializer().getClass().getName())); .getRegisteredTypesWithKryoSerializers()
.forEach((c, s) -> LOGGER.info("Class '{}' registered with serializer '{}'.", c.getName(),
s.getSerializer().getClass().getName()));
} }
private void buildPipeline() { private void buildPipeline() {
...@@ -134,12 +137,13 @@ public final class AggregationServiceFlinkJob { ...@@ -134,12 +137,13 @@ public final class AggregationServiceFlinkJob {
final FlinkKafkaConsumer<AggregatedActivePowerRecord> kafkaOutputSource = final FlinkKafkaConsumer<AggregatedActivePowerRecord> kafkaOutputSource =
kafkaConnector.createConsumer(outputTopic, AggregatedActivePowerRecord.class); kafkaConnector.createConsumer(outputTopic, AggregatedActivePowerRecord.class);
final FlinkKafkaConsumer<Tuple2<Event, String>> kafkaConfigSource = final FlinkKafkaConsumerBase<Tuple2<Event, String>> kafkaConfigSource =
kafkaConnector.createConsumer( kafkaConnector.createConsumer(
configurationTopic, configurationTopic,
EventSerde::serde, EventSerde::serde,
Serdes::String, Serdes::String,
TupleType.of(TypeInformation.of(Event.class), Types.STRING)); TupleType.of(TypeInformation.of(Event.class), Types.STRING))
.setStartFromEarliest();
// Sink to output topic with SensorId, AggregatedActivePowerRecord // Sink to output topic with SensorId, AggregatedActivePowerRecord
final FlinkKafkaProducer<Tuple2<String, AggregatedActivePowerRecord>> kafkaAggregationSink = final FlinkKafkaProducer<Tuple2<String, AggregatedActivePowerRecord>> kafkaAggregationSink =
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment