Skip to content
Snippets Groups Projects
Commit 67652eba authored by Sören Henning's avatar Sören Henning
Browse files

Merge branch 'uc_implementation' into 'master'

Add Implementations of Use Cases

See merge request soerenhenning/scalability-benchmarking!1
parents 166a6ca2 65b2fed6
No related branches found
No related tags found
1 merge request!1Add Implementations of Use Cases
Pipeline #363 passed
Showing
with 330 additions and 467 deletions
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
expose:
- "9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka #172.17.0.1 # Replace with docker network
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 30000
KAFKA_CREATE_TOPICS: "input:3:1,output:3:1,configuration:3:1,aggregation-feedback:3:1,dayofweek:3:1,hourofday:3:1,hourofweek:3:1"
uc-app:
image: benediktwetzel/uc1-app:latest
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
uc-wg:
image: benediktwetzel/uc1-wg:latest
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
NUM_SENSORS: 1
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
expose:
- "9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka #172.17.0.1 # Replace with docker network
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 30000
KAFKA_CREATE_TOPICS: "input:3:1,output:3:1,configuration:3:1,aggregation-feedback:3:1,dayofweek:3:1,hourofday:3:1,hourofweek:3:1"
uc-app:
image: benediktwetzel/uc2-app:latest
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
uc-wg:
image: benediktwetzel/uc2-wg:latest
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
NUM_SENSORS: 1
\ No newline at end of file
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
expose:
- "9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka #172.17.0.1 # Replace with docker network
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 30000
KAFKA_CREATE_TOPICS: "input:3:1,output:3:1,configuration:3:1,aggregation-feedback:3:1,dayofweek:3:1,hourofday:3:1,hourofweek:3:1"
uc-app:
image: benediktwetzel/uc3-app:latest
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
KAFKA_WINDOW_DURATION_MINUTES: 60
uc-wg:
image: benediktwetzel/uc3-wg:latest
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
NUM_SENSORS: 1
\ No newline at end of file
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
expose:
- "9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka #172.17.0.1 # Replace with docker network
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 30000
KAFKA_CREATE_TOPICS: "input:3:1,output:3:1,configuration:3:1,aggregation-feedback:3:1,dayofweek:3:1,hourofday:3:1,hourofweek:3:1"
uc-app:
image: benediktwetzel/uc2-app:latest #TODO
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
uc-wg:
image: benediktwetzel/uc2-wg:latest #TODO
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
FROM openjdk:11-slim
ADD build/distributions/exp-bigdata19-bridge.tar /
ADD build/distributions/uc1-application.tar /
CMD export JAVA_OPTS=-Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL \
&& /exp-bigdata19-bridge/bin/exp-bigdata19-bridge
\ No newline at end of file
CMD JAVA_OPTS="$JAVA_OPTS -Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL" \
/uc1-application/bin/uc1-application
\ No newline at end of file
......@@ -21,7 +21,7 @@ dependencies {
testCompile 'junit:junit:4.12'
}
mainClassName = "titan.ccp.kiekerbridge.expbigdata19.ExperimentorBigData"
mainClassName = "uc1.application.HistoryService"
eclipse {
classpath {
......
package titan.ccp.kiekerbridge.expbigdata19;
import java.io.IOException;
import java.util.Objects;
public class ExperimentorBigData {
public static void main(final String[] args) throws InterruptedException, IOException {
final String modus = Objects.requireNonNullElse(System.getenv("MODUS"), "LoadCounter");
if (modus.equals("LoadGenerator")) {
LoadGenerator.main(args);
} else if (modus.equals("LoadGeneratorExtrem")) {
LoadGeneratorExtrem.main(args);
} else if (modus.equals("LoadCounter")) {
LoadCounter.main(args);
}
}
}
package titan.ccp.kiekerbridge.expbigdata19;
import com.google.common.math.StatsAccumulator;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.Deserializer;
import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
import titan.ccp.models.records.AggregatedActivePowerRecord;
import titan.ccp.models.records.AggregatedActivePowerRecordFactory;
public class LoadCounter {
public static void main(final String[] args) throws InterruptedException {
final String kafkaBootstrapServers =
Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092");
final String kafkaInputTopic =
Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input");
final String kafkaOutputTopic =
Objects.requireNonNullElse(System.getenv("KAFKA_OUTPUT_TOPIC"), "output");
final Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaBootstrapServers);
props.setProperty("group.id", "load-counter");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("max.poll.records", "1000000");
props.setProperty("max.partition.fetch.bytes", "134217728"); // 128 MB
props.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
final Deserializer<AggregatedActivePowerRecord> deserializer =
IMonitoringRecordSerde.deserializer(new AggregatedActivePowerRecordFactory());
final KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of(kafkaInputTopic, kafkaOutputTopic));
executor.scheduleAtFixedRate(
() -> {
final long time = System.currentTimeMillis();
final ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(500));
long inputCount = 0;
for (final ConsumerRecord<String, byte[]> inputRecord : records
.records(kafkaInputTopic)) {
inputCount++;
}
long outputCount = 0;
final StatsAccumulator statsAccumulator = new StatsAccumulator();
for (final ConsumerRecord<String, byte[]> outputRecord : records
.records(kafkaOutputTopic)) {
outputCount++;
final AggregatedActivePowerRecord record =
deserializer.deserialize(kafkaOutputTopic, outputRecord.value());
final long latency = time - record.getTimestamp();
statsAccumulator.add(latency);
}
final double latency = statsAccumulator.count() > 0 ? statsAccumulator.mean() : 0.0;
final long elapsedTime = System.currentTimeMillis() - time;
System.out
.println("input," + time + ',' + elapsedTime + ',' + 0 + ',' + inputCount);
System.out
.println("output," + time + ',' + elapsedTime + ',' + latency + ',' + outputCount);
},
0,
1,
TimeUnit.SECONDS);
}
}
package titan.ccp.kiekerbridge.expbigdata19;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.ProducerConfig;
import titan.ccp.configuration.events.Event;
import titan.ccp.kiekerbridge.KafkaRecordSender;
import titan.ccp.model.sensorregistry.MutableAggregatedSensor;
import titan.ccp.model.sensorregistry.MutableSensorRegistry;
import titan.ccp.models.records.ActivePowerRecord;
public class LoadGenerator {
public static void main(final String[] args) throws InterruptedException, IOException {
final String hierarchy = Objects.requireNonNullElse(System.getenv("HIERARCHY"), "deep");
final int numNestedGroups =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_NESTED_GROUPS"), "1"));
final int numSensor =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "1"));
final int periodMs =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000"));
final int value =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
final boolean sendRegistry =
Boolean.parseBoolean(Objects.requireNonNullElse(System.getenv("SEND_REGISTRY"), "true"));
final int threads =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4"));
final String kafkaBootstrapServers =
Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092");
final String kafkaInputTopic =
Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input");
final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0");
if (hierarchy.equals("deep")) {
MutableAggregatedSensor lastSensor = sensorRegistry.getTopLevelSensor();
for (int lvl = 1; lvl < numNestedGroups; lvl++) {
lastSensor = lastSensor.addChildAggregatedSensor("group_lvl_" + lvl);
}
for (int s = 0; s < numSensor; s++) {
lastSensor.addChildMachineSensor("sensor_" + s);
}
} else if (hierarchy.equals("full")) {
addChildren(sensorRegistry.getTopLevelSensor(), numSensor, 1, numNestedGroups, 0);
} else {
throw new IllegalStateException();
}
final List<String> sensors =
sensorRegistry.getMachineSensors().stream().map(s -> s.getIdentifier())
.collect(Collectors.toList());
if (sendRegistry) {
final ConfigPublisher configPublisher =
new ConfigPublisher(kafkaBootstrapServers, "configuration");
configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson());
configPublisher.close();
System.out.println("Configuration sent.");
System.out.println("Now wait 30 seconds");
Thread.sleep(30_000);
System.out.println("And woke up again :)");
}
final Properties kafkaProperties = new Properties();
// kafkaProperties.put("acks", this.acknowledges);
kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize);
kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs);
kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory);
final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = new KafkaRecordSender<>(
kafkaBootstrapServers, kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(),
kafkaProperties);
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(threads);
final Random random = new Random();
for (final String sensor : sensors) {
final int initialDelay = random.nextInt(periodMs);
executor.scheduleAtFixedRate(
() -> {
kafkaRecordSender.write(new ActivePowerRecord(
sensor,
System.currentTimeMillis(),
value));
},
initialDelay,
periodMs,
TimeUnit.MILLISECONDS);
}
System.out.println("Wait for termination...");
executor.awaitTermination(30, TimeUnit.DAYS);
System.out.println("Will terminate now");
}
private static int addChildren(final MutableAggregatedSensor parent, final int numChildren,
final int lvl, final int maxLvl, int nextId) {
for (int c = 0; c < numChildren; c++) {
if (lvl == maxLvl) {
parent.addChildMachineSensor("s_" + nextId);
nextId++;
} else {
final MutableAggregatedSensor newParent =
parent.addChildAggregatedSensor("g_" + lvl + '_' + nextId);
nextId++;
nextId = addChildren(newParent, numChildren, lvl + 1, maxLvl, nextId);
}
}
return nextId;
}
}
package uc1.application;
/**
* Keys to access configuration parameters.
*/
public final class ConfigurationKeys {
public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic";
public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic";
public static final String NUM_THREADS = "num.threads";
public static final String COMMIT_INTERVAL_MS = "commit.interval.ms";
public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering";
private ConfigurationKeys() {
}
}
package uc1.application;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.configuration2.Configuration;
import org.apache.kafka.streams.KafkaStreams;
import titan.ccp.common.configuration.Configurations;
import uc1.streamprocessing.KafkaStreamsBuilder;
/**
* A microservice that manages the history and, therefore, stores and aggregates
* incoming measurements.
*
*/
public class HistoryService {
private final Configuration config = Configurations.create();
private final CompletableFuture<Void> stopEvent = new CompletableFuture<>();
/**
* Start the service.
*
* @return {@link CompletableFuture} which is completed when the service is
* successfully started.
*/
public void run() {
this.createKafkaStreamsApplication();
}
/**
* Build and start the underlying Kafka Streams application of the service.
*
*/
private void createKafkaStreamsApplication() {
final KafkaStreams kafkaStreams = new KafkaStreamsBuilder()
.bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS))
.inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC))
.numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS))
.commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS))
.cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)).build();
this.stopEvent.thenRun(kafkaStreams::close);
kafkaStreams.start();
}
public static void main(final String[] args) {
new HistoryService().run();
}
}
package uc1.streamprocessing;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import titan.ccp.common.kafka.streams.PropertiesBuilder;
/**
* Builder for the Kafka Streams configuration.
*/
public class KafkaStreamsBuilder {
private static final String APPLICATION_NAME = "titan-ccp-history";
private static final String APPLICATION_VERSION = "0.0.1";
// private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStreamsBuilder.class);
private String bootstrapServers; // NOPMD
private String inputTopic; // NOPMD
private int numThreads = -1; // NOPMD
private int commitIntervalMs = -1; // NOPMD
private int cacheMaxBytesBuff = -1; // NOPMD
public KafkaStreamsBuilder inputTopic(final String inputTopic) {
this.inputTopic = inputTopic;
return this;
}
public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
return this;
}
/**
* Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus
* one for using the default.
*/
public KafkaStreamsBuilder numThreads(final int numThreads) {
if (numThreads < -1 || numThreads == 0) {
throw new IllegalArgumentException("Number of threads must be greater 0 or -1.");
}
this.numThreads = numThreads;
return this;
}
/**
* Sets the Kafka Streams property for the frequency with which to save the position (offsets in
* source topics) of tasks (commit.interval.ms). Must be zero for processing all record, for
* example, when processing bulks of records. Can be minus one for using the default.
*/
public KafkaStreamsBuilder commitIntervalMs(final int commitIntervalMs) {
if (commitIntervalMs < -1) {
throw new IllegalArgumentException("Commit interval must be greater or equal -1.");
}
this.commitIntervalMs = commitIntervalMs;
return this;
}
/**
* Sets the Kafka Streams property for maximum number of memory bytes to be used for record caches
* across all threads (cache.max.bytes.buffering). Must be zero for processing all record, for
* example, when processing bulks of records. Can be minus one for using the default.
*/
public KafkaStreamsBuilder cacheMaxBytesBuffering(final int cacheMaxBytesBuffering) {
if (cacheMaxBytesBuffering < -1) {
throw new IllegalArgumentException("Cache max bytes buffering must be greater or equal -1.");
}
this.cacheMaxBytesBuff = cacheMaxBytesBuffering;
return this;
}
/**
* Builds the {@link KafkaStreams} instance.
*/
public KafkaStreams build() {
Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
// TODO log parameters
final TopologyBuilder topologyBuilder = new TopologyBuilder(
this.inputTopic);
final Properties properties = PropertiesBuilder
.bootstrapServers(this.bootstrapServers)
.applicationId(APPLICATION_NAME + '-' + APPLICATION_VERSION) // TODO as parameter
.set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0)
.set(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs, p -> p >= 0)
.set(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuff, p -> p >= 0)
.set(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG")
.build();
return new KafkaStreams(topologyBuilder.build(), properties);
}
}
package uc1.streamprocessing;
import com.google.gson.Gson;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
import titan.ccp.models.records.ActivePowerRecordFactory;
/**
* Builds Kafka Stream Topology for the History microservice.
*/
public class TopologyBuilder {
private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class);
private final String inputTopic;
private final Gson gson;
private final StreamsBuilder builder = new StreamsBuilder();
/**
* Create a new {@link TopologyBuilder} using the given topics.
*/
public TopologyBuilder(final String inputTopic) {
this.inputTopic = inputTopic;
this.gson = new Gson();
}
/**
* Build the {@link Topology} for the History microservice.
*/
public Topology build() {
this.builder
.stream(this.inputTopic,
Consumed.with(Serdes.String(), IMonitoringRecordSerde.serde(new ActivePowerRecordFactory())))
.mapValues(v -> this.gson.toJson(v)).foreach((k, v) -> LOGGER.info("Key: " + k + " Value: " + v));
return this.builder.build();
}
}
kafka.bootstrap.servers=localhost:9092
kafka.input.topic=input
kafka.output.topic=output
num.threads=1
commit.interval.ms=10
cache.max.bytes.buffering=-1
FROM openjdk:11-slim
ADD build/distributions/exp-bigdata19-bridge.tar /
ADD build/distributions/uc1-workload-generator.tar /
CMD export JAVA_OPTS=-Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL \
&& /exp-bigdata19-bridge/bin/exp-bigdata19-bridge
\ No newline at end of file
CMD JAVA_OPTS="$JAVA_OPTS -Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL" \
/uc1-workload-generator/bin/uc1-workload-generator
\ No newline at end of file
......@@ -21,7 +21,7 @@ dependencies {
testCompile 'junit:junit:4.12'
}
mainClassName = "titan.ccp.kiekerbridge.expbigdata19.ExperimentorBigData"
mainClassName = "uc1.workloadGenerator.LoadGenerator"
eclipse {
classpath {
......
package titan.ccp.kiekerbridge;
package kafkaSender;
import java.util.Properties;
import java.util.function.Function;
......
package titan.ccp.kiekerbridge.expbigdata19;
import java.io.IOException;
import java.util.Objects;
public class ExperimentorBigData {
public static void main(final String[] args) throws InterruptedException, IOException {
final String modus = Objects.requireNonNullElse(System.getenv("MODUS"), "LoadCounter");
if (modus.equals("LoadGenerator")) {
LoadGenerator.main(args);
} else if (modus.equals("LoadGeneratorExtrem")) {
LoadGeneratorExtrem.main(args);
} else if (modus.equals("LoadCounter")) {
LoadCounter.main(args);
}
}
}
package titan.ccp.kiekerbridge.expbigdata19;
import com.google.common.math.StatsAccumulator;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.Deserializer;
import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
import titan.ccp.models.records.AggregatedActivePowerRecord;
import titan.ccp.models.records.AggregatedActivePowerRecordFactory;
public class LoadCounter {
public static void main(final String[] args) throws InterruptedException {
final String kafkaBootstrapServers =
Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092");
final String kafkaInputTopic =
Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input");
final String kafkaOutputTopic =
Objects.requireNonNullElse(System.getenv("KAFKA_OUTPUT_TOPIC"), "output");
final Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaBootstrapServers);
props.setProperty("group.id", "load-counter");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("max.poll.records", "1000000");
props.setProperty("max.partition.fetch.bytes", "134217728"); // 128 MB
props.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
final Deserializer<AggregatedActivePowerRecord> deserializer =
IMonitoringRecordSerde.deserializer(new AggregatedActivePowerRecordFactory());
final KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of(kafkaInputTopic, kafkaOutputTopic));
executor.scheduleAtFixedRate(
() -> {
final long time = System.currentTimeMillis();
final ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(500));
long inputCount = 0;
for (final ConsumerRecord<String, byte[]> inputRecord : records
.records(kafkaInputTopic)) {
inputCount++;
}
long outputCount = 0;
final StatsAccumulator statsAccumulator = new StatsAccumulator();
for (final ConsumerRecord<String, byte[]> outputRecord : records
.records(kafkaOutputTopic)) {
outputCount++;
final AggregatedActivePowerRecord record =
deserializer.deserialize(kafkaOutputTopic, outputRecord.value());
final long latency = time - record.getTimestamp();
statsAccumulator.add(latency);
}
final double latency = statsAccumulator.count() > 0 ? statsAccumulator.mean() : 0.0;
final long elapsedTime = System.currentTimeMillis() - time;
System.out
.println("input," + time + ',' + elapsedTime + ',' + 0 + ',' + inputCount);
System.out
.println("output," + time + ',' + elapsedTime + ',' + latency + ',' + outputCount);
},
0,
1,
TimeUnit.SECONDS);
}
}
package titan.ccp.kiekerbridge.expbigdata19;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.ProducerConfig;
import titan.ccp.configuration.events.Event;
import titan.ccp.kiekerbridge.KafkaRecordSender;
import titan.ccp.model.sensorregistry.MutableAggregatedSensor;
import titan.ccp.model.sensorregistry.MutableSensorRegistry;
import titan.ccp.models.records.ActivePowerRecord;
public class LoadGenerator {
public static void main(final String[] args) throws InterruptedException, IOException {
final String hierarchy = Objects.requireNonNullElse(System.getenv("HIERARCHY"), "deep");
final int numNestedGroups =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_NESTED_GROUPS"), "1"));
final int numSensor =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "1"));
final int periodMs =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000"));
final int value =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
final boolean sendRegistry =
Boolean.parseBoolean(Objects.requireNonNullElse(System.getenv("SEND_REGISTRY"), "true"));
final int threads =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4"));
final String kafkaBootstrapServers =
Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092");
final String kafkaInputTopic =
Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input");
final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0");
if (hierarchy.equals("deep")) {
MutableAggregatedSensor lastSensor = sensorRegistry.getTopLevelSensor();
for (int lvl = 1; lvl < numNestedGroups; lvl++) {
lastSensor = lastSensor.addChildAggregatedSensor("group_lvl_" + lvl);
}
for (int s = 0; s < numSensor; s++) {
lastSensor.addChildMachineSensor("sensor_" + s);
}
} else if (hierarchy.equals("full")) {
addChildren(sensorRegistry.getTopLevelSensor(), numSensor, 1, numNestedGroups, 0);
} else {
throw new IllegalStateException();
}
final List<String> sensors =
sensorRegistry.getMachineSensors().stream().map(s -> s.getIdentifier())
.collect(Collectors.toList());
if (sendRegistry) {
final ConfigPublisher configPublisher =
new ConfigPublisher(kafkaBootstrapServers, "configuration");
configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson());
configPublisher.close();
System.out.println("Configuration sent.");
System.out.println("Now wait 30 seconds");
Thread.sleep(30_000);
System.out.println("And woke up again :)");
}
final Properties kafkaProperties = new Properties();
// kafkaProperties.put("acks", this.acknowledges);
kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize);
kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs);
kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory);
final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = new KafkaRecordSender<>(
kafkaBootstrapServers, kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(),
kafkaProperties);
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(threads);
final Random random = new Random();
for (final String sensor : sensors) {
final int initialDelay = random.nextInt(periodMs);
executor.scheduleAtFixedRate(
() -> {
kafkaRecordSender.write(new ActivePowerRecord(
sensor,
System.currentTimeMillis(),
value));
},
initialDelay,
periodMs,
TimeUnit.MILLISECONDS);
}
System.out.println("Wait for termination...");
executor.awaitTermination(30, TimeUnit.DAYS);
System.out.println("Will terminate now");
}
private static int addChildren(final MutableAggregatedSensor parent, final int numChildren,
final int lvl, final int maxLvl, int nextId) {
for (int c = 0; c < numChildren; c++) {
if (lvl == maxLvl) {
parent.addChildMachineSensor("s_" + nextId);
nextId++;
} else {
final MutableAggregatedSensor newParent =
parent.addChildAggregatedSensor("g_" + lvl + '_' + nextId);
nextId++;
nextId = addChildren(newParent, numChildren, lvl + 1, maxLvl, nextId);
}
}
return nextId;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment