Skip to content
Snippets Groups Projects
Commit f56fc513 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'feature/147-add-beam-implementations' of...

Merge branch 'feature/147-add-beam-implementations' of git.se.informatik.uni-kiel.de:stu203404/theodolite into feature/147-add-beam-implementations
parents e7d6031a b6d305b4
No related branches found
No related tags found
1 merge request!187Migrate Beam benchmark implementation
Showing
with 10 additions and 32 deletions
...@@ -2,7 +2,6 @@ package theodolite.commons.beam; ...@@ -2,7 +2,6 @@ package theodolite.commons.beam;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
......
...@@ -7,8 +7,6 @@ public final class ConfigurationKeys { ...@@ -7,8 +7,6 @@ public final class ConfigurationKeys {
// Common keys // Common keys
public static final String APPLICATION_NAME = "application.name"; public static final String APPLICATION_NAME = "application.name";
public static final String APPLICATION_VERSION = "application.version";
public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; public static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
...@@ -31,8 +29,6 @@ public final class ConfigurationKeys { ...@@ -31,8 +29,6 @@ public final class ConfigurationKeys {
public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days"; public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days";
// UC4 // UC4
public static final String EMIT_PERIOD_MS = "emit.period.ms";
public static final String GRACE_PERIOD_MS = "grace.period.ms"; public static final String GRACE_PERIOD_MS = "grace.period.ms";
...@@ -46,8 +42,6 @@ public final class ConfigurationKeys { ...@@ -46,8 +42,6 @@ public final class ConfigurationKeys {
public static final String TRIGGER_INTERVAL = "trigger.interval"; public static final String TRIGGER_INTERVAL = "trigger.interval";
private ConfigurationKeys() { private ConfigurationKeys() {
} }
......
...@@ -24,9 +24,8 @@ public class KafkaActivePowerRecordReader extends ...@@ -24,9 +24,8 @@ public class KafkaActivePowerRecordReader extends
/** /**
* Instantiates a {@link PTransform} that reads from Kafka with the given Configuration. * Instantiates a {@link PTransform} that reads from Kafka with the given Configuration.
*/ */
@SuppressWarnings({"unchecked", "rawtypes"})
public KafkaActivePowerRecordReader(final String bootstrapServer, final String inputTopic, public KafkaActivePowerRecordReader(final String bootstrapServer, final String inputTopic,
final Map consumerConfig) { final Map<String, Object> consumerConfig) {
super(); super();
// Check if boostrap server and inputTopic are defined // Check if boostrap server and inputTopic are defined
......
...@@ -25,9 +25,8 @@ public class KafkaActivePowerTimestampReader extends ...@@ -25,9 +25,8 @@ public class KafkaActivePowerTimestampReader extends
/** /**
* Instantiates a {@link PTransform} that reads from Kafka with the given Configuration. * Instantiates a {@link PTransform} that reads from Kafka with the given Configuration.
*/ */
@SuppressWarnings({"unchecked", "rawtypes"})
public KafkaActivePowerTimestampReader(final String bootstrapServer, final String inputTopic, public KafkaActivePowerTimestampReader(final String bootstrapServer, final String inputTopic,
final Map consumerConfig) { final Map<String, Object> consumerConfig) {
super(); super();
// Check if boostrap server and inputTopic are defined // Check if boostrap server and inputTopic are defined
......
...@@ -22,9 +22,8 @@ public class KafkaGenericReader<K, V> extends ...@@ -22,9 +22,8 @@ public class KafkaGenericReader<K, V> extends
/** /**
* Instantiates a {@link PTransform} that reads from Kafka with the given Configuration. * Instantiates a {@link PTransform} that reads from Kafka with the given Configuration.
*/ */
@SuppressWarnings({"unchecked", "rawtypes"})
public KafkaGenericReader(final String bootstrapServer, final String inputTopic, public KafkaGenericReader(final String bootstrapServer, final String inputTopic,
final Map consumerConfig, final Map<String, Object> consumerConfig,
final Class<? extends final Class<? extends
org.apache.kafka.common.serialization.Deserializer<K>> org.apache.kafka.common.serialization.Deserializer<K>>
keyDeserializer, keyDeserializer,
......
...@@ -24,7 +24,6 @@ public final class Uc1BeamFlink extends AbstractBeamService { ...@@ -24,7 +24,6 @@ public final class Uc1BeamFlink extends AbstractBeamService {
/** /**
* Main method. * Main method.
*/ */
@SuppressWarnings({"unchecked", "rawtypes", "unused"})
public static void main(final String[] args) { public static void main(final String[] args) {
// Create application via configurations // Create application via configurations
......
...@@ -26,7 +26,6 @@ public final class Uc1BeamSamza extends AbstractBeamService { ...@@ -26,7 +26,6 @@ public final class Uc1BeamSamza extends AbstractBeamService {
/** /**
* Main method. * Main method.
*/ */
@SuppressWarnings({"unchecked", "rawtypes", "unused"})
public static void main(final String[] args) { public static void main(final String[] args) {
// Create application via configurations // Create application via configurations
......
...@@ -34,7 +34,7 @@ public final class Uc1BeamPipeline extends AbstractPipeline { ...@@ -34,7 +34,7 @@ public final class Uc1BeamPipeline extends AbstractPipeline {
cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$)); cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$));
// build KafkaConsumerConfig // build KafkaConsumerConfig
final Map consumerConfig = buildConsumerConfig(); final Map<String, Object> consumerConfig = buildConsumerConfig();
// Create Pipeline transformations // Create Pipeline transformations
final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka = final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka =
......
...@@ -23,7 +23,6 @@ public final class Uc2BeamFlink extends AbstractBeamService { ...@@ -23,7 +23,6 @@ public final class Uc2BeamFlink extends AbstractBeamService {
/** /**
* Start running this microservice. * Start running this microservice.
*/ */
@SuppressWarnings({"serial", "unchecked", "rawtypes"})
public static void main(final String[] args) { public static void main(final String[] args) {
final Uc2BeamFlink uc2BeamFlink = new Uc2BeamFlink(args); final Uc2BeamFlink uc2BeamFlink = new Uc2BeamFlink(args);
......
...@@ -27,7 +27,6 @@ public final class Uc2BeamSamza extends AbstractBeamService { ...@@ -27,7 +27,6 @@ public final class Uc2BeamSamza extends AbstractBeamService {
/** /**
* Start running this microservice. * Start running this microservice.
*/ */
@SuppressWarnings({"serial", "unchecked", "rawtypes"})
public static void main(final String[] args) { public static void main(final String[] args) {
final Uc2BeamSamza uc2BeamSamza = new Uc2BeamSamza(args); final Uc2BeamSamza uc2BeamSamza = new Uc2BeamSamza(args);
......
...@@ -17,7 +17,6 @@ import org.apache.beam.sdk.transforms.windowing.Window; ...@@ -17,7 +17,6 @@ import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.POutput;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.Duration; import org.joda.time.Duration;
...@@ -38,7 +37,7 @@ import titan.ccp.model.records.ActivePowerRecord; ...@@ -38,7 +37,7 @@ import titan.ccp.model.records.ActivePowerRecord;
*/ */
public final class Uc2BeamPipeline extends AbstractPipeline { public final class Uc2BeamPipeline extends AbstractPipeline {
protected Uc2BeamPipeline(final PipelineOptions options,final Configuration config) { protected Uc2BeamPipeline(final PipelineOptions options, final Configuration config) {
super(options, config); super(options, config);
// Additional needed variables // Additional needed variables
final String outputTopic = config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC); final String outputTopic = config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC);
...@@ -48,7 +47,7 @@ public final class Uc2BeamPipeline extends AbstractPipeline { ...@@ -48,7 +47,7 @@ public final class Uc2BeamPipeline extends AbstractPipeline {
final Duration duration = Duration.standardMinutes(windowDurationMinutes); final Duration duration = Duration.standardMinutes(windowDurationMinutes);
// Build kafka configuration // Build kafka configuration
final Map consumerConfig = buildConsumerConfig(); final Map<String, Object> consumerConfig = buildConsumerConfig();
// Set Coders for Classes that will be distributed // Set Coders for Classes that will be distributed
final CoderRegistry cr = this.getCoderRegistry(); final CoderRegistry cr = this.getCoderRegistry();
...@@ -67,8 +66,8 @@ public final class Uc2BeamPipeline extends AbstractPipeline { ...@@ -67,8 +66,8 @@ public final class Uc2BeamPipeline extends AbstractPipeline {
final StatsToString statsToString = new StatsToString(); final StatsToString statsToString = new StatsToString();
// Write to Kafka // Write to Kafka
final PTransform<PCollection<KV<String, String>>, POutput> kafkaWriter = final KafkaWriterTransformation<String> kafkaWriter =
new KafkaWriterTransformation(bootstrapServer, outputTopic, StringSerializer.class); new KafkaWriterTransformation<>(bootstrapServer, outputTopic, StringSerializer.class);
// Apply pipeline transformations // Apply pipeline transformations
this.apply(kafkaActivePowerRecordReader) this.apply(kafkaActivePowerRecordReader)
......
...@@ -10,14 +10,11 @@ import org.apache.beam.sdk.coders.SerializableCoder; ...@@ -10,14 +10,11 @@ import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows; import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.POutput;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.Duration; import org.joda.time.Duration;
...@@ -63,7 +60,6 @@ public final class Uc3BeamPipeline extends AbstractPipeline { ...@@ -63,7 +60,6 @@ public final class Uc3BeamPipeline extends AbstractPipeline {
registerCoders(cr); registerCoders(cr);
// Read from Kafka // Read from Kafka
@SuppressWarnings({"rawtypes", "unchecked"})
final KafkaActivePowerTimestampReader kafka = final KafkaActivePowerTimestampReader kafka =
new KafkaActivePowerTimestampReader(bootstrapServer, inputTopic, consumerConfig); new KafkaActivePowerTimestampReader(bootstrapServer, inputTopic, consumerConfig);
...@@ -74,9 +70,8 @@ public final class Uc3BeamPipeline extends AbstractPipeline { ...@@ -74,9 +70,8 @@ public final class Uc3BeamPipeline extends AbstractPipeline {
final HourOfDayWithStats hourOfDayWithStats = new HourOfDayWithStats(); final HourOfDayWithStats hourOfDayWithStats = new HourOfDayWithStats();
// Write to Kafka // Write to Kafka
@SuppressWarnings({"rawtypes", "unchecked"}) final KafkaWriterTransformation<String> kafkaWriter =
final PTransform<PCollection<KV<String, String>>, POutput> kafkaWriter = new KafkaWriterTransformation<>(bootstrapServer, outputTopic, StringSerializer.class);
new KafkaWriterTransformation(bootstrapServer, outputTopic, StringSerializer.class);
this.apply(kafka) this.apply(kafka)
// Map to correct time format // Map to correct time format
......
...@@ -22,7 +22,6 @@ public final class Uc4BeamFlink extends AbstractBeamService { ...@@ -22,7 +22,6 @@ public final class Uc4BeamFlink extends AbstractBeamService {
/** /**
* Start running this microservice. * Start running this microservice.
*/ */
@SuppressWarnings({"serial", "unchecked", "rawtypes"})
public static void main(final String[] args) { public static void main(final String[] args) {
final Uc4BeamFlink uc4BeamFlink = new Uc4BeamFlink(args); final Uc4BeamFlink uc4BeamFlink = new Uc4BeamFlink(args);
......
...@@ -28,7 +28,6 @@ public final class Uc4BeamSamza extends AbstractBeamService { ...@@ -28,7 +28,6 @@ public final class Uc4BeamSamza extends AbstractBeamService {
/** /**
* Start running this microservice. * Start running this microservice.
*/ */
@SuppressWarnings({"serial", "unchecked", "rawtypes"})
public static void main(final String[] args) { public static void main(final String[] args) {
final Uc4BeamSamza uc4BeamSamza = new Uc4BeamSamza(args); final Uc4BeamSamza uc4BeamSamza = new Uc4BeamSamza(args);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment