Skip to content
Snippets Groups Projects
Commit 289dcadd authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'master' into theodolite-kotlin

parents 2765c192 e09d88e8
No related branches found
No related tags found
3 merge requests!159Re-implementation of Theodolite with Kotlin/Quarkus,!157Update Graal Image in CI pipeline,!83WIP: Re-implementation of Theodolite with Kotlin/Quarkus
Pipeline #3179 canceled
Showing
with 42 additions and 3 deletions
...@@ -19,6 +19,8 @@ public final class ConfigurationKeys { ...@@ -19,6 +19,8 @@ public final class ConfigurationKeys {
public static final String CHECKPOINTING = "checkpointing"; public static final String CHECKPOINTING = "checkpointing";
public static final String PARALLELISM = "parallelism";
private ConfigurationKeys() {} private ConfigurationKeys() {}
} }
package theodolite.uc1.application; package theodolite.uc1.application;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
...@@ -42,6 +43,14 @@ public final class HistoryServiceFlinkJob { ...@@ -42,6 +43,14 @@ public final class HistoryServiceFlinkJob {
if (checkpointing) { if (checkpointing) {
this.env.enableCheckpointing(commitIntervalMs); this.env.enableCheckpointing(commitIntervalMs);
} }
// Parallelism
final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
if (parallelism != null) {
LOGGER.error("Set parallelism: {}.", parallelism);
this.env.setParallelism(parallelism);
}
} }
private void buildPipeline() { private void buildPipeline() {
...@@ -61,7 +70,8 @@ public final class HistoryServiceFlinkJob { ...@@ -61,7 +70,8 @@ public final class HistoryServiceFlinkJob {
stream stream
.rebalance() .rebalance()
.map(new GsonMapper()) .map(new GsonMapper())
.flatMap((record, c) -> LOGGER.info("Record: {}", record)); .flatMap((record, c) -> LOGGER.info("Record: {}", record))
.returns(Types.GENERIC(Object.class)); // Will never be used
} }
/** /**
......
...@@ -30,6 +30,8 @@ public final class ConfigurationKeys { ...@@ -30,6 +30,8 @@ public final class ConfigurationKeys {
public static final String CHECKPOINTING = "checkpointing"; public static final String CHECKPOINTING = "checkpointing";
public static final String PARALLELISM = "parallelism";
private ConfigurationKeys() {} private ConfigurationKeys() {}
} }
...@@ -56,6 +56,13 @@ public final class HistoryServiceFlinkJob { ...@@ -56,6 +56,13 @@ public final class HistoryServiceFlinkJob {
this.env.enableCheckpointing(commitIntervalMs); this.env.enableCheckpointing(commitIntervalMs);
} }
// Parallelism
final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
if (parallelism != null) {
LOGGER.error("Set parallelism: {}.", parallelism);
this.env.setParallelism(parallelism);
}
// State Backend // State Backend
final StateBackend stateBackend = StateBackends.fromConfiguration(this.config); final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
this.env.setStateBackend(stateBackend); this.env.setStateBackend(stateBackend);
......
...@@ -34,6 +34,8 @@ public final class ConfigurationKeys { ...@@ -34,6 +34,8 @@ public final class ConfigurationKeys {
public static final String CHECKPOINTING = "checkpointing"; public static final String CHECKPOINTING = "checkpointing";
public static final String PARALLELISM = "parallelism";
private ConfigurationKeys() {} private ConfigurationKeys() {}
} }
...@@ -63,6 +63,13 @@ public final class HistoryServiceFlinkJob { ...@@ -63,6 +63,13 @@ public final class HistoryServiceFlinkJob {
this.env.enableCheckpointing(commitIntervalMs); this.env.enableCheckpointing(commitIntervalMs);
} }
// Parallelism
final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
if (parallelism != null) {
LOGGER.error("Set parallelism: {}.", parallelism);
this.env.setParallelism(parallelism);
}
// State Backend // State Backend
final StateBackend stateBackend = StateBackends.fromConfiguration(this.config); final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
this.env.setStateBackend(stateBackend); this.env.setStateBackend(stateBackend);
......
...@@ -76,6 +76,13 @@ public final class AggregationServiceFlinkJob { ...@@ -76,6 +76,13 @@ public final class AggregationServiceFlinkJob {
this.env.enableCheckpointing(commitIntervalMs); this.env.enableCheckpointing(commitIntervalMs);
} }
// Parallelism
final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
if (parallelism != null) {
LOGGER.error("Set parallelism: {}.", parallelism);
this.env.setParallelism(parallelism);
}
// State Backend // State Backend
final StateBackend stateBackend = StateBackends.fromConfiguration(this.config); final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
this.env.setStateBackend(stateBackend); this.env.setStateBackend(stateBackend);
......
...@@ -15,7 +15,7 @@ public final class ConfigurationKeys { ...@@ -15,7 +15,7 @@ public final class ConfigurationKeys {
public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; public static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
public static final String WINDOW_SIZE_MS = "window.size.ms"; public static final String WINDOW_SIZE_MS = "window.size.ms";
...@@ -28,13 +28,15 @@ public final class ConfigurationKeys { ...@@ -28,13 +28,15 @@ public final class ConfigurationKeys {
public static final String FLINK_STATE_BACKEND_PATH = "flink.state.backend.path"; public static final String FLINK_STATE_BACKEND_PATH = "flink.state.backend.path";
public static final String FLINK_STATE_BACKEND_MEMORY_SIZE = //NOPMD public static final String FLINK_STATE_BACKEND_MEMORY_SIZE = // NOPMD
"flink.state.backend.memory.size"; "flink.state.backend.memory.size";
public static final String DEBUG = "debug"; public static final String DEBUG = "debug";
public static final String CHECKPOINTING = "checkpointing"; public static final String CHECKPOINTING = "checkpointing";
public static final String PARALLELISM = "parallelism";
private ConfigurationKeys() {} private ConfigurationKeys() {}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment