diff --git a/uc1-application/src/main/java/titan/ccp/kiekerbridge/KafkaRecordSender.java b/uc1-application/src/main/java/titan/ccp/kiekerbridge/KafkaRecordSender.java deleted file mode 100644 index b46128c8ebfd52aeeb7127777bb6530761f35181..0000000000000000000000000000000000000000 --- a/uc1-application/src/main/java/titan/ccp/kiekerbridge/KafkaRecordSender.java +++ /dev/null @@ -1,84 +0,0 @@ -package titan.ccp.kiekerbridge; - -import java.util.Properties; -import java.util.function.Function; -import kieker.common.record.IMonitoringRecord; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; - - -/** - * Sends monitoring records to Kafka. - * - * @param <T> {@link IMonitoringRecord} to send - */ -public class KafkaRecordSender<T extends IMonitoringRecord> { - - private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class); - - private final String topic; - - private final Function<T, String> keyAccessor; - - private final Function<T, Long> timestampAccessor; - - private final Producer<String, T> producer; - - public KafkaRecordSender(final String bootstrapServers, final String topic) { - this(bootstrapServers, topic, x -> "", x -> null, new Properties()); - } - - public KafkaRecordSender(final String bootstrapServers, final String topic, - final Function<T, String> keyAccessor) { - this(bootstrapServers, topic, keyAccessor, x -> null, new Properties()); - } - - public KafkaRecordSender(final String bootstrapServers, final String topic, - final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor) { - this(bootstrapServers, topic, keyAccessor, timestampAccessor, new Properties()); - } - - /** - * Create a new {@link KafkaRecordSender}. - */ - public KafkaRecordSender(final String bootstrapServers, final String topic, - final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor, - final Properties defaultProperties) { - this.topic = topic; - this.keyAccessor = keyAccessor; - this.timestampAccessor = timestampAccessor; - - final Properties properties = new Properties(); - properties.putAll(defaultProperties); - properties.put("bootstrap.servers", bootstrapServers); - // properties.put("acks", this.acknowledges); - // properties.put("batch.size", this.batchSize); - // properties.put("linger.ms", this.lingerMs); - // properties.put("buffer.memory", this.bufferMemory); - - this.producer = new KafkaProducer<>(properties, new StringSerializer(), - IMonitoringRecordSerde.serializer()); - } - - /** - * Write the passed monitoring record to Kafka. - */ - public void write(final T monitoringRecord) { - final ProducerRecord<String, T> record = - new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord), - this.keyAccessor.apply(monitoringRecord), monitoringRecord); - - LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record); - this.producer.send(record); - } - - public void terminate() { - this.producer.close(); - } - -} diff --git a/uc1-application/src/main/java/titan/ccp/kiekerbridge/expbigdata19/ConfigPublisher.java b/uc1-application/src/main/java/titan/ccp/kiekerbridge/expbigdata19/ConfigPublisher.java deleted file mode 100644 index d5f55a4ab7ca265b241e880363975070e9952c45..0000000000000000000000000000000000000000 --- a/uc1-application/src/main/java/titan/ccp/kiekerbridge/expbigdata19/ConfigPublisher.java +++ /dev/null @@ -1,50 +0,0 @@ -package titan.ccp.kiekerbridge.expbigdata19; - -import java.util.Properties; -import java.util.concurrent.ExecutionException; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; -import titan.ccp.configuration.events.Event; -import titan.ccp.configuration.events.EventSerde; - -public class ConfigPublisher { - - private final String topic; - - private final Producer<Event, String> producer; - - public ConfigPublisher(final String bootstrapServers, final String topic) { - this(bootstrapServers, topic, new Properties()); - } - - public ConfigPublisher(final String bootstrapServers, final String topic, - final Properties defaultProperties) { - this.topic = topic; - - final Properties properties = new Properties(); - properties.putAll(defaultProperties); - properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "134217728"); // 128 MB - properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "134217728"); // 128 MB - - this.producer = - new KafkaProducer<>(properties, EventSerde.serializer(), new StringSerializer()); - } - - public void publish(final Event event, final String value) { - final ProducerRecord<Event, String> record = new ProducerRecord<>(this.topic, event, value); - try { - this.producer.send(record).get(); - } catch (InterruptedException | ExecutionException e) { - throw new IllegalArgumentException(e); - } - } - - public void close() { - this.producer.close(); - } - -} diff --git a/uc1-application/src/main/java/titan/ccp/kiekerbridge/expbigdata19/ExperimentorBigData.java b/uc1-application/src/main/java/titan/ccp/kiekerbridge/expbigdata19/ExperimentorBigData.java deleted file mode 100644 index a50bbd942fccf5f8899414fe8cb7b82ad6953f87..0000000000000000000000000000000000000000 --- a/uc1-application/src/main/java/titan/ccp/kiekerbridge/expbigdata19/ExperimentorBigData.java +++ /dev/null @@ -1,21 +0,0 @@ -package titan.ccp.kiekerbridge.expbigdata19; - -import java.io.IOException; -import java.util.Objects; - -public class ExperimentorBigData { - - public static void main(final String[] args) throws InterruptedException, IOException { - - final String modus = Objects.requireNonNullElse(System.getenv("MODUS"), "LoadCounter"); - - if (modus.equals("LoadGenerator")) { - LoadGenerator.main(args); - } else if (modus.equals("LoadGeneratorExtrem")) { - LoadGeneratorExtrem.main(args); - } else if (modus.equals("LoadCounter")) { - LoadCounter.main(args); - } - - } -} diff --git a/uc1-application/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadCounter.java b/uc1-application/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadCounter.java deleted file mode 100644 index 798f3014446605afab2cf20f3232896baab02802..0000000000000000000000000000000000000000 --- a/uc1-application/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadCounter.java +++ /dev/null @@ -1,84 +0,0 @@ -package titan.ccp.kiekerbridge.expbigdata19; - -import com.google.common.math.StatsAccumulator; -import java.time.Duration; -import java.util.List; -import java.util.Objects; -import java.util.Properties; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.serialization.Deserializer; -import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; -import titan.ccp.models.records.AggregatedActivePowerRecord; -import titan.ccp.models.records.AggregatedActivePowerRecordFactory; - -public class LoadCounter { - - public static void main(final String[] args) throws InterruptedException { - - final String kafkaBootstrapServers = - Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092"); - final String kafkaInputTopic = - Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input"); - final String kafkaOutputTopic = - Objects.requireNonNullElse(System.getenv("KAFKA_OUTPUT_TOPIC"), "output"); - - final Properties props = new Properties(); - props.setProperty("bootstrap.servers", kafkaBootstrapServers); - props.setProperty("group.id", "load-counter"); - props.setProperty("enable.auto.commit", "true"); - props.setProperty("auto.commit.interval.ms", "1000"); - props.setProperty("max.poll.records", "1000000"); - props.setProperty("max.partition.fetch.bytes", "134217728"); // 128 MB - props.setProperty("key.deserializer", - "org.apache.kafka.common.serialization.StringDeserializer"); - props.setProperty("value.deserializer", - "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - - final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); - final Deserializer<AggregatedActivePowerRecord> deserializer = - IMonitoringRecordSerde.deserializer(new AggregatedActivePowerRecordFactory()); - - final KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props); - consumer.subscribe(List.of(kafkaInputTopic, kafkaOutputTopic)); - - executor.scheduleAtFixedRate( - () -> { - final long time = System.currentTimeMillis(); - final ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(500)); - - long inputCount = 0; - for (final ConsumerRecord<String, byte[]> inputRecord : records - .records(kafkaInputTopic)) { - inputCount++; - } - - long outputCount = 0; - final StatsAccumulator statsAccumulator = new StatsAccumulator(); - for (final ConsumerRecord<String, byte[]> outputRecord : records - .records(kafkaOutputTopic)) { - outputCount++; - final AggregatedActivePowerRecord record = - deserializer.deserialize(kafkaOutputTopic, outputRecord.value()); - final long latency = time - record.getTimestamp(); - statsAccumulator.add(latency); - } - - final double latency = statsAccumulator.count() > 0 ? statsAccumulator.mean() : 0.0; - - final long elapsedTime = System.currentTimeMillis() - time; - System.out - .println("input," + time + ',' + elapsedTime + ',' + 0 + ',' + inputCount); - System.out - .println("output," + time + ',' + elapsedTime + ',' + latency + ',' + outputCount); - }, - 0, - 1, - TimeUnit.SECONDS); - } - -} diff --git a/uc1-application/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadGenerator.java b/uc1-application/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadGenerator.java deleted file mode 100644 index 97a7c84f872f3ab676128d903ae121c376bf7608..0000000000000000000000000000000000000000 --- a/uc1-application/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadGenerator.java +++ /dev/null @@ -1,124 +0,0 @@ -package titan.ccp.kiekerbridge.expbigdata19; - -import java.io.IOException; -import java.util.List; -import java.util.Objects; -import java.util.Properties; -import java.util.Random; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import org.apache.kafka.clients.producer.ProducerConfig; -import titan.ccp.configuration.events.Event; -import titan.ccp.kiekerbridge.KafkaRecordSender; -import titan.ccp.model.sensorregistry.MutableAggregatedSensor; -import titan.ccp.model.sensorregistry.MutableSensorRegistry; -import titan.ccp.models.records.ActivePowerRecord; - -public class LoadGenerator { - - public static void main(final String[] args) throws InterruptedException, IOException { - - final String hierarchy = Objects.requireNonNullElse(System.getenv("HIERARCHY"), "deep"); - final int numNestedGroups = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_NESTED_GROUPS"), "1")); - final int numSensor = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "1")); - final int periodMs = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("PERIOD_MS"), "1000")); - final int value = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); - final boolean sendRegistry = - Boolean.parseBoolean(Objects.requireNonNullElse(System.getenv("SEND_REGISTRY"), "true")); - final int threads = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4")); - final String kafkaBootstrapServers = - Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092"); - final String kafkaInputTopic = - Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input"); - final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); - final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS"); - final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY"); - - final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0"); - if (hierarchy.equals("deep")) { - MutableAggregatedSensor lastSensor = sensorRegistry.getTopLevelSensor(); - for (int lvl = 1; lvl < numNestedGroups; lvl++) { - lastSensor = lastSensor.addChildAggregatedSensor("group_lvl_" + lvl); - } - for (int s = 0; s < numSensor; s++) { - lastSensor.addChildMachineSensor("sensor_" + s); - } - } else if (hierarchy.equals("full")) { - addChildren(sensorRegistry.getTopLevelSensor(), numSensor, 1, numNestedGroups, 0); - } else { - throw new IllegalStateException(); - } - - final List<String> sensors = - sensorRegistry.getMachineSensors().stream().map(s -> s.getIdentifier()) - .collect(Collectors.toList()); - - if (sendRegistry) { - final ConfigPublisher configPublisher = - new ConfigPublisher(kafkaBootstrapServers, "configuration"); - configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson()); - configPublisher.close(); - System.out.println("Configuration sent."); - - System.out.println("Now wait 30 seconds"); - Thread.sleep(30_000); - System.out.println("And woke up again :)"); - } - - - final Properties kafkaProperties = new Properties(); - // kafkaProperties.put("acks", this.acknowledges); - kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize); - kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs); - kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory); - final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender = new KafkaRecordSender<>( - kafkaBootstrapServers, kafkaInputTopic, r -> r.getIdentifier(), r -> r.getTimestamp(), - kafkaProperties); - - final ScheduledExecutorService executor = Executors.newScheduledThreadPool(threads); - final Random random = new Random(); - - for (final String sensor : sensors) { - final int initialDelay = random.nextInt(periodMs); - executor.scheduleAtFixedRate( - () -> { - kafkaRecordSender.write(new ActivePowerRecord( - sensor, - System.currentTimeMillis(), - value)); - }, - initialDelay, - periodMs, - TimeUnit.MILLISECONDS); - } - - System.out.println("Wait for termination..."); - executor.awaitTermination(30, TimeUnit.DAYS); - System.out.println("Will terminate now"); - - } - - private static int addChildren(final MutableAggregatedSensor parent, final int numChildren, - final int lvl, final int maxLvl, int nextId) { - for (int c = 0; c < numChildren; c++) { - if (lvl == maxLvl) { - parent.addChildMachineSensor("s_" + nextId); - nextId++; - } else { - final MutableAggregatedSensor newParent = - parent.addChildAggregatedSensor("g_" + lvl + '_' + nextId); - nextId++; - nextId = addChildren(newParent, numChildren, lvl + 1, maxLvl, nextId); - } - } - return nextId; - } - -} diff --git a/uc1-application/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadGeneratorExtrem.java b/uc1-application/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadGeneratorExtrem.java deleted file mode 100644 index 5bfb6ad488e90f39ded2b9e4cb57d10099f1c538..0000000000000000000000000000000000000000 --- a/uc1-application/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadGeneratorExtrem.java +++ /dev/null @@ -1,165 +0,0 @@ -package titan.ccp.kiekerbridge.expbigdata19; - -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.lang.management.ThreadMXBean; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; -import java.util.Properties; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import org.apache.kafka.clients.producer.ProducerConfig; -import titan.ccp.configuration.events.Event; -import titan.ccp.kiekerbridge.KafkaRecordSender; -import titan.ccp.model.sensorregistry.MutableAggregatedSensor; -import titan.ccp.model.sensorregistry.MutableSensorRegistry; -import titan.ccp.model.sensorregistry.SensorRegistry; -import titan.ccp.models.records.ActivePowerRecord; - -public class LoadGeneratorExtrem { - - public static void main(final String[] args) throws InterruptedException, IOException { - - final String hierarchy = Objects.requireNonNullElse(System.getenv("HIERARCHY"), "deep"); - final int numNestedGroups = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_NESTED_GROUPS"), "1")); - final int numSensor = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("NUM_SENSORS"), "1")); - final int value = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("VALUE"), "10")); - final boolean sendRegistry = - Boolean.parseBoolean(Objects.requireNonNullElse(System.getenv("SEND_REGISTRY"), "true")); - final boolean doNothing = - Boolean.parseBoolean(Objects.requireNonNullElse(System.getenv("DO_NOTHING"), "false")); - final int threads = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("THREADS"), "4")); - final int producers = - Integer.parseInt(Objects.requireNonNullElse(System.getenv("PRODUCERS"), "1")); - final String kafkaBootstrapServers = - Objects.requireNonNullElse(System.getenv("KAFKA_BOOTSTRAP_SERVERS"), "localhost:9092"); - final String kafkaInputTopic = - Objects.requireNonNullElse(System.getenv("KAFKA_INPUT_TOPIC"), "input"); - final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); - final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS"); - final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY"); - - final SensorRegistry sensorRegistry = - buildSensorRegistry(hierarchy, numNestedGroups, numSensor); - - if (sendRegistry) { - final ConfigPublisher configPublisher = - new ConfigPublisher(kafkaBootstrapServers, "configuration"); - configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson()); - configPublisher.close(); - System.out.println("Configuration sent."); - - System.out.println("Now wait 30 seconds"); - Thread.sleep(30_000); - System.out.println("And woke up again :)"); - } - - final Properties kafkaProperties = new Properties(); - // kafkaProperties.put("acks", this.acknowledges); - kafkaProperties.compute(ProducerConfig.BATCH_SIZE_CONFIG, (k, v) -> kafkaBatchSize); - kafkaProperties.compute(ProducerConfig.LINGER_MS_CONFIG, (k, v) -> kafkaLingerMs); - kafkaProperties.compute(ProducerConfig.BUFFER_MEMORY_CONFIG, (k, v) -> kafkaBufferMemory); - final List<KafkaRecordSender<ActivePowerRecord>> kafkaRecordSenders = Stream - .<KafkaRecordSender<ActivePowerRecord>>generate( - () -> new KafkaRecordSender<>( - kafkaBootstrapServers, - kafkaInputTopic, - r -> r.getIdentifier(), - r -> r.getTimestamp(), - kafkaProperties)) - .limit(producers) - .collect(Collectors.toList()); - - final List<String> sensors = - sensorRegistry.getMachineSensors().stream().map(s -> s.getIdentifier()) - .collect(Collectors.toList()); - - for (int i = 0; i < threads; i++) { - final int threadId = i; - new Thread(() -> { - while (true) { - for (final String sensor : sensors) { - if (!doNothing) { - kafkaRecordSenders.get(threadId % producers).write(new ActivePowerRecord( - sensor, - System.currentTimeMillis(), - value)); - } - } - } - }).start(); - } - - while (true) { - printCpuUsagePerThread(); - } - - // System.out.println("Wait for termination..."); - // Thread.sleep(30 * 24 * 60 * 60 * 1000L); - // System.out.println("Will terminate now"); - } - - private static void printCpuUsagePerThread() throws InterruptedException { - final ThreadMXBean tmxb = ManagementFactory.getThreadMXBean(); - final List<Thread> threads = new ArrayList<>(Thread.getAllStackTraces().keySet()); - - final long start = System.nanoTime(); - final long[] startCpuTimes = new long[threads.size()]; - for (int i = 0; i < threads.size(); i++) { - final Thread thread = threads.get(i); - startCpuTimes[i] = tmxb.getThreadCpuTime(thread.getId()); - } - - Thread.sleep(5000); - - for (int i = 0; i < threads.size(); i++) { - final Thread thread = threads.get(i); - final long cpuTime = tmxb.getThreadCpuTime(thread.getId()) - startCpuTimes[i]; - final long dur = System.nanoTime() - start; - final double util = (double) cpuTime / dur; - System.out.println( - "Thread " + thread.getName() + ": " + String.format(java.util.Locale.US, "%.4f", util)); - } - } - - private static SensorRegistry buildSensorRegistry(final String hierarchy, - final int numNestedGroups, final int numSensor) { - final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0"); - if (hierarchy.equals("deep")) { - MutableAggregatedSensor lastSensor = sensorRegistry.getTopLevelSensor(); - for (int lvl = 1; lvl < numNestedGroups; lvl++) { - lastSensor = lastSensor.addChildAggregatedSensor("group_lvl_" + lvl); - } - for (int s = 0; s < numSensor; s++) { - lastSensor.addChildMachineSensor("sensor_" + s); - } - } else if (hierarchy.equals("full")) { - addChildren(sensorRegistry.getTopLevelSensor(), numSensor, 1, numNestedGroups, 0); - } else { - throw new IllegalStateException(); - } - return sensorRegistry; - } - - private static int addChildren(final MutableAggregatedSensor parent, final int numChildren, - final int lvl, final int maxLvl, int nextId) { - for (int c = 0; c < numChildren; c++) { - if (lvl == maxLvl) { - parent.addChildMachineSensor("s_" + nextId); - nextId++; - } else { - final MutableAggregatedSensor newParent = - parent.addChildAggregatedSensor("g_" + lvl + '_' + nextId); - nextId++; - nextId = addChildren(newParent, numChildren, lvl + 1, maxLvl, nextId); - } - } - return nextId; - } - -} diff --git a/uc1-application/src/main/java/uc1/application/ConfigurationKeys.java b/uc1-application/src/main/java/uc1/application/ConfigurationKeys.java new file mode 100644 index 0000000000000000000000000000000000000000..965881321cc55f8a602ee7292b3e35ddf1ae29d9 --- /dev/null +++ b/uc1-application/src/main/java/uc1/application/ConfigurationKeys.java @@ -0,0 +1,38 @@ +package uc1.application; + +/** + * Keys to access configuration parameters. + */ +public final class ConfigurationKeys { + + public static final String CASSANDRA_HOST = "cassandra.host"; + + public static final String CASSANDRA_PORT = "cassandra.port"; + + public static final String CASSANDRA_KEYSPACE = "cassandra.keyspace"; + + public static final String CASSANDRA_INIT_TIMEOUT_MS = "cassandra.init.timeout.ms"; + + public static final String WEBSERVER_ENABLE = "webserver.enable"; + + public static final String WEBSERVER_PORT = "webserver.port"; + + public static final String WEBSERVER_CORS = "webserver.cors"; + + public static final String WEBSERVER_GZIP = "webserver.gzip"; + + public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; + + public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; + + public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; + + public static final String NUM_THREADS = "num.threads"; + + public static final String COMMIT_INTERVAL_MS = "commit.interval.ms"; + + public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering"; + + private ConfigurationKeys() {} + +} diff --git a/uc1-application/src/main/java/uc1/application/HistoryService.java b/uc1-application/src/main/java/uc1/application/HistoryService.java new file mode 100644 index 0000000000000000000000000000000000000000..149648d33fb46083af4e06db9d4a7d2aec748b5a --- /dev/null +++ b/uc1-application/src/main/java/uc1/application/HistoryService.java @@ -0,0 +1,49 @@ +package uc1.application; + +import java.util.concurrent.CompletableFuture; +import org.apache.commons.configuration2.Configuration; +import org.apache.kafka.streams.KafkaStreams; +import titan.ccp.common.configuration.Configurations; +import uc1.streamprocessing.KafkaStreamsBuilder; + +/** + * A microservice that manages the history and, therefore, stores and aggregates + * incoming measurements. + * + */ +public class HistoryService { + + private final Configuration config = Configurations.create(); + + private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); + + /** + * Start the service. + * + * @return {@link CompletableFuture} which is completed when the service is + * successfully started. + */ + public void run() { + this.createKafkaStreamsApplication(); + } + + /** + * Build and start the underlying Kafka Streams application of the service. + * + */ + private void createKafkaStreamsApplication() { + final KafkaStreams kafkaStreams = new KafkaStreamsBuilder() + .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) + .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) + .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) + .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) + .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)).build(); + this.stopEvent.thenRun(kafkaStreams::close); + kafkaStreams.start(); + } + + public static void main(final String[] args) { + new HistoryService().run(); + } + +} diff --git a/uc1-application/src/main/java/uc1/streamprocessing/KafkaStreamsBuilder.java b/uc1-application/src/main/java/uc1/streamprocessing/KafkaStreamsBuilder.java new file mode 100644 index 0000000000000000000000000000000000000000..706cf79022b2485b349bfe7ae144145dda013d20 --- /dev/null +++ b/uc1-application/src/main/java/uc1/streamprocessing/KafkaStreamsBuilder.java @@ -0,0 +1,92 @@ +package uc1.streamprocessing; + +import java.util.Objects; +import java.util.Properties; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; +import titan.ccp.common.kafka.streams.PropertiesBuilder; + +/** + * Builder for the Kafka Streams configuration. + */ +public class KafkaStreamsBuilder { + + private static final String APPLICATION_NAME = "titan-ccp-history"; + private static final String APPLICATION_VERSION = "0.0.1"; + + // private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStreamsBuilder.class); + + private String bootstrapServers; // NOPMD + private String inputTopic; // NOPMD + private int numThreads = -1; // NOPMD + private int commitIntervalMs = -1; // NOPMD + private int cacheMaxBytesBuff = -1; // NOPMD + + public KafkaStreamsBuilder inputTopic(final String inputTopic) { + this.inputTopic = inputTopic; + return this; + } + + public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) { + this.bootstrapServers = bootstrapServers; + return this; + } + + /** + * Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus + * one for using the default. + */ + public KafkaStreamsBuilder numThreads(final int numThreads) { + if (numThreads < -1 || numThreads == 0) { + throw new IllegalArgumentException("Number of threads must be greater 0 or -1."); + } + this.numThreads = numThreads; + return this; + } + + /** + * Sets the Kafka Streams property for the frequency with which to save the position (offsets in + * source topics) of tasks (commit.interval.ms). Must be zero for processing all record, for + * example, when processing bulks of records. Can be minus one for using the default. + */ + public KafkaStreamsBuilder commitIntervalMs(final int commitIntervalMs) { + if (commitIntervalMs < -1) { + throw new IllegalArgumentException("Commit interval must be greater or equal -1."); + } + this.commitIntervalMs = commitIntervalMs; + return this; + } + + /** + * Sets the Kafka Streams property for maximum number of memory bytes to be used for record caches + * across all threads (cache.max.bytes.buffering). Must be zero for processing all record, for + * example, when processing bulks of records. Can be minus one for using the default. + */ + public KafkaStreamsBuilder cacheMaxBytesBuffering(final int cacheMaxBytesBuffering) { + if (cacheMaxBytesBuffering < -1) { + throw new IllegalArgumentException("Cache max bytes buffering must be greater or equal -1."); + } + this.cacheMaxBytesBuff = cacheMaxBytesBuffering; + return this; + } + + /** + * Builds the {@link KafkaStreams} instance. + */ + public KafkaStreams build() { + Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); + // TODO log parameters + final TopologyBuilder topologyBuilder = new TopologyBuilder( + this.inputTopic); + final Properties properties = PropertiesBuilder + .bootstrapServers(this.bootstrapServers) + .applicationId(APPLICATION_NAME + '-' + APPLICATION_VERSION) // TODO as parameter + .set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0) + .set(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs, p -> p >= 0) + .set(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuff, p -> p >= 0) + .set(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG") + .build(); + return new KafkaStreams(topologyBuilder.build(), properties); + } + +} diff --git a/uc1-application/src/main/java/uc1/streamprocessing/TopologyBuilder.java b/uc1-application/src/main/java/uc1/streamprocessing/TopologyBuilder.java new file mode 100644 index 0000000000000000000000000000000000000000..140c592f4e33334fbced6a80b82173c00d19eb25 --- /dev/null +++ b/uc1-application/src/main/java/uc1/streamprocessing/TopologyBuilder.java @@ -0,0 +1,45 @@ +package uc1.streamprocessing; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.kstream.Consumed; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; +import titan.ccp.models.records.ActivePowerRecordFactory; + +/** + * Builds Kafka Stream Topology for the History microservice. + */ +public class TopologyBuilder { + + private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); + + private final String inputTopic; + + private final StreamsBuilder builder = new StreamsBuilder(); + + /** + * Create a new {@link TopologyBuilder} using the given topics. + */ + public TopologyBuilder(final String inputTopic) { + this.inputTopic = inputTopic; + } + + /** + * Build the {@link Topology} for the History microservice. + */ + public Topology build() { + + this.builder + .stream(this.inputTopic, Consumed.with( + Serdes.String(), + IMonitoringRecordSerde.serde(new ActivePowerRecordFactory()))) + .mapValues(value -> value.getValueInW()) + .foreach((key, measurement) -> LOGGER + .info("Key: " + key + " Value: " + measurement)); + + return this.builder.build(); + } +}