Skip to content
Snippets Groups Projects
Uc4ApplicationBeam.java 19.04 KiB
package application;

import com.google.common.base.MoreObjects;
import com.google.common.math.StatsAccumulator;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.beam.runners.samza.SamzaRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.SetCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Latest;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.Duration;
import serialization.AggregatedActivePowerRecordCoder;
import serialization.AggregatedActivePowerRecordDeserializer;
import serialization.AggregatedActivePowerRecordSerializer;
import serialization.EventCoder;
import serialization.EventDeserializer;
import serialization.SensorParentKeyCoder;
import titan.ccp.configuration.events.Event;
import titan.ccp.model.records.ActivePowerRecord;
import titan.ccp.model.records.AggregatedActivePowerRecord;

/**
 * Implementation of the use case Hierarchical Aggregation using Apache Beam with the Samza
 * Runner. To run locally in standalone start Kafka, Zookeeper, the schema-registry and the
 * workload generator using the delayed_startup.sh script. Add
 * --configFactory=org.apache.samza.config.factories.PropertiesConfigFactory
 * --configFilePath=${workspace_loc:uc4-application-samza}/config/standalone_local.properties
 * --samzaExecutionEnvironment=STANDALONE --maxSourceParallelism=1024 --as program arguments. To
 * persist logs add ${workspace_loc:/uc4-application-samza/eclipseConsoleLogs.log} as Output File
 * under Standard Input Output in Common in the Run Configuration Start via Eclipse Run.
 */
public class Uc4ApplicationBeam {
  private static final String JOB_NAME = "Uc4Application";
  private static final String BOOTSTRAP = "KAFKA_BOOTSTRAP_SERVERS";
  private static final String INPUT = "INPUT";
  private static final String OUTPUT = "OUTPUT";
  private static final String CONFIGURATION = "CONFIGURATION";
  private static final String FEEDBACKTOPIC = "FEEDBACKTOPIC";
  private static final String SCHEMA_REGISTRY = "SCHEMA_REGISTRY_URL";
  private static final String YES = "true";
  private static final String USE_AVRO_READER = YES;
  private static final String AUTO_COMMIT_CONFIG = YES;
  private static final String KAFKA_WINDOW_DURATION  = "KAFKA_WINDOW_DURATION";
  private static final String TRIGGER_INTERVAL  = "TRIGGER_INTERVAL";
  private static final String GRACE_PERIOD  = "GRACE_PERIOD";
  private static final String AUTO_OFFSET_RESET_CONFIG  = "earliest";




  /**
   * Private constructor to avoid instantiation.
   */
  private Uc4ApplicationBeam() {
    throw new UnsupportedOperationException();
  }

  /**
   * Start executing this microservice.
   */
  @SuppressWarnings({"serial", "unchecked", "rawtypes"})
  public static void main(final String[] args) {

    // Set Configuration for Windows
    final int windowDuration = Integer.parseInt(
        System.getenv(KAFKA_WINDOW_DURATION) == null
            ? "60" : System.getenv(KAFKA_WINDOW_DURATION));
    final Duration duration = Duration.standardSeconds(windowDuration);
    final int triggerInterval = Integer.parseInt(
        System.getenv(TRIGGER_INTERVAL) == null
            ? "30" : System.getenv(TRIGGER_INTERVAL));

    final Duration triggerDelay = Duration.standardSeconds(triggerInterval);

    final int grace = Integer.parseInt(
        System.getenv(GRACE_PERIOD) == null
            ? "270" : System.getenv(GRACE_PERIOD));

    final Duration gracePeriod = Duration.standardSeconds(grace);
    // Set Configuration for Kafka
    final String bootstrapServer =
        System.getenv(BOOTSTRAP) == null ? "my-confluent-cp-kafka:9092"
            : System.getenv(BOOTSTRAP);
    final String inputTopic = System.getenv(INPUT) == null ? "input" : System.getenv(INPUT);
    final String outputTopic = System.getenv(OUTPUT) == null ? "output" : System.getenv(OUTPUT);
    final String configurationTopic =
        System.getenv(CONFIGURATION) == null ? "configuration" : System.getenv(CONFIGURATION);
    final String feedbackTopic =
        System.getenv(FEEDBACKTOPIC) == null ? "aggregation-feedback"
            : System.getenv(FEEDBACKTOPIC);
    final String schemaRegistryUrl =
        System.getenv(SCHEMA_REGISTRY) == null ? "http://my-confluent-cp-schema-registry:8081"
            : System.getenv(SCHEMA_REGISTRY);


    // final String inputTopic = "input";
    // final String outputTopic = "output";
    // final String bootstrapServer = "localhost:9092";
    // final String configurationTopic = "configuration";
    // final String feedbackTopic = "aggregation-feedback";
    // final String schemaRegistryURL = "http://localhost:8081";
    // final Duration duration = Duration.standardSeconds(5);
    // final Duration gracePeriod = Duration.standardMinutes(5);
    // final Duration triggerDelay = Duration.standardSeconds(5);

    // Set consumer configuration for the schema registry and commits back to Kafka
    final HashMap<String, Object> consumerConfig = new HashMap<>();
    consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AUTO_COMMIT_CONFIG);
    consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG);
    consumerConfig.put("schema.registry.url", schemaRegistryUrl);
    consumerConfig.put("specific.avro.reader", USE_AVRO_READER);
    consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "uc-application-input");

    final HashMap<String, Object> consumerConfigConfiguration = new HashMap<>();
    consumerConfigConfiguration.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AUTO_COMMIT_CONFIG);
    consumerConfigConfiguration.put(
        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG);
    consumerConfigConfiguration.put(ConsumerConfig.GROUP_ID_CONFIG, "uc-application-configuration");

    // Create run options from args
    final PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
    options.setRunner(SamzaRunner.class);
    options.setJobName(JOB_NAME);

    final Pipeline pipeline = Pipeline.create(options);
    final CoderRegistry cr = pipeline.getCoderRegistry();

    // Set Coders for Classes that will be distributed
    cr.registerCoderForClass(ActivePowerRecord.class,
        NullableCoder.of(AvroCoder.of(ActivePowerRecord.class)));
    cr.registerCoderForClass(AggregatedActivePowerRecord.class,
        new AggregatedActivePowerRecordCoder());
    cr.registerCoderForClass(Set.class, SetCoder.of(StringUtf8Coder.of()));
    cr.registerCoderForClass(Event.class, new EventCoder());
    cr.registerCoderForClass(SensorParentKey.class, new SensorParentKeyCoder());
    cr.registerCoderForClass(StatsAccumulator.class, AvroCoder.of(StatsAccumulator.class));


    @SuppressWarnings({"unchecked", "rawtypes"})
    final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka =
        KafkaIO.<String, ActivePowerRecord>read()
            .withBootstrapServers(bootstrapServer)
            .withTopic(inputTopic)
            .withKeyDeserializer(StringDeserializer.class)
            .withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
                NullableCoder.of(AvroCoder.of(ActivePowerRecord.class)))
            .withConsumerConfigUpdates(consumerConfig)
            // Set TimeStampPolicy for event time
            .withTimestampPolicyFactory(
                (tp, previousWaterMark) -> new EventTimePolicy(previousWaterMark))
            .withoutMetadata();

    // Apply pipeline transformations
    // Read from Kafka
    final PCollection<KV<String, ActivePowerRecord>> values = pipeline.apply(kafka)
        .apply("Read Windows", Window.into(FixedWindows.of(duration)))
        .apply("Set trigger for input", Window
            .<KV<String, ActivePowerRecord>>configure()
            .triggering(Repeatedly.forever(
                AfterProcessingTime.pastFirstElementInPane()
                    .plusDelayOf(triggerDelay)))
            .withAllowedLateness(gracePeriod)
            .discardingFiredPanes());

    // Read the results of earlier aggregations.
    final PCollection<KV<String, ActivePowerRecord>> aggregationsInput = pipeline
        .apply("Read aggregation results", KafkaIO.<String, AggregatedActivePowerRecord>read()
            .withBootstrapServers(bootstrapServer)
            .withTopic(feedbackTopic)
            .withKeyDeserializer(StringDeserializer.class)
            .withValueDeserializer(AggregatedActivePowerRecordDeserializer.class)
            .withTimestampPolicyFactory(
                (tp, previousWaterMark) -> new AggregatedActivePowerRecordEventTimePolicy(
                          previousWaterMark))
            .withoutMetadata())
        .apply("Apply Winddows", Window.into(FixedWindows.of(duration)))
        // Convert into the correct data format
        .apply("Convert AggregatedActivePowerRecord to ActivePowerRecord",
            MapElements.via(
                new SimpleFunction<KV<String, AggregatedActivePowerRecord>,
                    KV<String, ActivePowerRecord>>() {
                @Override
                public KV<String, ActivePowerRecord> apply(
                    final KV<String, AggregatedActivePowerRecord> kv) {
                      return KV.of(kv.getKey(), new ActivePowerRecord(kv.getValue().getIdentifier(),
                            kv.getValue().getTimestamp(), kv.getValue().getSumInW()));
                }
                }))
        .apply("Set trigger for feedback", Window
            .<KV<String, ActivePowerRecord>>configure()
            .triggering(Repeatedly.forever(
                AfterProcessingTime.pastFirstElementInPane()
                    .plusDelayOf(triggerDelay)))
            .withAllowedLateness(gracePeriod)
            .discardingFiredPanes());
    // Prepare flatten
    final PCollectionList<KV<String, ActivePowerRecord>> collections =
        PCollectionList.of(values).and(aggregationsInput);

    // Create a single PCollection out of the input and already computed results
    final PCollection<KV<String, ActivePowerRecord>> inputCollection =
        collections.apply("Flatten sensor data and aggregation results",
            Flatten.pCollections());


    // Build the configuration stream from a changelog.
    final PCollection<KV<String, Set<String>>> configurationStream = pipeline
        .apply("Read sensor groups", KafkaIO.<Event, String>read()
            .withBootstrapServers(bootstrapServer)
            .withTopic(configurationTopic)
            .withKeyDeserializer(EventDeserializer.class)
            .withValueDeserializer(StringDeserializer.class)
            .withConsumerConfigUpdates(consumerConfigConfiguration)
            .withoutMetadata())
        // Only forward relevant changes in the hierarchie
        .apply("Filter changed and status events",
            Filter.by(new SerializableFunction<KV<Event, String>, Boolean>() {
              @Override
              public Boolean apply(final KV<Event, String> kv) {
                return kv.getKey() == Event.SENSOR_REGISTRY_CHANGED
                    || kv.getKey() == Event.SENSOR_REGISTRY_STATUS;
              }
            }))
        // Build the changelog
        .apply("Generate Parents for every Sensor", ParDo.of(new GenerateParentsFn()))
        .apply("Update child and parent pairs", ParDo.of(new UpdateChildParentPairs()))
        .apply("Set trigger for configuration", Window
            .<KV<String, Set<String>>>configure()
            .triggering(AfterWatermark.pastEndOfWindow()
                .withEarlyFirings(
                    AfterPane.elementCountAtLeast(1)))
            .withAllowedLateness(Duration.ZERO)
            .accumulatingFiredPanes());

    final PCollectionView<Map<String, Set<String>>> childParentPairMap =
        configurationStream.apply(Latest.perKey())
            // Reset trigger to avoid synchronized processing time
            .apply("Reset trigger for configurations", Window
                .<KV<String, Set<String>>>configure()
                .triggering(AfterWatermark.pastEndOfWindow()
                    .withEarlyFirings(
                        AfterPane.elementCountAtLeast(1)))
                .withAllowedLateness(Duration.ZERO)
                .accumulatingFiredPanes())
            .apply(View.asMap());

    // Build pairs of every sensor reading and parent
    final PCollection<KV<SensorParentKey, ActivePowerRecord>> flatMappedValues =
        inputCollection.apply(
            "Duplicate as flatMap",
            ParDo.of(
                new DoFn<KV<String, ActivePowerRecord>, KV<SensorParentKey, ActivePowerRecord>>() {
                  @StateId("parents")
                  private final StateSpec<ValueState<Set<String>>> parents = StateSpecs.value();

                  // Generate a KV-pair for every child-parent match
                  @ProcessElement
                  public void processElement(@Element final KV<String, ActivePowerRecord> kv,
                      final OutputReceiver<KV<SensorParentKey, ActivePowerRecord>> out,
                      @StateId("parents") final ValueState<Set<String>> state,
                      final ProcessContext c) {
                    final ActivePowerRecord record = kv.getValue() == null ? null : kv.getValue();
                    final Set<String> newParents =
                        c.sideInput(childParentPairMap).get(kv.getKey()) == null
                            ? Collections.emptySet()
                            : c.sideInput(childParentPairMap).get(kv.getKey());
                    final Set<String> oldParents =
                        MoreObjects.firstNonNull(state.read(), Collections.emptySet());
                    // Forward new Pairs if they exist
                    if (!newParents.isEmpty()) {
                      for (final String parent : newParents) {

                        // Forward flat mapped record
                        final SensorParentKey key = new SensorParentKey(kv.getKey(), parent);
                        out.output(KV.of(key, record));
                      }
                    }
                    if (!newParents.equals(oldParents)) {
                      for (final String oldParent : oldParents) {
                        if (!newParents.contains(oldParent)) {
                          // Forward Delete
                          final SensorParentKey key = new SensorParentKey(kv.getKey(), oldParent);
                          out.output(KV.of(key, null));
                        }
                      }
                      state.write(newParents);
                    }
                  }
                }).withSideInputs(childParentPairMap))

            .apply("Filter only latest changes", Latest.perKey())
            .apply("Filter out null values",
                Filter.by(
                    new SerializableFunction<KV<SensorParentKey, ActivePowerRecord>, Boolean>() {
                      @Override
                      public Boolean apply(final KV<SensorParentKey, ActivePowerRecord> kv) {
                        return kv.getValue() != null;
                      }
                    }));
    // Aggregate for every sensor group of the current level
    final PCollection<KV<String, AggregatedActivePowerRecord>>
        aggregations = flatMappedValues
        .apply("Set key to group", MapElements.via(
            new SimpleFunction<KV<SensorParentKey,
                ActivePowerRecord>, KV<String, ActivePowerRecord>>() {
              @Override
              public KV<String, ActivePowerRecord> apply(
                  final KV<SensorParentKey, ActivePowerRecord> kv) {

                return KV.of(kv.getKey().getParent(), kv.getValue());
              }
            }))
        // Reset trigger to avoid synchronized processing time
        .apply("Reset trigger for aggregations", Window
            .<KV<String, ActivePowerRecord>>configure()
            .triggering(Repeatedly.forever(
                AfterProcessingTime.pastFirstElementInPane()
                    .plusDelayOf(triggerDelay)))
            .withAllowedLateness(gracePeriod)
            .discardingFiredPanes())

        .apply(
            "Aggregate per group",
            Combine.perKey(new RecordAggregation()))
        .apply("Set the Identifier in AggregatedActivePowerRecord",
            MapElements.via(
              new SimpleFunction<KV<String, AggregatedActivePowerRecord>,
                  KV<String, AggregatedActivePowerRecord>>() {
                @Override
                public KV<String, AggregatedActivePowerRecord> apply(
                    final KV<String, AggregatedActivePowerRecord> kv) {
                  final AggregatedActivePowerRecord record = new AggregatedActivePowerRecord(
                      kv.getKey(), kv.getValue().getTimestamp(), kv.getValue().getCount(),
                      kv.getValue().getSumInW(), kv.getValue().getAverageInW());
                  return KV.of(kv.getKey(), record);
                }
              }));



    aggregations.apply("Write to aggregation results",
        KafkaIO.<String, AggregatedActivePowerRecord>write()
            .withBootstrapServers(bootstrapServer)
            .withTopic(outputTopic)
            .withKeySerializer(StringSerializer.class)
            .withValueSerializer(AggregatedActivePowerRecordSerializer.class));


    aggregations
        .apply("Write to feedback topic", KafkaIO.<String, AggregatedActivePowerRecord>write()
            .withBootstrapServers(bootstrapServer)
            .withTopic(feedbackTopic)
            .withKeySerializer(StringSerializer.class)
            .withValueSerializer(AggregatedActivePowerRecordSerializer.class));


    pipeline.run().waitUntilFinish();
  }

}