diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java index 3e94fb4c878401183f45ff384e39dd6bc0291a27..4e9704dc9b08b3c000c60799e73802ca44ac7699 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractBeamService.java @@ -11,6 +11,7 @@ import titan.ccp.common.configuration.ServiceConfigurations; * Abstraction of a Beam microservice. Encapsulates the corresponding {@link PipelineOptions} and * the beam Runner. */ +@Deprecated public class AbstractBeamService { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractBeamService.class); diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java index 3f04bf4373aab0394ff4574b4020065ac356724b..939b8e3de62fea445078359523e0fe127a2346e1 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipeline.java @@ -10,6 +10,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; /** * Abstraction of a Beam {@link Pipeline}. */ +@Deprecated public class AbstractPipeline extends Pipeline { private static final String KAFKA_CONFIG_SPECIFIC_AVRO_READER = "specific.avro.reader"; // NOPMD diff --git a/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipelineFactory.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipelineFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..2cedeb1ce3134fd8dfcd614ba2b47605e27d31f8 --- /dev/null +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/AbstractPipelineFactory.java @@ -0,0 +1,63 @@ +package theodolite.commons.beam; + +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.commons.configuration2.Configuration; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader; + +public abstract class AbstractPipelineFactory { + + protected final Configuration config; + + public AbstractPipelineFactory(final Configuration configuration) { + this.config = configuration; + } + + public final Pipeline create(final PipelineOptions options) { + this.expandOptions(options); + final Pipeline pipeline = Pipeline.create(options); + this.constructPipeline(pipeline); + this.registerCoders(pipeline.getCoderRegistry()); + return pipeline; + } + + protected abstract void expandOptions(final PipelineOptions options); + + protected abstract void constructPipeline(Pipeline pipeline); + + protected abstract void registerCoders(CoderRegistry registry); + + protected KafkaActivePowerTimestampReader buildKafkaReader() { + final String inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); + final String bootstrapServer = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); + + final Map<String, Object> consumerConfig = new HashMap<>(); + consumerConfig.put( + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, + this.config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG)); + consumerConfig.put( + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + this.config.getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG)); + consumerConfig.put( + AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, + this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL)); + consumerConfig.put( + KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, + this.config.getString(ConfigurationKeys.SPECIFIC_AVRO_READER)); + consumerConfig.put( + ConsumerConfig.GROUP_ID_CONFIG, + this.config.getString(ConfigurationKeys.APPLICATION_NAME)); + + return new KafkaActivePowerTimestampReader( + bootstrapServer, + inputTopic, + consumerConfig); + } + +} diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/BeamService.java b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/BeamService.java similarity index 70% rename from theodolite-benchmarks/uc1-beam/src/main/java/application/BeamService.java rename to theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/BeamService.java index 02bc29d88a8ddb687ca46d9772b788fac7d8d670..28f3d481f384a1fd3d8cc3873033fe80f6b04d20 100644 --- a/theodolite-benchmarks/uc1-beam/src/main/java/application/BeamService.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/theodolite/commons/beam/BeamService.java @@ -1,5 +1,6 @@ -package application; +package theodolite.commons.beam; +import java.util.function.Function; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.options.PipelineOptions; @@ -7,7 +8,6 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.commons.configuration2.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import theodolite.commons.beam.ConfigurationKeys; import titan.ccp.common.configuration.ServiceConfigurations; public class BeamService { @@ -21,18 +21,20 @@ public class BeamService { private final PipelineOptions pipelineOptions; public BeamService( - AbstractPipelineFactory pipelineFactory, + Function<Configuration, AbstractPipelineFactory> pipelineFactoryFactory, Class<? extends PipelineRunner<?>> runner, String[] args) { - this.pipelineFactory = pipelineFactory; + this.pipelineFactory = pipelineFactoryFactory.apply(this.config); this.pipelineOptions = PipelineOptionsFactory.fromArgs(args).create(); this.pipelineOptions.setJobName(this.applicationName); this.pipelineOptions.setRunner(runner); } public void run() { - LOGGER.info("Starting BeamService with pipeline options: {}", this.pipelineOptions.toString()); - final Pipeline pipeline = this.pipelineFactory.create(this.config, this.pipelineOptions); + LOGGER.info("Construct Beam pipeline with pipeline options: {}", + this.pipelineOptions.toString()); + final Pipeline pipeline = this.pipelineFactory.create(this.pipelineOptions); + LOGGER.info("Starting BeamService {}.", this.applicationName); pipeline.run().waitUntilFinish(); } diff --git a/theodolite-benchmarks/uc1-beam-flink/build.gradle b/theodolite-benchmarks/uc1-beam-flink/build.gradle index f4b6cff8efbcdbcb701f249220643669f0f89626..cb707ddfd0bf9650beffb851efdca84c08fad84f 100644 --- a/theodolite-benchmarks/uc1-beam-flink/build.gradle +++ b/theodolite-benchmarks/uc1-beam-flink/build.gradle @@ -3,7 +3,17 @@ plugins { } dependencies { - implementation project(':uc1-beam') + implementation project(':uc1-beam') +} + +sourceSets { + main { + resources { + srcDirs += [ + project(':uc1-beam').sourceSets.main.resources + ] + } + } } mainClassName = "application.Uc1BeamFlink" diff --git a/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1BeamFlink.java b/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1BeamFlink.java index fe58369b3c0c19351bcc5cde170df68946af7cbd..5d4dc3328bcd772d7a486b90d5dd69ec5438a1b2 100644 --- a/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1BeamFlink.java +++ b/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1BeamFlink.java @@ -1,7 +1,7 @@ package application; import org.apache.beam.runners.flink.FlinkRunner; -import theodolite.commons.beam.AbstractBeamService; +import theodolite.commons.beam.BeamService; /** * Implementation of the use case Database Storage using Apache Beam with the Flink Runner. To @@ -11,29 +11,10 @@ import theodolite.commons.beam.AbstractBeamService; * ${workspace_loc:/uc1-application-samza/eclipseConsoleLogs.log} as Output File under Standard * Input Output in Common in the Run Configuration Start via Eclipse Run. */ -public final class Uc1BeamFlink extends AbstractBeamService { +public final class Uc1BeamFlink { - /** - * Private constructor setting specific options for this use case. - */ - private Uc1BeamFlink(final String[] args) { //NOPMD - super(args); - this.options.setRunner(FlinkRunner.class); - } - - /** - * Main method. - */ public static void main(final String[] args) { - - // Create application via configurations - final Uc1BeamFlink uc1 = new Uc1BeamFlink(args); - - // Create pipeline with configurations - final Uc1BeamPipeline pipeline = new Uc1BeamPipeline(uc1.options, uc1.getConfig()); - - // Submit job and start execution - pipeline.run().waitUntilFinish(); + new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).run(); } } diff --git a/theodolite-benchmarks/uc1-beam-samza/src/main/java/application/Uc1BeamSamza.java b/theodolite-benchmarks/uc1-beam-samza/src/main/java/application/Uc1BeamSamza.java index aaef5c2d6968c4b89059537277a2582ecca70451..fb1b1d15aa4783cc2c72bc901d5bf71cb6f6ae8f 100644 --- a/theodolite-benchmarks/uc1-beam-samza/src/main/java/application/Uc1BeamSamza.java +++ b/theodolite-benchmarks/uc1-beam-samza/src/main/java/application/Uc1BeamSamza.java @@ -1,7 +1,7 @@ package application; import org.apache.beam.runners.samza.SamzaRunner; -import theodolite.commons.beam.AbstractBeamService; +import theodolite.commons.beam.BeamService; /** * Implementation of the use case Database Storage using Apache Beam with the Samza Runner. To @@ -13,29 +13,13 @@ import theodolite.commons.beam.AbstractBeamService; * persist logs add ${workspace_loc:/uc4-application-samza/eclipseConsoleLogs.log} as Output File * under Standard Input Output in Common in the Run Configuration Start via Eclipse Run. */ -public final class Uc1BeamSamza extends AbstractBeamService { - - /** - * Private constructor setting specific options for this use case. - */ - private Uc1BeamSamza(final String[] args) { //NOPMD - super(args); - this.options.setRunner(SamzaRunner.class); - } +public final class Uc1BeamSamza { /** * Main method. */ public static void main(final String[] args) { - - // Create application via configurations - final Uc1BeamSamza uc1 = new Uc1BeamSamza(args); - - // Create pipeline with configurations - final Uc1BeamPipeline pipeline = new Uc1BeamPipeline(uc1.options, uc1.getConfig()); - - // Submit job and start execution - pipeline.run().waitUntilFinish(); + new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).run(); } } diff --git a/theodolite-benchmarks/uc1-beam-samza/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc1-beam-samza/src/main/resources/META-INF/application.properties deleted file mode 100644 index 70cc5e94a64b8218344263d9d9d2ba3421fd69fd..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/uc1-beam-samza/src/main/resources/META-INF/application.properties +++ /dev/null @@ -1,18 +0,0 @@ -application.name=theodolite-uc1-application -application.version=0.0.1 - -sink.type=logger - -kafka.bootstrap.servers=localhost:9092 -kafka.input.topic=input -kafka.output.topic=output - -schema.registry.url=http://localhost:8081 - -num.threads=1 -commit.interval.ms=1000 -cache.max.bytes.buffering=-1 - -specific.avro.reader=True -enable.auto.commit.config=True -auto.offset.reset.config=earliest diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/AbstractPipelineFactory.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/AbstractPipelineFactory.java deleted file mode 100644 index bf2d1039ba45dc28621b6f347fc6c880e5ff805e..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/uc1-beam/src/main/java/application/AbstractPipelineFactory.java +++ /dev/null @@ -1,37 +0,0 @@ -package application; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.AvroCoder; -import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.commons.configuration2.Configuration; -import titan.ccp.model.records.ActivePowerRecord; - -public abstract class AbstractPipelineFactory { - - protected final Configuration configuration; - - public AbstractPipelineFactory(final Configuration configuration) { - this.configuration = configuration; - } - - public final Pipeline create(final PipelineOptions options) { - final Pipeline pipeline = Pipeline.create(options); - this.constructPipeline(pipeline); - this.registerCoders(pipeline.getCoderRegistry()); - return pipeline; - } - - private void constructPipeline(Pipeline pipeline) { - // pipeline.apply(kafka) - // .apply(Values.create()) - // .apply(sinkType.create(config)); - } - - private void registerCoders(CoderRegistry registry) { - registry.registerCoderForClass( - ActivePowerRecord.class, - AvroCoder.of(ActivePowerRecord.SCHEMA$)); - } - -} diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/PipelineFactory.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/PipelineFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..f308b2ccd87733c8e7e689b4066c9b4a0c625264 --- /dev/null +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/PipelineFactory.java @@ -0,0 +1,54 @@ +package application; + +import java.util.function.Function; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Values; +import org.apache.commons.configuration2.Configuration; +import theodolite.commons.beam.AbstractPipelineFactory; +import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader; +import titan.ccp.model.records.ActivePowerRecord; + +public class PipelineFactory extends AbstractPipelineFactory { + + public static final String SINK_TYPE_KEY = "sink.type"; + + public PipelineFactory(final Configuration configuration) { + super(configuration); + } + + @Override + protected void expandOptions(final PipelineOptions options) { + // TODO Add for PubSub + // final String pubSubEmulatorHost = super.config.getString(null); + // if (pubSubEmulatorHost != null) { + // final PubsubOptions pubSubOptions = options.as(PubsubOptions.class); + // pubSubOptions.setPubsubRootUrl("http://" + pubSubEmulatorHost); + // } + } + + @Override + protected void constructPipeline(Pipeline pipeline) { + final SinkType sinkType = SinkType.from(this.config.getString(SINK_TYPE_KEY)); + + final KafkaActivePowerTimestampReader kafkaReader = super.buildKafkaReader(); + + pipeline.apply(kafkaReader) + .apply(Values.create()) + .apply(sinkType.create(this.config)); + } + + @Override + protected void registerCoders(CoderRegistry registry) { + registry.registerCoderForClass( + ActivePowerRecord.class, + AvroCoder.of(ActivePowerRecord.SCHEMA$)); + } + + public static Function<Configuration, AbstractPipelineFactory> factory() { + return config -> new PipelineFactory(config); + } + +} diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java deleted file mode 100644 index 352b32a29ff6cfd5d01a4e74798f79c8d08c769a..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/uc1-beam/src/main/java/application/Uc1BeamPipeline.java +++ /dev/null @@ -1,39 +0,0 @@ -package application; - -import org.apache.beam.sdk.coders.AvroCoder; -import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.Values; -import org.apache.commons.configuration2.Configuration; -import theodolite.commons.beam.AbstractPipeline; -import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader; -import titan.ccp.model.records.ActivePowerRecord; - -/** - * Implementation of benchmark UC1: Database Storage with Apache Beam. - */ -public final class Uc1BeamPipeline extends AbstractPipeline { - - public static final String SINK_TYPE_KEY = "sink.type"; - - protected Uc1BeamPipeline(final PipelineOptions options, final Configuration config) { - super(options, config); - - final SinkType sinkType = SinkType.from(config.getString(SINK_TYPE_KEY)); - - // Set Coders for classes that will be distributed - final CoderRegistry cr = super.getCoderRegistry(); - cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$)); - - final KafkaActivePowerTimestampReader kafka = new KafkaActivePowerTimestampReader( - super.bootstrapServer, - super.inputTopic, - super.buildConsumerConfig()); - - super.apply(kafka) - .apply(Values.create()) - .apply(sinkType.create(config)); - } - -} - diff --git a/theodolite-benchmarks/uc1-beam-flink/src/main/resources/META-INF/application.properties b/theodolite-benchmarks/uc1-beam/src/main/resources/META-INF/application.properties similarity index 95% rename from theodolite-benchmarks/uc1-beam-flink/src/main/resources/META-INF/application.properties rename to theodolite-benchmarks/uc1-beam/src/main/resources/META-INF/application.properties index 70cc5e94a64b8218344263d9d9d2ba3421fd69fd..b785d698cd59a31bff7e9cffc21ca1d877f037fe 100644 --- a/theodolite-benchmarks/uc1-beam-flink/src/main/resources/META-INF/application.properties +++ b/theodolite-benchmarks/uc1-beam/src/main/resources/META-INF/application.properties @@ -2,6 +2,7 @@ application.name=theodolite-uc1-application application.version=0.0.1 sink.type=logger +source.type=kafka kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input diff --git a/theodolite-benchmarks/uc2-beam-flink/src/main/java/application/Uc2BeamFlink.java b/theodolite-benchmarks/uc2-beam-flink/src/main/java/application/Uc2BeamFlink.java index f5bb849e626444929e00b17b1324a08c41cb19a0..d7300323df941338fce9c999b50ee3f9c2e56cd9 100644 --- a/theodolite-benchmarks/uc2-beam-flink/src/main/java/application/Uc2BeamFlink.java +++ b/theodolite-benchmarks/uc2-beam-flink/src/main/java/application/Uc2BeamFlink.java @@ -3,6 +3,7 @@ package application; import org.apache.beam.runners.flink.FlinkRunner; import org.apache.beam.sdk.Pipeline; import theodolite.commons.beam.AbstractBeamService; +import theodolite.commons.beam.BeamService; /** * Implementation of the use case Downsampling using Apache Beam with the Flink Runner. To execute @@ -10,26 +11,10 @@ import theodolite.commons.beam.AbstractBeamService; * using the delayed_startup.sh script. Start a Flink cluster and pass its REST adress * using--flinkMaster as run parameter. */ -public final class Uc2BeamFlink extends AbstractBeamService { +public final class Uc2BeamFlink { - /** - * Private constructor setting specific options for this use case. - */ - private Uc2BeamFlink(final String[] args) { // NOPMD - super(args); - this.options.setRunner(FlinkRunner.class); - } - - /** - * Start running this microservice. - */ public static void main(final String[] args) { - - final Uc2BeamFlink uc2BeamFlink = new Uc2BeamFlink(args); - - final Pipeline pipeline = new Uc2BeamPipeline(uc2BeamFlink.options, uc2BeamFlink.getConfig()); - - pipeline.run().waitUntilFinish(); + new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).run(); } } diff --git a/theodolite-benchmarks/uc2-beam-samza/src/main/java/application/Uc2BeamSamza.java b/theodolite-benchmarks/uc2-beam-samza/src/main/java/application/Uc2BeamSamza.java index d4b3d6d910824a718bffe8dc5f0204d53b9865c1..c3ac1541d71dbd9774c5588d4b0543b23b231c7b 100644 --- a/theodolite-benchmarks/uc2-beam-samza/src/main/java/application/Uc2BeamSamza.java +++ b/theodolite-benchmarks/uc2-beam-samza/src/main/java/application/Uc2BeamSamza.java @@ -2,7 +2,7 @@ package application; import org.apache.beam.runners.samza.SamzaRunner; import org.apache.beam.sdk.Pipeline; -import theodolite.commons.beam.AbstractBeamService; +import theodolite.commons.beam.BeamService; /** * Implementation of the use case Downsampling using Apache Beam with the Samza Runner. To run @@ -14,26 +14,11 @@ import theodolite.commons.beam.AbstractBeamService; * persist logs add ${workspace_loc:/uc3-application-samza/eclipseConsoleLogs.log} as Output File * under Standard Input Output in Common in the Run Configuration Start via Eclipse Run. */ -public final class Uc2BeamSamza extends AbstractBeamService { +public final class Uc2BeamSamza { - /** - * Private constructor setting specific options for this use case. - */ - private Uc2BeamSamza(final String[] args) { //NOPMD - super(args); - this.options.setRunner(SamzaRunner.class); - } - - /** - * Start running this microservice. - */ public static void main(final String[] args) { - - final Uc2BeamSamza uc2BeamSamza = new Uc2BeamSamza(args); - - final Pipeline pipeline = new Uc2BeamPipeline(uc2BeamSamza.options, uc2BeamSamza.getConfig()); - - pipeline.run().waitUntilFinish(); + new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).run(); } + } diff --git a/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java b/theodolite-benchmarks/uc2-beam/src/main/java/application/PipelineFactory.java similarity index 53% rename from theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java rename to theodolite-benchmarks/uc2-beam/src/main/java/application/PipelineFactory.java index 02eec9868b0bbfbf6fd45206ff0d4092ac09e1ac..e926f19c3c50915fc08cf77df78fb582c00b52ce 100644 --- a/theodolite-benchmarks/uc2-beam/src/main/java/application/Uc2BeamPipeline.java +++ b/theodolite-benchmarks/uc2-beam/src/main/java/application/PipelineFactory.java @@ -2,7 +2,8 @@ package application; import com.google.common.math.Stats; import com.google.common.math.StatsAccumulator; -import java.util.Map; +import java.util.function.Function; +import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.KvCoder; @@ -17,48 +18,49 @@ import org.apache.beam.sdk.values.KV; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.common.serialization.StringSerializer; import org.joda.time.Duration; -import theodolite.commons.beam.AbstractPipeline; +import theodolite.commons.beam.AbstractPipelineFactory; import theodolite.commons.beam.ConfigurationKeys; import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader; import theodolite.commons.beam.kafka.KafkaWriterTransformation; import titan.ccp.model.records.ActivePowerRecord; +public class PipelineFactory extends AbstractPipelineFactory { -/** - * Implementation of the use case Downsampling using Apache Beam. - */ -public final class Uc2BeamPipeline extends AbstractPipeline { + public static final String SINK_TYPE_KEY = "sink.type"; - protected Uc2BeamPipeline(final PipelineOptions options, final Configuration config) { - super(options, config); - // Additional needed variables - final String outputTopic = config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); + public PipelineFactory(final Configuration configuration) { + super(configuration); + } - final Duration duration = - Duration.standardMinutes(config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES)); + @Override + protected void expandOptions(final PipelineOptions options) { + // TODO Add for PubSub + // final String pubSubEmulatorHost = super.config.getString(null); + // if (pubSubEmulatorHost != null) { + // final PubsubOptions pubSubOptions = options.as(PubsubOptions.class); + // pubSubOptions.setPubsubRootUrl("http://" + pubSubEmulatorHost); + // } + } - // Build kafka configuration - final Map<String, Object> consumerConfig = buildConsumerConfig(); + @Override + protected void constructPipeline(Pipeline pipeline) { + final String outputTopic = config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); - // Set Coders for Classes that will be distributed - final CoderRegistry cr = getCoderRegistry(); - cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$)); - cr.registerCoderForClass(StatsAggregation.class, SerializableCoder.of(StatsAggregation.class)); - cr.registerCoderForClass(StatsAccumulator.class, AvroCoder.of(StatsAccumulator.class)); + final Duration duration = Duration.standardMinutes( + config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES)); - // Read from Kafka - final KafkaActivePowerTimestampReader kafkaActivePowerRecordReader = - new KafkaActivePowerTimestampReader(bootstrapServer, inputTopic, consumerConfig); + final KafkaActivePowerTimestampReader kafkaReader = super.buildKafkaReader(); // Transform into String final StatsToString statsToString = new StatsToString(); // Write to Kafka + final String bootstrapServer = config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); final KafkaWriterTransformation<String> kafkaWriter = new KafkaWriterTransformation<>(bootstrapServer, outputTopic, StringSerializer.class); // Apply pipeline transformations - this.apply(kafkaActivePowerRecordReader) + pipeline.apply(kafkaReader) // Apply a fixed window .apply(Window.<KV<String, ActivePowerRecord>>into(FixedWindows.of(duration))) // Aggregate per window for every key @@ -69,5 +71,19 @@ public final class Uc2BeamPipeline extends AbstractPipeline { // Write to Kafka .apply(kafkaWriter); } -} + @Override + protected void registerCoders(CoderRegistry registry) { + registry.registerCoderForClass(ActivePowerRecord.class, + AvroCoder.of(ActivePowerRecord.SCHEMA$)); + registry.registerCoderForClass(StatsAggregation.class, + SerializableCoder.of(StatsAggregation.class)); + registry.registerCoderForClass(StatsAccumulator.class, + AvroCoder.of(StatsAccumulator.class)); + } + + public static Function<Configuration, AbstractPipelineFactory> factory() { + return config -> new PipelineFactory(config); + } + +}