Skip to content
Snippets Groups Projects
Commit 91751eef authored by Lorenz Boguhn's avatar Lorenz Boguhn Committed by Lorenz Boguhn
Browse files

Codestyle cleanup

parent 4513127c
No related branches found
No related tags found
1 merge request!272Introduce Abstract Flink Service Class
......@@ -5,7 +5,6 @@ import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.commons.flink.StateBackends;
import titan.ccp.common.configuration.ServiceConfigurations;
/**
......@@ -31,7 +30,7 @@ public abstract class AbstractFlinkService {
this.env = StreamExecutionEnvironment.getExecutionEnvironment();
this.configureEnv();
this.configureEnv(); //NOPMD
this.buildPipeline();
}
......@@ -39,7 +38,7 @@ public abstract class AbstractFlinkService {
/**
* Configures the service using environment variables.
*/
public void configureEnv() {
protected void configureEnv() {
this.configureCheckpointing();
this.configureParallelism();
this.configureStateBackend();
......
......@@ -37,6 +37,7 @@ public final class ConfigurationKeys {
public static final String CHECKPOINTING = "checkpointing";
public static final String PARALLELISM = "parallelism";
private ConfigurationKeys() {}
}
......@@ -3,8 +3,6 @@ package rocks.theodolite.benchmarks.uc1.flink;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rocks.theodolite.benchmarks.commons.flink.AbstractFlinkService;
import rocks.theodolite.benchmarks.commons.flink.KafkaConnectorFactory;
import rocks.theodolite.benchmarks.uc1.commons.DatabaseAdapter;
......@@ -16,24 +14,21 @@ import titan.ccp.model.records.ActivePowerRecord;
*/
public final class HistoryServiceFlinkJob extends AbstractFlinkService {
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class);
private static final DatabaseAdapter<String> databaseAdapter = LogWriterFactory.forJson();
/**
* Create a new instance of the {@link HistoryServiceFlinkJob}.
*/
public HistoryServiceFlinkJob() {
super();
}
private static final DatabaseAdapter<String> DATABASE_ADAPTER = LogWriterFactory.forJson();
@Override
public void configureEnv() {
super.configureCheckpointing();
super.configureParallelism();
}
@Override
protected void configureSerializers() {
// No serializers needed here
}
@Override
protected void buildPipeline() {
final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
......@@ -50,9 +45,9 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService {
stream
// .rebalance()
.map(new ConverterAdapter<>(this.databaseAdapter.getRecordConverter()))
.map(new ConverterAdapter<>(this.DATABASE_ADAPTER.getRecordConverter()))
.returns(Types.STRING)
.flatMap(new WriterAdapter<>(this.databaseAdapter.getDatabaseWriter()))
.flatMap(new WriterAdapter<>(this.DATABASE_ADAPTER.getDatabaseWriter()))
.returns(Types.VOID); // Will never be used
}
......
......@@ -24,13 +24,7 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService {
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class);
/**
* Create a new instance of the {@link HistoryServiceFlinkJob}.
*/
public HistoryServiceFlinkJob() {
super();
}
@Override
protected void configureSerializers() {
this.env.getConfig().registerTypeWithKryoSerializer(Stats.class, new StatsSerializer());
this.env.getConfig().getRegisteredTypesWithKryoSerializers()
......@@ -39,7 +33,7 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService {
}
@Override
protected void buildPipeline() {
final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
final String schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL);
......
......@@ -30,13 +30,7 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService {
private static final Logger LOGGER = LoggerFactory.getLogger(HistoryServiceFlinkJob.class);
/**
* Create a new instance of the {@link HistoryServiceFlinkJob}.
*/
public HistoryServiceFlinkJob() {
super();
}
@Override
protected void configureSerializers() {
this.env.getConfig().registerTypeWithKryoSerializer(HourOfDayKey.class,
new HourOfDayKeySerde());
......@@ -49,6 +43,7 @@ public final class HistoryServiceFlinkJob extends AbstractFlinkService {
}
}
@Override
protected void buildPipeline() {
// Configurations
final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
......
......@@ -38,13 +38,7 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService {
private static final Logger LOGGER = LoggerFactory.getLogger(AggregationServiceFlinkJob.class);
/**
* Create a new {@link AggregationServiceFlinkJob}.
*/
public AggregationServiceFlinkJob() {
super();
}
@Override
protected void configureSerializers() {
this.env.getConfig().registerTypeWithKryoSerializer(ImmutableSensorRegistry.class,
new ImmutableSensorRegistrySerializer());
......@@ -65,6 +59,7 @@ public final class AggregationServiceFlinkJob extends AbstractFlinkService {
s.getSerializer().getClass().getName()));
}
@Override
protected void buildPipeline() {
// Get configurations
final String kafkaBroker = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment