Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • she/theodolite
1 result
Show changes
Commits on Source (2)
# Docker Compose Files for Testing
This directory contains Docker Compose files, which help testing Benchmark implementations.
For each stream processing engine (Kafka Streams and Flink) and Benchmark (UC1-4), a Docker Compose file is provided
in the corresponding subdirectory.
For each stream processing engine (Kafka Streams, Flink, Hazelcast Jet, Beam/Flink and Beam/Samza) and Benchmark
(UC1-4), a Docker Compose file is provided in the corresponding subdirectory.
## Full Dockerized Testing
......
FROM flink:1.13-java11
ADD build/libs/uc3-flink-all.jar /opt/flink/usrlib/artifacts/uc3-flink-all.jar
\ No newline at end of file
ADD build/libs/uc3-flink-all.jar /opt/flink/usrlib/artifacts/uc3-flink-all.jar
......@@ -21,6 +21,9 @@ public final class ConfigurationKeys {
public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days";
public static final String AGGREGATION_TRIGGER_INTERVAL_SECONDS =
"aggregation.trigger.interval.seconds";
public static final String COMMIT_INTERVAL_MS = "commit.interval.ms";
public static final String TIME_ZONE = "time.zone";
......
......@@ -9,6 +9,7 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.common.serialization.Serdes;
......@@ -55,6 +56,8 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService {
Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS));
final Time aggregationAdvance =
Time.days(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS));
final Time triggerDuration =
Time.seconds(this.config.getInt(ConfigurationKeys.AGGREGATION_TRIGGER_INTERVAL_SECONDS));
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final KafkaConnectorFactory kafkaConnector = new KafkaConnectorFactory(
......@@ -80,13 +83,14 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService {
return keyFactory.createKey(record.getIdentifier(), dateTime);
})
.window(SlidingEventTimeWindows.of(aggregationDuration, aggregationAdvance))
.trigger(ContinuousEventTimeTrigger.of(triggerDuration))
.aggregate(new StatsAggregateFunction(), new HourOfDayProcessWindowFunction())
.map(tuple -> {
final String newKey = keyFactory.getSensorId(tuple.f0);
final String newValue = tuple.f1.toString();
final int hourOfDay = tuple.f0.getHourOfDay();
LOGGER.info("{}|{}: {}", newKey, hourOfDay, newValue);
return new Tuple2<>(newKey, newValue);
final String sensorId = keyFactory.getSensorId(tuple.f0);
final String stats = tuple.f1.toString();
// final int hourOfDay = tuple.f0.getHourOfDay();
// LOGGER.info("{}|{}: {}", newKey, hourOfDay, newValue);
return new Tuple2<>(sensorId, stats);
})
.name("map")
.returns(Types.TUPLE(Types.STRING, Types.STRING))
......
......@@ -7,6 +7,7 @@ kafka.output.topic=output
schema.registry.url=http://localhost:8081
aggregation.duration.days=30
aggregation.advance.days=1
aggregation.trigger.interval.seconds=15
num.threads=1
commit.interval.ms=100
cache.max.bytes.buffering=-1
......