diff --git a/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1BeamFlink.java b/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1BeamFlink.java index 5d4dc3328bcd772d7a486b90d5dd69ec5438a1b2..878d08537f87b3e759f96614b8bcaaf669a70f19 100644 --- a/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1BeamFlink.java +++ b/theodolite-benchmarks/uc1-beam-flink/src/main/java/application/Uc1BeamFlink.java @@ -13,6 +13,8 @@ import theodolite.commons.beam.BeamService; */ public final class Uc1BeamFlink { + private Uc1BeamFlink() {} + public static void main(final String[] args) { new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).run(); } diff --git a/theodolite-benchmarks/uc1-beam-samza/src/main/java/application/Uc1BeamSamza.java b/theodolite-benchmarks/uc1-beam-samza/src/main/java/application/Uc1BeamSamza.java index fb1b1d15aa4783cc2c72bc901d5bf71cb6f6ae8f..2e5cf8a191462ec22da8fc0ab72d1294d6671063 100644 --- a/theodolite-benchmarks/uc1-beam-samza/src/main/java/application/Uc1BeamSamza.java +++ b/theodolite-benchmarks/uc1-beam-samza/src/main/java/application/Uc1BeamSamza.java @@ -6,22 +6,20 @@ import theodolite.commons.beam.BeamService; /** * Implementation of the use case Database Storage using Apache Beam with the Samza Runner. To * execute locally in standalone start Kafka, Zookeeper, the schema-registry and the workload - * generator. Add - * --configFactory=org.apache.samza.config.factories.PropertiesConfigFactory - * --configFilePath=samza-standalone.properties - * --samzaExecutionEnvironment=STANDALONE --maxSourceParallelism=1024 as program arguments. To - * persist logs add ${workspace_loc:/uc4-application-samza/eclipseConsoleLogs.log} as Output File - * under Standard Input Output in Common in the Run Configuration Start via Eclipse Run. + * generator. Add --configFactory=org.apache.samza.config.factories.PropertiesConfigFactory + * --configFilePath=samza-standalone.properties --samzaExecutionEnvironment=STANDALONE + * --maxSourceParallelism=1024 as program arguments. To persist logs add + * ${workspace_loc:/uc4-application-samza/eclipseConsoleLogs.log} as Output File under Standard + * Input Output in Common in the Run Configuration Start via Eclipse Run. */ public final class Uc1BeamSamza { + private Uc1BeamSamza() {} + /** * Main method. */ public static void main(final String[] args) { - new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).run(); + new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).run(); } } - - - diff --git a/theodolite-benchmarks/uc1-beam/src/main/java/application/PipelineFactory.java b/theodolite-benchmarks/uc1-beam/src/main/java/application/PipelineFactory.java index f308b2ccd87733c8e7e689b4066c9b4a0c625264..c01b2b3892cefe23264af8a5ecc29fe5927e88f4 100644 --- a/theodolite-benchmarks/uc1-beam/src/main/java/application/PipelineFactory.java +++ b/theodolite-benchmarks/uc1-beam/src/main/java/application/PipelineFactory.java @@ -30,7 +30,7 @@ public class PipelineFactory extends AbstractPipelineFactory { } @Override - protected void constructPipeline(Pipeline pipeline) { + protected void constructPipeline(final Pipeline pipeline) { final SinkType sinkType = SinkType.from(this.config.getString(SINK_TYPE_KEY)); final KafkaActivePowerTimestampReader kafkaReader = super.buildKafkaReader(); @@ -41,7 +41,7 @@ public class PipelineFactory extends AbstractPipelineFactory { } @Override - protected void registerCoders(CoderRegistry registry) { + protected void registerCoders(final CoderRegistry registry) { registry.registerCoderForClass( ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$)); diff --git a/theodolite-benchmarks/uc2-beam-flink/src/main/java/application/Uc2BeamFlink.java b/theodolite-benchmarks/uc2-beam-flink/src/main/java/application/Uc2BeamFlink.java index d7300323df941338fce9c999b50ee3f9c2e56cd9..28cc32e434c11c612fc90caefac96d355cb8d972 100644 --- a/theodolite-benchmarks/uc2-beam-flink/src/main/java/application/Uc2BeamFlink.java +++ b/theodolite-benchmarks/uc2-beam-flink/src/main/java/application/Uc2BeamFlink.java @@ -1,8 +1,6 @@ package application; import org.apache.beam.runners.flink.FlinkRunner; -import org.apache.beam.sdk.Pipeline; -import theodolite.commons.beam.AbstractBeamService; import theodolite.commons.beam.BeamService; /** @@ -13,8 +11,10 @@ import theodolite.commons.beam.BeamService; */ public final class Uc2BeamFlink { + private Uc2BeamFlink() {} + public static void main(final String[] args) { - new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).run(); + new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).run(); } } diff --git a/theodolite-benchmarks/uc2-beam-samza/src/main/java/application/Uc2BeamSamza.java b/theodolite-benchmarks/uc2-beam-samza/src/main/java/application/Uc2BeamSamza.java index c3ac1541d71dbd9774c5588d4b0543b23b231c7b..496f462795eb53af0e30e41ba66f0c24c71372c7 100644 --- a/theodolite-benchmarks/uc2-beam-samza/src/main/java/application/Uc2BeamSamza.java +++ b/theodolite-benchmarks/uc2-beam-samza/src/main/java/application/Uc2BeamSamza.java @@ -1,7 +1,6 @@ package application; import org.apache.beam.runners.samza.SamzaRunner; -import org.apache.beam.sdk.Pipeline; import theodolite.commons.beam.BeamService; /** @@ -16,9 +15,11 @@ import theodolite.commons.beam.BeamService; */ public final class Uc2BeamSamza { + private Uc2BeamSamza() {} + public static void main(final String[] args) { new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).run(); } - + } diff --git a/theodolite-benchmarks/uc2-beam/src/main/java/application/PipelineFactory.java b/theodolite-benchmarks/uc2-beam/src/main/java/application/PipelineFactory.java index e926f19c3c50915fc08cf77df78fb582c00b52ce..442bc72ceb7a3241becdb75e6ca6dd9ca27bfd8f 100644 --- a/theodolite-benchmarks/uc2-beam/src/main/java/application/PipelineFactory.java +++ b/theodolite-benchmarks/uc2-beam/src/main/java/application/PipelineFactory.java @@ -26,28 +26,19 @@ import titan.ccp.model.records.ActivePowerRecord; public class PipelineFactory extends AbstractPipelineFactory { - public static final String SINK_TYPE_KEY = "sink.type"; - public PipelineFactory(final Configuration configuration) { super(configuration); } @Override - protected void expandOptions(final PipelineOptions options) { - // TODO Add for PubSub - // final String pubSubEmulatorHost = super.config.getString(null); - // if (pubSubEmulatorHost != null) { - // final PubsubOptions pubSubOptions = options.as(PubsubOptions.class); - // pubSubOptions.setPubsubRootUrl("http://" + pubSubEmulatorHost); - // } - } + protected void expandOptions(final PipelineOptions options) {} @Override - protected void constructPipeline(Pipeline pipeline) { - final String outputTopic = config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); + protected void constructPipeline(final Pipeline pipeline) { + final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); final Duration duration = Duration.standardMinutes( - config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES)); + this.config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES)); final KafkaActivePowerTimestampReader kafkaReader = super.buildKafkaReader(); @@ -55,7 +46,7 @@ public class PipelineFactory extends AbstractPipelineFactory { final StatsToString statsToString = new StatsToString(); // Write to Kafka - final String bootstrapServer = config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); + final String bootstrapServer = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); final KafkaWriterTransformation<String> kafkaWriter = new KafkaWriterTransformation<>(bootstrapServer, outputTopic, StringSerializer.class); @@ -73,7 +64,7 @@ public class PipelineFactory extends AbstractPipelineFactory { } @Override - protected void registerCoders(CoderRegistry registry) { + protected void registerCoders(final CoderRegistry registry) { registry.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$)); registry.registerCoderForClass(StatsAggregation.class, diff --git a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/Uc3BeamFlink.java b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/Uc3BeamFlink.java index 18532b2655fcc6c24dad5f2fca87607c0b5d2e54..50827360223dfb1360ed3697ae732d85db48c1c7 100644 --- a/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/Uc3BeamFlink.java +++ b/theodolite-benchmarks/uc3-beam-flink/src/main/java/application/Uc3BeamFlink.java @@ -1,7 +1,7 @@ package application; import org.apache.beam.runners.flink.FlinkRunner; -import theodolite.commons.beam.AbstractBeamService; +import theodolite.commons.beam.BeamService; /** * Implementation of the use case Aggregation based on Time Attributes using Apache Beam with the @@ -12,28 +12,15 @@ import theodolite.commons.beam.AbstractBeamService; * ${workspace_loc:/uc4-application-samza/eclipseConsoleLogs.log} as Output File under Standard * Input Output in Common in the Run Configuration Start via Eclipse Run. */ -public final class Uc3BeamFlink extends AbstractBeamService { +public final class Uc3BeamFlink { - /** - * Private constructor to avoid instantiation. - */ - private Uc3BeamFlink(final String[] args) { //NOPMD - super(args); - this.options.setRunner(FlinkRunner.class); - } + private Uc3BeamFlink() {} /** * Start running this microservice. */ public static void main(final String[] args) { - - final Uc3BeamFlink uc3BeamFlink = new Uc3BeamFlink(args); - - final Uc3BeamPipeline pipeline = - new Uc3BeamPipeline(uc3BeamFlink.options, uc3BeamFlink.getConfig()); - - pipeline.run().waitUntilFinish(); + new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).run(); } - } diff --git a/theodolite-benchmarks/uc3-beam-samza/src/main/java/application/Uc3BeamSamza.java b/theodolite-benchmarks/uc3-beam-samza/src/main/java/application/Uc3BeamSamza.java index 913293bd02cb16e14ee9d94ea0e161c74853e72a..974c5e79e5582768f65fe8341819e8f9857acf49 100644 --- a/theodolite-benchmarks/uc3-beam-samza/src/main/java/application/Uc3BeamSamza.java +++ b/theodolite-benchmarks/uc3-beam-samza/src/main/java/application/Uc3BeamSamza.java @@ -1,7 +1,7 @@ package application; import org.apache.beam.runners.samza.SamzaRunner; -import theodolite.commons.beam.AbstractBeamService; +import theodolite.commons.beam.BeamService; /** * Implementation of the use case Aggregation based on Time Attributes using Apache Beam with the @@ -12,27 +12,15 @@ import theodolite.commons.beam.AbstractBeamService; * ${workspace_loc:/uc4-application-samza/eclipseConsoleLogs.log} as Output File under Standard * Input Output in Common in the Run Configuration Start via Eclipse Run. */ -public final class Uc3BeamSamza extends AbstractBeamService { +public final class Uc3BeamSamza { - /** - * Private constructor to avoid instantiation. - */ - private Uc3BeamSamza(final String[] args) { //NOPMD - super(args); - this.options.setRunner(SamzaRunner.class); - } + private Uc3BeamSamza() {} /** * Start running this microservice. */ public static void main(final String[] args) { - - final Uc3BeamSamza uc3BeamSamza = new Uc3BeamSamza(args); - - final Uc3BeamPipeline pipeline = - new Uc3BeamPipeline(uc3BeamSamza.options, uc3BeamSamza.getConfig()); - - pipeline.run().waitUntilFinish(); + new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).run(); } } diff --git a/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java b/theodolite-benchmarks/uc3-beam/src/main/java/application/PipelineFactory.java similarity index 58% rename from theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java rename to theodolite-benchmarks/uc3-beam/src/main/java/application/PipelineFactory.java index c402271777dd63026e1f1fb36855dad1a72e1136..afb6464ce1d4ad2545a3508640051ea31c93c04f 100644 --- a/theodolite-benchmarks/uc3-beam/src/main/java/application/Uc3BeamPipeline.java +++ b/theodolite-benchmarks/uc3-beam/src/main/java/application/PipelineFactory.java @@ -2,7 +2,8 @@ package application; import com.google.common.math.Stats; import com.google.common.math.StatsAccumulator; -import java.util.Map; +import java.util.function.Function; +import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.KvCoder; @@ -18,40 +19,34 @@ import org.apache.beam.sdk.values.KV; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.common.serialization.StringSerializer; import org.joda.time.Duration; -import theodolite.commons.beam.AbstractPipeline; +import theodolite.commons.beam.AbstractPipelineFactory; import theodolite.commons.beam.ConfigurationKeys; import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader; import theodolite.commons.beam.kafka.KafkaWriterTransformation; import titan.ccp.model.records.ActivePowerRecord; +public class PipelineFactory extends AbstractPipelineFactory { -/** - * Implementation of the use case Aggregation based on Time Attributes using Apache Beam. - */ -public final class Uc3BeamPipeline extends AbstractPipeline { + public PipelineFactory(final Configuration configuration) { + super(configuration); + } + + @Override + protected void expandOptions(final PipelineOptions options) {} - protected Uc3BeamPipeline(final PipelineOptions options, final Configuration config) { - super(options, config); - // Additional needed variables - final String outputTopic = config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); + @Override + protected void constructPipeline(final Pipeline pipeline) { + final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); final Duration duration = - Duration.standardDays(config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS)); + Duration.standardDays(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS)); final Duration aggregationAdvanceDuration = - Duration.standardDays(config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS)); + Duration.standardDays(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS)); final Duration triggerDelay = - Duration.standardSeconds(config.getInt(ConfigurationKeys.TRIGGER_INTERVAL)); - - // Build Kafka configuration - final Map<String, Object> consumerConfig = this.buildConsumerConfig(); - - // Set Coders for classes that will be distributed - final CoderRegistry cr = this.getCoderRegistry(); - registerCoders(cr); + Duration.standardSeconds(this.config.getInt(ConfigurationKeys.TRIGGER_INTERVAL)); // Read from Kafka - final KafkaActivePowerTimestampReader kafka = - new KafkaActivePowerTimestampReader(this.bootstrapServer, this.inputTopic, consumerConfig); + final KafkaActivePowerTimestampReader kafkaReader = super.buildKafkaReader(); // Map the time format final MapTimeFormat mapTimeFormat = new MapTimeFormat(); @@ -60,10 +55,11 @@ public final class Uc3BeamPipeline extends AbstractPipeline { final HourOfDayWithStats hourOfDayWithStats = new HourOfDayWithStats(); // Write to Kafka + final String bootstrapServer = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); final KafkaWriterTransformation<String> kafkaWriter = - new KafkaWriterTransformation<>(this.bootstrapServer, outputTopic, StringSerializer.class); + new KafkaWriterTransformation<>(bootstrapServer, outputTopic, StringSerializer.class); - this.apply(kafka) + pipeline.apply(kafkaReader) // Map to correct time format .apply(MapElements.via(mapTimeFormat)) // Apply a sliding window @@ -86,17 +82,24 @@ public final class Uc3BeamPipeline extends AbstractPipeline { .apply(kafkaWriter); } + @Override + protected void registerCoders(final CoderRegistry registry) { + registry.registerCoderForClass( + ActivePowerRecord.class, + AvroCoder.of(ActivePowerRecord.SCHEMA$)); + registry.registerCoderForClass( + HourOfDayKey.class, + new HourOfDaykeyCoder()); + registry.registerCoderForClass( + StatsAggregation.class, + SerializableCoder.of(StatsAggregation.class)); + registry.registerCoderForClass( + StatsAccumulator.class, + AvroCoder.of(StatsAccumulator.class)); + } - /** - * Registers all Coders for all needed Coders. - * - * @param cr CoderRegistry. - */ - private static void registerCoders(final CoderRegistry cr) { - cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$)); - cr.registerCoderForClass(HourOfDayKey.class, new HourOfDaykeyCoder()); - cr.registerCoderForClass(StatsAggregation.class, SerializableCoder.of(StatsAggregation.class)); - cr.registerCoderForClass(StatsAccumulator.class, AvroCoder.of(StatsAccumulator.class)); + public static Function<Configuration, AbstractPipelineFactory> factory() { + return config -> new PipelineFactory(config); } -} +} diff --git a/theodolite-benchmarks/uc4-beam-flink/src/main/java/application/Uc4BeamFlink.java b/theodolite-benchmarks/uc4-beam-flink/src/main/java/application/Uc4BeamFlink.java index 90f9a4a292e99526fa94c7dd512bdcec548fbb4f..0d5d2a04f5a86fb343b05c230533fb268d69815c 100644 --- a/theodolite-benchmarks/uc4-beam-flink/src/main/java/application/Uc4BeamFlink.java +++ b/theodolite-benchmarks/uc4-beam-flink/src/main/java/application/Uc4BeamFlink.java @@ -1,34 +1,20 @@ package application; import org.apache.beam.runners.flink.FlinkRunner; -import org.apache.beam.sdk.Pipeline; -import theodolite.commons.beam.AbstractBeamService; +import theodolite.commons.beam.BeamService; /** - * Implementation of the use case Hierarchical Aggregation using Apache Beam with the Flink - * Runner. + * Implementation of the use case Hierarchical Aggregation using Apache Beam with the Flink Runner. **/ -public final class Uc4BeamFlink extends AbstractBeamService { +public final class Uc4BeamFlink { - - /** - * Private constructor setting specific options for this use case. - */ - private Uc4BeamFlink(final String[] args) { //NOPMD - super(args); - this.options.setRunner(FlinkRunner.class); - } + private Uc4BeamFlink() {} /** * Start running this microservice. */ public static void main(final String[] args) { - - final Uc4BeamFlink uc4BeamFlink = new Uc4BeamFlink(args); - - final Pipeline pipeline = new Uc4BeamPipeline(uc4BeamFlink.options, uc4BeamFlink.getConfig()); - - pipeline.run().waitUntilFinish(); + new BeamService(PipelineFactory.factory(), FlinkRunner.class, args).run(); } } diff --git a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4BeamSamza.java b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4BeamSamza.java index 3894fa95f16253e0a165dde70bf25d4a4bee96cb..108b85d7376e956607cf265f4c78ce1d2fe2fc8d 100644 --- a/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4BeamSamza.java +++ b/theodolite-benchmarks/uc4-beam-samza/src/main/java/application/Uc4BeamSamza.java @@ -1,40 +1,28 @@ package application; + import org.apache.beam.runners.samza.SamzaRunner; -import org.apache.beam.sdk.Pipeline; -import theodolite.commons.beam.AbstractBeamService; +import theodolite.commons.beam.BeamService; /** - * Implementation of the use case Hierarchical Aggregation using Apache Beam with the Samza - * Runner. To run locally in standalone start Kafka, Zookeeper, the schema-registry and the - * workload generator using the delayed_startup.sh script. Add + * Implementation of the use case Hierarchical Aggregation using Apache Beam with the Samza Runner. + * To run locally in standalone start Kafka, Zookeeper, the schema-registry and the workload + * generator using the delayed_startup.sh script. Add * --configFactory=org.apache.samza.config.factories.PropertiesConfigFactory * --configFilePath=${workspace_loc:uc4-application-samza}/config/standalone_local.properties * --samzaExecutionEnvironment=STANDALONE --maxSourceParallelism=1024 --as program arguments. To * persist logs add ${workspace_loc:/uc4-application-samza/eclipseConsoleLogs.log} as Output File * under Standard Input Output in Common in the Run Configuration Start via Eclipse Run. */ -public final class Uc4BeamSamza extends AbstractBeamService { - +public final class Uc4BeamSamza { - /** - * Private constructor setting specific options for this use case. - */ - private Uc4BeamSamza(final String[] args) { //NOPMD - super(args); - this.options.setRunner(SamzaRunner.class); - } + private Uc4BeamSamza() {} /** * Start running this microservice. */ public static void main(final String[] args) { - - final Uc4BeamSamza uc4BeamSamza = new Uc4BeamSamza(args); - - final Pipeline pipeline = new Uc4BeamPipeline(uc4BeamSamza.options, uc4BeamSamza.getConfig()); - - pipeline.run().waitUntilFinish(); + new BeamService(PipelineFactory.factory(), SamzaRunner.class, args).run(); } } diff --git a/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java b/theodolite-benchmarks/uc4-beam/src/main/java/application/PipelineFactory.java similarity index 67% rename from theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java rename to theodolite-benchmarks/uc4-beam/src/main/java/application/PipelineFactory.java index 0c63e6f9322e5f70f1ad010de168f1a5292a45a4..323779b71075595d854024a4b62421e94056c778 100644 --- a/theodolite-benchmarks/uc4-beam/src/main/java/application/Uc4BeamPipeline.java +++ b/theodolite-benchmarks/uc4-beam/src/main/java/application/PipelineFactory.java @@ -1,9 +1,13 @@ -package application; // NOPMD +package application; import com.google.common.math.StatsAccumulator; +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.function.Function; +import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.SetCoder; @@ -37,7 +41,7 @@ import serialization.AggregatedActivePowerRecordSerializer; import serialization.EventCoder; import serialization.EventDeserializer; import serialization.SensorParentKeyCoder; -import theodolite.commons.beam.AbstractPipeline; +import theodolite.commons.beam.AbstractPipelineFactory; import theodolite.commons.beam.ConfigurationKeys; import theodolite.commons.beam.kafka.KafkaActivePowerTimestampReader; import theodolite.commons.beam.kafka.KafkaGenericReader; @@ -46,68 +50,62 @@ import titan.ccp.configuration.events.Event; import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.records.AggregatedActivePowerRecord; -/** - * Implementation of the use case Hierarchical Aggregation using Apache Beam. - */ -public final class Uc4BeamPipeline extends AbstractPipeline { +public class PipelineFactory extends AbstractPipelineFactory { - protected Uc4BeamPipeline(final PipelineOptions options, final Configuration config) { // NOPMD - super(options, config); - - // Additional needed variables - final String feedbackTopic = config.getString(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC); - final String outputTopic = config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); - final String configurationTopic = config.getString(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC); + public PipelineFactory(final Configuration configuration) { + super(configuration); + } - final Duration duration = - Duration.standardSeconds(config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES)); - final Duration triggerDelay = - Duration.standardSeconds(config.getInt(ConfigurationKeys.TRIGGER_INTERVAL)); - final Duration gracePeriod = - Duration.standardSeconds(config.getInt(ConfigurationKeys.GRACE_PERIOD_MS)); + @Override + protected void expandOptions(final PipelineOptions options) {} - // Build Kafka configuration - final Map<String, Object> consumerConfig = super.buildConsumerConfig(); - final Map<String, Object> configurationConfig = this.configurationConfig(config); + @Override + protected void constructPipeline(final Pipeline pipeline) { + // Additional needed variables + final String feedbackTopic = this.config.getString(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC); + final String outputTopic = this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); + final String configurationTopic = + this.config.getString(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC); - // Set Coders for Classes that will be distributed - final CoderRegistry cr = this.getCoderRegistry(); - registerCoders(cr); + final Duration duration = Duration.standardSeconds( + this.config.getInt(ConfigurationKeys.KAFKA_WINDOW_DURATION_MINUTES)); + final Duration triggerDelay = Duration.standardSeconds( + this.config.getInt(ConfigurationKeys.TRIGGER_INTERVAL)); + final Duration gracePeriod = Duration.standardSeconds( + this.config.getInt(ConfigurationKeys.GRACE_PERIOD_MS)); // Read from Kafka + final String bootstrapServer = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); + // ActivePowerRecords - final KafkaActivePowerTimestampReader kafkaActivePowerRecordReader = - new KafkaActivePowerTimestampReader( - this.bootstrapServer, - this.inputTopic, - consumerConfig); + final KafkaActivePowerTimestampReader kafkaActivePowerRecordReader = super.buildKafkaReader(); // Configuration Events final KafkaGenericReader<Event, String> kafkaConfigurationReader = new KafkaGenericReader<>( - this.bootstrapServer, + bootstrapServer, configurationTopic, - configurationConfig, + this.configurationConfig(), EventDeserializer.class, StringDeserializer.class); // Write to Kafka final KafkaWriterTransformation<AggregatedActivePowerRecord> kafkaOutput = new KafkaWriterTransformation<>( - this.bootstrapServer, + bootstrapServer, outputTopic, AggregatedActivePowerRecordSerializer.class, - super.buildProducerConfig()); + this.buildProducerConfig()); final KafkaWriterTransformation<AggregatedActivePowerRecord> kafkaFeedback = new KafkaWriterTransformation<>( - this.bootstrapServer, + bootstrapServer, feedbackTopic, AggregatedActivePowerRecordSerializer.class, - super.buildProducerConfig()); + this.buildProducerConfig()); // Apply pipeline transformations - final PCollection<KV<String, ActivePowerRecord>> values = this + final PCollection<KV<String, ActivePowerRecord>> values = pipeline .apply("Read from Kafka", kafkaActivePowerRecordReader) .apply("Read Windows", Window.into(FixedWindows.of(duration))) .apply("Set trigger for input", Window @@ -119,15 +117,15 @@ public final class Uc4BeamPipeline extends AbstractPipeline { .discardingFiredPanes()); // Read the results of earlier aggregations. - final PCollection<KV<String, ActivePowerRecord>> aggregationsInput = this + final PCollection<KV<String, ActivePowerRecord>> aggregationsInput = pipeline .apply("Read aggregation results", KafkaIO.<String, AggregatedActivePowerRecord>read() - .withBootstrapServers(this.bootstrapServer) + .withBootstrapServers(bootstrapServer) .withTopic(feedbackTopic) .withKeyDeserializer(StringDeserializer.class) .withValueDeserializerAndCoder( AggregatedActivePowerRecordDeserializer.class, AvroCoder.of(AggregatedActivePowerRecord.class)) - .withConsumerConfigUpdates(consumerConfig) + .withConsumerConfigUpdates(this.buildConsumerConfig()) .withTimestampPolicyFactory( (tp, previousWaterMark) -> new AggregatedActivePowerRecordEventTimePolicy( previousWaterMark)) @@ -155,7 +153,7 @@ public final class Uc4BeamPipeline extends AbstractPipeline { Flatten.pCollections()); // Build the configuration stream from a changelog. - final PCollection<KV<String, Set<String>>> configurationStream = this + final PCollection<KV<String, Set<String>>> configurationStream = pipeline .apply("Read sensor groups", kafkaConfigurationReader) // Only forward relevant changes in the hierarchy .apply("Filter changed and status events", @@ -214,7 +212,28 @@ public final class Uc4BeamPipeline extends AbstractPipeline { aggregations.apply("Write to aggregation results", kafkaOutput); aggregations.apply("Write to feedback topic", kafkaFeedback); + } + @Override + protected void registerCoders(final CoderRegistry registry) { + registry.registerCoderForClass( + ActivePowerRecord.class, + AvroCoder.of(ActivePowerRecord.class)); + registry.registerCoderForClass( + AggregatedActivePowerRecord.class, + new AggregatedActivePowerRecordCoder()); + registry.registerCoderForClass( + Set.class, + SetCoder.of(StringUtf8Coder.of())); + registry.registerCoderForClass( + Event.class, + new EventCoder()); + registry.registerCoderForClass( + SensorParentKey.class, + new SensorParentKeyCoder()); + registry.registerCoderForClass( + StatsAccumulator.class, + AvroCoder.of(StatsAccumulator.class)); } @@ -223,35 +242,58 @@ public final class Uc4BeamPipeline extends AbstractPipeline { * * @return the build configuration. */ - public Map<String, Object> configurationConfig(final Configuration config) { + private Map<String, Object> configurationConfig() { final Map<String, Object> consumerConfig = new HashMap<>(); consumerConfig.put( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, - config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG)); + this.config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG)); consumerConfig.put( ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, - config.getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG)); + this.config.getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG)); consumerConfig.put( - ConsumerConfig.GROUP_ID_CONFIG, config + ConsumerConfig.GROUP_ID_CONFIG, this.config .getString(ConfigurationKeys.APPLICATION_NAME) + "-configuration"); return consumerConfig; } + private Map<String, Object> buildConsumerConfig() { + final Map<String, Object> consumerConfig = new HashMap<>(); + consumerConfig.put( + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, + this.config.getString(ConfigurationKeys.ENABLE_AUTO_COMMIT_CONFIG)); + consumerConfig.put( + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + this.config.getString(ConfigurationKeys.AUTO_OFFSET_RESET_CONFIG)); + consumerConfig.put( + AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, + this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL)); + consumerConfig.put( + KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, + this.config.getString(ConfigurationKeys.SPECIFIC_AVRO_READER)); + consumerConfig.put( + ConsumerConfig.GROUP_ID_CONFIG, + this.config.getString(ConfigurationKeys.APPLICATION_NAME)); + return consumerConfig; + } /** - * Registers all Coders for all needed Coders. + * Builds a simple configuration for a Kafka producer transformation. * - * @param cr CoderRegistry. + * @return the build configuration. */ - private static void registerCoders(final CoderRegistry cr) { - cr.registerCoderForClass(ActivePowerRecord.class, - AvroCoder.of(ActivePowerRecord.class)); - cr.registerCoderForClass(AggregatedActivePowerRecord.class, - new AggregatedActivePowerRecordCoder()); - cr.registerCoderForClass(Set.class, SetCoder.of(StringUtf8Coder.of())); - cr.registerCoderForClass(Event.class, new EventCoder()); - cr.registerCoderForClass(SensorParentKey.class, new SensorParentKeyCoder()); - cr.registerCoderForClass(StatsAccumulator.class, AvroCoder.of(StatsAccumulator.class)); + private Map<String, Object> buildProducerConfig() { + final Map<String, Object> config = new HashMap<>(); + config.put( + AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, + this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL)); + config.put( + KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, + this.config.getString(ConfigurationKeys.SPECIFIC_AVRO_READER)); + return config; } -} + public static Function<Configuration, AbstractPipelineFactory> factory() { + return config -> new PipelineFactory(config); + } + +}