Skip to content
Snippets Groups Projects
Commit 351b9ec6 authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'feature/app-wg-with-avro' into 'master'

Use Titan CC Avro Records in UC App and Workload Generator

Closes #1, #10, #16, #18, and #34

See merge request !28
parents 5777c722 6a79ddca
No related branches found
No related tags found
1 merge request!28Use Titan CC Avro Records in UC App and Workload Generator
Pipeline #814 passed
Showing
with 130 additions and 83 deletions
package theodolite.uc4.application; package theodolite.commons.kafkastreams;
/** /**
* Keys to access configuration parameters. * Keys to access configuration parameters.
*/ */
public final class ConfigurationKeys { public final class ConfigurationKeys {
// Common keys
public static final String APPLICATION_NAME = "application.name"; public static final String APPLICATION_NAME = "application.name";
public static final String APPLICATION_VERSION = "application.version"; public static final String APPLICATION_VERSION = "application.version";
public static final String NUM_THREADS = "num.threads";
public static final String COMMIT_INTERVAL_MS = "commit.interval.ms";
public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering";
public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
public static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
// Additional topics
public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days"; public static final String CONFIGURATION_KAFKA_TOPIC = "configuration.kafka.topic";
public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days"; // UC2
public static final String WINDOW_SIZE_MS = "window.size.ms";
public static final String NUM_THREADS = "num.threads"; public static final String WINDOW_GRACE_MS = "window.grace.ms";
public static final String COMMIT_INTERVAL_MS = "commit.interval.ms"; // UC3
public static final String KAFKA_WINDOW_DURATION_MINUTES = "kafka.window.duration.minutes";
public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering"; // UC4
public static final String AGGREGATION_DURATION_DAYS = "aggregation.duration.days";
public static final String AGGREGATION_ADVANCE_DAYS = "aggregation.advance.days";
public static final String KAFKA_WINDOW_DURATION_MINUTES = "kafka.window.duration.minutes";
private ConfigurationKeys() {} private ConfigurationKeys() {}
......
...@@ -13,6 +13,8 @@ import titan.ccp.common.kafka.streams.PropertiesBuilder; ...@@ -13,6 +13,8 @@ import titan.ccp.common.kafka.streams.PropertiesBuilder;
public abstract class KafkaStreamsBuilder { public abstract class KafkaStreamsBuilder {
// Kafkastreams application specific // Kafkastreams application specific
protected String schemaRegistryUrl; // NOPMD for use in subclass
private String applicationName; // NOPMD private String applicationName; // NOPMD
private String applicationVersion; // NOPMD private String applicationVersion; // NOPMD
private String bootstrapServers; // NOPMD private String bootstrapServers; // NOPMD
...@@ -55,6 +57,17 @@ public abstract class KafkaStreamsBuilder { ...@@ -55,6 +57,17 @@ public abstract class KafkaStreamsBuilder {
return this; return this;
} }
/**
* Sets the URL for the schema registry.
*
* @param url The URL of the schema registry.
* @return
*/
public KafkaStreamsBuilder schemaRegistry(final String url) {
this.schemaRegistryUrl = url;
return this;
}
/** /**
* Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus * Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus
* one for using the default. * one for using the default.
...@@ -131,9 +144,10 @@ public abstract class KafkaStreamsBuilder { ...@@ -131,9 +144,10 @@ public abstract class KafkaStreamsBuilder {
*/ */
public KafkaStreams build() { public KafkaStreams build() {
// Check for required attributes for building properties. // Check for required attributes for building properties.
Objects.requireNonNull(this.bootstrapServers, "Bootstrap server has not been set.");
Objects.requireNonNull(this.applicationName, "Application name has not been set."); Objects.requireNonNull(this.applicationName, "Application name has not been set.");
Objects.requireNonNull(this.applicationVersion, "Application version has not been set."); Objects.requireNonNull(this.applicationVersion, "Application version has not been set.");
Objects.requireNonNull(this.bootstrapServers, "Bootstrap server has not been set.");
Objects.requireNonNull(this.schemaRegistryUrl, "Schema registry has not been set.");
// Create the Kafka streams instance. // Create the Kafka streams instance.
return new KafkaStreams(this.buildTopology(), this.buildProperties()); return new KafkaStreams(this.buildTopology(), this.buildProperties());
......
...@@ -12,9 +12,10 @@ buildscript { ...@@ -12,9 +12,10 @@ buildscript {
// Variables used to distinct different subprojects // Variables used to distinct different subprojects
def useCaseProjects = subprojects.findAll {it -> it.name.matches('uc(.)*')} def useCaseProjects = subprojects.findAll {it -> it.name.matches('uc(.)*')}
def useCaseApplications = subprojects.findAll {it -> it.name.matches('uc[0-9]+-application')}
def useCaseGenerators = subprojects.findAll {it -> it.name.matches('uc[0-9]+-workload-generator*')}
def commonProjects = subprojects.findAll {it -> it.name.matches('(.)*commons(.)*')} def commonProjects = subprojects.findAll {it -> it.name.matches('(.)*commons(.)*')}
// Plugins // Plugins
allprojects { allprojects {
apply plugin: 'eclipse' apply plugin: 'eclipse'
...@@ -51,22 +52,38 @@ allprojects { ...@@ -51,22 +52,38 @@ allprojects {
maven { maven {
url "https://oss.sonatype.org/content/repositories/snapshots/" url "https://oss.sonatype.org/content/repositories/snapshots/"
} }
maven {
url 'https://packages.confluent.io/maven/'
}
} }
} }
// Dependencies for all use cases // Dependencies for all use case applications
configure(useCaseProjects) { configure(useCaseApplications) {
dependencies { dependencies {
// These dependencies are exported to consumers, that is to say found on their compile classpath.
api('org.industrial-devops:titan-ccp-common:0.0.4-SNAPSHOT') { changing = true }
// These dependencies are used internally, and not exposed to consumers on their own compile classpath. // These dependencies are used internally, and not exposed to consumers on their own compile classpath.
implementation 'org.apache.kafka:kafka-clients:2.1.0' implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
implementation 'com.google.guava:guava:24.1-jre' implementation 'com.google.guava:guava:24.1-jre'
implementation 'org.jctools:jctools-core:2.1.1' implementation 'org.jctools:jctools-core:2.1.1'
implementation 'org.slf4j:slf4j-simple:1.6.1' implementation 'org.slf4j:slf4j-simple:1.6.1'
implementation project(':application-kafkastreams-commons') implementation project(':application-kafkastreams-commons')
// Use JUnit test framework
testImplementation 'junit:junit:4.12'
}
}
// Dependencies for all use case generators
configure(useCaseGenerators) {
dependencies {
// These dependencies are used internally, and not exposed to consumers on their own compile classpath.
implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
implementation 'com.google.guava:guava:24.1-jre'
implementation 'org.jctools:jctools-core:2.1.1'
implementation 'org.slf4j:slf4j-simple:1.6.1'
// These dependencies are used for the workload-generator-commmon // These dependencies are used for the workload-generator-commmon
implementation project(':workload-generator-commons') implementation project(':workload-generator-commons')
...@@ -78,14 +95,10 @@ configure(useCaseProjects) { ...@@ -78,14 +95,10 @@ configure(useCaseProjects) {
// Dependencies for all commons // Dependencies for all commons
configure(commonProjects) { configure(commonProjects) {
dependencies { dependencies {
// These dependencies is exported to consumers, that is to say found on their compile classpath.
api 'org.apache.kafka:kafka-clients:2.4.0'
api('org.industrial-devops:titan-ccp-common:0.0.3-SNAPSHOT') { changing = true }
api 'net.kieker-monitoring:kieker:1.14-SNAPSHOT'
// These dependencies are used internally, and not exposed to consumers on their own compile classpath. // These dependencies are used internally, and not exposed to consumers on their own compile classpath.
implementation 'org.slf4j:slf4j-simple:1.6.1' implementation 'org.slf4j:slf4j-simple:1.6.1'
implementation('org.industrial-devops:titan-ccp-common:0.0.4-SNAPSHOT') { changing = true } implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true }
implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true }
// Use JUnit test framework // Use JUnit test framework
testImplementation 'junit:junit:4.12' testImplementation 'junit:junit:4.12'
......
...@@ -22,6 +22,8 @@ spec: ...@@ -22,6 +22,8 @@ spec:
env: env:
- name: KAFKA_BOOTSTRAP_SERVERS - name: KAFKA_BOOTSTRAP_SERVERS
value: "my-confluent-cp-kafka:9092" value: "my-confluent-cp-kafka:9092"
- name: SCHEMA_REGISTRY_URL
value: "http://my-confluent-cp-schema-registry:8081"
- name: COMMIT_INTERVAL_MS - name: COMMIT_INTERVAL_MS
value: "{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}" value: "{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}"
- name: JAVA_OPTS - name: JAVA_OPTS
...@@ -50,4 +52,4 @@ spec: ...@@ -50,4 +52,4 @@ spec:
volumes: volumes:
- name: jmx-config - name: jmx-config
configMap: configMap:
name: aggregation-jmx-configmap name: aggregation-jmx-configmap
\ No newline at end of file
...@@ -24,6 +24,8 @@ spec: ...@@ -24,6 +24,8 @@ spec:
value: "2181" value: "2181"
- name: KAFKA_BOOTSTRAP_SERVERS - name: KAFKA_BOOTSTRAP_SERVERS
value: "my-confluent-cp-kafka:9092" value: "my-confluent-cp-kafka:9092"
- name: SCHEMA_REGISTRY_URL
value: "http://my-confluent-cp-schema-registry:8081"
- name: NUM_SENSORS - name: NUM_SENSORS
value: "{{NUM_SENSORS}}" value: "{{NUM_SENSORS}}"
- name: POD_NAME - name: POD_NAME
...@@ -32,4 +34,3 @@ spec: ...@@ -32,4 +34,3 @@ spec:
fieldPath: metadata.name fieldPath: metadata.name
- name: INSTANCES - name: INSTANCES
value: "{{INSTANCES}}" value: "{{INSTANCES}}"
\ No newline at end of file
...@@ -22,6 +22,8 @@ spec: ...@@ -22,6 +22,8 @@ spec:
env: env:
- name: KAFKA_BOOTSTRAP_SERVERS - name: KAFKA_BOOTSTRAP_SERVERS
value: "my-confluent-cp-kafka:9092" value: "my-confluent-cp-kafka:9092"
- name: SCHEMA_REGISTRY_URL
value: "http://my-confluent-cp-schema-registry:8081"
- name: COMMIT_INTERVAL_MS - name: COMMIT_INTERVAL_MS
value: "{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}" value: "{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}"
- name: JAVA_OPTS - name: JAVA_OPTS
...@@ -50,4 +52,4 @@ spec: ...@@ -50,4 +52,4 @@ spec:
volumes: volumes:
- name: jmx-config - name: jmx-config
configMap: configMap:
name: aggregation-jmx-configmap name: aggregation-jmx-configmap
\ No newline at end of file
...@@ -23,10 +23,11 @@ spec: ...@@ -23,10 +23,11 @@ spec:
value: "2181" value: "2181"
- name: KAFKA_BOOTSTRAP_SERVERS - name: KAFKA_BOOTSTRAP_SERVERS
value: "my-confluent-cp-kafka:9092" value: "my-confluent-cp-kafka:9092"
- name: SCHEMA_REGISTRY_URL
value: "http://my-confluent-cp-schema-registry:8081"
- name: HIERARCHY - name: HIERARCHY
value: "full" value: "full"
- name: NUM_SENSORS - name: NUM_SENSORS
value: "4" value: "4"
- name: NUM_NESTED_GROUPS - name: NUM_NESTED_GROUPS
value: "{{NUM_NESTED_GROUPS}}" value: "{{NUM_NESTED_GROUPS}}"
\ No newline at end of file
...@@ -22,6 +22,8 @@ spec: ...@@ -22,6 +22,8 @@ spec:
env: env:
- name: KAFKA_BOOTSTRAP_SERVERS - name: KAFKA_BOOTSTRAP_SERVERS
value: "my-confluent-cp-kafka:9092" value: "my-confluent-cp-kafka:9092"
- name: SCHEMA_REGISTRY_URL
value: "http://my-confluent-cp-schema-registry:8081"
- name: KAFKA_WINDOW_DURATION_MINUTES - name: KAFKA_WINDOW_DURATION_MINUTES
value: "1" value: "1"
- name: COMMIT_INTERVAL_MS - name: COMMIT_INTERVAL_MS
...@@ -52,4 +54,4 @@ spec: ...@@ -52,4 +54,4 @@ spec:
volumes: volumes:
- name: jmx-config - name: jmx-config
configMap: configMap:
name: aggregation-jmx-configmap name: aggregation-jmx-configmap
\ No newline at end of file
...@@ -24,6 +24,8 @@ spec: ...@@ -24,6 +24,8 @@ spec:
value: "2181" value: "2181"
- name: KAFKA_BOOTSTRAP_SERVERS - name: KAFKA_BOOTSTRAP_SERVERS
value: "my-confluent-cp-kafka:9092" value: "my-confluent-cp-kafka:9092"
- name: SCHEMA_REGISTRY_URL
value: "http://my-confluent-cp-schema-registry:8081"
- name: NUM_SENSORS - name: NUM_SENSORS
value: "{{NUM_SENSORS}}" value: "{{NUM_SENSORS}}"
- name: POD_NAME - name: POD_NAME
...@@ -32,4 +34,3 @@ spec: ...@@ -32,4 +34,3 @@ spec:
fieldPath: metadata.name fieldPath: metadata.name
- name: INSTANCES - name: INSTANCES
value: "{{INSTANCES}}" value: "{{INSTANCES}}"
\ No newline at end of file
...@@ -22,6 +22,8 @@ spec: ...@@ -22,6 +22,8 @@ spec:
env: env:
- name: KAFKA_BOOTSTRAP_SERVERS - name: KAFKA_BOOTSTRAP_SERVERS
value: "my-confluent-cp-kafka:9092" value: "my-confluent-cp-kafka:9092"
- name: SCHEMA_REGISTRY_URL
value: "http://my-confluent-cp-schema-registry:8081"
- name: AGGREGATION_DURATION_DAYS - name: AGGREGATION_DURATION_DAYS
value: "3" #AGGREGATION_DURATION_DAYS value: "3" #AGGREGATION_DURATION_DAYS
- name: AGGREGATION_DURATION_ADVANCE - name: AGGREGATION_DURATION_ADVANCE
...@@ -54,4 +56,4 @@ spec: ...@@ -54,4 +56,4 @@ spec:
volumes: volumes:
- name: jmx-config - name: jmx-config
configMap: configMap:
name: aggregation-jmx-configmap name: aggregation-jmx-configmap
\ No newline at end of file
...@@ -23,6 +23,7 @@ spec: ...@@ -23,6 +23,7 @@ spec:
value: "2181" value: "2181"
- name: KAFKA_BOOTSTRAP_SERVERS - name: KAFKA_BOOTSTRAP_SERVERS
value: "my-confluent-cp-kafka:9092" value: "my-confluent-cp-kafka:9092"
- name: SCHEMA_REGISTRY_URL
value: "http://my-confluent-cp-schema-registry:8081"
- name: NUM_SENSORS - name: NUM_SENSORS
value: "{{NUM_SENSORS}}" value: "{{NUM_SENSORS}}"
\ No newline at end of file
package theodolite.uc1.application;
/**
* Keys to access configuration parameters.
*/
public final class ConfigurationKeys {
public static final String APPLICATION_NAME = "application.name";
public static final String APPLICATION_VERSION = "application.version";
public static final String NUM_THREADS = "num.threads";
public static final String COMMIT_INTERVAL_MS = "commit.interval.ms";
public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering";
public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
private ConfigurationKeys() {}
}
...@@ -3,8 +3,9 @@ package theodolite.uc1.application; ...@@ -3,8 +3,9 @@ package theodolite.uc1.application;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams;
import theodolite.commons.kafkastreams.ConfigurationKeys;
import theodolite.uc1.streamprocessing.Uc1KafkaStreamsBuilder; import theodolite.uc1.streamprocessing.Uc1KafkaStreamsBuilder;
import titan.ccp.common.configuration.Configurations; import titan.ccp.common.configuration.ServiceConfigurations;
/** /**
* A microservice that manages the history and, therefore, stores and aggregates incoming * A microservice that manages the history and, therefore, stores and aggregates incoming
...@@ -13,7 +14,7 @@ import titan.ccp.common.configuration.Configurations; ...@@ -13,7 +14,7 @@ import titan.ccp.common.configuration.Configurations;
*/ */
public class HistoryService { public class HistoryService {
private final Configuration config = Configurations.create(); private final Configuration config = ServiceConfigurations.createWithDefaults();
private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); private final CompletableFuture<Void> stopEvent = new CompletableFuture<>();
...@@ -40,6 +41,7 @@ public class HistoryService { ...@@ -40,6 +41,7 @@ public class HistoryService {
.commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS))
.cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING))
.bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS))
.schemaRegistry(this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL))
.build(); .build();
this.stopEvent.thenRun(kafkaStreams::close); this.stopEvent.thenRun(kafkaStreams::close);
......
...@@ -7,8 +7,8 @@ import org.apache.kafka.streams.Topology; ...@@ -7,8 +7,8 @@ import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Consumed;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
import titan.ccp.models.records.ActivePowerRecordFactory; import titan.ccp.model.records.ActivePowerRecord;
/** /**
* Builds Kafka Stream Topology for the History microservice. * Builds Kafka Stream Topology for the History microservice.
...@@ -18,14 +18,19 @@ public class TopologyBuilder { ...@@ -18,14 +18,19 @@ public class TopologyBuilder {
private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class);
private final String inputTopic; private final String inputTopic;
private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory;
private final Gson gson = new Gson(); private final Gson gson = new Gson();
private final StreamsBuilder builder = new StreamsBuilder(); private final StreamsBuilder builder = new StreamsBuilder();
/** /**
* Create a new {@link TopologyBuilder} using the given topics. * Create a new {@link TopologyBuilder} using the given topics.
*/ */
public TopologyBuilder(final String inputTopic) { public TopologyBuilder(final String inputTopic,
final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory) {
this.inputTopic = inputTopic; this.inputTopic = inputTopic;
this.srAvroSerdeFactory = srAvroSerdeFactory;
} }
/** /**
...@@ -35,7 +40,7 @@ public class TopologyBuilder { ...@@ -35,7 +40,7 @@ public class TopologyBuilder {
this.builder this.builder
.stream(this.inputTopic, Consumed.with( .stream(this.inputTopic, Consumed.with(
Serdes.String(), Serdes.String(),
IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) this.srAvroSerdeFactory.<ActivePowerRecord>forValues()))
.mapValues(v -> this.gson.toJson(v)) .mapValues(v -> this.gson.toJson(v))
.foreach((k, v) -> LOGGER.info("Key: " + k + " Value: " + v)); .foreach((k, v) -> LOGGER.info("Key: " + k + " Value: " + v));
......
...@@ -3,6 +3,7 @@ package theodolite.uc1.streamprocessing; ...@@ -3,6 +3,7 @@ package theodolite.uc1.streamprocessing;
import java.util.Objects; import java.util.Objects;
import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.Topology;
import theodolite.commons.kafkastreams.KafkaStreamsBuilder; import theodolite.commons.kafkastreams.KafkaStreamsBuilder;
import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
/** /**
* Builder for the Kafka Streams configuration. * Builder for the Kafka Streams configuration.
...@@ -18,6 +19,7 @@ public class Uc1KafkaStreamsBuilder extends KafkaStreamsBuilder { ...@@ -18,6 +19,7 @@ public class Uc1KafkaStreamsBuilder extends KafkaStreamsBuilder {
@Override @Override
protected Topology buildTopology() { protected Topology buildTopology() {
Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
return new TopologyBuilder(this.inputTopic).build(); return new TopologyBuilder(this.inputTopic,
new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl)).build();
} }
} }
...@@ -5,6 +5,8 @@ kafka.bootstrap.servers=localhost:9092 ...@@ -5,6 +5,8 @@ kafka.bootstrap.servers=localhost:9092
kafka.input.topic=input kafka.input.topic=input
kafka.output.topic=output kafka.output.topic=output
schema.registry.url=http://localhost:8091
num.threads=1 num.threads=1
commit.interval.ms=100 commit.interval.ms=100
cache.max.bytes.buffering=-1 cache.max.bytes.buffering=-1
...@@ -13,7 +13,7 @@ import theodolite.commons.workloadgeneration.dimensions.KeySpace; ...@@ -13,7 +13,7 @@ import theodolite.commons.workloadgeneration.dimensions.KeySpace;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator; import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder; import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder;
import theodolite.commons.workloadgeneration.misc.ZooKeeper; import theodolite.commons.workloadgeneration.misc.ZooKeeper;
import titan.ccp.models.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
/** /**
* Load Generator for UC1. * Load Generator for UC1.
...@@ -41,11 +41,14 @@ public final class LoadGenerator { ...@@ -41,11 +41,14 @@ public final class LoadGenerator {
Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10")); Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10"));
final int periodMs = final int periodMs =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000")); Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000"));
final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); final double value =
Double.parseDouble(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"),
"4")); "4"));
final String kafkaBootstrapServers = final String kafkaBootstrapServers =
Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092"); Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092");
final String schemaRegistryUrl =
Objects.requireNonNullElse(System.getenv("SCHEMA_REGISTRY_URL"), "http://localhost:8091");
final String kafkaInputTopic = final String kafkaInputTopic =
Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input"); Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input");
final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
...@@ -60,25 +63,29 @@ public final class LoadGenerator { ...@@ -60,25 +63,29 @@ public final class LoadGenerator {
kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize); kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize);
kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs); kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs);
kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory); kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory);
final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = new KafkaRecordSender<>(
kafkaBootstrapServers, final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender =
kafkaInputTopic, new KafkaRecordSender.Builder<ActivePowerRecord>(
r -> r.getIdentifier(), kafkaBootstrapServers,
r -> r.getTimestamp(), kafkaInputTopic,
kafkaProperties); schemaRegistryUrl)
.keyAccessor(r -> r.getIdentifier())
.timestampAccessor(r -> r.getTimestamp())
.defaultProperties(kafkaProperties)
.build();
// create workload generator // create workload generator
final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator = final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator =
KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder() KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder()
.setInstances(instances) .instances(instances)
.setKeySpace(new KeySpace("s_", numSensors)) .keySpace(new KeySpace("s_", numSensors))
.setThreads(threads) .threads(threads)
.setPeriod(Duration.of(periodMs, ChronoUnit.MILLIS)) .period(Duration.of(periodMs, ChronoUnit.MILLIS))
.setDuration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS)) .duration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS))
.setGeneratorFunction( .generatorFunction(
sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value)) sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value))
.setZooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort)) .zooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort))
.setKafkaRecordSender(kafkaRecordSender) .kafkaRecordSender(kafkaRecordSender)
.build(); .build();
// start // start
......
...@@ -4,8 +4,9 @@ import java.time.Duration; ...@@ -4,8 +4,9 @@ import java.time.Duration;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams;
import theodolite.commons.kafkastreams.ConfigurationKeys;
import theodolite.uc2.streamprocessing.Uc2KafkaStreamsBuilder; import theodolite.uc2.streamprocessing.Uc2KafkaStreamsBuilder;
import titan.ccp.common.configuration.Configurations; import titan.ccp.common.configuration.ServiceConfigurations;
/** /**
* A microservice that manages the history and, therefore, stores and aggregates incoming * A microservice that manages the history and, therefore, stores and aggregates incoming
...@@ -14,7 +15,7 @@ import titan.ccp.common.configuration.Configurations; ...@@ -14,7 +15,7 @@ import titan.ccp.common.configuration.Configurations;
*/ */
public class AggregationService { public class AggregationService {
private final Configuration config = Configurations.create(); private final Configuration config = ServiceConfigurations.createWithDefaults();
private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); private final CompletableFuture<Void> stopEvent = new CompletableFuture<>();
...@@ -49,6 +50,7 @@ public class AggregationService { ...@@ -49,6 +50,7 @@ public class AggregationService {
.applicationName(this.config.getString(ConfigurationKeys.APPLICATION_NAME)) .applicationName(this.config.getString(ConfigurationKeys.APPLICATION_NAME))
.applicationVersion(this.config.getString(ConfigurationKeys.APPLICATION_VERSION)) .applicationVersion(this.config.getString(ConfigurationKeys.APPLICATION_VERSION))
.bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS))
.schemaRegistry(this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL))
.numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS))
.commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS))
.cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING))
......
...@@ -9,7 +9,7 @@ import org.apache.kafka.streams.KeyValue; ...@@ -9,7 +9,7 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer; import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.KeyValueStore;
import titan.ccp.models.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
/** /**
* Transforms the join result of an {@link ActivePowerRecord} and the corresponding sensor parents * Transforms the join result of an {@link ActivePowerRecord} and the corresponding sensor parents
......
...@@ -8,7 +8,7 @@ import org.apache.kafka.streams.kstream.TransformerSupplier; ...@@ -8,7 +8,7 @@ import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.Stores;
import titan.ccp.models.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
/** /**
* Factory class configuration required by {@link JointFlatTransformerFactory}. * Factory class configuration required by {@link JointFlatTransformerFactory}.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment