Skip to content
Snippets Groups Projects

Firestore sink for UC1 Beam

Merged Sören Henning requested to merge uc1-beam-firestore into master
2 files
+ 17
21
Compare changes
  • Side-by-side
  • Inline
Files
2
@@ -6,6 +6,7 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.Deserializer;
/**
* Simple {@link PTransform} that read from Kafka using {@link KafkaIO}.
@@ -13,8 +14,7 @@ import org.apache.beam.sdk.values.PCollection;
* @param <K> Type of the Key.
* @param <V> Type of the Value.
*/
public class KafkaGenericReader<K, V> extends
PTransform<PBegin, PCollection<KV<K, V>>> {
public class KafkaGenericReader<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> {
private static final long serialVersionUID = 2603286150183186115L;
private final PTransform<PBegin, PCollection<KV<K, V>>> reader;
@@ -22,14 +22,12 @@ public class KafkaGenericReader<K, V> extends
/**
* Instantiates a {@link PTransform} that reads from Kafka with the given Configuration.
*/
public KafkaGenericReader(final String bootstrapServer, final String inputTopic,
final Map<String, Object> consumerConfig,
final Class<? extends
org.apache.kafka.common.serialization.Deserializer<K>>
keyDeserializer,
final Class<? extends
org.apache.kafka.common.serialization.Deserializer<V>>
valueDeserializer) {
public KafkaGenericReader(
final String bootstrapServer,
final String inputTopic,
final Map<String, Object> consumerConfig,
final Class<? extends Deserializer<K>> keyDeserializer,
final Class<? extends Deserializer<V>> valueDeserializer) {
super();
// Check if boostrap server and inputTopic are defined
@@ -37,7 +35,7 @@ public class KafkaGenericReader<K, V> extends
throw new IllegalArgumentException("bootstrapServer or inputTopic missing");
}
reader =
this.reader =
KafkaIO.<K, V>read()
.withBootstrapServers(bootstrapServer)
.withTopic(inputTopic)
Loading