diff --git a/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/AbstractFlinkService.java b/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/AbstractFlinkService.java index c858941d87df23fb5301f5b4414304c9ef0cde3a..ac4e30352c83c2b2a0fae9be6362c725ae02b3c7 100644 --- a/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/AbstractFlinkService.java +++ b/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/AbstractFlinkService.java @@ -5,7 +5,6 @@ import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import rocks.theodolite.benchmarks.commons.flink.StateBackends; import titan.ccp.common.configuration.ServiceConfigurations; /** @@ -31,7 +30,7 @@ public abstract class AbstractFlinkService { this.env = StreamExecutionEnvironment.getExecutionEnvironment(); - this.configureEnv(); + this.configureEnv(); //NOPMD this.buildPipeline(); } @@ -39,7 +38,7 @@ public abstract class AbstractFlinkService { /** * Configures the service using environment variables. */ - public void configureEnv() { + protected void configureEnv() { this.configureCheckpointing(); this.configureParallelism(); this.configureStateBackend(); diff --git a/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/ConfigurationKeys.java b/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/ConfigurationKeys.java index a25c0418c642f7c578647c2d87d1755736c2ab8f..9eb143c3c07f879de37eafa2fbe6729bf182d45e 100644 --- a/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/ConfigurationKeys.java +++ b/theodolite-benchmarks/flink-commons/src/main/java/rocks/theodolite/benchmarks/commons/flink/ConfigurationKeys.java @@ -37,6 +37,7 @@ public final class ConfigurationKeys { public static final String CHECKPOINTING = "checkpointing"; public static final String PARALLELISM = "parallelism"; + private ConfigurationKeys() {} } diff --git a/theodolite-benchmarks/uc1-flink/src/main/java/rocks/theodolite/benchmarks/uc1/flink/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc1-flink/src/main/java/rocks/theodolite/benchmarks/uc1/flink/HistoryServiceFlinkJob.java index 04f14017b4ded27d792fefa545916422a1a43266..c1b3278c34583a69ece2b505599d8f338ac93d89 100644 --- a/theodolite-benchmarks/uc1-flink/src/main/java/rocks/theodolite/benchmarks/uc1/flink/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc1-flink/src/main/java/rocks/theodolite/benchmarks/uc1/flink/HistoryServiceFlinkJob.java @@ -3,8 +3,6 @@ package rocks.theodolite.benchmarks.uc1.flink; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import rocks.theodolite.benchmarks.commons.flink.AbstractFlinkService; import rocks.theodolite.benchmarks.commons.flink.KafkaConnectorFactory; import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter; @@ -16,24 +14,21 @@ import titan.ccp.model.records.ActivePowerRecord; */ public final class HistoryServiceFlinkJob extends AbstractFlinkService { - private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class); - private static final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson(); - - /** - * Create a new instance of the {@link HistoryServiceFlinkJob}. - */ - public HistoryServiceFlinkJob() { - super(); - } + private static final DatabaseAdapter<String> DATABASE_ADAPTER = LogWriterFactory.forJson(); + @Override public void configureEnv() { super.configureCheckpointing(); super.configureParallelism(); } + @Override protected void configureSerializers() { + // No serializers needed here } + + @Override protected void buildPipeline() { final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); @@ -50,9 +45,9 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService { stream // .rebalance() - .map(new ConverterAdapter<>(this.databaseAdapter.getRecordConverter())) + .map(new ConverterAdapter<>(this.DATABASE_ADAPTER.getRecordConverter())) .returns(Types.STRING) - .flatMap(new WriterAdapter<>(this.databaseAdapter.getDatabaseWriter())) + .flatMap(new WriterAdapter<>(this.DATABASE_ADAPTER.getDatabaseWriter())) .returns(Types.VOID); // Will never be used } diff --git a/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java index 8b8d00ff0a14938a69aff7b09464a6b7788455ee..1109fb7bb431811dde4c448b445990599bc6d626 100644 --- a/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc2-flink/src/main/java/rocks/theodolite/benchmarks/uc2/flink/HistoryServiceFlinkJob.java @@ -24,13 +24,7 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService { private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class); - /** - * Create a new instance of the {@link HistoryServiceFlinkJob}. - */ - public HistoryServiceFlinkJob() { - super(); - } - + @Override protected void configureSerializers() { this.env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer()); this.env.getConfig().getRegisteredTypesWithKryoSerializers() @@ -39,7 +33,7 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService { } - + @Override protected void buildPipeline() { final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); diff --git a/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java b/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java index e0c25c83f5d1678f8171943dd5426f0e994579f4..ad38303b4f8957bcb6de23608451be2da4ae41ac 100644 --- a/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java +++ b/theodolite-benchmarks/uc3-flink/src/main/java/rocks/theodolite/benchmarks/uc3/flink/HistoryServiceFlinkJob.java @@ -30,13 +30,7 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService { private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class); - /** - * Create a new instance of the {@link HistoryServiceFlinkJob}. - */ - public HistoryServiceFlinkJob() { - super(); - } - + @Override protected void configureSerializers() { this.env.getConfig().registerTypeWithKryoSerializer(HourOfDayKey.class, new HourOfDayKeySerde()); @@ -49,6 +43,7 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService { } } + @Override protected void buildPipeline() { // Configurations final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); diff --git a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java index 583ff36a6aa8ead2e6442b2422422adadc396bf8..35ef7dd1426f287c7529ca12f00c7c1d23606ab1 100644 --- a/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java +++ b/theodolite-benchmarks/uc4-flink/src/main/java/rocks/theodolite/benchmarks/uc4/flink/AggregationServiceFlinkJob.java @@ -38,13 +38,7 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService { private static final Logger LOGGER = LoggerFactory.getLogger(AggregationServiceFlinkJob.class); - /** - * Create a new {@link AggregationServiceFlinkJob}. - */ - public AggregationServiceFlinkJob() { - super(); - } - + @Override protected void configureSerializers() { this.env.getConfig().registerTypeWithKryoSerializer(ImmutableSensorRegistry.class, new ImmutableSensorRegistrySerializer()); @@ -65,6 +59,7 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService { s.getSerializer().getClass().getName())); } + @Override protected void buildPipeline() { // Get configurations final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);