Skip to content
Snippets Groups Projects

Draft: Add Spark Structured Streaming based Benchmarks

Open Christian Richter requested to merge stu204011/theodolite:theodolite-spark into main
5 unresolved threads
1 file
+ 3
3
Compare changes
  • Side-by-side
  • Inline
@@ -61,7 +61,7 @@ public final class HistoryServiceSparkJob {
.readStream()
.format("kafka")
.option("enable.auto.commit", "true")
.option("kafka.bootstrap.servers", "theodolite-cp-kafka:9092")
.option("kafka.bootstrap.servers", "theodolite-kafka-kafka-bootstrap:9092")
.option("subscribe", "input")
.load();
@@ -114,7 +114,7 @@ public final class HistoryServiceSparkJob {
// allColumns = new StructType(Arrays.asList(windowDs.columns()).stream().map(f -> col(f)));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "theodolite-cp-kafka:9092");
kafkaParams.put("bootstrap.servers", "theodolite-kafka-kafka-bootstrap:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
@@ -129,7 +129,7 @@ public final class HistoryServiceSparkJob {
windowDs
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "theodolite-cp-kafka:9092")
.option("kafka.bootstrap.servers", "theodolite-kafka-kafka-bootstrap:9092")
.option("checkpointLocation", "/tmp/spark-checkpoint") // TODO for now checkpoint into local storage. Should probably be changed
.option("topic", "output")
.outputMode("append")
Loading