Skip to content
Snippets Groups Projects
Commit ead33635 authored by Sören Henning's avatar Sören Henning
Browse files

Minor JavaDoc fixes

parent 3cfdc5a4
Branches
Tags
No related merge requests found
Pipeline #10416 passed
...@@ -10,8 +10,8 @@ import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord; ...@@ -10,8 +10,8 @@ import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord;
/** /**
* TimeStampPolicy to use event time based on the timestamp of the record value. * TimeStampPolicy to use event time based on the timestamp of the record value.
*/ */
public class EventTimePolicy public class EventTimePolicy extends TimestampPolicy<String, ActivePowerRecord> {
extends TimestampPolicy<String, ActivePowerRecord> {
protected Instant currentWatermark; protected Instant currentWatermark;
public EventTimePolicy(final Optional<Instant> previousWatermark) { public EventTimePolicy(final Optional<Instant> previousWatermark) {
...@@ -19,7 +19,6 @@ public class EventTimePolicy ...@@ -19,7 +19,6 @@ public class EventTimePolicy
this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE); this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
} }
@Override @Override
public Instant getTimestampForRecord(final PartitionContext ctx, public Instant getTimestampForRecord(final PartitionContext ctx,
final KafkaRecord<String, ActivePowerRecord> record) { final KafkaRecord<String, ActivePowerRecord> record) {
......
...@@ -9,7 +9,7 @@ import org.apache.beam.sdk.values.PCollection; ...@@ -9,7 +9,7 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Deserializer;
/** /**
* Simple {@link PTransform} that read from Kafka using {@link KafkaIO}. * Simple {@link PTransform} that reads from Kafka using {@link KafkaIO}.
* *
* @param <K> Type of the Key. * @param <K> Type of the Key.
* @param <V> Type of the Value. * @param <V> Type of the Value.
...@@ -17,10 +17,11 @@ import org.apache.kafka.common.serialization.Deserializer; ...@@ -17,10 +17,11 @@ import org.apache.kafka.common.serialization.Deserializer;
public class KafkaGenericReader<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> { public class KafkaGenericReader<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> {
private static final long serialVersionUID = 2603286150183186115L; private static final long serialVersionUID = 2603286150183186115L;
private final PTransform<PBegin, PCollection<KV<K, V>>> reader; private final PTransform<PBegin, PCollection<KV<K, V>>> reader;
/** /**
* Instantiates a {@link PTransform} that reads from Kafka with the given Configuration. * Instantiates a {@link PTransform} that reads from Kafka with the given configuration.
*/ */
public KafkaGenericReader( public KafkaGenericReader(
final String bootstrapServer, final String bootstrapServer,
...@@ -30,13 +31,12 @@ public class KafkaGenericReader<K, V> extends PTransform<PBegin, PCollection<KV< ...@@ -30,13 +31,12 @@ public class KafkaGenericReader<K, V> extends PTransform<PBegin, PCollection<KV<
final Class<? extends Deserializer<V>> valueDeserializer) { final Class<? extends Deserializer<V>> valueDeserializer) {
super(); super();
// Check if boostrap server and inputTopic are defined // Check if the boostrap server and the input topic are defined
if (bootstrapServer.isEmpty() || inputTopic.isEmpty()) { if (bootstrapServer.isEmpty() || inputTopic.isEmpty()) {
throw new IllegalArgumentException("bootstrapServer or inputTopic missing"); throw new IllegalArgumentException("bootstrapServer or inputTopic missing");
} }
this.reader = this.reader = KafkaIO.<K, V>read()
KafkaIO.<K, V>read()
.withBootstrapServers(bootstrapServer) .withBootstrapServers(bootstrapServer)
.withTopic(inputTopic) .withTopic(inputTopic)
.withKeyDeserializer(keyDeserializer) .withKeyDeserializer(keyDeserializer)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment