From 1d3a979f73742fdcb684dc6e7db090f2650f557a Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Bj=C3=B6rn=20Vonheiden?= <bjoern.vonheiden@hotmail.de>
Date: Sun, 26 Jul 2020 21:24:52 +0200
Subject: [PATCH] Use the changed KafkaRecordSender in the workload generators.

Send avro data with the producer to kafka.
---
 .../uc1/workloadgenerator/LoadGenerator.java  | 23 ++++++++++++-------
 .../uc2/workloadgenerator/LoadGenerator.java  | 18 +++++++++++----
 .../uc3/workloadgenerator/LoadGenerator.java  | 17 ++++++++++----
 .../uc4/workloadgenerator/LoadGenerator.java  | 18 +++++++++++----
 4 files changed, 56 insertions(+), 20 deletions(-)

diff --git a/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java b/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java
index 381aef34d..842a668c1 100644
--- a/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java
+++ b/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java
@@ -13,7 +13,7 @@ import theodolite.commons.workloadgeneration.dimensions.KeySpace;
 import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator;
 import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder;
 import theodolite.commons.workloadgeneration.misc.ZooKeeper;
-import titan.ccp.models.records.ActivePowerRecord;
+import titan.ccp.model.records.ActivePowerRecord;
 
 /**
  * Load Generator for UC1.
@@ -41,11 +41,14 @@ public final class LoadGenerator {
         Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10"));
     final int periodMs =
         Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000"));
-    final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
+    final double value =
+        Double.parseDouble(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
     final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"),
         "4"));
     final String kafkaBootstrapServers =
         Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092");
+    final String schemaRegistryUrl =
+        Objects.requireNonNullElse(System.getenv("SCHEMA_REGISTRY_URL"), "http://localhost:8091");
     final String kafkaInputTopic =
         Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input");
     final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
@@ -60,12 +63,16 @@ public final class LoadGenerator {
     kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize);
     kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs);
     kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory);
-    final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = new KafkaRecordSender<>(
-        kafkaBootstrapServers,
-        kafkaInputTopic,
-        r -> r.getIdentifier(),
-        r -> r.getTimestamp(),
-        kafkaProperties);
+
+    final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender =
+        new KafkaRecordSender.Builder<ActivePowerRecord>(
+            kafkaBootstrapServers,
+            kafkaInputTopic,
+            schemaRegistryUrl)
+                .keyAccessor(r -> r.getIdentifier())
+                .timestampAccessor(r -> r.getTimestamp())
+                .defaultProperties(kafkaProperties)
+                .build();
 
     // create workload generator
     final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator =
diff --git a/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java b/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java
index 6cf7d81af..eadb4ccd2 100644
--- a/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java
+++ b/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java
@@ -14,9 +14,9 @@ import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator;
 import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder;
 import theodolite.commons.workloadgeneration.misc.ZooKeeper;
 import titan.ccp.configuration.events.Event;
+import titan.ccp.model.records.ActivePowerRecord;
 import titan.ccp.model.sensorregistry.MutableAggregatedSensor;
 import titan.ccp.model.sensorregistry.MutableSensorRegistry;
-import titan.ccp.models.records.ActivePowerRecord;
 
 public class LoadGenerator {
 
@@ -39,13 +39,16 @@ public class LoadGenerator {
         Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "1"));
     final int periodMs =
         Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000"));
-    final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
+    final double value =
+        Double.parseDouble(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
     final boolean sendRegistry = Boolean
         .parseBoolean(Objects.requireNonNullElse(System.getenv("SEND_REGISTRY"), "true"));
     final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4"));
     final String kafkaBootstrapServers =
         Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"),
             "localhost:9092");
+    final String schemaRegistryUrl =
+        Objects.requireNonNullElse(System.getenv("SCHEMA_REGISTRY_URL"), "http://localhost:8091");
     final String kafkaInputTopic =
         Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input");
     final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
@@ -64,9 +67,16 @@ public class LoadGenerator {
     kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize);
     kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs);
     kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory);
+
     final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender =
-        new KafkaRecordSender<>(kafkaBootstrapServers,
-            kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(), kafkaProperties);
+        new KafkaRecordSender.Builder<ActivePowerRecord>(
+            kafkaBootstrapServers,
+            kafkaInputTopic,
+            schemaRegistryUrl)
+                .keyAccessor(r -> r.getIdentifier())
+                .timestampAccessor(r -> r.getTimestamp())
+                .defaultProperties(kafkaProperties)
+                .build();
 
     // create workload generator
     final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator =
diff --git a/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java b/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java
index 80e3810f9..5c7441845 100644
--- a/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java
+++ b/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java
@@ -13,7 +13,7 @@ import theodolite.commons.workloadgeneration.dimensions.KeySpace;
 import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator;
 import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder;
 import theodolite.commons.workloadgeneration.misc.ZooKeeper;
-import titan.ccp.models.records.ActivePowerRecord;
+import titan.ccp.model.records.ActivePowerRecord;
 
 public class LoadGenerator {
 
@@ -33,11 +33,14 @@ public class LoadGenerator {
         Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10"));
     final int periodMs =
         Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000"));
-    final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
+    final double value =
+        Double.parseDouble(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
     final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4"));
     final String kafkaBootstrapServers =
         Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"),
             "localhost:9092");
+    final String schemaRegistryUrl =
+        Objects.requireNonNullElse(System.getenv("SCHEMA_REGISTRY_URL"), "http://localhost:8091");
     final String kafkaInputTopic =
         Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input");
     final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
@@ -53,8 +56,14 @@ public class LoadGenerator {
     kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs);
     kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory);
     final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender =
-        new KafkaRecordSender<>(kafkaBootstrapServers,
-            kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(), kafkaProperties);
+        new KafkaRecordSender.Builder<ActivePowerRecord>(
+            kafkaBootstrapServers,
+            kafkaInputTopic,
+            schemaRegistryUrl)
+                .keyAccessor(r -> r.getIdentifier())
+                .timestampAccessor(r -> r.getTimestamp())
+                .defaultProperties(kafkaProperties)
+                .build();
 
     // create workload generator
     final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator =
diff --git a/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java b/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java
index 84df87a6a..14e9e589d 100644
--- a/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java
+++ b/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java
@@ -13,7 +13,7 @@ import theodolite.commons.workloadgeneration.dimensions.KeySpace;
 import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator;
 import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder;
 import theodolite.commons.workloadgeneration.misc.ZooKeeper;
-import titan.ccp.models.records.ActivePowerRecord;
+import titan.ccp.model.records.ActivePowerRecord;
 
 public class LoadGenerator {
 
@@ -33,11 +33,14 @@ public class LoadGenerator {
         Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10"));
     final int periodMs =
         Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000"));
-    final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
+    final double value =
+        Double.parseDouble(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
     final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "1"));
     final String kafkaBootstrapServers =
         Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"),
             "localhost:9092");
+    final String schemaRegistryUrl =
+        Objects.requireNonNullElse(System.getenv("SCHEMA_REGISTRY_URL"), "http://localhost:8091");
     final String kafkaInputTopic =
         Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input");
     final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
@@ -52,9 +55,16 @@ public class LoadGenerator {
     kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize);
     kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs);
     kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory);
+
     final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender =
-        new KafkaRecordSender<>(kafkaBootstrapServers,
-            kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(), kafkaProperties);
+        new KafkaRecordSender.Builder<ActivePowerRecord>(
+            kafkaBootstrapServers,
+            kafkaInputTopic,
+            schemaRegistryUrl)
+                .keyAccessor(r -> r.getIdentifier())
+                .timestampAccessor(r -> r.getTimestamp())
+                .defaultProperties(kafkaProperties)
+                .build();
 
     // create workload generator
     final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator =
-- 
GitLab