Skip to content
Snippets Groups Projects
Commit 809402a5 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'upgrade-flink' into 'master'

Upgrade Flink benchmarks to Flink 1.13

Closes #302

See merge request !226
parents c96e3f01 79b8512b
No related branches found
No related tags found
2 merge requests!230Upgrade Kafka Streams Benchmarks to Kafka Streams 3.1,!226Upgrade Flink benchmarks to Flink 1.13
Pipeline #6450 passed
...@@ -20,7 +20,7 @@ shadowJar { ...@@ -20,7 +20,7 @@ shadowJar {
tasks.distZip.enabled = false tasks.distZip.enabled = false
ext { ext {
flinkVersion = '1.12.2' flinkVersion = '1.13.5'
scalaBinaryVersion = '2.12' scalaBinaryVersion = '2.12'
} }
......
...@@ -3,7 +3,7 @@ plugins { ...@@ -3,7 +3,7 @@ plugins {
} }
ext { ext {
flinkVersion = '1.12.0' flinkVersion = '1.13.5'
scalaBinaryVersion = '2.12' scalaBinaryVersion = '2.12'
} }
......
FROM flink:1.12-scala_2.12-java11 FROM flink:1.13-java11
ADD build/libs/uc1-flink-all.jar /opt/flink/usrlib/artifacts/uc1-flink-all.jar ADD build/libs/uc1-flink-all.jar /opt/flink/usrlib/artifacts/uc1-flink-all.jar
FROM flink:1.12-scala_2.12-java11 FROM flink:1.13-java11
ADD build/libs/uc2-flink-all.jar /opt/flink/usrlib/artifacts/uc2-flink-all.jar ADD build/libs/uc2-flink-all.jar /opt/flink/usrlib/artifacts/uc2-flink-all.jar
\ No newline at end of file
...@@ -5,7 +5,6 @@ import org.apache.commons.configuration2.Configuration; ...@@ -5,7 +5,6 @@ import org.apache.commons.configuration2.Configuration;
import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.time.Time;
...@@ -48,8 +47,6 @@ public final class HistoryServiceFlinkJob { ...@@ -48,8 +47,6 @@ public final class HistoryServiceFlinkJob {
} }
private void configureEnv() { private void configureEnv() {
this.env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS); final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS);
if (checkpointing) { if (checkpointing) {
......
FROM flink:1.12-scala_2.12-java11 FROM flink:1.13-java11
ADD build/libs/uc3-flink-all.jar /opt/flink/usrlib/artifacts/uc3-flink-all.jar ADD build/libs/uc3-flink-all.jar /opt/flink/usrlib/artifacts/uc3-flink-all.jar
\ No newline at end of file
...@@ -9,7 +9,6 @@ import org.apache.flink.api.common.typeinfo.Types; ...@@ -9,7 +9,6 @@ import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.time.Time;
...@@ -55,8 +54,6 @@ public final class HistoryServiceFlinkJob { ...@@ -55,8 +54,6 @@ public final class HistoryServiceFlinkJob {
} }
private void configureEnv() { private void configureEnv() {
this.env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS); final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS);
if (checkpointing) { if (checkpointing) {
......
FROM flink:1.12-scala_2.12-java11 FROM flink:1.13-java11
ADD build/libs/uc4-flink-all.jar /opt/flink/usrlib/artifacts/uc4-flink-all.jar ADD build/libs/uc4-flink-all.jar /opt/flink/usrlib/artifacts/uc4-flink-all.jar
\ No newline at end of file
...@@ -9,7 +9,6 @@ import org.apache.flink.api.common.typeinfo.Types; ...@@ -9,7 +9,6 @@ import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
...@@ -69,8 +68,6 @@ public final class AggregationServiceFlinkJob { ...@@ -69,8 +68,6 @@ public final class AggregationServiceFlinkJob {
} }
private void configureEnv() { private void configureEnv() {
this.env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true); final boolean checkpointing = this.config.getBoolean(ConfigurationKeys.CHECKPOINTING, true);
final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS); final int commitIntervalMs = this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS);
if (checkpointing) { if (checkpointing) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment