Skip to content
Snippets Groups Projects
Commit 47e43546 authored by Simon Ehrenstein's avatar Simon Ehrenstein
Browse files

Refactor workload generators using wg common lib

parent 94a3999e
No related branches found
No related tags found
1 merge request!6Add Distributed Workload Generator
Showing
with 193 additions and 383 deletions
......@@ -79,6 +79,14 @@ do
done
echo "Finish topic deletion, print topics:"
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p'
# delete zookeeper nodes used for workload generation
echo "Delete ZooKeeper configurations used for workload generation"
kubectl exec zookeeper-client -- bash -c "zookeeper-shell my-confluent-cp-zookeeper:2181 deleteall /workload-generation"
echo "Waiting for deletion"
sleep 5s
echo "Deletion finished"
echo "Exiting script"
KAFKA_LAG_EXPORTER_POD=$(kubectl get pod -l app.kubernetes.io/name=kafka-lag-exporter -o jsonpath="{.items[0].metadata.name}")
......
......@@ -71,6 +71,14 @@ do
done
echo "Finish topic deletion, print topics:"
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p'
# delete zookeeper nodes used for workload generation
echo "Delete ZooKeeper configurations used for workload generation"
kubectl exec zookeeper-client -- bash -c "zookeeper-shell my-confluent-cp-zookeeper:2181 deleteall /workload-generation"
echo "Waiting for deletion"
sleep 5s
echo "Deletion finished"
echo "Exiting script"
KAFKA_LAG_EXPORTER_POD=$(kubectl get pod -l app.kubernetes.io/name=kafka-lag-exporter -o jsonpath="{.items[0].metadata.name}")
......
......@@ -79,6 +79,14 @@ do
done
echo "Finish topic deletion, print topics:"
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p'
# delete zookeeper nodes used for workload generation
echo "Delete ZooKeeper configurations used for workload generation"
kubectl exec zookeeper-client -- bash -c "zookeeper-shell my-confluent-cp-zookeeper:2181 deleteall /workload-generation"
echo "Waiting for deletion"
sleep 5s
echo "Deletion finished"
echo "Exiting script"
KAFKA_LAG_EXPORTER_POD=$(kubectl get pod -l app.kubernetes.io/name=kafka-lag-exporter -o jsonpath="{.items[0].metadata.name}")
......
......@@ -74,6 +74,14 @@ do
done
echo "Finish topic deletion, print topics:"
#kubectl exec kafka-client -- bash -c "kafka-topics --zookeeper my-confluent-cp-zookeeper:2181 --list" | sed -n -E '/^(titan-.*|input|output|configuration)( - marked for deletion)?$/p'
# delete zookeeper nodes used for workload generation
echo "Delete ZooKeeper configurations used for workload generation"
kubectl exec zookeeper-client -- bash -c "zookeeper-shell my-confluent-cp-zookeeper:2181 deleteall /workload-generation"
echo "Waiting for deletion"
sleep 5s
echo "Deletion finished"
echo "Exiting script"
KAFKA_LAG_EXPORTER_POD=$(kubectl get pod -l app.kubernetes.io/name=kafka-lag-exporter -o jsonpath="{.items[0].metadata.name}")
......
......@@ -17,6 +17,10 @@ spec:
- name: workload-generator
image: benediktwetzel/uc2-wg:latest
env:
- name: ZK_HOST
value: "my-confluent-cp-zookeeper"
- name: ZK_PORT
value: "2181"
- name: KAFKA_BOOTSTRAP_SERVERS
value: "my-confluent-cp-kafka:9092"
- name: HIERARCHY
......
......@@ -18,6 +18,10 @@ spec:
- name: workload-generator
image: soerenhenning/uc3-wg:latest
env:
- name: ZK_HOST
value: "my-confluent-cp-zookeeper"
- name: ZK_PORT
value: "2181"
- name: KAFKA_BOOTSTRAP_SERVERS
value: "my-confluent-cp-kafka:9092"
- name: NUM_SENSORS
......
......@@ -17,6 +17,10 @@ spec:
- name: workload-generator
image: soerenhenning/uc4-wg:latest
env:
- name: ZK_HOST
value: "my-confluent-cp-zookeeper"
- name: ZK_PORT
value: "2181"
- name: KAFKA_BOOTSTRAP_SERVERS
value: "my-confluent-cp-kafka:9092"
- name: NUM_SENSORS
......
package spesb.kafkasender;
import java.util.Properties;
import java.util.function.Function;
import kieker.common.record.IMonitoringRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
/**
* Sends monitoring records to Kafka.
*
* @param <T> {@link IMonitoringRecord} to send
*/
public class KafkaRecordSender<T extends IMonitoringRecord> {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class);
private final String topic;
private final Function<T, String> keyAccessor;
private final Function<T, Long> timestampAccessor;
private final Producer<String, T> producer;
public KafkaRecordSender(final String bootstrapServers, final String topic) {
this(bootstrapServers, topic, x -> "", x -> null, new Properties());
}
public KafkaRecordSender(final String bootstrapServers, final String topic,
final Function<T, String> keyAccessor) {
this(bootstrapServers, topic, keyAccessor, x -> null, new Properties());
}
public KafkaRecordSender(final String bootstrapServers, final String topic,
final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor) {
this(bootstrapServers, topic, keyAccessor, timestampAccessor, new Properties());
}
/**
* Create a new {@link KafkaRecordSender}.
*/
public KafkaRecordSender(final String bootstrapServers, final String topic,
final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor,
final Properties defaultProperties) {
this.topic = topic;
this.keyAccessor = keyAccessor;
this.timestampAccessor = timestampAccessor;
final Properties properties = new Properties();
properties.putAll(defaultProperties);
properties.put("bootstrap.servers", bootstrapServers);
// properties.put("acks", this.acknowledges);
// properties.put("batch.size", this.batchSize);
// properties.put("linger.ms", this.lingerMs);
// properties.put("buffer.memory", this.bufferMemory);
this.producer = new KafkaProducer<>(properties, new StringSerializer(),
IMonitoringRecordSerde.serializer());
}
/**
* Write the passed monitoring record to Kafka.
*/
public void write(final T monitoringRecord) {
final ProducerRecord<String, T> record =
new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord),
this.keyAccessor.apply(monitoringRecord), monitoringRecord);
LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record);
this.producer.send(record);
}
public void terminate() {
this.producer.close();
}
}
......@@ -5,7 +5,6 @@ import common.dimensions.KeySpace;
import common.dimensions.Period;
import common.generators.KafkaWorkloadGenerator;
import common.generators.KafkaWorkloadGeneratorBuilder;
import common.messages.OutputMessage;
import common.misc.ZooKeeper;
import communication.kafka.KafkaRecordSender;
import java.io.IOException;
......@@ -22,8 +21,13 @@ public class LoadGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class);
public static void main(final String[] args) throws InterruptedException, IOException {
// uc1
LOGGER.info("Start workload generator for use case UC1.");
// get environment variables
final String zooKeeperHost = Objects.requireNonNullElse(System.getenv("ZK_HOST"), "localhost");
final int zooKeeperPort =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("ZK_PORT"), "2181"));
final int numSensors =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10"));
final int periodMs =
......@@ -31,9 +35,6 @@ public class LoadGenerator {
final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"),
"4"));
final String zooKeeperHost = Objects.requireNonNullElse(System.getenv("ZK_HOST"), "localhost");
final int zooKeeperPort =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("ZK_PORT"), "2181"));
final String kafkaBootstrapServers =
Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092");
final String kafkaInputTopic =
......@@ -42,6 +43,7 @@ public class LoadGenerator {
final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
// create kafka record sender
final Properties kafkaProperties = new Properties();
// kafkaProperties.put("acks", this.acknowledges);
kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize);
......@@ -54,18 +56,20 @@ public class LoadGenerator {
r -> r.getTimestamp(),
kafkaProperties);
// create workload generator
final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator =
KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder()
.setKeySpace(new KeySpace("s_", numSensors))
.setThreads(threads)
.setPeriod(new Period(periodMs, TimeUnit.MILLISECONDS))
.setDuration(new Duration(100, TimeUnit.SECONDS))
.setGeneratorFunction(sensor -> new OutputMessage<>(sensor,
new ActivePowerRecord(sensor, System.currentTimeMillis(), value)))
.setDuration(new Duration(30, TimeUnit.DAYS))
.setGeneratorFunction(
sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value))
.setZooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort))
.setKafkaRecordSender(kafkaRecordSender)
.build();
// start
workloadGenerator.start();
}
}
mainClassName = "spesb.uc2.workloadgenerator.LoadGenerator"
dependencies {
compile project(':workload-generator-common')
}
\ No newline at end of file
package spesb.uc2.workloadgenerator;
import common.dimensions.Duration;
import common.dimensions.KeySpace;
import common.dimensions.Period;
import common.generators.KafkaWorkloadGenerator;
import common.generators.KafkaWorkloadGeneratorBuilder;
import common.misc.ZooKeeper;
import communication.kafka.KafkaRecordSender;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import spesb.kafkasender.KafkaRecordSender;
import titan.ccp.configuration.events.Event;
import titan.ccp.model.sensorregistry.MutableAggregatedSensor;
import titan.ccp.model.sensorregistry.MutableSensorRegistry;
......@@ -23,12 +24,17 @@ public class LoadGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class);
public static void main(final String[] args) throws InterruptedException, IOException {
// uc2
LOGGER.info("Start workload generator for use case UC2.");
// get environment variables
final String hierarchy = Objects.requireNonNullElse(System.getenv("HIERARCHY"), "deep");
final int numNestedGroups = Integer
.parseInt(Objects.requireNonNullElse(System.getenv("NUM_NESTED_GROUPS"), "1"));
final int numSensor =
final String zooKeeperHost = Objects.requireNonNullElse(System.getenv("ZK_HOST"), "localhost");
final int zooKeeperPort =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("ZK_PORT"), "2181"));
final int numSensors =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "1"));
final int periodMs =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000"));
......@@ -45,37 +51,11 @@ public class LoadGenerator {
final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0");
if (hierarchy.equals("deep")) {
MutableAggregatedSensor lastSensor = sensorRegistry.getTopLevelSensor();
for (int lvl = 1; lvl < numNestedGroups; lvl++) {
lastSensor = lastSensor.addChildAggregatedSensor("group_lvl_" + lvl);
}
for (int s = 0; s < numSensor; s++) {
lastSensor.addChildMachineSensor("sensor_" + s);
}
} else if (hierarchy.equals("full")) {
addChildren(sensorRegistry.getTopLevelSensor(), numSensor, 1, numNestedGroups, 0);
} else {
throw new IllegalStateException();
}
final List<String> sensors =
sensorRegistry.getMachineSensors().stream().map(s -> s.getIdentifier())
.collect(Collectors.toList());
if (sendRegistry) {
final ConfigPublisher configPublisher =
new ConfigPublisher(kafkaBootstrapServers, "configuration");
configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson());
configPublisher.close();
System.out.println("Configuration sent.");
System.out.println("Now wait 30 seconds");
Thread.sleep(30_000);
System.out.println("And woke up again :)");
}
// build sensor registry
final MutableSensorRegistry sensorRegistry =
buildSensorRegistry(hierarchy, numNestedGroups, numSensors);
// create kafka record sender
final Properties kafkaProperties = new Properties();
// kafkaProperties.put("acks", this.acknowledges);
kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize);
......@@ -85,20 +65,60 @@ public class LoadGenerator {
new KafkaRecordSender<>(kafkaBootstrapServers,
kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(), kafkaProperties);
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(threads);
final Random random = new Random();
// create workload generator
final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator =
KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder()
.setKeySpace(new KeySpace("s_", numSensors))
.setThreads(threads)
.setPeriod(new Period(periodMs, TimeUnit.MILLISECONDS))
.setDuration(new Duration(30, TimeUnit.DAYS))
.setBeforeAction(() -> {
if (sendRegistry) {
final ConfigPublisher configPublisher =
new ConfigPublisher(kafkaBootstrapServers, "configuration");
configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson());
configPublisher.close();
System.out.println("Configuration sent.");
System.out.println("Now wait 30 seconds");
try {
Thread.sleep(30_000);
} catch (final InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("And woke up again :)");
}
})
.setGeneratorFunction(
sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value))
.setZooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort))
.setKafkaRecordSender(kafkaRecordSender)
.build();
// start
workloadGenerator.start();
}
for (final String sensor : sensors) {
final int initialDelay = random.nextInt(periodMs);
executor.scheduleAtFixedRate(() -> {
kafkaRecordSender.write(new ActivePowerRecord(sensor, System.currentTimeMillis(), value));
}, initialDelay, periodMs, TimeUnit.MILLISECONDS);
private static MutableSensorRegistry buildSensorRegistry(
final String hierarchy,
final int numNestedGroups,
final int numSensors) {
final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0");
if (hierarchy.equals("deep")) {
MutableAggregatedSensor lastSensor = sensorRegistry.getTopLevelSensor();
for (int lvl = 1; lvl < numNestedGroups; lvl++) {
lastSensor = lastSensor.addChildAggregatedSensor("group_lvl_" + lvl);
}
for (int s = 0; s < numSensors; s++) {
lastSensor.addChildMachineSensor("sensor_" + s);
}
} else if (hierarchy.equals("full")) {
addChildren(sensorRegistry.getTopLevelSensor(), numSensors, 1, numNestedGroups, 0);
} else {
throw new IllegalStateException();
}
System.out.println("Wait for termination...");
executor.awaitTermination(30, TimeUnit.DAYS);
System.out.println("Will terminate now");
return sensorRegistry;
}
private static int addChildren(final MutableAggregatedSensor parent, final int numChildren,
......
mainClassName = "spesb.uc3.workloadgenerator.LoadGenerator"
dependencies {
compile project(':workload-generator-common')
}
\ No newline at end of file
package spesb.kafkasender;
import java.util.Properties;
import java.util.function.Function;
import kieker.common.record.IMonitoringRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
/**
* Sends monitoring records to Kafka.
*
* @param <T> {@link IMonitoringRecord} to send
*/
public class KafkaRecordSender<T extends IMonitoringRecord> {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class);
private final String topic;
private final Function<T, String> keyAccessor;
private final Function<T, Long> timestampAccessor;
private final Producer<String, T> producer;
public KafkaRecordSender(final String bootstrapServers, final String topic) {
this(bootstrapServers, topic, x -> "", x -> null, new Properties());
}
public KafkaRecordSender(final String bootstrapServers, final String topic,
final Function<T, String> keyAccessor) {
this(bootstrapServers, topic, keyAccessor, x -> null, new Properties());
}
public KafkaRecordSender(final String bootstrapServers, final String topic,
final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor) {
this(bootstrapServers, topic, keyAccessor, timestampAccessor, new Properties());
}
/**
* Create a new {@link KafkaRecordSender}.
*/
public KafkaRecordSender(final String bootstrapServers, final String topic,
final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor,
final Properties defaultProperties) {
this.topic = topic;
this.keyAccessor = keyAccessor;
this.timestampAccessor = timestampAccessor;
final Properties properties = new Properties();
properties.putAll(defaultProperties);
properties.put("bootstrap.servers", bootstrapServers);
// properties.put("acks", this.acknowledges);
// properties.put("batch.size", this.batchSize);
// properties.put("linger.ms", this.lingerMs);
// properties.put("buffer.memory", this.bufferMemory);
this.producer = new KafkaProducer<>(properties, new StringSerializer(),
IMonitoringRecordSerde.serializer());
}
/**
* Write the passed monitoring record to Kafka.
*/
public void write(final T monitoringRecord) {
final ProducerRecord<String, T> record =
new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord),
this.keyAccessor.apply(monitoringRecord), monitoringRecord);
LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record);
this.producer.send(record);
}
public void terminate() {
this.producer.close();
}
}
package spesb.uc3.workloadgenerator;
import common.dimensions.Duration;
import common.dimensions.KeySpace;
import common.dimensions.Period;
import common.generators.KafkaWorkloadGenerator;
import common.generators.KafkaWorkloadGeneratorBuilder;
import common.misc.ZooKeeper;
import communication.kafka.KafkaRecordSender;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import spesb.kafkasender.KafkaRecordSender;
import titan.ccp.models.records.ActivePowerRecord;
public class LoadGenerator {
......@@ -26,9 +25,12 @@ public class LoadGenerator {
public static void main(final String[] args) throws InterruptedException, IOException {
LOGGER.info("Start workload generator for use case UC3.");
// get environment variables
final String zooKeeperHost = Objects.requireNonNullElse(System.getenv("ZK_HOST"), "localhost");
final int zooKeeperPort =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("ZK_PORT"), "2181"));
final int numSensors =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10"));
final int instanceId = getInstanceId();
final int periodMs =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000"));
final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
......@@ -42,13 +44,7 @@ public class LoadGenerator {
final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
final int idStart = instanceId * WL_MAX_RECORDS;
final int idEnd = Math.min((instanceId + 1) * WL_MAX_RECORDS, numSensors);
LOGGER.info("Generating data for sensors with IDs from {} to {} (exclusive).", idStart, idEnd);
final List<String> sensors = IntStream.range(idStart, idEnd)
.mapToObj(i -> "s_" + i)
.collect(Collectors.toList());
// create kafka record sender
final Properties kafkaProperties = new Properties();
// kafkaProperties.put("acks", this.acknowledges);
kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize);
......@@ -58,35 +54,21 @@ public class LoadGenerator {
new KafkaRecordSender<>(kafkaBootstrapServers,
kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(), kafkaProperties);
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(threads);
final Random random = new Random();
LOGGER.info("Start setting up sensors.");
for (final String sensor : sensors) {
final int initialDelay = random.nextInt(periodMs);
executor.scheduleAtFixedRate(() -> {
kafkaRecordSender.write(new ActivePowerRecord(sensor, System.currentTimeMillis(), value));
}, initialDelay, periodMs, TimeUnit.MILLISECONDS);
}
LOGGER.info("Finished setting up sensors.");
// create workload generator
final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator =
KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder()
.setKeySpace(new KeySpace("s_", numSensors))
.setThreads(threads)
.setPeriod(new Period(periodMs, TimeUnit.MILLISECONDS))
.setDuration(new Duration(30, TimeUnit.DAYS))
.setGeneratorFunction(
sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value))
.setZooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort))
.setKafkaRecordSender(kafkaRecordSender)
.build();
System.out.println("Wait for termination...");
executor.awaitTermination(30, TimeUnit.DAYS);
System.out.println("Will terminate now");
// start
workloadGenerator.start();
}
private static int getInstanceId() {
final String podName = System.getenv("POD_NAME");
if (podName == null) {
return 0;
} else {
return Pattern.compile("-")
.splitAsStream(podName)
.reduce((p, x) -> x)
.map(Integer::parseInt)
.orElse(0);
}
}
}
mainClassName = "spesb.uc4.workloadgenerator.LoadGenerator"
dependencies {
compile project(':workload-generator-common')
}
\ No newline at end of file
package spesb.kafkasender;
import java.util.Properties;
import java.util.function.Function;
import kieker.common.record.IMonitoringRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
/**
* Sends monitoring records to Kafka.
*
* @param <T> {@link IMonitoringRecord} to send
*/
public class KafkaRecordSender<T extends IMonitoringRecord> {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class);
private final String topic;
private final Function<T, String> keyAccessor;
private final Function<T, Long> timestampAccessor;
private final Producer<String, T> producer;
public KafkaRecordSender(final String bootstrapServers, final String topic) {
this(bootstrapServers, topic, x -> "", x -> null, new Properties());
}
public KafkaRecordSender(final String bootstrapServers, final String topic,
final Function<T, String> keyAccessor) {
this(bootstrapServers, topic, keyAccessor, x -> null, new Properties());
}
public KafkaRecordSender(final String bootstrapServers, final String topic,
final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor) {
this(bootstrapServers, topic, keyAccessor, timestampAccessor, new Properties());
}
/**
* Create a new {@link KafkaRecordSender}.
*/
public KafkaRecordSender(final String bootstrapServers, final String topic,
final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor,
final Properties defaultProperties) {
this.topic = topic;
this.keyAccessor = keyAccessor;
this.timestampAccessor = timestampAccessor;
final Properties properties = new Properties();
properties.putAll(defaultProperties);
properties.put("bootstrap.servers", bootstrapServers);
// properties.put("acks", this.acknowledges);
// properties.put("batch.size", this.batchSize);
// properties.put("linger.ms", this.lingerMs);
// properties.put("buffer.memory", this.bufferMemory);
this.producer = new KafkaProducer<>(properties, new StringSerializer(),
IMonitoringRecordSerde.serializer());
}
/**
* Write the passed monitoring record to Kafka.
*/
public void write(final T monitoringRecord) {
final ProducerRecord<String, T> record =
new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord),
this.keyAccessor.apply(monitoringRecord), monitoringRecord);
LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record);
this.producer.send(record);
}
public void terminate() {
this.producer.close();
}
}
package spesb.uc4.workloadgenerator;
import common.dimensions.Duration;
import common.dimensions.KeySpace;
import common.dimensions.Period;
import common.generators.KafkaWorkloadGenerator;
import common.generators.KafkaWorkloadGeneratorBuilder;
import common.misc.ZooKeeper;
import communication.kafka.KafkaRecordSender;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import spesb.kafkasender.KafkaRecordSender;
import titan.ccp.models.records.ActivePowerRecord;
public class LoadGenerator {
......@@ -24,7 +24,11 @@ public class LoadGenerator {
// uc4
LOGGER.info("Start workload generator for use case UC4.");
final int numSensor =
// get environment variables
final String zooKeeperHost = Objects.requireNonNullElse(System.getenv("ZK_HOST"), "localhost");
final int zooKeeperPort =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("ZK_PORT"), "2181"));
final int numSensors =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10"));
final int periodMs =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000"));
......@@ -39,6 +43,7 @@ public class LoadGenerator {
final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
// create kafka record sender
final Properties kafkaProperties = new Properties();
// kafkaProperties.put("acks", this.acknowledges);
kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize);
......@@ -48,23 +53,21 @@ public class LoadGenerator {
new KafkaRecordSender<>(kafkaBootstrapServers,
kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(), kafkaProperties);
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(threads);
final Random random = new Random();
final List<String> sensors =
IntStream.range(0, numSensor).mapToObj(i -> "s_" + i).collect(Collectors.toList());
for (final String sensor : sensors) {
final int initialDelay = random.nextInt(periodMs);
executor.scheduleAtFixedRate(() -> {
kafkaRecordSender.write(new ActivePowerRecord(sensor, System.currentTimeMillis(), value));
}, initialDelay, periodMs, TimeUnit.MILLISECONDS);
}
System.out.println("Wait for termination...");
executor.awaitTermination(30, TimeUnit.DAYS);
System.out.println("Will terminate now");
// create workload generator
final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator =
KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder()
.setKeySpace(new KeySpace("s_", numSensors))
.setThreads(threads)
.setPeriod(new Period(periodMs, TimeUnit.MILLISECONDS))
.setDuration(new Duration(30, TimeUnit.DAYS))
.setGeneratorFunction(
sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value))
.setZooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort))
.setKafkaRecordSender(kafkaRecordSender)
.build();
// start
workloadGenerator.start();
}
}
package common.functions;
import common.messages.OutputMessage;
import kieker.common.record.IMonitoringRecord;
@FunctionalInterface
public interface MessageGenerator<T extends IMonitoringRecord> {
OutputMessage<T> generateMessage(final String key);
T generateMessage(final String key);
}
package common.functions;
import common.messages.OutputMessage;
import kieker.common.record.IMonitoringRecord;
@FunctionalInterface
public interface Transport<T extends IMonitoringRecord> {
public void transport(final OutputMessage<T> message);
void transport(final T message);
}
......@@ -15,7 +15,6 @@ import common.dimensions.Period;
import common.functions.BeforeAction;
import common.functions.MessageGenerator;
import common.functions.Transport;
import common.messages.OutputMessage;
import common.misc.Worker;
import common.misc.WorkloadDefinition;
import common.misc.WorkloadEntity;
......@@ -105,7 +104,7 @@ public abstract class WorkloadGenerator<T extends IMonitoringRecord> implements
LOGGER.info("Beginning of Experiment...");
LOGGER.info("Experiment is going to be executed for the specified duration...");
entities.forEach(entity -> {
final OutputMessage<T> message = entity.generateMessage();
final T message = entity.generateMessage();
final long initialDelay = random.nextInt(periodMs);
this.executor.scheduleAtFixedRate(() -> this.transport.transport(message), initialDelay,
periodMs, period.getTimeUnit());
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment