Skip to content
Snippets Groups Projects
Commit a0bb0d9b authored by Lorenz Boguhn's avatar Lorenz Boguhn
Browse files

uc3-beam-flink Rm cs + sp warnings but pmd remain

parent 5c2b370b
No related branches found
No related tags found
1 merge request!187Migrate Beam benchmark implementation
package application;
import java.util.Optional;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.TimestampPolicy;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
......@@ -15,6 +15,7 @@ public class EventTimePolicy
protected Instant currentWatermark;
public EventTimePolicy(final Optional<Instant> previousWatermark) {
super();
this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
}
......
package application;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
/**
* Composed key of an hour of the day and a sensor id.
*/
@DefaultCoder(AvroCoder.class)
public class HourOfDayKey{
public class HourOfDayKey {
private final int hourOfDay;
private final String sensorId;
......
......@@ -5,6 +5,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
......@@ -13,9 +14,12 @@ import org.apache.kafka.common.serialization.Serde;
/**
* Wrapper Class that encapsulates a HourOfDayKeySerde in a org.apache.beam.sdk.coders.Coder.
*/
@SuppressWarnings("serial")
public class HourOfDaykeyCoder extends Coder<HourOfDayKey> implements Serializable {
private transient Serde<HourOfDayKey> innerSerde = HourOfDayKeySerde.create();
private static final int VALUE_SIZE = 4;
private static final boolean DETEMINISTIC = false;
public static final long serialVersionUID = 4444444;
private Serde<HourOfDayKey> innerSerde = HourOfDayKeySerde.create();
@Override
public void encode(final HourOfDayKey value, final OutputStream outStream)
......@@ -24,7 +28,7 @@ public class HourOfDaykeyCoder extends Coder<HourOfDayKey> implements Serializab
this.innerSerde = HourOfDayKeySerde.create();
}
final byte[] bytes = this.innerSerde.serializer().serialize("ser", value);
final byte[] sizeinBytes = ByteBuffer.allocate(4).putInt(bytes.length).array();
final byte[] sizeinBytes = ByteBuffer.allocate(VALUE_SIZE).putInt(bytes.length).array();
outStream.write(sizeinBytes);
outStream.write(bytes);
}
......@@ -34,22 +38,23 @@ public class HourOfDaykeyCoder extends Coder<HourOfDayKey> implements Serializab
if (this.innerSerde == null) {
this.innerSerde = HourOfDayKeySerde.create();
}
final byte[] sizeinBytes = new byte[4];
inStream.read(sizeinBytes);
final byte[] sizeinBytes = new byte[VALUE_SIZE];
//inStream.read(sizeinBytes);
final int size = ByteBuffer.wrap(sizeinBytes).getInt();
final byte[] bytes = new byte[size];
inStream.read(bytes);
//inStream.read(bytes);
return this.innerSerde.deserializer().deserialize("deser", bytes);
}
@Override
public List<? extends Coder<?>> getCoderArguments() {
return null;
return Collections.emptyList();
}
@Override
public void verifyDeterministic() throws NonDeterministicException {
if (!DETEMINISTIC) {
throw new NonDeterministicException(this, "This class is not deterministic!");
}
}
}
......@@ -7,6 +7,8 @@ import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
......@@ -42,65 +44,67 @@ import titan.ccp.model.records.ActivePowerRecord;
* ${workspace_loc:/uc4-application-samza/eclipseConsoleLogs.log} as Output File under Standard
* Input Output in Common in the Run Configuration Start via Eclipse Run.
*/
public class Uc3ApplicationBeam {
public final class Uc3ApplicationBeam {
private static final String JOB_NAME = "Uc3Application";
private static final String BOOTSTRAP = "KAFKA_BOOTSTRAP_SERVERS";
private static final String INPUT = "INPUT";
private static final String OUTPUT = "OUTPUT";
private static final String SCHEMA_REGISTRY = "SCHEMA_REGISTRY_URL";
private static final String YES = "true";
private static final String USE_AVRO_READER = YES;
private static final String AUTO_COMMIT_CONFIG = YES;
private static final String KAFKA_WINDOW_DURATION_DAYS = "KAFKA_WINDOW_DURATION_MINUTES";
private static final String AGGREGATION_ADVANCE_DAYS = "AGGREGATION_ADVANCE_DAYS";
private static final String TRIGGER_INTERVAL = "TRIGGER_INTERVAL";
/**
* Private constructor to avoid instantiation.
*/
private Uc3ApplicationBeam() {
throw new UnsupportedOperationException();
}
@SuppressWarnings("serial")
/**
* Start running this microservice.
*/
public static void main(final String[] args) {
// Set Configuration for Windows
final int windowDuration = Integer.parseInt(
System.getenv("KAFKA_WINDOW_DURATION_DAYS") != null
? System.getenv("KAFKA_WINDOW_DURATION_DAYS")
: "30");
System.getenv(KAFKA_WINDOW_DURATION_DAYS) == null
? "30" : System.getenv(KAFKA_WINDOW_DURATION_DAYS));
final Duration duration = Duration.standardDays(windowDuration);
final int aggregationAdvance = Integer.parseInt(
System.getenv("AGGREGATION_ADVANCE_DAYS") != null
? System.getenv("AGGREGATION_ADVANCE_DAYS")
: "1");
System.getenv(AGGREGATION_ADVANCE_DAYS) == null
? "1" : System.getenv(AGGREGATION_ADVANCE_DAYS));
final Duration advance = Duration.standardDays(aggregationAdvance);
final int triggerInterval = Integer.parseInt(
System.getenv("TRIGGER_INTERVAL") != null
? System.getenv("TRIGGER_INTERVAL")
: "15");
System.getenv(TRIGGER_INTERVAL) == null
? "15" : System.getenv(TRIGGER_INTERVAL));
final Duration triggerDelay = Duration.standardSeconds(triggerInterval);
// Set Configuration for Kafka
final String bootstrapServer =
System.getenv("KAFKA_BOOTSTRAP_SERVERS") != null ? System.getenv("KAFKA_BOOTSTRAP_SERVERS")
: "my-confluent-cp-kafka:9092";
final String inputTopic = System.getenv("INPUT") != null ? System.getenv("INPUT") : "input";
final String outputTopic = System.getenv("OUTPUT") != null ? System.getenv("OUTPUT") : "output";
final String schemaRegistryURL =
System.getenv("SCHEMA_REGISTRY_URL") != null ? System.getenv("SCHEMA_REGISTRY_URL")
: "http://my-confluent-cp-schema-registry:8081";
// Set consumer configuration for the schema registry and commits back to Kafka
final HashMap<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put("schema.registry.url", schemaRegistryURL);
consumerConfig.put("specific.avro.reader", "true");
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "uc-application");
System.getenv(BOOTSTRAP) == null ? "my-confluent-cp-kafka:9092"
: System.getenv(BOOTSTRAP);
final String inputTopic = System.getenv(INPUT) == null ? "input" : System.getenv(INPUT);
final String outputTopic = System.getenv(OUTPUT) == null ? "output" : System.getenv(OUTPUT);
final String schemaRegistryUrl =
System.getenv(SCHEMA_REGISTRY) == null ? "http://my-confluent-cp-schema-registry:8081"
: System.getenv(SCHEMA_REGISTRY);
final Map<String, Object> consumerConfig = buildConsumerConfig(schemaRegistryUrl);
final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory();
final PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
options.setRunner(FlinkRunner.class);
options.setJobName("ucapplication");
options.setJobName(JOB_NAME);
final Pipeline pipeline = Pipeline.create(options);
final CoderRegistry cr = pipeline.getCoderRegistry();
cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$));
cr.registerCoderForClass(HourOfDayKey.class, new HourOfDaykeyCoder());
cr.registerCoderForClass(StatsAggregation.class,
SerializableCoder.of(StatsAggregation.class));
cr.registerCoderForClass(StatsAccumulator.class, AvroCoder.of(StatsAccumulator.class));
registerCoders(cr);
@SuppressWarnings({"rawtypes", "unchecked"})
final PTransform<PBegin, PCollection<KV<String, ActivePowerRecord>>> kafka =
......@@ -120,8 +124,9 @@ public class Uc3ApplicationBeam {
pipeline.apply(kafka)
// Map to correct time format
.apply(MapElements.via(
new SimpleFunction<KV<String, ActivePowerRecord>, KV<HourOfDayKey, ActivePowerRecord>>() {
final ZoneId zone = ZoneId.of("Europe/Paris");
new SimpleFunction<KV<String, ActivePowerRecord>, KV<HourOfDayKey,
ActivePowerRecord>>() {
private final ZoneId zone = ZoneId.of("Europe/Paris");
@Override
public KV<application.HourOfDayKey, ActivePowerRecord> apply(
......@@ -162,11 +167,37 @@ public class Uc3ApplicationBeam {
.withKeySerializer(StringSerializer.class)
.withValueSerializer(StringSerializer.class));
pipeline.run().waitUntilFinish();
}
/**
* Builds a configuration for a Kafka consumer.
* @param schemaRegistryUrl the url to the SchemaRegistry.
* @return the configuration.
*/
public static Map<String, Object> buildConsumerConfig(final String schemaRegistryUrl) {
// Set consumer configuration for the schema registry and commits back to Kafka
final HashMap<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AUTO_COMMIT_CONFIG);
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put("schema.registry.url", schemaRegistryUrl);
consumerConfig.put("specific.avro.reader", USE_AVRO_READER);
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, JOB_NAME);
return consumerConfig;
}
/**
* Registers all Coders for all needed Coders.
* @param cr CoderRegistry.
*/
private static void registerCoders(final CoderRegistry cr) {
cr.registerCoderForClass(ActivePowerRecord.class, AvroCoder.of(ActivePowerRecord.SCHEMA$));
cr.registerCoderForClass(HourOfDayKey.class, new HourOfDaykeyCoder());
cr.registerCoderForClass(StatsAggregation.class,
SerializableCoder.of(StatsAggregation.class));
cr.registerCoderForClass(StatsAccumulator.class, AvroCoder.of(StatsAccumulator.class));
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment