Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • she/theodolite
1 result
Show changes
Showing
with 224 additions and 147 deletions
...@@ -15,13 +15,15 @@ spec: ...@@ -15,13 +15,15 @@ spec:
terminationGracePeriodSeconds: 0 terminationGracePeriodSeconds: 0
containers: containers:
- name: uc1-application - name: uc1-application
image: "soerenhenning/uc1-app:latest" image: "theodolite/theodolite-uc1-kstreams-app:latest"
ports: ports:
- containerPort: 5555 - containerPort: 5555
name: jmx name: jmx
env: env:
- name: KAFKA_BOOTSTRAP_SERVERS - name: KAFKA_BOOTSTRAP_SERVERS
value: "my-confluent-cp-kafka:9092" value: "my-confluent-cp-kafka:9092"
- name: SCHEMA_REGISTRY_URL
value: "http://my-confluent-cp-schema-registry:8081"
- name: COMMIT_INTERVAL_MS - name: COMMIT_INTERVAL_MS
value: "{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}" value: "{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}"
- name: JAVA_OPTS - name: JAVA_OPTS
...@@ -50,4 +52,4 @@ spec: ...@@ -50,4 +52,4 @@ spec:
volumes: volumes:
- name: jmx-config - name: jmx-config
configMap: configMap:
name: aggregation-jmx-configmap name: aggregation-jmx-configmap
\ No newline at end of file
apiVersion: apps/v1 apiVersion: apps/v1
kind: StatefulSet kind: Deployment
metadata: metadata:
name: titan-ccp-load-generator name: titan-ccp-load-generator
spec: spec:
selector: selector:
matchLabels: matchLabels:
app: titan-ccp-load-generator app: titan-ccp-load-generator
serviceName: titan-ccp-load-generator
replicas: {{INSTANCES}} replicas: {{INSTANCES}}
template: template:
metadata: metadata:
...@@ -16,10 +15,16 @@ spec: ...@@ -16,10 +15,16 @@ spec:
terminationGracePeriodSeconds: 0 terminationGracePeriodSeconds: 0
containers: containers:
- name: workload-generator - name: workload-generator
image: soerenhenning/uc1-wg:latest image: theodolite/theodolite-uc1-workload-generator:latest
env: env:
- name: ZK_HOST
value: "my-confluent-cp-zookeeper"
- name: ZK_PORT
value: "2181"
- name: KAFKA_BOOTSTRAP_SERVERS - name: KAFKA_BOOTSTRAP_SERVERS
value: "my-confluent-cp-kafka:9092" value: "my-confluent-cp-kafka:9092"
- name: SCHEMA_REGISTRY_URL
value: "http://my-confluent-cp-schema-registry:8081"
- name: NUM_SENSORS - name: NUM_SENSORS
value: "{{NUM_SENSORS}}" value: "{{NUM_SENSORS}}"
- name: POD_NAME - name: POD_NAME
...@@ -28,4 +33,3 @@ spec: ...@@ -28,4 +33,3 @@ spec:
fieldPath: metadata.name fieldPath: metadata.name
- name: INSTANCES - name: INSTANCES
value: "{{INSTANCES}}" value: "{{INSTANCES}}"
\ No newline at end of file
...@@ -15,17 +15,21 @@ spec: ...@@ -15,17 +15,21 @@ spec:
terminationGracePeriodSeconds: 0 terminationGracePeriodSeconds: 0
containers: containers:
- name: uc2-application - name: uc2-application
image: "benediktwetzel/uc2-app:latest" image: "theodolite/theodolite-uc2-kstreams-app:latest"
ports: ports:
- containerPort: 5555 - containerPort: 5555
name: jmx name: jmx
env: env:
- name: KAFKA_BOOTSTRAP_SERVERS - name: KAFKA_BOOTSTRAP_SERVERS
value: "my-confluent-cp-kafka:9092" value: "my-confluent-cp-kafka:9092"
- name: SCHEMA_REGISTRY_URL
value: "http://my-confluent-cp-schema-registry:8081"
- name: COMMIT_INTERVAL_MS - name: COMMIT_INTERVAL_MS
value: "{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}" value: "{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}"
- name: JAVA_OPTS - name: JAVA_OPTS
value: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=5555" value: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=5555"
- name: LOG_LEVEL
value: "INFO"
resources: resources:
limits: limits:
memory: "{{MEMORY_LIMIT}}" memory: "{{MEMORY_LIMIT}}"
...@@ -50,4 +54,4 @@ spec: ...@@ -50,4 +54,4 @@ spec:
volumes: volumes:
- name: jmx-config - name: jmx-config
configMap: configMap:
name: aggregation-jmx-configmap name: aggregation-jmx-configmap
\ No newline at end of file
...@@ -6,7 +6,7 @@ spec: ...@@ -6,7 +6,7 @@ spec:
selector: selector:
matchLabels: matchLabels:
app: titan-ccp-load-generator app: titan-ccp-load-generator
replicas: 1 replicas: {{INSTANCES}}
template: template:
metadata: metadata:
labels: labels:
...@@ -15,14 +15,23 @@ spec: ...@@ -15,14 +15,23 @@ spec:
terminationGracePeriodSeconds: 0 terminationGracePeriodSeconds: 0
containers: containers:
- name: workload-generator - name: workload-generator
image: benediktwetzel/uc2-wg:latest image: theodolite/theodolite-uc2-workload-generator:latest
env: env:
- name: ZK_HOST
value: "my-confluent-cp-zookeeper"
- name: ZK_PORT
value: "2181"
- name: KAFKA_BOOTSTRAP_SERVERS - name: KAFKA_BOOTSTRAP_SERVERS
value: "my-confluent-cp-kafka:9092" value: "my-confluent-cp-kafka:9092"
- name: HIERARCHY - name: SCHEMA_REGISTRY_URL
value: "full" value: "http://my-confluent-cp-schema-registry:8081"
- name: NUM_SENSORS - name: NUM_SENSORS
value: "4" value: "4"
- name: NUM_NESTED_GROUPS - name: NUM_NESTED_GROUPS
value: "{{NUM_NESTED_GROUPS}}" value: "{{NUM_NESTED_GROUPS}}"
- name: POD_NAME
\ No newline at end of file valueFrom:
fieldRef:
fieldPath: metadata.name
- name: INSTANCES
value: "{{INSTANCES}}"
...@@ -15,13 +15,15 @@ spec: ...@@ -15,13 +15,15 @@ spec:
terminationGracePeriodSeconds: 0 terminationGracePeriodSeconds: 0
containers: containers:
- name: uc3-application - name: uc3-application
image: "soerenhenning/uc3-app:latest" image: "theodolite/theodolite-uc3-kstreams-app:latest"
ports: ports:
- containerPort: 5555 - containerPort: 5555
name: jmx name: jmx
env: env:
- name: KAFKA_BOOTSTRAP_SERVERS - name: KAFKA_BOOTSTRAP_SERVERS
value: "my-confluent-cp-kafka:9092" value: "my-confluent-cp-kafka:9092"
- name: SCHEMA_REGISTRY_URL
value: "http://my-confluent-cp-schema-registry:8081"
- name: KAFKA_WINDOW_DURATION_MINUTES - name: KAFKA_WINDOW_DURATION_MINUTES
value: "1" value: "1"
- name: COMMIT_INTERVAL_MS - name: COMMIT_INTERVAL_MS
...@@ -52,4 +54,4 @@ spec: ...@@ -52,4 +54,4 @@ spec:
volumes: volumes:
- name: jmx-config - name: jmx-config
configMap: configMap:
name: aggregation-jmx-configmap name: aggregation-jmx-configmap
\ No newline at end of file
apiVersion: apps/v1 apiVersion: apps/v1
kind: StatefulSet kind: Deployment
metadata: metadata:
name: titan-ccp-load-generator name: titan-ccp-load-generator
spec: spec:
selector: selector:
matchLabels: matchLabels:
app: titan-ccp-load-generator app: titan-ccp-load-generator
serviceName: titan-ccp-load-generator
replicas: {{INSTANCES}} replicas: {{INSTANCES}}
template: template:
metadata: metadata:
...@@ -16,10 +15,16 @@ spec: ...@@ -16,10 +15,16 @@ spec:
terminationGracePeriodSeconds: 0 terminationGracePeriodSeconds: 0
containers: containers:
- name: workload-generator - name: workload-generator
image: soerenhenning/uc3-wg:latest image: theodolite/theodolite-uc3-workload-generator:latest
env: env:
- name: ZK_HOST
value: "my-confluent-cp-zookeeper"
- name: ZK_PORT
value: "2181"
- name: KAFKA_BOOTSTRAP_SERVERS - name: KAFKA_BOOTSTRAP_SERVERS
value: "my-confluent-cp-kafka:9092" value: "my-confluent-cp-kafka:9092"
- name: SCHEMA_REGISTRY_URL
value: "http://my-confluent-cp-schema-registry:8081"
- name: NUM_SENSORS - name: NUM_SENSORS
value: "{{NUM_SENSORS}}" value: "{{NUM_SENSORS}}"
- name: POD_NAME - name: POD_NAME
...@@ -28,4 +33,3 @@ spec: ...@@ -28,4 +33,3 @@ spec:
fieldPath: metadata.name fieldPath: metadata.name
- name: INSTANCES - name: INSTANCES
value: "{{INSTANCES}}" value: "{{INSTANCES}}"
\ No newline at end of file
...@@ -15,13 +15,15 @@ spec: ...@@ -15,13 +15,15 @@ spec:
terminationGracePeriodSeconds: 0 terminationGracePeriodSeconds: 0
containers: containers:
- name: uc4-application - name: uc4-application
image: "soerenhenning/uc4-app:latest" image: "theodolite/theodolite-uc4-kstreams-app:latest"
ports: ports:
- containerPort: 5555 - containerPort: 5555
name: jmx name: jmx
env: env:
- name: KAFKA_BOOTSTRAP_SERVERS - name: KAFKA_BOOTSTRAP_SERVERS
value: "my-confluent-cp-kafka:9092" value: "my-confluent-cp-kafka:9092"
- name: SCHEMA_REGISTRY_URL
value: "http://my-confluent-cp-schema-registry:8081"
- name: AGGREGATION_DURATION_DAYS - name: AGGREGATION_DURATION_DAYS
value: "3" #AGGREGATION_DURATION_DAYS value: "3" #AGGREGATION_DURATION_DAYS
- name: AGGREGATION_DURATION_ADVANCE - name: AGGREGATION_DURATION_ADVANCE
...@@ -54,4 +56,4 @@ spec: ...@@ -54,4 +56,4 @@ spec:
volumes: volumes:
- name: jmx-config - name: jmx-config
configMap: configMap:
name: aggregation-jmx-configmap name: aggregation-jmx-configmap
\ No newline at end of file
...@@ -6,7 +6,7 @@ spec: ...@@ -6,7 +6,7 @@ spec:
selector: selector:
matchLabels: matchLabels:
app: titan-ccp-load-generator app: titan-ccp-load-generator
replicas: 1 replicas: {{INSTANCES}}
template: template:
metadata: metadata:
labels: labels:
...@@ -15,10 +15,21 @@ spec: ...@@ -15,10 +15,21 @@ spec:
terminationGracePeriodSeconds: 0 terminationGracePeriodSeconds: 0
containers: containers:
- name: workload-generator - name: workload-generator
image: soerenhenning/uc4-wg:latest image: theodolite/theodolite-uc4-workload-generator:latest
env: env:
- name: ZK_HOST
value: "my-confluent-cp-zookeeper"
- name: ZK_PORT
value: "2181"
- name: KAFKA_BOOTSTRAP_SERVERS - name: KAFKA_BOOTSTRAP_SERVERS
value: "my-confluent-cp-kafka:9092" value: "my-confluent-cp-kafka:9092"
- name: SCHEMA_REGISTRY_URL
value: "http://my-confluent-cp-schema-registry:8081"
- name: NUM_SENSORS - name: NUM_SENSORS
value: "{{NUM_SENSORS}}" value: "{{NUM_SENSORS}}"
- name: POD_NAME
\ No newline at end of file valueFrom:
fieldRef:
fieldPath: metadata.name
- name: INSTANCES
value: "{{INSTANCES}}"
rootProject.name = 'scalability-benchmarking' rootProject.name = 'scalability-benchmarking'
include 'workload-generator-commons'
include 'application-kafkastreams-commons' include 'application-kafkastreams-commons'
include 'uc1-workload-generator' include 'uc1-workload-generator'
......
package theodolite.uc1.application;
/**
* Keys to access configuration parameters.
*/
public final class ConfigurationKeys {
public static final String APPLICATION_NAME = "application.name";
public static final String APPLICATION_VERSION = "application.version";
public static final String NUM_THREADS = "num.threads";
public static final String COMMIT_INTERVAL_MS = "commit.interval.ms";
public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering";
public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
private ConfigurationKeys() {}
}
...@@ -3,8 +3,9 @@ package theodolite.uc1.application; ...@@ -3,8 +3,9 @@ package theodolite.uc1.application;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams;
import theodolite.commons.kafkastreams.ConfigurationKeys;
import theodolite.uc1.streamprocessing.Uc1KafkaStreamsBuilder; import theodolite.uc1.streamprocessing.Uc1KafkaStreamsBuilder;
import titan.ccp.common.configuration.Configurations; import titan.ccp.common.configuration.ServiceConfigurations;
/** /**
* A microservice that manages the history and, therefore, stores and aggregates incoming * A microservice that manages the history and, therefore, stores and aggregates incoming
...@@ -13,7 +14,7 @@ import titan.ccp.common.configuration.Configurations; ...@@ -13,7 +14,7 @@ import titan.ccp.common.configuration.Configurations;
*/ */
public class HistoryService { public class HistoryService {
private final Configuration config = Configurations.create(); private final Configuration config = ServiceConfigurations.createWithDefaults();
private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); private final CompletableFuture<Void> stopEvent = new CompletableFuture<>();
...@@ -40,6 +41,7 @@ public class HistoryService { ...@@ -40,6 +41,7 @@ public class HistoryService {
.commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS))
.cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING))
.bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS))
.schemaRegistry(this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL))
.build(); .build();
this.stopEvent.thenRun(kafkaStreams::close); this.stopEvent.thenRun(kafkaStreams::close);
......
...@@ -7,8 +7,8 @@ import org.apache.kafka.streams.Topology; ...@@ -7,8 +7,8 @@ import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Consumed;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
import titan.ccp.models.records.ActivePowerRecordFactory; import titan.ccp.model.records.ActivePowerRecord;
/** /**
* Builds Kafka Stream Topology for the History microservice. * Builds Kafka Stream Topology for the History microservice.
...@@ -18,14 +18,19 @@ public class TopologyBuilder { ...@@ -18,14 +18,19 @@ public class TopologyBuilder {
private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class);
private final String inputTopic; private final String inputTopic;
private final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory;
private final Gson gson = new Gson(); private final Gson gson = new Gson();
private final StreamsBuilder builder = new StreamsBuilder(); private final StreamsBuilder builder = new StreamsBuilder();
/** /**
* Create a new {@link TopologyBuilder} using the given topics. * Create a new {@link TopologyBuilder} using the given topics.
*/ */
public TopologyBuilder(final String inputTopic) { public TopologyBuilder(final String inputTopic,
final SchemaRegistryAvroSerdeFactory srAvroSerdeFactory) {
this.inputTopic = inputTopic; this.inputTopic = inputTopic;
this.srAvroSerdeFactory = srAvroSerdeFactory;
} }
/** /**
...@@ -35,7 +40,7 @@ public class TopologyBuilder { ...@@ -35,7 +40,7 @@ public class TopologyBuilder {
this.builder this.builder
.stream(this.inputTopic, Consumed.with( .stream(this.inputTopic, Consumed.with(
Serdes.String(), Serdes.String(),
IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) this.srAvroSerdeFactory.<ActivePowerRecord>forValues()))
.mapValues(v -> this.gson.toJson(v)) .mapValues(v -> this.gson.toJson(v))
.foreach((k, v) -> LOGGER.info("Key: " + k + " Value: " + v)); .foreach((k, v) -> LOGGER.info("Key: " + k + " Value: " + v));
......
...@@ -3,6 +3,7 @@ package theodolite.uc1.streamprocessing; ...@@ -3,6 +3,7 @@ package theodolite.uc1.streamprocessing;
import java.util.Objects; import java.util.Objects;
import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.Topology;
import theodolite.commons.kafkastreams.KafkaStreamsBuilder; import theodolite.commons.kafkastreams.KafkaStreamsBuilder;
import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
/** /**
* Builder for the Kafka Streams configuration. * Builder for the Kafka Streams configuration.
...@@ -18,6 +19,7 @@ public class Uc1KafkaStreamsBuilder extends KafkaStreamsBuilder { ...@@ -18,6 +19,7 @@ public class Uc1KafkaStreamsBuilder extends KafkaStreamsBuilder {
@Override @Override
protected Topology buildTopology() { protected Topology buildTopology() {
Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
return new TopologyBuilder(this.inputTopic).build(); return new TopologyBuilder(this.inputTopic,
new SchemaRegistryAvroSerdeFactory(this.schemaRegistryUrl)).build();
} }
} }
...@@ -5,6 +5,8 @@ kafka.bootstrap.servers=localhost:9092 ...@@ -5,6 +5,8 @@ kafka.bootstrap.servers=localhost:9092
kafka.input.topic=input kafka.input.topic=input
kafka.output.topic=output kafka.output.topic=output
schema.registry.url=http://localhost:8091
num.threads=1 num.threads=1
commit.interval.ms=100 commit.interval.ms=100
cache.max.bytes.buffering=-1 cache.max.bytes.buffering=-1
package theodolite.uc1.workloadgenerator; package theodolite.uc1.workloadgenerator;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Objects; import java.util.Objects;
import java.util.Properties; import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import theodolite.kafkasender.KafkaRecordSender; import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender;
import titan.ccp.models.records.ActivePowerRecord; import theodolite.commons.workloadgeneration.dimensions.KeySpace;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder;
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
import titan.ccp.model.records.ActivePowerRecord;
public class LoadGenerator { /**
* Load Generator for UC1.
*/
public final class LoadGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class);
private static final int WL_MAX_RECORDS = 150_000; private static final long MAX_DURATION_IN_DAYS = 30L;
private LoadGenerator() {}
/**
* Entry point.
*/
public static void main(final String[] args) throws InterruptedException, IOException { public static void main(final String[] args) throws InterruptedException, IOException {
// uc1
LOGGER.info("Start workload generator for use case UC1."); LOGGER.info("Start workload generator for use case UC1.");
// get environment variables
final String zooKeeperHost = Objects.requireNonNullElse(System.getenv("ZK_HOST"), "localhost");
final int zooKeeperPort =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("ZK_PORT"), "2181"));
final int numSensors = final int numSensors =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10")); Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10"));
final int instanceId = getInstanceId();
final int periodMs = final int periodMs =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000")); Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000"));
final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); final double value =
final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4")); Double.parseDouble(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"),
"4"));
final String kafkaBootstrapServers = final String kafkaBootstrapServers =
Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092"); Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092");
final String schemaRegistryUrl =
Objects.requireNonNullElse(System.getenv("SCHEMA_REGISTRY_URL"), "http://localhost:8091");
final String kafkaInputTopic = final String kafkaInputTopic =
Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input"); Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input");
final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS"); final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY"); final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
final int instances =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("INSTANCES"), "1"));
final int idStart = instanceId * WL_MAX_RECORDS; // create kafka record sender
final int idEnd = Math.min((instanceId + 1) * WL_MAX_RECORDS, numSensors);
LOGGER.info("Generating data for sensors with IDs from {} to {} (exclusive).", idStart, idEnd);
final List<String> sensors = IntStream.range(idStart, idEnd)
.mapToObj(i -> "s_" + i)
.collect(Collectors.toList());
final Properties kafkaProperties = new Properties(); final Properties kafkaProperties = new Properties();
// kafkaProperties.put("acks", this.acknowledges); // kafkaProperties.put("acks", this.acknowledges);
kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize); kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize);
kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs); kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs);
kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory); kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory);
final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = new KafkaRecordSender<>(
kafkaBootstrapServers,
kafkaInputTopic,
r -> r.getIdentifier(),
r -> r.getTimestamp(),
kafkaProperties);
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(threads);
final Random random = new Random();
for (final String sensor : sensors) { final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender =
final int initialDelay = random.nextInt(periodMs); new KafkaRecordSender.Builder<ActivePowerRecord>(
executor.scheduleAtFixedRate(() -> { kafkaBootstrapServers,
kafkaRecordSender.write(new ActivePowerRecord(sensor, System.currentTimeMillis(), value)); kafkaInputTopic,
}, initialDelay, periodMs, TimeUnit.MILLISECONDS); schemaRegistryUrl)
} .keyAccessor(r -> r.getIdentifier())
.timestampAccessor(r -> r.getTimestamp())
.defaultProperties(kafkaProperties)
.build();
System.out.println("Wait for termination..."); // create workload generator
executor.awaitTermination(30, TimeUnit.DAYS); final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator =
System.out.println("Will terminate now"); KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder()
.instances(instances)
.keySpace(new KeySpace("s_", numSensors))
.threads(threads)
.period(Duration.of(periodMs, ChronoUnit.MILLIS))
.duration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS))
.generatorFunction(
sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value))
.zooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort))
.kafkaRecordSender(kafkaRecordSender)
.build();
// start
workloadGenerator.start();
} }
private static int getInstanceId() {
final String podName = System.getenv("POD_NAME");
if (podName == null) {
return 0;
} else {
return Pattern.compile("-")
.splitAsStream(podName)
.reduce((p, x) -> x)
.map(Integer::parseInt)
.orElse(0);
}
}
} }
...@@ -4,8 +4,9 @@ import java.time.Duration; ...@@ -4,8 +4,9 @@ import java.time.Duration;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams;
import theodolite.commons.kafkastreams.ConfigurationKeys;
import theodolite.uc2.streamprocessing.Uc2KafkaStreamsBuilder; import theodolite.uc2.streamprocessing.Uc2KafkaStreamsBuilder;
import titan.ccp.common.configuration.Configurations; import titan.ccp.common.configuration.ServiceConfigurations;
/** /**
* A microservice that manages the history and, therefore, stores and aggregates incoming * A microservice that manages the history and, therefore, stores and aggregates incoming
...@@ -14,7 +15,7 @@ import titan.ccp.common.configuration.Configurations; ...@@ -14,7 +15,7 @@ import titan.ccp.common.configuration.Configurations;
*/ */
public class AggregationService { public class AggregationService {
private final Configuration config = Configurations.create(); private final Configuration config = ServiceConfigurations.createWithDefaults();
private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); private final CompletableFuture<Void> stopEvent = new CompletableFuture<>();
...@@ -39,16 +40,18 @@ public class AggregationService { ...@@ -39,16 +40,18 @@ public class AggregationService {
final Uc2KafkaStreamsBuilder uc2KafkaStreamsBuilder = new Uc2KafkaStreamsBuilder(); final Uc2KafkaStreamsBuilder uc2KafkaStreamsBuilder = new Uc2KafkaStreamsBuilder();
uc2KafkaStreamsBuilder uc2KafkaStreamsBuilder
.inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC))
.feedbackTopic(this.config.getString(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC))
.outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC))
.configurationTopic(this.config.getString(ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC)) .configurationTopic(this.config.getString(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC))
.windowSize(Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_SIZE_MS))) .emitPeriod(Duration.ofMillis(this.config.getLong(ConfigurationKeys.EMIT_PERIOD_MS)))
.gracePeriod(Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_GRACE_MS))); .gracePeriod(Duration.ofMillis(this.config.getLong(ConfigurationKeys.GRACE_PERIOD_MS)));
// Configuration of the stream application // Configuration of the stream application
final KafkaStreams kafkaStreams = uc2KafkaStreamsBuilder final KafkaStreams kafkaStreams = uc2KafkaStreamsBuilder
.applicationName(this.config.getString(ConfigurationKeys.APPLICATION_NAME)) .applicationName(this.config.getString(ConfigurationKeys.APPLICATION_NAME))
.applicationVersion(this.config.getString(ConfigurationKeys.APPLICATION_VERSION)) .applicationVersion(this.config.getString(ConfigurationKeys.APPLICATION_VERSION))
.bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS))
.schemaRegistry(this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL))
.numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS))
.commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS))
.cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING))
......
package theodolite.uc2.application;
/**
* Keys to access configuration parameters.
*/
public final class ConfigurationKeys {
public static final String APPLICATION_NAME = "application.name";
public static final String APPLICATION_VERSION = "application.version";
public static final String CONFIGURATION_KAFKA_TOPIC = "configuration.kafka.topic";
public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
public static final String WINDOW_SIZE_MS = "window.size.ms";
public static final String WINDOW_GRACE_MS = "window.grace.ms";
public static final String NUM_THREADS = "num.threads";
public static final String COMMIT_INTERVAL_MS = "commit.interval.ms";
public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering";
private ConfigurationKeys() {}
}
...@@ -5,6 +5,7 @@ import java.util.Optional; ...@@ -5,6 +5,7 @@ import java.util.Optional;
import java.util.Set; import java.util.Set;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier; import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.StoreBuilder;
...@@ -13,40 +14,27 @@ import titan.ccp.configuration.events.Event; ...@@ -13,40 +14,27 @@ import titan.ccp.configuration.events.Event;
import titan.ccp.model.sensorregistry.SensorRegistry; import titan.ccp.model.sensorregistry.SensorRegistry;
/** /**
* Factory class configuration required by {@link ChildParentsTransformer}. * Supplier class for a {@link ChildParentsTransformer}.
*/ */
public class ChildParentsTransformerFactory { public class ChildParentsTransformerSupplier implements
TransformerSupplier<Event, SensorRegistry, Iterable<KeyValue<String, Optional<Set<String>>>>> {
private static final String STORE_NAME = "CHILD-PARENTS-TRANSFORM-STATE"; private static final String STORE_NAME = "CHILD-PARENTS-TRANSFORM-STATE";
/** @Override
* Returns a {@link TransformerSupplier} for {@link ChildParentsTransformer}. public Transformer<Event, SensorRegistry, Iterable<KeyValue<String, Optional<Set<String>>>>> get() { // NOCS
*/ return new ChildParentsTransformer(STORE_NAME);
public TransformerSupplier<Event, SensorRegistry, Iterable<KeyValue<String, Optional<Set<String>>>>> getTransformerSupplier() { // NOCS
return new TransformerSupplier<>() {
@Override
public ChildParentsTransformer get() {
return new ChildParentsTransformer(STORE_NAME);
}
};
} }
/** @Override
* Returns a {@link StoreBuilder} for {@link ChildParentsTransformer}. public Set<StoreBuilder<?>> stores() {
*/ final StoreBuilder<KeyValueStore<String, Set<String>>> store = Stores.keyValueStoreBuilder(
public StoreBuilder<KeyValueStore<String, Set<String>>> getStoreBuilder() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(STORE_NAME), Stores.persistentKeyValueStore(STORE_NAME),
Serdes.String(), Serdes.String(),
ParentsSerde.serde()) ParentsSerde.serde())
.withLoggingEnabled(Map.of()); .withLoggingEnabled(Map.of());
}
/** return Set.of(store);
* Returns the store name for {@link ChildParentsTransformer}.
*/
public String getStoreName() {
return STORE_NAME;
} }
} }
...@@ -9,7 +9,7 @@ import org.apache.kafka.streams.KeyValue; ...@@ -9,7 +9,7 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer; import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.KeyValueStore;
import titan.ccp.models.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
/** /**
* Transforms the join result of an {@link ActivePowerRecord} and the corresponding sensor parents * Transforms the join result of an {@link ActivePowerRecord} and the corresponding sensor parents
......
...@@ -4,47 +4,35 @@ import java.util.Map; ...@@ -4,47 +4,35 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier; import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.Stores;
import titan.ccp.models.records.ActivePowerRecord; import titan.ccp.model.records.ActivePowerRecord;
/** /**
* Factory class configuration required by {@link JointFlatTransformerFactory}. * Supplier class for {@link JointFlatTransformerSupplier}.
*/ */
public class JointFlatTransformerFactory { public class JointFlatTransformerSupplier implements
TransformerSupplier<String, JointRecordParents, Iterable<KeyValue<SensorParentKey, ActivePowerRecord>>> { // NOCS
private static final String STORE_NAME = "JOINT-FLAT-MAP-TRANSFORM-STATE"; private static final String STORE_NAME = "JOINT-FLAT-MAP-TRANSFORM-STATE";
/** @Override
* Returns a {@link TransformerSupplier} for {@link JointFlatTransformer}. public Transformer<String, JointRecordParents, Iterable<KeyValue<SensorParentKey, ActivePowerRecord>>> get() { // NOCS
*/ return new JointFlatTransformer(STORE_NAME);
public TransformerSupplier<String, JointRecordParents, Iterable<KeyValue<SensorParentKey, ActivePowerRecord>>> getTransformerSupplier() { // NOCS
return new TransformerSupplier<>() {
@Override
public JointFlatTransformer get() {
return new JointFlatTransformer(STORE_NAME);
}
};
} }
/** @Override
* Returns a {@link StoreBuilder} for {@link JointFlatTransformer}. public Set<StoreBuilder<?>> stores() {
*/ final StoreBuilder<KeyValueStore<String, Set<String>>> store = Stores.keyValueStoreBuilder(
public StoreBuilder<KeyValueStore<String, Set<String>>> getStoreBuilder() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(STORE_NAME), Stores.persistentKeyValueStore(STORE_NAME),
Serdes.String(), Serdes.String(),
ParentsSerde.serde()) ParentsSerde.serde())
.withLoggingEnabled(Map.of()); .withLoggingEnabled(Map.of());
}
/** return Set.of(store);
* Returns the store name for {@link JointFlatTransformer}.
*/
public String getStoreName() {
return STORE_NAME;
} }
} }