Skip to content
Snippets Groups Projects
Commit 1661623c authored by Sören Henning's avatar Sören Henning
Browse files

Minor code refactroing

parent b874591c
No related branches found
No related tags found
1 merge request!245Firestore sink for UC1 Beam
Pipeline #6644 canceled
...@@ -8,8 +8,8 @@ import org.slf4j.LoggerFactory; ...@@ -8,8 +8,8 @@ import org.slf4j.LoggerFactory;
import titan.ccp.common.configuration.ServiceConfigurations; import titan.ccp.common.configuration.ServiceConfigurations;
/** /**
* Abstraction of a Beam microservice. * Abstraction of a Beam microservice. Encapsulates the corresponding {@link PipelineOptions} and
* Encapsulates the corresponding {@link PipelineOptions} and the beam Runner. * the beam Runner.
*/ */
public class AbstractBeamService { public class AbstractBeamService {
...@@ -20,26 +20,24 @@ public class AbstractBeamService { ...@@ -20,26 +20,24 @@ public class AbstractBeamService {
// Application Configurations // Application Configurations
private final Configuration config = ServiceConfigurations.createWithDefaults(); private final Configuration config = ServiceConfigurations.createWithDefaults();
private final String applicationName = private final String applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME);
config.getString(ConfigurationKeys.APPLICATION_NAME);
/** /**
* Creates AbstractBeamService with options. * Creates AbstractBeamService with options.
*/ */
public AbstractBeamService(final String[] args) { //NOPMD public AbstractBeamService(final String[] args) { // NOPMD
super(); super();
LOGGER.info("Pipeline options:"); LOGGER.info("Pipeline options:");
for (final String s : args) { for (final String s : args) {
LOGGER.info("{}", s); LOGGER.info("{}", s);
} }
options = PipelineOptionsFactory.fromArgs(args).create(); this.options = PipelineOptionsFactory.fromArgs(args).create();
options.setJobName(applicationName); this.options.setJobName(this.applicationName);
LOGGER.info("Starting BeamService with PipelineOptions {}:", this.options.toString()); LOGGER.info("Starting BeamService with PipelineOptions: {}", this.options.toString());
} }
public Configuration getConfig() { public Configuration getConfig() {
return config; return this.config;
} }
} }
...@@ -6,6 +6,7 @@ import org.apache.beam.sdk.transforms.PTransform; ...@@ -6,6 +6,7 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.Deserializer;
/** /**
* Simple {@link PTransform} that read from Kafka using {@link KafkaIO}. * Simple {@link PTransform} that read from Kafka using {@link KafkaIO}.
...@@ -13,8 +14,7 @@ import org.apache.beam.sdk.values.PCollection; ...@@ -13,8 +14,7 @@ import org.apache.beam.sdk.values.PCollection;
* @param <K> Type of the Key. * @param <K> Type of the Key.
* @param <V> Type of the Value. * @param <V> Type of the Value.
*/ */
public class KafkaGenericReader<K, V> extends public class KafkaGenericReader<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> {
PTransform<PBegin, PCollection<KV<K, V>>> {
private static final long serialVersionUID = 2603286150183186115L; private static final long serialVersionUID = 2603286150183186115L;
private final PTransform<PBegin, PCollection<KV<K, V>>> reader; private final PTransform<PBegin, PCollection<KV<K, V>>> reader;
...@@ -22,14 +22,12 @@ public class KafkaGenericReader<K, V> extends ...@@ -22,14 +22,12 @@ public class KafkaGenericReader<K, V> extends
/** /**
* Instantiates a {@link PTransform} that reads from Kafka with the given Configuration. * Instantiates a {@link PTransform} that reads from Kafka with the given Configuration.
*/ */
public KafkaGenericReader(final String bootstrapServer, final String inputTopic, public KafkaGenericReader(
final Map<String, Object> consumerConfig, final String bootstrapServer,
final Class<? extends final String inputTopic,
org.apache.kafka.common.serialization.Deserializer<K>> final Map<String, Object> consumerConfig,
keyDeserializer, final Class<? extends Deserializer<K>> keyDeserializer,
final Class<? extends final Class<? extends Deserializer<V>> valueDeserializer) {
org.apache.kafka.common.serialization.Deserializer<V>>
valueDeserializer) {
super(); super();
// Check if boostrap server and inputTopic are defined // Check if boostrap server and inputTopic are defined
...@@ -37,7 +35,7 @@ public class KafkaGenericReader<K, V> extends ...@@ -37,7 +35,7 @@ public class KafkaGenericReader<K, V> extends
throw new IllegalArgumentException("bootstrapServer or inputTopic missing"); throw new IllegalArgumentException("bootstrapServer or inputTopic missing");
} }
reader = this.reader =
KafkaIO.<K, V>read() KafkaIO.<K, V>read()
.withBootstrapServers(bootstrapServer) .withBootstrapServers(bootstrapServer)
.withTopic(inputTopic) .withTopic(inputTopic)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment