Skip to content
Snippets Groups Projects
Commit a5b64db3 authored by Sören Henning's avatar Sören Henning
Browse files

Use Kieker records instead of Avro for load generation

parent 60187c25
No related branches found
No related tags found
No related merge requests found
Pipeline #918 failed
Showing with 20 additions and 20 deletions
......@@ -79,8 +79,9 @@ configure(useCaseApplications) {
configure(useCaseGenerators) {
dependencies {
// These dependencies are used internally, and not exposed to consumers on their own compile classpath.
implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
//implementation('org.industrial-devops:titan-ccp-common:0.0.4-SNAPSHOT') { changing = true }
//implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
implementation 'com.google.guava:guava:1.14'
implementation 'com.google.guava:guava:24.1-jre'
implementation 'org.jctools:jctools-core:2.1.1'
implementation 'org.slf4j:slf4j-simple:1.7.25'
......@@ -98,8 +99,8 @@ configure(commonProjects) {
dependencies {
// These dependencies are used internally, and not exposed to consumers on their own compile classpath.
implementation 'org.slf4j:slf4j-simple:1.7.25'
implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
implementation('org.industrial-devops:titan-ccp-common:0.0.4-SNAPSHOT') { changing = true }
//implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
// Use JUnit test framework
testImplementation 'junit:junit:4.12'
......
......@@ -13,7 +13,7 @@ import theodolite.commons.workloadgeneration.dimensions.KeySpace;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder;
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
import titan.ccp.model.records.ActivePowerRecord;
import titan.ccp.models.records.ActivePowerRecord;
/**
* Load Generator for UC1.
......
......@@ -14,8 +14,8 @@ import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder;
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
import titan.ccp.configuration.events.Event;
import titan.ccp.model.records.ActivePowerRecord;
import titan.ccp.model.sensorregistry.SensorRegistry;
import titan.ccp.models.records.ActivePowerRecord;
/**
* The {@code LoadGenerator} creates a load in Kafka.
......
......@@ -13,7 +13,8 @@ import theodolite.commons.workloadgeneration.dimensions.KeySpace;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder;
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
import titan.ccp.model.records.ActivePowerRecord;
import titan.ccp.models.records.ActivePowerRecord;
/**
* The {@code LoadGenerator} creates a load in Kafka.
......
......@@ -13,7 +13,7 @@ import theodolite.commons.workloadgeneration.dimensions.KeySpace;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder;
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
import titan.ccp.model.records.ActivePowerRecord;
import titan.ccp.models.records.ActivePowerRecord;
/**
* The {@code LoadGenerator} creates a load in Kafka.
......
......@@ -2,7 +2,7 @@ package theodolite.commons.workloadgeneration.communication.kafka;
import java.util.Properties;
import java.util.function.Function;
import org.apache.avro.specific.SpecificRecord;
import kieker.common.record.IMonitoringRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
......@@ -10,14 +10,14 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.commons.workloadgeneration.functions.Transport;
import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
/**
* Sends monitoring records to Kafka.
*
* @param <T> {@link IMonitoringRecord} to send
*/
public class KafkaRecordSender<T extends SpecificRecord> implements Transport<T> {
public class KafkaRecordSender<T extends IMonitoringRecord> implements Transport<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class);
......@@ -45,10 +45,8 @@ public class KafkaRecordSender<T extends SpecificRecord> implements Transport<T>
// properties.put("linger.ms", this.lingerMs);
// properties.put("buffer.memory", this.bufferMemory);
final SchemaRegistryAvroSerdeFactory avroSerdeFactory =
new SchemaRegistryAvroSerdeFactory(builder.schemaRegistryUrl);
this.producer = new KafkaProducer<>(properties, new StringSerializer(),
avroSerdeFactory.<T>forKeys().serializer());
IMonitoringRecordSerde.serializer());
}
/**
......@@ -56,7 +54,7 @@ public class KafkaRecordSender<T extends SpecificRecord> implements Transport<T>
*
* @param <T> Type of the records that should later be send.
*/
public static class Builder<T extends SpecificRecord> {
public static class Builder<T extends IMonitoringRecord> {
private final String bootstrapServers;
private final String topic;
......
package theodolite.commons.workloadgeneration.generators;
import java.time.Duration;
import org.apache.avro.specific.SpecificRecord;
import kieker.common.record.IMonitoringRecord;
import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender;
import theodolite.commons.workloadgeneration.dimensions.KeySpace;
import theodolite.commons.workloadgeneration.functions.BeforeAction;
......@@ -13,7 +13,7 @@ import theodolite.commons.workloadgeneration.misc.ZooKeeper;
*
* @param <T> The type of records the workload generator is dedicated for.
*/
public class KafkaWorkloadGenerator<T extends SpecificRecord>
public class KafkaWorkloadGenerator<T extends IMonitoringRecord>
extends AbstractWorkloadGenerator<T> {
private final KafkaRecordSender<T> recordSender;
......
......@@ -2,7 +2,7 @@ package theodolite.commons.workloadgeneration.generators;
import java.time.Duration;
import java.util.Objects;
import org.apache.avro.specific.SpecificRecord;
import kieker.common.record.IMonitoringRecord;
import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender;
import theodolite.commons.workloadgeneration.dimensions.KeySpace;
import theodolite.commons.workloadgeneration.functions.BeforeAction;
......@@ -14,7 +14,7 @@ import theodolite.commons.workloadgeneration.misc.ZooKeeper;
*
* @param <T> the record for which the builder is dedicated for.
*/
public final class KafkaWorkloadGeneratorBuilder<T extends SpecificRecord> { // NOPMD
public final class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> { // NOPMD
private int instances; // NOPMD
private ZooKeeper zooKeeper; // NOPMD
......@@ -35,7 +35,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends SpecificRecord> { //
*
* @return the builder.
*/
public static <T extends SpecificRecord> KafkaWorkloadGeneratorBuilder<T> builder() {
public static <T extends IMonitoringRecord> KafkaWorkloadGeneratorBuilder<T> builder() {
return new KafkaWorkloadGeneratorBuilder<>();
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment