Skip to content
Snippets Groups Projects
Commit e09d88e8 authored by Sören Henning's avatar Sören Henning
Browse files

Make parallelism configurable via env. variables

parent 87f00436
No related branches found
No related tags found
No related merge requests found
Pipeline #3060 passed
Showing
with 39 additions and 2 deletions
...@@ -19,6 +19,8 @@ public final class ConfigurationKeys { ...@@ -19,6 +19,8 @@ public final class ConfigurationKeys {
public static final String CHECKPOINTING = "checkpointing"; public static final String CHECKPOINTING = "checkpointing";
public static final String PARALLELISM = "parallelism";
private ConfigurationKeys() {} private ConfigurationKeys() {}
} }
...@@ -43,6 +43,14 @@ public final class HistoryServiceFlinkJob { ...@@ -43,6 +43,14 @@ public final class HistoryServiceFlinkJob {
if (checkpointing) { if (checkpointing) {
this.env.enableCheckpointing(commitIntervalMs); this.env.enableCheckpointing(commitIntervalMs);
} }
// Parallelism
final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
if (parallelism != null) {
LOGGER.error("Set parallelism: {}.", parallelism);
this.env.setParallelism(parallelism);
}
} }
private void buildPipeline() { private void buildPipeline() {
......
...@@ -30,6 +30,8 @@ public final class ConfigurationKeys { ...@@ -30,6 +30,8 @@ public final class ConfigurationKeys {
public static final String CHECKPOINTING = "checkpointing"; public static final String CHECKPOINTING = "checkpointing";
public static final String PARALLELISM = "parallelism";
private ConfigurationKeys() {} private ConfigurationKeys() {}
} }
...@@ -56,6 +56,13 @@ public final class HistoryServiceFlinkJob { ...@@ -56,6 +56,13 @@ public final class HistoryServiceFlinkJob {
this.env.enableCheckpointing(commitIntervalMs); this.env.enableCheckpointing(commitIntervalMs);
} }
// Parallelism
final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
if (parallelism != null) {
LOGGER.error("Set parallelism: {}.", parallelism);
this.env.setParallelism(parallelism);
}
// State Backend // State Backend
final StateBackend stateBackend = StateBackends.fromConfiguration(this.config); final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
this.env.setStateBackend(stateBackend); this.env.setStateBackend(stateBackend);
......
...@@ -34,6 +34,8 @@ public final class ConfigurationKeys { ...@@ -34,6 +34,8 @@ public final class ConfigurationKeys {
public static final String CHECKPOINTING = "checkpointing"; public static final String CHECKPOINTING = "checkpointing";
public static final String PARALLELISM = "parallelism";
private ConfigurationKeys() {} private ConfigurationKeys() {}
} }
...@@ -63,6 +63,13 @@ public final class HistoryServiceFlinkJob { ...@@ -63,6 +63,13 @@ public final class HistoryServiceFlinkJob {
this.env.enableCheckpointing(commitIntervalMs); this.env.enableCheckpointing(commitIntervalMs);
} }
// Parallelism
final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
if (parallelism != null) {
LOGGER.error("Set parallelism: {}.", parallelism);
this.env.setParallelism(parallelism);
}
// State Backend // State Backend
final StateBackend stateBackend = StateBackends.fromConfiguration(this.config); final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
this.env.setStateBackend(stateBackend); this.env.setStateBackend(stateBackend);
......
...@@ -76,6 +76,13 @@ public final class AggregationServiceFlinkJob { ...@@ -76,6 +76,13 @@ public final class AggregationServiceFlinkJob {
this.env.enableCheckpointing(commitIntervalMs); this.env.enableCheckpointing(commitIntervalMs);
} }
// Parallelism
final Integer parallelism = this.config.getInteger(ConfigurationKeys.PARALLELISM, null);
if (parallelism != null) {
LOGGER.error("Set parallelism: {}.", parallelism);
this.env.setParallelism(parallelism);
}
// State Backend // State Backend
final StateBackend stateBackend = StateBackends.fromConfiguration(this.config); final StateBackend stateBackend = StateBackends.fromConfiguration(this.config);
this.env.setStateBackend(stateBackend); this.env.setStateBackend(stateBackend);
......
...@@ -35,6 +35,8 @@ public final class ConfigurationKeys { ...@@ -35,6 +35,8 @@ public final class ConfigurationKeys {
public static final String CHECKPOINTING = "checkpointing"; public static final String CHECKPOINTING = "checkpointing";
public static final String PARALLELISM = "parallelism";
private ConfigurationKeys() {} private ConfigurationKeys() {}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment