diff --git a/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/kafka/EventTimePolicy.java b/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/kafka/EventTimePolicy.java index a63b5f4939566134a0aeec765fe084ea5bcc41ff..582e1ecc4b41e83e62d390da17ff3a7a2e64be42 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/kafka/EventTimePolicy.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/kafka/EventTimePolicy.java @@ -10,8 +10,8 @@ import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord; /** * TimeStampPolicy to use event time based on the timestamp of the record value. */ -public class EventTimePolicy - extends TimestampPolicy<String, ActivePowerRecord> { +public class EventTimePolicy extends TimestampPolicy<String, ActivePowerRecord> { + protected Instant currentWatermark; public EventTimePolicy(final Optional<Instant> previousWatermark) { @@ -19,7 +19,6 @@ public class EventTimePolicy this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE); } - @Override public Instant getTimestampForRecord(final PartitionContext ctx, final KafkaRecord<String, ActivePowerRecord> record) { diff --git a/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/kafka/KafkaGenericReader.java b/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/kafka/KafkaGenericReader.java index 000ddcdccd90cf3bc4f0cdaabe004ce74bef5dec..577019659f14fd6c9057868db4acbbc0c7e1447f 100644 --- a/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/kafka/KafkaGenericReader.java +++ b/theodolite-benchmarks/beam-commons/src/main/java/rocks/theodolite/benchmarks/commons/beam/kafka/KafkaGenericReader.java @@ -9,7 +9,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.kafka.common.serialization.Deserializer; /** - * Simple {@link PTransform} that read from Kafka using {@link KafkaIO}. + * Simple {@link PTransform} that reads from Kafka using {@link KafkaIO}. * * @param <K> Type of the Key. * @param <V> Type of the Value. @@ -17,10 +17,11 @@ import org.apache.kafka.common.serialization.Deserializer; public class KafkaGenericReader<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> { private static final long serialVersionUID = 2603286150183186115L; + private final PTransform<PBegin, PCollection<KV<K, V>>> reader; /** - * Instantiates a {@link PTransform} that reads from Kafka with the given Configuration. + * Instantiates a {@link PTransform} that reads from Kafka with the given configuration. */ public KafkaGenericReader( final String bootstrapServer, @@ -30,19 +31,18 @@ public class KafkaGenericReader<K, V> extends PTransform<PBegin, PCollection<KV< final Class<? extends Deserializer<V>> valueDeserializer) { super(); - // Check if boostrap server and inputTopic are defined + // Check if the boostrap server and the input topic are defined if (bootstrapServer.isEmpty() || inputTopic.isEmpty()) { throw new IllegalArgumentException("bootstrapServer or inputTopic missing"); } - this.reader = - KafkaIO.<K, V>read() - .withBootstrapServers(bootstrapServer) - .withTopic(inputTopic) - .withKeyDeserializer(keyDeserializer) - .withValueDeserializer(valueDeserializer) - .withConsumerConfigUpdates(consumerConfig) - .withoutMetadata(); + this.reader = KafkaIO.<K, V>read() + .withBootstrapServers(bootstrapServer) + .withTopic(inputTopic) + .withKeyDeserializer(keyDeserializer) + .withValueDeserializer(valueDeserializer) + .withConsumerConfigUpdates(consumerConfig) + .withoutMetadata(); } @Override