Skip to content
Snippets Groups Projects
Commit cab6297c authored by Lorenz Boguhn's avatar Lorenz Boguhn Committed by Lorenz Boguhn
Browse files

Refactor the AbstractFlinkService

+ Move buildPipeline() and configureEnv() to run method.
+ Extend Javadoc
+ Add constructor which can override the configuration as needed for integration tests
+ remove now unnecessary static from uc1 flink
+ Fix uc2 and uc4 broken build.gradle (ref to uc1/uc2)
parent 00d439f5
No related branches found
No related tags found
1 merge request!272Introduce Abstract Flink Service Class
Pipeline #8269 passed
...@@ -9,19 +9,20 @@ import titan.ccp.common.configuration.ServiceConfigurations; ...@@ -9,19 +9,20 @@ import titan.ccp.common.configuration.ServiceConfigurations;
/** /**
* A general Apache Flink-based microservice. It is configured by {@link #configureEnv()}, * A general Apache Flink-based microservice. It is configured by {@link #configureEnv()},
* and extended by implementing business logic in {@link #buildPipeline()} * and extended by implementing business logic in {@link #buildPipeline()}.
* The configuration of the serializer needs to be implemented in {@link #configureSerializers()}.
*/ */
public abstract class AbstractFlinkService { public abstract class AbstractFlinkService {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractFlinkService.class); private static final Logger LOGGER = LoggerFactory.getLogger(AbstractFlinkService.class);
protected final StreamExecutionEnvironment env; protected final StreamExecutionEnvironment env;
protected final Configuration config = ServiceConfigurations.createWithDefaults(); protected Configuration config = ServiceConfigurations.createWithDefaults();
protected final String applicationId; protected final String applicationId;
/** /**
* Abstract Service constructing and configuring the application. * Abstract Service constructing the name and {@link StreamExecutionEnvironment}.
*/ */
public AbstractFlinkService() { public AbstractFlinkService() {
final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME); final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
...@@ -30,11 +31,22 @@ public abstract class AbstractFlinkService { ...@@ -30,11 +31,22 @@ public abstract class AbstractFlinkService {
this.env = StreamExecutionEnvironment.getExecutionEnvironment(); this.env = StreamExecutionEnvironment.getExecutionEnvironment();
this.configureEnv(); //NOPMD }
this.buildPipeline(); /**
* Abstract Service constructing the name and {@link StreamExecutionEnvironment}.
* @param config the configuration for the service.
*/
public AbstractFlinkService(final Configuration config) {
this.config = config;
final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION);
this.applicationId = applicationName + "-" + applicationVersion;
this.env = StreamExecutionEnvironment.getExecutionEnvironment();
} }
/** /**
* Configures the service using environment variables. * Configures the service using environment variables.
*/ */
...@@ -86,6 +98,8 @@ public abstract class AbstractFlinkService { ...@@ -86,6 +98,8 @@ public abstract class AbstractFlinkService {
* Starts the service. * Starts the service.
*/ */
public void run() { public void run() {
this.configureEnv();
this.buildPipeline();
LOGGER.info("Execution plan: {}", this.env.getExecutionPlan()); LOGGER.info("Execution plan: {}", this.env.getExecutionPlan());
try { try {
......
...@@ -14,7 +14,7 @@ import titan.ccp.model.records.ActivePowerRecord; ...@@ -14,7 +14,7 @@ import titan.ccp.model.records.ActivePowerRecord;
*/ */
public final class HistoryServiceFlinkJob extends AbstractFlinkService { public final class HistoryServiceFlinkJob extends AbstractFlinkService {
private static final DatabaseAdapter<String> DATABASE_ADAPTER = LogWriterFactory.forJson(); private final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson();
@Override @Override
public void configureEnv() { public void configureEnv() {
...@@ -45,9 +45,9 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService { ...@@ -45,9 +45,9 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService {
stream stream
// .rebalance() // .rebalance()
.map(new ConverterAdapter<>(this.DATABASE_ADAPTER.getRecordConverter())) .map(new ConverterAdapter<>(this.databaseAdapter.getRecordConverter()))
.returns(Types.STRING) .returns(Types.STRING)
.flatMap(new WriterAdapter<>(this.DATABASE_ADAPTER.getDatabaseWriter())) .flatMap(new WriterAdapter<>(this.databaseAdapter.getDatabaseWriter()))
.returns(Types.VOID); // Will never be used .returns(Types.VOID); // Will never be used
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment