Skip to content
Snippets Groups Projects
Commit 092cc3c0 authored by Sören Henning's avatar Sören Henning
Browse files

Align Flink UC3 implementation with others

* Remove logging statement
* Add trigger (default 15s)
parent 0f707399
No related branches found
No related tags found
No related merge requests found
Pipeline #10125 failed
......@@ -21,6 +21,9 @@ public final class ConfigurationKeys {
public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days";
public static final String AGGREGATION_TRIGGER_INTERVAL_SECONDS =
"aggregation.trigger.interval.seconds";
public static final String COMMIT_INTERVAL_MS = "commit.interval.ms";
public static final String TIME_ZONE = "time.zone";
......
......@@ -9,6 +9,7 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.common.serialization.Serdes;
......@@ -55,6 +56,8 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService {
Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS));
final Time aggregationAdvance =
Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS));
final Time triggerDuration =
Time.seconds(this.config.getInt(ConfigurationKeys.AGGREGATION_TRIGGER_INTERVAL_SECONDS));
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory(
......@@ -80,13 +83,14 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService {
return keyFactory.createKey(record.getIdentifier(), dateTime);
})
.window(SlidingEventTimeWindows.of(aggregationDuration, aggregationAdvance))
.trigger(ContinuousEventTimeTrigger.of(triggerDuration))
.aggregate(new StatsAggregateFunction(), new HourOfDayProcessWindowFunction())
.map(tuple -> {
final String newKey = keyFactory.getSensorId(tuple.f0);
final String newValue = tuple.f1.toString();
final int hourOfDay = tuple.f0.getHourOfDay();
LOGGER.info("{}|{}: {}", newKey, hourOfDay, newValue);
return new Tuple2<>(newKey, newValue);
final String sensorId = keyFactory.getSensorId(tuple.f0);
final String stats = tuple.f1.toString();
// final int hourOfDay = tuple.f0.getHourOfDay();
// LOGGER.info("{}|{}: {}", newKey, hourOfDay, newValue);
return new Tuple2<>(sensorId, stats);
})
.name("map")
.returns(Types.TUPLE(Types.STRING, Types.STRING))
......
......@@ -7,6 +7,7 @@ kafka.output.topic=output
schema.registry.url=http://localhost:8081
aggregation.duration.days=30
aggregation.advance.days=1
aggregation.trigger.interval.seconds=15
num.threads=1
commit.interval.ms=100
cache.max.bytes.buffering=-1
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment