Skip to content
Snippets Groups Projects
Commit 3c289fab authored by Björn Vonheiden's avatar Björn Vonheiden
Browse files

Use avro instead of kieker in uc1

Replace the kieker records for reading and writing to/from kafka
with the avro records from Titan.
parent 3db33520
No related branches found
No related tags found
2 merge requests!28Use Titan CC Avro Records in UC App and Workload Generator,!13Migrate to new Titan CC records
Showing
with 87 additions and 34 deletions
...@@ -12,9 +12,10 @@ buildscript { ...@@ -12,9 +12,10 @@ buildscript {
// Variables used to distinct different subprojects // Variables used to distinct different subprojects
def useCaseProjects = subprojects.findAll {it -> it.name.matches('uc(.)*')} def useCaseProjects = subprojects.findAll {it -> it.name.matches('uc(.)*')}
def useCaseApplications = subprojects.findAll {it -> it.name.matches('uc[0-9]+-application')}
def useCaseGenerators = subprojects.findAll {it -> it.name.matches('uc[0-9]+-workload-generator*')}
def commonProjects = subprojects.findAll {it -> it.name.matches('(.)*commons(.)*')} def commonProjects = subprojects.findAll {it -> it.name.matches('(.)*commons(.)*')}
// Plugins // Plugins
allprojects { allprojects {
apply plugin: 'eclipse' apply plugin: 'eclipse'
...@@ -51,19 +52,18 @@ allprojects { ...@@ -51,19 +52,18 @@ allprojects {
maven { maven {
url "https://oss.sonatype.org/content/repositories/snapshots/" url "https://oss.sonatype.org/content/repositories/snapshots/"
} }
maven {
url 'https://packages.confluent.io/maven/'
}
} }
} }
// Dependencies for all use cases // Dependencies for all use case applications
configure(useCaseProjects) { configure(useCaseApplications) {
dependencies { dependencies {
// These dependencies is exported to consumers, that is to say found on their compile classpath.
api('org.industrial-devops:titan-ccp-common:0.0.3-SNAPSHOT') { changing = true }
api 'net.kieker-monitoring:kieker:1.14-SNAPSHOT'
api 'net.sourceforge.teetime:teetime:3.0'
// These dependencies are used internally, and not exposed to consumers on their own compile classpath. // These dependencies are used internally, and not exposed to consumers on their own compile classpath.
implementation 'org.apache.kafka:kafka-clients:2.1.0' implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
implementation 'com.google.guava:guava:24.1-jre' implementation 'com.google.guava:guava:24.1-jre'
implementation 'org.jctools:jctools-core:2.1.1' implementation 'org.jctools:jctools-core:2.1.1'
implementation 'org.slf4j:slf4j-simple:1.6.1' implementation 'org.slf4j:slf4j-simple:1.6.1'
...@@ -74,6 +74,25 @@ configure(useCaseProjects) { ...@@ -74,6 +74,25 @@ configure(useCaseProjects) {
} }
} }
// Dependencies for all use case generators
configure(useCaseGenerators) {
dependencies {
// These dependencies are used internally, and not exposed to consumers on their own compile classpath.
// implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
implementation 'com.google.guava:guava:24.1-jre'
implementation 'org.jctools:jctools-core:2.1.1'
implementation 'org.slf4j:slf4j-simple:1.6.1'
// maintain build of generators
implementation 'net.kieker-monitoring:kieker:1.14-SNAPSHOT'
implementation('org.industrial-devops:titan-ccp-common:0.0.3-SNAPSHOT') { changing = true }
// Use JUnit test framework
testImplementation 'junit:junit:4.12'
}
}
// Dependencies for all commons // Dependencies for all commons
configure(commonProjects) { configure(commonProjects) {
dependencies { dependencies {
...@@ -82,7 +101,8 @@ configure(commonProjects) { ...@@ -82,7 +101,8 @@ configure(commonProjects) {
// These dependencies are used internally, and not exposed to consumers on their own compile classpath. // These dependencies are used internally, and not exposed to consumers on their own compile classpath.
implementation 'org.slf4j:slf4j-simple:1.6.1' implementation 'org.slf4j:slf4j-simple:1.6.1'
implementation('org.industrial-devops:titan-ccp-common:0.0.3-SNAPSHOT') { changing = true } implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
// Use JUnit test framework // Use JUnit test framework
testImplementation 'junit:junit:4.12' testImplementation 'junit:junit:4.12'
......
...@@ -22,6 +22,8 @@ spec: ...@@ -22,6 +22,8 @@ spec:
env: env:
- name: KAFKA_BOOTSTRAP_SERVERS - name: KAFKA_BOOTSTRAP_SERVERS
value: "my-confluent-cp-kafka:9092" value: "my-confluent-cp-kafka:9092"
- name: SCHEMA_REGISTRY_URL
value: "http://my-confluent-cp-schema-registry:8081"
- name: COMMIT_INTERVAL_MS - name: COMMIT_INTERVAL_MS
value: "{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}" value: "{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}"
- name: JAVA_OPTS - name: JAVA_OPTS
...@@ -50,4 +52,4 @@ spec: ...@@ -50,4 +52,4 @@ spec:
volumes: volumes:
- name: jmx-config - name: jmx-config
configMap: configMap:
name: aggregation-jmx-configmap name: aggregation-jmx-configmap
\ No newline at end of file
...@@ -16,10 +16,12 @@ spec: ...@@ -16,10 +16,12 @@ spec:
terminationGracePeriodSeconds: 0 terminationGracePeriodSeconds: 0
containers: containers:
- name: workload-generator - name: workload-generator
image: soerenhenning/uc1-wg:latest image: soerenhenning/uc1-wg:latest
env: env:
- name: KAFKA_BOOTSTRAP_SERVERS - name: KAFKA_BOOTSTRAP_SERVERS
value: "my-confluent-cp-kafka:9092" value: "my-confluent-cp-kafka:9092"
- name: SCHEMA_REGISTRY_URL
value: "http://my-confluent-cp-schema-registry:8081"
- name: NUM_SENSORS - name: NUM_SENSORS
value: "{{NUM_SENSORS}}" value: "{{NUM_SENSORS}}"
- name: POD_NAME - name: POD_NAME
...@@ -28,4 +30,3 @@ spec: ...@@ -28,4 +30,3 @@ spec:
fieldPath: metadata.name fieldPath: metadata.name
- name: INSTANCES - name: INSTANCES
value: "{{INSTANCES}}" value: "{{INSTANCES}}"
\ No newline at end of file
...@@ -17,6 +17,8 @@ public final class ConfigurationKeys { ...@@ -17,6 +17,8 @@ public final class ConfigurationKeys {
public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
public static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
......
...@@ -4,7 +4,7 @@ import java.util.concurrent.CompletableFuture; ...@@ -4,7 +4,7 @@ import java.util.concurrent.CompletableFuture;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams;
import theodolite.uc1.streamprocessing.Uc1KafkaStreamsBuilder; import theodolite.uc1.streamprocessing.Uc1KafkaStreamsBuilder;
import titan.ccp.common.configuration.Configurations; import titan.ccp.common.configuration.ServiceConfigurations;
/** /**
* A microservice that manages the history and, therefore, stores and aggregates incoming * A microservice that manages the history and, therefore, stores and aggregates incoming
...@@ -13,7 +13,7 @@ import titan.ccp.common.configuration.Configurations; ...@@ -13,7 +13,7 @@ import titan.ccp.common.configuration.Configurations;
*/ */
public class HistoryService { public class HistoryService {
private final Configuration config = Configurations.create(); private final Configuration config = ServiceConfigurations.createWithDefaults();
private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); private final CompletableFuture<Void> stopEvent = new CompletableFuture<>();
...@@ -40,6 +40,7 @@ public class HistoryService { ...@@ -40,6 +40,7 @@ public class HistoryService {
.commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS))
.cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING))
.bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS))
.schemaRegistry(this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL))
.build(); .build();
this.stopEvent.thenRun(kafkaStreams::close); this.stopEvent.thenRun(kafkaStreams::close);
......
...@@ -7,8 +7,8 @@ import org.apache.kafka.streams.Topology; ...@@ -7,8 +7,8 @@ import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Consumed;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
import titan.ccp.models.records.ActivePowerRecordFactory; import titan.ccp.model.records.ActivePowerRecord;
/** /**
* Builds Kafka Stream Topology for the History microservice. * Builds Kafka Stream Topology for the History microservice.
...@@ -18,14 +18,19 @@ public class TopologyBuilder { ...@@ -18,14 +18,19 @@ public class TopologyBuilder {
private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class);
private final String inputTopic; private final String inputTopic;
private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory;
private final Gson gson = new Gson(); private final Gson gson = new Gson();
private final StreamsBuilder builder = new StreamsBuilder(); private final StreamsBuilder builder = new StreamsBuilder();
/** /**
* Create a new {@link TopologyBuilder} using the given topics. * Create a new {@link TopologyBuilder} using the given topics.
*/ */
public TopologyBuilder(final String inputTopic) { public TopologyBuilder(final String inputTopic,
final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory) {
this.inputTopic = inputTopic; this.inputTopic = inputTopic;
this.srAvroSerdeFactory = srAvroSerdeFactory;
} }
/** /**
...@@ -35,7 +40,7 @@ public class TopologyBuilder { ...@@ -35,7 +40,7 @@ public class TopologyBuilder {
this.builder this.builder
.stream(this.inputTopic, Consumed.with( .stream(this.inputTopic, Consumed.with(
Serdes.String(), Serdes.String(),
IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) this.srAvroSerdeFactory.<ActivePowerRecord>forKeys()))
.mapValues(v -> this.gson.toJson(v)) .mapValues(v -> this.gson.toJson(v))
.foreach((k, v) -> LOGGER.info("Key: " + k + " Value: " + v)); .foreach((k, v) -> LOGGER.info("Key: " + k + " Value: " + v));
......
...@@ -3,6 +3,7 @@ package theodolite.uc1.streamprocessing; ...@@ -3,6 +3,7 @@ package theodolite.uc1.streamprocessing;
import java.util.Objects; import java.util.Objects;
import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.Topology;
import theodolite.commons.kafkastreams.KafkaStreamsBuilder; import theodolite.commons.kafkastreams.KafkaStreamsBuilder;
import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
/** /**
* Builder for the Kafka Streams configuration. * Builder for the Kafka Streams configuration.
...@@ -18,6 +19,7 @@ public class Uc1KafkaStreamsBuilder extends KafkaStreamsBuilder { ...@@ -18,6 +19,7 @@ public class Uc1KafkaStreamsBuilder extends KafkaStreamsBuilder {
@Override @Override
protected Topology buildTopology() { protected Topology buildTopology() {
Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
return new TopologyBuilder(this.inputTopic).build(); return new TopologyBuilder(this.inputTopic,
new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl)).build();
} }
} }
...@@ -5,6 +5,8 @@ kafka.bootstrap.servers=localhost:9092 ...@@ -5,6 +5,8 @@ kafka.bootstrap.servers=localhost:9092
kafka.input.topic=input kafka.input.topic=input
kafka.output.topic=output kafka.output.topic=output
schema.registry.url=http://localhost:8091
num.threads=1 num.threads=1
commit.interval.ms=100 commit.interval.ms=100
cache.max.bytes.buffering=-1 cache.max.bytes.buffering=-1
...@@ -2,14 +2,14 @@ package theodolite.kafkasender; ...@@ -2,14 +2,14 @@ package theodolite.kafkasender;
import java.util.Properties; import java.util.Properties;
import java.util.function.Function; import java.util.function.Function;
import kieker.common.record.IMonitoringRecord; import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
/** /**
...@@ -17,7 +17,7 @@ import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; ...@@ -17,7 +17,7 @@ import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
* *
* @param <T> {@link IMonitoringRecord} to send * @param <T> {@link IMonitoringRecord} to send
*/ */
public class KafkaRecordSender<T extends IMonitoringRecord> { public class KafkaRecordSender<T extends SpecificRecord> {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class); private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class);
...@@ -29,24 +29,30 @@ public class KafkaRecordSender<T extends IMonitoringRecord> { ...@@ -29,24 +29,30 @@ public class KafkaRecordSender<T extends IMonitoringRecord> {
private final Producer<String, T> producer; private final Producer<String, T> producer;
public KafkaRecordSender(final String bootstrapServers, final String topic) { public KafkaRecordSender(final String bootstrapServers, final String schemaRegistryUrl,
this(bootstrapServers, topic, x -> "", x -> null, new Properties()); final String topic) {
this(bootstrapServers, schemaRegistryUrl, topic, x -> "", x -> null, new Properties());
} }
public KafkaRecordSender(final String bootstrapServers, final String topic, public KafkaRecordSender(final String bootstrapServers, final String schemaRegistryUrl,
final String topic,
final Function<T, String> keyAccessor) { final Function<T, String> keyAccessor) {
this(bootstrapServers, topic, keyAccessor, x -> null, new Properties()); this(bootstrapServers, schemaRegistryUrl, topic, keyAccessor, x -> null, new Properties());
} }
public KafkaRecordSender(final String bootstrapServers, final String topic, public KafkaRecordSender(final String bootstrapServers, final String schemaRegistryUrl,
final String topic,
final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor) { final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor) {
this(bootstrapServers, topic, keyAccessor, timestampAccessor, new Properties()); this(bootstrapServers, schemaRegistryUrl, topic, keyAccessor, timestampAccessor,
new Properties());
} }
/** /**
* Create a new {@link KafkaRecordSender}. * Create a new {@link KafkaRecordSender}.
*/ */
public KafkaRecordSender(final String bootstrapServers, final String topic, public KafkaRecordSender(final String bootstrapServers,
final String schemaRegistryUrl,
final String topic,
final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor, final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor,
final Properties defaultProperties) { final Properties defaultProperties) {
this.topic = topic; this.topic = topic;
...@@ -61,8 +67,12 @@ public class KafkaRecordSender<T extends IMonitoringRecord> { ...@@ -61,8 +67,12 @@ public class KafkaRecordSender<T extends IMonitoringRecord> {
// properties.put("linger.ms", this.lingerMs); // properties.put("linger.ms", this.lingerMs);
// properties.put("buffer.memory", this.bufferMemory); // properties.put("buffer.memory", this.bufferMemory);
this.producer = new KafkaProducer<>(properties, new StringSerializer(), final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory =
IMonitoringRecordSerde.serializer()); new SchemaRegistryAvroSerdeFactory(schemaRegistryUrl);
this.producer = new KafkaProducer<>(properties,
new StringSerializer(),
srAvroSerdeFactory.<T>forKeys().serializer());
} }
/** /**
......
...@@ -15,7 +15,8 @@ import org.apache.kafka.clients.producer.ProducerConfig; ...@@ -15,7 +15,8 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import theodolite.kafkasender.KafkaRecordSender; import theodolite.kafkasender.KafkaRecordSender;
import titan.ccp.models.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
import titan.ccp.model.records.ActivePowerRecord.Builder;
public class LoadGenerator { public class LoadGenerator {
...@@ -35,6 +36,8 @@ public class LoadGenerator { ...@@ -35,6 +36,8 @@ public class LoadGenerator {
final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4")); final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4"));
final String kafkaBootstrapServers = final String kafkaBootstrapServers =
Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092"); Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092");
final String schemaRegistryUrl =
Objects.requireNonNullElse(System.getenv("SCHEMA_REGISTRY_URL"), "http://localhost:8091");
final String kafkaInputTopic = final String kafkaInputTopic =
Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input"); Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input");
final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
...@@ -55,6 +58,7 @@ public class LoadGenerator { ...@@ -55,6 +58,7 @@ public class LoadGenerator {
kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory); kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory);
final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = new KafkaRecordSender<>( final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = new KafkaRecordSender<>(
kafkaBootstrapServers, kafkaBootstrapServers,
schemaRegistryUrl,
kafkaInputTopic, kafkaInputTopic,
r -> r.getIdentifier(), r -> r.getIdentifier(),
r -> r.getTimestamp(), r -> r.getTimestamp(),
...@@ -63,10 +67,14 @@ public class LoadGenerator { ...@@ -63,10 +67,14 @@ public class LoadGenerator {
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(threads); final ScheduledExecutorService executor = Executors.newScheduledThreadPool(threads);
final Random random = new Random(); final Random random = new Random();
final Builder aprBuilder = ActivePowerRecord.newBuilder();
for (final String sensor : sensors) { for (final String sensor : sensors) {
final int initialDelay = random.nextInt(periodMs); final int initialDelay = random.nextInt(periodMs);
executor.scheduleAtFixedRate(() -> { executor.scheduleAtFixedRate(() -> {
kafkaRecordSender.write(new ActivePowerRecord(sensor, System.currentTimeMillis(), value)); aprBuilder.setIdentifier(sensor).setTimestamp(System.currentTimeMillis())
.setValueInW(value);
kafkaRecordSender.write(aprBuilder.build());
}, initialDelay, periodMs, TimeUnit.MILLISECONDS); }, initialDelay, periodMs, TimeUnit.MILLISECONDS);
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment