diff --git a/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/AbstractFlinkService.java b/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/AbstractFlinkService.java index ac4e30352c83c2b2a0fae9be6362c725ae02b3c7..ff75b6c9daffe6f6cd82465746e18bb7802fd45f 100644 --- a/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/AbstractFlinkService.java +++ b/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/AbstractFlinkService.java @@ -9,19 +9,20 @@ import titan.ccp.common.configuration.ServiceConfigurations; /** * A general Apache Flink-based microservice. It is configured by {@link #configureEnv()}, - * and extended by implementing business logic in {@link #buildPipeline()} + * and extended by implementing business logic in {@link #buildPipeline()}. + * The configuration of the serializer needs to be implemented in {@link #configureSerializers()}. */ public abstract class AbstractFlinkService { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractFlinkService.class); protected final StreamExecutionEnvironment env; - protected final Configuration config = ServiceConfigurations.createWithDefaults(); + protected Configuration config = ServiceConfigurations.createWithDefaults(); protected final String applicationId; /** - * Abstract Service constructing and configuring the application. + * Abstract Service constructing the name and {@link StreamExecutionEnvironment}. */ public AbstractFlinkService() { final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME); @@ -30,11 +31,22 @@ public abstract class AbstractFlinkService { this.env = StreamExecutionEnvironment.getExecutionEnvironment(); - this.configureEnv(); //NOPMD + } - this.buildPipeline(); + /** + * Abstract Service constructing the name and {@link StreamExecutionEnvironment}. + * @param config the configuration for the service. + */ + public AbstractFlinkService(final Configuration config) { + this.config = config; + final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME); + final String applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION); + this.applicationId = applicationName + "-" + applicationVersion; + this.env = StreamExecutionEnvironment.getExecutionEnvironment(); } + + /** * Configures the service using environment variables. */ @@ -86,6 +98,8 @@ public abstract class AbstractFlinkService { * Starts the service. */ public void run() { + this.configureEnv(); + this.buildPipeline(); LOGGER.info("Execution plan: {}", this.env.getExecutionPlan()); try { diff --git a/theodolite-benchmarks/uc1-flink/src/main/java/rocks/theodolite/benchmarks/uc1/flink/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc1-flink/src/main/java/rocks/theodolite/benchmarks/uc1/flink/HistoryServiceFlinkJob.java index c1b3278c34583a69ece2b505599d8f338ac93d89..a5e833b0bbb7cd8a153e5fa7c51c1a548ef63083 100644 --- a/theodolite-benchmarks/uc1-flink/src/main/java/rocks/theodolite/benchmarks/uc1/flink/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc1-flink/src/main/java/rocks/theodolite/benchmarks/uc1/flink/HistoryServiceFlinkJob.java @@ -14,7 +14,7 @@ import titan.ccp.model.records.ActivePowerRecord; */ public final class HistoryServiceFlinkJob extends AbstractFlinkService { - private static final DatabaseAdapter<String> DATABASE_ADAPTER = LogWriterFactory.forJson(); + private final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson(); @Override public void configureEnv() { @@ -45,9 +45,9 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService { stream // .rebalance() - .map(new ConverterAdapter<>(this.DATABASE_ADAPTER.getRecordConverter())) + .map(new ConverterAdapter<>(this.databaseAdapter.getRecordConverter())) .returns(Types.STRING) - .flatMap(new WriterAdapter<>(this.DATABASE_ADAPTER.getDatabaseWriter())) + .flatMap(new WriterAdapter<>(this.databaseAdapter.getDatabaseWriter())) .returns(Types.VOID); // Will never be used }