Skip to content
Snippets Groups Projects
Commit 7e779666 authored by Benedikt Wetzel's avatar Benedikt Wetzel
Browse files

add minimal WG implementation

parent 76eea338
No related branches found
No related tags found
1 merge request!1Add Implementations of Use Cases
Showing
with 91 additions and 370 deletions
package titan.ccp.kiekerbridge;
package kafkaSender;
import java.util.Properties;
import java.util.function.Function;
......
package titan.ccp.kiekerbridge.expbigdata19;
import java.io.IOException;
import java.util.Objects;
public class ExperimentorBigData {
public static void main(final String[] args) throws InterruptedException, IOException {
final String modus = Objects.requireNonNullElse(System.getenv("MODUS"), "LoadCounter");
if (modus.equals("LoadGenerator")) {
LoadGenerator.main(args);
} else if (modus.equals("LoadGeneratorExtrem")) {
LoadGeneratorExtrem.main(args);
} else if (modus.equals("LoadCounter")) {
LoadCounter.main(args);
}
}
}
package titan.ccp.kiekerbridge.expbigdata19;
import com.google.common.math.StatsAccumulator;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.Deserializer;
import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
import titan.ccp.models.records.AggregatedActivePowerRecord;
import titan.ccp.models.records.AggregatedActivePowerRecordFactory;
public class LoadCounter {
public static void main(final String[] args) throws InterruptedException {
final String kafkaBootstrapServers =
Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092");
final String kafkaInputTopic =
Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input");
final String kafkaOutputTopic =
Objects.requireNonNullElse(System.getenv("KAFKA_OUTPUT_TOPIC"), "output");
final Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaBootstrapServers);
props.setProperty("group.id", "load-counter");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("max.poll.records", "1000000");
props.setProperty("max.partition.fetch.bytes", "134217728"); // 128 MB
props.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
final Deserializer<AggregatedActivePowerRecord> deserializer =
IMonitoringRecordSerde.deserializer(new AggregatedActivePowerRecordFactory());
final KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of(kafkaInputTopic, kafkaOutputTopic));
executor.scheduleAtFixedRate(
() -> {
final long time = System.currentTimeMillis();
final ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(500));
long inputCount = 0;
for (final ConsumerRecord<String, byte[]> inputRecord : records
.records(kafkaInputTopic)) {
inputCount++;
}
long outputCount = 0;
final StatsAccumulator statsAccumulator = new StatsAccumulator();
for (final ConsumerRecord<String, byte[]> outputRecord : records
.records(kafkaOutputTopic)) {
outputCount++;
final AggregatedActivePowerRecord record =
deserializer.deserialize(kafkaOutputTopic, outputRecord.value());
final long latency = time - record.getTimestamp();
statsAccumulator.add(latency);
}
final double latency = statsAccumulator.count() > 0 ? statsAccumulator.mean() : 0.0;
final long elapsedTime = System.currentTimeMillis() - time;
System.out
.println("input," + time + ',' + elapsedTime + ',' + 0 + ',' + inputCount);
System.out
.println("output," + time + ',' + elapsedTime + ',' + latency + ',' + outputCount);
},
0,
1,
TimeUnit.SECONDS);
}
}
package titan.ccp.kiekerbridge.expbigdata19;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.ProducerConfig;
import titan.ccp.configuration.events.Event;
import titan.ccp.kiekerbridge.KafkaRecordSender;
import titan.ccp.model.sensorregistry.MutableAggregatedSensor;
import titan.ccp.model.sensorregistry.MutableSensorRegistry;
import titan.ccp.models.records.ActivePowerRecord;
public class LoadGenerator {
public static void main(final String[] args) throws InterruptedException, IOException {
final String hierarchy = Objects.requireNonNullElse(System.getenv("HIERARCHY"), "deep");
final int numNestedGroups =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_NESTED_GROUPS"), "1"));
final int numSensor =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "1"));
final int periodMs =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000"));
final int value =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
final boolean sendRegistry =
Boolean.parseBoolean(Objects.requireNonNullElse(System.getenv("SEND_REGISTRY"), "true"));
final int threads =
Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4"));
final String kafkaBootstrapServers =
Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092");
final String kafkaInputTopic =
Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input");
final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0");
if (hierarchy.equals("deep")) {
MutableAggregatedSensor lastSensor = sensorRegistry.getTopLevelSensor();
for (int lvl = 1; lvl < numNestedGroups; lvl++) {
lastSensor = lastSensor.addChildAggregatedSensor("group_lvl_" + lvl);
}
for (int s = 0; s < numSensor; s++) {
lastSensor.addChildMachineSensor("sensor_" + s);
}
} else if (hierarchy.equals("full")) {
addChildren(sensorRegistry.getTopLevelSensor(), numSensor, 1, numNestedGroups, 0);
} else {
throw new IllegalStateException();
}
final List<String> sensors =
sensorRegistry.getMachineSensors().stream().map(s -> s.getIdentifier())
.collect(Collectors.toList());
if (sendRegistry) {
final ConfigPublisher configPublisher =
new ConfigPublisher(kafkaBootstrapServers, "configuration");
configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson());
configPublisher.close();
System.out.println("Configuration sent.");
System.out.println("Now wait 30 seconds");
Thread.sleep(30_000);
System.out.println("And woke up again :)");
}
final Properties kafkaProperties = new Properties();
// kafkaProperties.put("acks", this.acknowledges);
kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize);
kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs);
kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory);
final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = new KafkaRecordSender<>(
kafkaBootstrapServers, kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(),
kafkaProperties);
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(threads);
final Random random = new Random();
for (final String sensor : sensors) {
final int initialDelay = random.nextInt(periodMs);
executor.scheduleAtFixedRate(
() -> {
kafkaRecordSender.write(new ActivePowerRecord(
sensor,
System.currentTimeMillis(),
value));
},
initialDelay,
periodMs,
TimeUnit.MILLISECONDS);
}
System.out.println("Wait for termination...");
executor.awaitTermination(30, TimeUnit.DAYS);
System.out.println("Will terminate now");
}
private static int addChildren(final MutableAggregatedSensor parent, final int numChildren,
final int lvl, final int maxLvl, int nextId) {
for (int c = 0; c < numChildren; c++) {
if (lvl == maxLvl) {
parent.addChildMachineSensor("s_" + nextId);
nextId++;
} else {
final MutableAggregatedSensor newParent =
parent.addChildAggregatedSensor("g_" + lvl + '_' + nextId);
nextId++;
nextId = addChildren(newParent, numChildren, lvl + 1, maxLvl, nextId);
}
}
return nextId;
}
}
package uc1.streamprocessing;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import titan.ccp.common.kafka.streams.PropertiesBuilder;
/**
* Builder for the Kafka Streams configuration.
*/
public class KafkaStreamsBuilder {
private static final String APPLICATION_NAME = "titan-ccp-history";
private static final String APPLICATION_VERSION = "0.0.1";
// private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStreamsBuilder.class);
private String bootstrapServers; // NOPMD
private String inputTopic; // NOPMD
private int numThreads = -1; // NOPMD
private int commitIntervalMs = -1; // NOPMD
private int cacheMaxBytesBuff = -1; // NOPMD
public KafkaStreamsBuilder inputTopic(final String inputTopic) {
this.inputTopic = inputTopic;
return this;
}
public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
return this;
}
/**
* Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus
* one for using the default.
*/
public KafkaStreamsBuilder numThreads(final int numThreads) {
if (numThreads < -1 || numThreads == 0) {
throw new IllegalArgumentException("Number of threads must be greater 0 or -1.");
}
this.numThreads = numThreads;
return this;
}
/**
* Sets the Kafka Streams property for the frequency with which to save the position (offsets in
* source topics) of tasks (commit.interval.ms). Must be zero for processing all record, for
* example, when processing bulks of records. Can be minus one for using the default.
*/
public KafkaStreamsBuilder commitIntervalMs(final int commitIntervalMs) {
if (commitIntervalMs < -1) {
throw new IllegalArgumentException("Commit interval must be greater or equal -1.");
}
this.commitIntervalMs = commitIntervalMs;
return this;
}
/**
* Sets the Kafka Streams property for maximum number of memory bytes to be used for record caches
* across all threads (cache.max.bytes.buffering). Must be zero for processing all record, for
* example, when processing bulks of records. Can be minus one for using the default.
*/
public KafkaStreamsBuilder cacheMaxBytesBuffering(final int cacheMaxBytesBuffering) {
if (cacheMaxBytesBuffering < -1) {
throw new IllegalArgumentException("Cache max bytes buffering must be greater or equal -1.");
}
this.cacheMaxBytesBuff = cacheMaxBytesBuffering;
return this;
}
/**
* Builds the {@link KafkaStreams} instance.
*/
public KafkaStreams build() {
Objects.requireNonNull(this.inputTopic, "Input topic has not been set.");
// TODO log parameters
final TopologyBuilder topologyBuilder = new TopologyBuilder(
this.inputTopic);
final Properties properties = PropertiesBuilder
.bootstrapServers(this.bootstrapServers)
.applicationId(APPLICATION_NAME + '-' + APPLICATION_VERSION) // TODO as parameter
.set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0)
.set(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs, p -> p >= 0)
.set(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuff, p -> p >= 0)
.set(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG")
.build();
return new KafkaStreams(topologyBuilder.build(), properties);
}
}
package uc1.streamprocessing;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
import titan.ccp.models.records.ActivePowerRecordFactory;
/**
* Builds Kafka Stream Topology for the History microservice.
*/
public class TopologyBuilder {
private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class);
private final String inputTopic;
private final StreamsBuilder builder = new StreamsBuilder();
/**
* Create a new {@link TopologyBuilder} using the given topics.
*/
public TopologyBuilder(final String inputTopic) {
this.inputTopic = inputTopic;
}
/**
* Build the {@link Topology} for the History microservice.
*/
public Topology build() {
this.builder
.stream(this.inputTopic, Consumed.with(
Serdes.String(),
IMonitoringRecordSerde.serde(new ActivePowerRecordFactory())))
.mapValues(value -> value.getValueInW())
.foreach((key, measurement) -> LOGGER
.info("Key: " + key + " Value: " + measurement));
return this.builder.build();
}
}
package titan.ccp.kiekerbridge.expbigdata19;
package uc3.workloadGenerator;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
......
package uc3.workloadGenerator;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import kafkaSender.KafkaRecordSender;
import org.apache.kafka.clients.producer.ProducerConfig;
import titan.ccp.configuration.events.Event;
import titan.ccp.model.sensorregistry.MutableAggregatedSensor;
import titan.ccp.model.sensorregistry.MutableSensorRegistry;
import titan.ccp.models.records.ActivePowerRecord;
public class LoadGenerator {
public static void main(final String[] args) throws InterruptedException, IOException {
// uc1
final int numSensor = Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "10"));
final int periodMs = Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000"));
final int value = Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10"));
final boolean sendRegistry = Boolean
.parseBoolean(Objects.requireNonNullElse(System.getenv("SEND_REGISTRY"), "true"));
final int threads = Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4"));
final String kafkaBootstrapServers = Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"),
"localhost:9092");
final String kafkaInputTopic = Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input");
final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE");
final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS");
final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY");
// create sensorRegistry
final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0");
addChildrens(sensorRegistry.getTopLevelSensor(), numSensor, 0);
final List<String> sensors = sensorRegistry.getMachineSensors().stream().map(s -> s.getIdentifier())
.collect(Collectors.toList());
if (sendRegistry) {
final ConfigPublisher configPublisher = new ConfigPublisher(kafkaBootstrapServers, "configuration");
configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson());
configPublisher.close();
System.out.println("Configuration sent.");
System.out.println("Now wait 30 seconds");
Thread.sleep(30_000);
System.out.println("And woke up again :)");
}
final Properties kafkaProperties = new Properties();
// kafkaProperties.put("acks", this.acknowledges);
kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize);
kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs);
kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory);
final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = new KafkaRecordSender<>(kafkaBootstrapServers,
kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(), kafkaProperties);
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(threads);
final Random random = new Random();
for (final String sensor : sensors) {
System.out.println("working");
final int initialDelay = random.nextInt(periodMs);
executor.scheduleAtFixedRate(() -> {
kafkaRecordSender.write(new ActivePowerRecord(sensor, System.currentTimeMillis(), value));
}, initialDelay, periodMs, TimeUnit.MILLISECONDS);
}
System.out.println("Wait for termination...");
executor.awaitTermination(30, TimeUnit.DAYS);
System.out.println("Will terminate now");
}
private static void addChildrens(final MutableAggregatedSensor parent, final int numChildren, int nextId) {
for (int c = 0; c < numChildren; c++) {
parent.addChildMachineSensor("s_" + nextId);
nextId++;
}
}
}
package titan.ccp.kiekerbridge.expbigdata19;
package uc3.workloadGenerator;
import java.io.IOException;
import java.lang.management.ManagementFactory;
......@@ -9,9 +9,9 @@ import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafkaSender.KafkaRecordSender;
import org.apache.kafka.clients.producer.ProducerConfig;
import titan.ccp.configuration.events.Event;
import titan.ccp.kiekerbridge.KafkaRecordSender;
import titan.ccp.model.sensorregistry.MutableAggregatedSensor;
import titan.ccp.model.sensorregistry.MutableSensorRegistry;
import titan.ccp.model.sensorregistry.SensorRegistry;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment