diff --git a/docs/running-benchmarks.md b/docs/running-benchmarks.md index 0a76316c0515233f9445b363f941d60ab7aa0e06..5051cb5b685deb17212c1489c585e75262bf9da5 100644 --- a/docs/running-benchmarks.md +++ b/docs/running-benchmarks.md @@ -130,7 +130,7 @@ If [persisting results](installation#persisting-results) is enabled in Theodolit For installations without persistence, but also as an alternative for installations with persistence, we provide a second option to access results: Theodolite comes with a *results access sidecar*. It allows to copy all benchmark results from the Theodolite pod to your current working directory on your host machine with the following command: ```sh -kubectl cp $(kubectl get pod -l app=theodolite -o jsonpath="{.items[0].metadata.name}"):/results . -c results-access +kubectl cp $(kubectl get pod -l app=theodolite -o jsonpath="{.items[0].metadata.name}"):results . -c results-access ``` ## Analyzing Benchmark Results diff --git a/docs/theodolite-benchmarks/load-generator.md b/docs/theodolite-benchmarks/load-generator.md index e0d35332e97452d971ee91290d167d4e5087f796..5ae10d16a50aaa16a76975d8127ef379508b1a37 100644 --- a/docs/theodolite-benchmarks/load-generator.md +++ b/docs/theodolite-benchmarks/load-generator.md @@ -55,6 +55,7 @@ The prebuilt container images can be configured with the following environment v | `KAFKA_LINGER_MS` | Value for the Kafka producer configuration: [`linger.ms`](https://kafka.apache.org/documentation/#producerconfigs_linger.ms). Only used if Kafka is set as `TARGET`. | see Kafka producer config: [`linger.ms`](https://kafka.apache.org/documentation/#producerconfigs_linger.ms) | | `KAFKA_BUFFER_MEMORY` | Value for the Kafka producer configuration: [`buffer.memory`](https://kafka.apache.org/documentation/#producerconfigs_buffer.memory) Only used if Kafka is set as `TARGET`. | see Kafka producer config: [`buffer.memory`](https://kafka.apache.org/documentation/#producerconfigs_buffer.memory) | | `HTTP_URL` | The URL the load generator should post messages to. Only used if HTTP is set as `TARGET`. | | +| `HTTP_ASYNC` | Whether the load generator should send HTTP messages asynchronously. Only used if HTTP is set as `TARGET`. | `false` | | `PUBSUB_INPUT_TOPIC` | The Google Cloud Pub/Sub topic to write messages to. Only used if Pub/Sub is set as `TARGET`. | input | | `PUBSUB_PROJECT` | The Google Cloud this Pub/Sub topic is associated with. Only used if Pub/Sub is set as `TARGET`. | | | `PUBSUB_EMULATOR_HOST` | A Pub/Sub emulator host. Only used if Pub/Sub is set as `TARGET`. | | @@ -62,6 +63,7 @@ The prebuilt container images can be configured with the following environment v | `PERIOD_MS` | The time in milliseconds between generating two messages for the same sensor. With our Theodolite benchmarks, we apply an [open workload model](https://www.usenix.org/legacy/event/nsdi06/tech/full_papers/schroeder/schroeder.pdf) in which new messages are generated at a fixed rate, without considering the think time of the target server nor the time required for generating a message. | 1000 | | `VALUE` | The constant `valueInW` of an `ActivePowerRecord`. | 10 | | `THREADS` | Number of worker threads used to generate the load. | 4 | +| `DISABLE_DNS_CACHING` | Set to `true` to disable DNS caching by the underlying JVM. You might want to do so when generating load via HTTP that should be sent to different target instances. | `false` | Please note that there are some additional configuration options for benchmark [UC4's load generator](hhttps://github.com/cau-se/theodolite/blob/master/theodolite-benchmarks/uc4-load-generator/src/main/java/rocks/theodolite/benchmarks/uc4/loadgenerator/LoadGenerator.java). diff --git a/helm/templates/kafka/kafka-cluster.yaml b/helm/templates/kafka/kafka-cluster.yaml index 0d7eccfd279c62f7d996a8e3e41a55a5ebdd4e96..29cf038f12aa6ee38b21697b8d79b5aea378c7d8 100644 --- a/helm/templates/kafka/kafka-cluster.yaml +++ b/helm/templates/kafka/kafka-cluster.yaml @@ -30,8 +30,6 @@ spec: configMapKeyRef: name: {{ template "theodolite.fullname" . }}-kafka-metrics key: kafka-metrics-config.yml - - kafkaExporter: {} zookeeper: {{- with .Values.strimzi.zookeeper.replicas }} @@ -39,5 +37,14 @@ spec: {{- toYaml . | nindent 6 }} {{- end }} storage: - type: ephemeral + type: ephemeral + + kafkaExporter: {} + + {{- if .Values.strimzi.topicOperator.enabled }} + entityOperator: + topicOperator: {} + {{- end }} + + {{- end }} \ No newline at end of file diff --git a/helm/values.yaml b/helm/values.yaml index f3f6093d1e66432495f8acfc6db1e7e269330c4e..0f1d4790b6d03d42f6ccc65a5156eb70999867e6 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -173,7 +173,6 @@ strimzi: jvmOptions: "-Xmx": "512M" "-Xms": "512M" - zookeeper: replicas: 3 zooEntrance: @@ -181,6 +180,8 @@ strimzi: zookeeperClient: enabled: true nodeSelector: {} + topicOperator: + enabled: false ### diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/ConfigurationKeys.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/ConfigurationKeys.java index 30735a561302ff3fc2dce564aa98d1d1657164a1..efb7db61cc4c81ec2d1ffd49141d6d70a23dacaa 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/ConfigurationKeys.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/ConfigurationKeys.java @@ -23,6 +23,8 @@ public final class ConfigurationKeys { public static final String THREADS = "THREADS"; + public static final String DISABLE_DNS_CACHING = "DISABLE_DNS_CACHING"; + public static final String TARGET = "TARGET"; public static final String KAFKA_BOOTSTRAP_SERVERS = "KAFKA_BOOTSTRAP_SERVERS"; @@ -39,12 +41,15 @@ public final class ConfigurationKeys { public static final String HTTP_URL = "HTTP_URL"; + public static final String HTTP_ASYNC = "HTTP_ASYNC"; + public static final String PUBSUB_INPUT_TOPIC = "PUBSUB_INPUT_TOPIC"; public static final String PUBSUB_PROJECT = "PUBSUB_PROJECT"; public static final String PUBSUB_EMULATOR_HOST = "PUBSUB_EMULATOR_HOST"; + private ConfigurationKeys() {} } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/EnvVarLoadGeneratorFactory.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/EnvVarLoadGeneratorFactory.java index 7123cfe540b8a9010e6ae9fffc25dade5161eec9..29ede821eefe171f377d58fce8d98eee28e8a277 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/EnvVarLoadGeneratorFactory.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/EnvVarLoadGeneratorFactory.java @@ -11,9 +11,19 @@ import titan.ccp.model.records.ActivePowerRecord; class EnvVarLoadGeneratorFactory { + public static final boolean DISABLE_DNS_CACHING_DEFAULT = false; + private static final Logger LOGGER = LoggerFactory.getLogger(EnvVarLoadGeneratorFactory.class); public LoadGenerator create(final LoadGenerator loadGeneratorTemplate) { + + final boolean disableDnsCaching = Boolean.parseBoolean(Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.DISABLE_DNS_CACHING), + Boolean.toString(DISABLE_DNS_CACHING_DEFAULT))); + if (disableDnsCaching) { + this.disableDnsCaching(); + } + final int numSensors = Integer.parseInt(Objects.requireNonNullElse( System.getenv(ConfigurationKeys.NUM_SENSORS), Integer.toString(LoadGenerator.NUMBER_OF_KEYS_DEFAULT))); @@ -32,9 +42,9 @@ class EnvVarLoadGeneratorFactory { .setLoadDefinition(new WorkloadDefinition( new KeySpace(LoadGenerator.SENSOR_PREFIX_DEFAULT, numSensors), Duration.ofMillis(periodMs))) - .setGeneratorConfig(new LoadGeneratorConfig( + .setGeneratorConfig(new LoadGeneratorConfig(GeneratorAction.from( TitanRecordGenerator.forConstantValue(value), - this.buildRecordSender())) + this.buildRecordSender()))) .withThreads(threads); } @@ -109,8 +119,11 @@ class EnvVarLoadGeneratorFactory { Objects.requireNonNullElse( System.getenv(ConfigurationKeys.HTTP_URL), LoadGenerator.HTTP_URI_DEFAULT)); - recordSender = new HttpRecordSender<>(url); - LOGGER.info("Use HTTP server as target with url '{}'.", url); + final boolean async = Boolean.parseBoolean(Objects.requireNonNullElse( + System.getenv(ConfigurationKeys.HTTP_ASYNC), + Boolean.toString(LoadGenerator.HTTP_ASYNC_DEFAULT))); + recordSender = new HttpRecordSender<>(url, async); + LOGGER.info("Use HTTP server as target with URL '{}' and asynchronously: '{}'.", url, async); } else if (target == LoadGeneratorTarget.PUBSUB) { final String project = System.getenv(ConfigurationKeys.PUBSUB_PROJECT); final String inputTopic = Objects.requireNonNullElse( @@ -135,4 +148,9 @@ class EnvVarLoadGeneratorFactory { return recordSender; } + private void disableDnsCaching() { + LOGGER.info("Disable DNS caching."); + java.security.Security.setProperty("networkaddress.cache.ttl", "0"); + } + } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/GeneratorAction.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/GeneratorAction.java index e8a03fc8776d0cb07fc9623df93688db7455a042..fb3bf1c9f802d2af2a0eae72bc58c9e609a6b624 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/GeneratorAction.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/GeneratorAction.java @@ -5,14 +5,18 @@ package rocks.theodolite.benchmarks.loadgenerator; * it. */ @FunctionalInterface -interface GeneratorAction { +public interface GeneratorAction { void generate(final String key); + default void shutdown() { + // Nothing to do per default + } + public static <T> GeneratorAction from( final RecordGenerator<? extends T> generator, final RecordSender<? super T> sender) { - return key -> sender.send(generator.generate(key)); + return new GeneratorActionImpl<>(generator, sender); } } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/GeneratorActionImpl.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/GeneratorActionImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..1d177b7193ee8df8b4d65546cbbcbb6f49a95488 --- /dev/null +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/GeneratorActionImpl.java @@ -0,0 +1,27 @@ +package rocks.theodolite.benchmarks.loadgenerator; + +class GeneratorActionImpl<T> implements GeneratorAction { + + private final RecordGenerator<? extends T> generator; + + private final RecordSender<? super T> sender; + + public GeneratorActionImpl( + final RecordGenerator<? extends T> generator, + final RecordSender<? super T> sender) { + this.generator = generator; + this.sender = sender; + } + + @Override + public void shutdown() { + this.generator.close(); + this.sender.close(); + } + + @Override + public void generate(final String key) { + this.sender.send(this.generator.generate(key)); + } + +} diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HazelcastRunner.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HazelcastRunner.java index 7d298c6275e55ebfbdb7bd2a5ba71264a7769dec..3b222a332fc76159aedcf82a1753363c7b1e414e 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HazelcastRunner.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HazelcastRunner.java @@ -36,7 +36,7 @@ public class HazelcastRunner { } /** - * Start the workload generation and blocks until the workload generation is stopped again. + * Start the load generation and blocks until the load generation is stopped again. */ public void runBlocking() { while (!this.stopAction.isDone()) { @@ -52,19 +52,24 @@ public class HazelcastRunner { } public void restart() { - this.stopRunnerState(); + this.stopRunnerStateAsync(); } + /** + * Stop generating load and clean up the entire state. + */ public void stop() { this.stopAction.complete(null); - this.stopRunnerState(); + this.stopRunnerStateAsync().join(); + this.hzInstance.shutdown(); } - private void stopRunnerState() { + private CompletableFuture<Void> stopRunnerStateAsync() { synchronized (this) { if (this.runnerState != null) { - this.runnerState.stopAsync(); + return this.runnerState.stopAsync(); } + return CompletableFuture.completedFuture(null); } } @@ -94,7 +99,9 @@ public class HazelcastRunner { .getJoin(); joinConfig.getMulticastConfig().setEnabled(false); if (cluster.hasBootstrapServer()) { - joinConfig.getTcpIpConfig().addMember(cluster.getBootstrapServer()); + joinConfig.getTcpIpConfig() + .setEnabled(true) + .addMember(cluster.getBootstrapServer()); } else if (cluster.hasKubernetesDnsName()) { joinConfig.getKubernetesConfig() .setEnabled(true) diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HazelcastRunnerStateInstance.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HazelcastRunnerStateInstance.java index 79b86b6eecd980b8d90acab682687b6f8804d1b6..81a6db7329309a8c2b09fb3308cb496c5420a206 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HazelcastRunnerStateInstance.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HazelcastRunnerStateInstance.java @@ -27,6 +27,7 @@ public class HazelcastRunnerStateInstance { private static final Duration TASK_ASSIGNMENT_WAIT_DURATION = Duration.ofMillis(500); private final CompletableFuture<Void> stopAction = new CompletableFuture<>(); + private final CompletableFuture<Void> stopFinished = new CompletableFuture<>(); private LoadGeneratorExecution loadGeneratorExecution; private final LoadGeneratorConfig loadGeneratorConfig; @@ -61,10 +62,12 @@ public class HazelcastRunnerStateInstance { } this.stopAction.join(); this.stopLoadGeneration(); + this.stopFinished.complete(null); } - public void stopAsync() { + public CompletableFuture<Void> stopAsync() { this.stopAction.complete(null); + return this.stopFinished; } private void tryPerformBeforeAction() { diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HttpRecordSender.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HttpRecordSender.java index f9c0d3830e955ca88864d61e8bb74dc0974c1f26..124f11a979f3afad5507db86c68e9eeb42c64eb6 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HttpRecordSender.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/HttpRecordSender.java @@ -7,6 +7,7 @@ import java.net.http.HttpRequest; import java.net.http.HttpResponse; import java.net.http.HttpResponse.BodyHandler; import java.net.http.HttpResponse.BodyHandlers; +import java.time.Duration; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -23,6 +24,8 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender< private static final int HTTP_OK = 200; + private static final Duration CONNECTION_TIMEOUT = Duration.ofSeconds(1); + private static final Logger LOGGER = LoggerFactory.getLogger(HttpRecordSender.class); private final Gson gson = new Gson(); @@ -41,7 +44,17 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender< * @param uri the {@link URI} records should be sent to */ public HttpRecordSender(final URI uri) { - this(uri, true, List.of(HTTP_OK)); + this(uri, false, List.of(HTTP_OK)); + } + + /** + * Create a new {@link HttpRecordSender}. + * + * @param uri the {@link URI} records should be sent to + * @param async whether HTTP requests should be sent asynchronous + */ + public HttpRecordSender(final URI uri, final boolean async) { + this(uri, async, List.of(HTTP_OK)); } /** @@ -63,6 +76,7 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender< final String json = this.gson.toJson(message); final HttpRequest request = HttpRequest.newBuilder() .uri(this.uri) + .timeout(CONNECTION_TIMEOUT) .POST(HttpRequest.BodyPublishers.ofString(json)) .build(); final BodyHandler<Void> bodyHandler = BodyHandlers.discarding(); @@ -81,7 +95,7 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender< response.statusCode()); } }); - if (this.async) { + if (this.isSync()) { try { result.get(); } catch (InterruptedException | ExecutionException e) { @@ -90,4 +104,8 @@ public class HttpRecordSender<T extends SpecificRecord> implements RecordSender< } } + private boolean isSync() { + return !this.async; + } + } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSender.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSender.java index 45731e8bce79264252c55a61b9efa5245f610c3b..56b1946ad78d888fe6e5140fdc373bb2cd3a4ed4 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSender.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSender.java @@ -4,115 +4,72 @@ import java.util.Properties; import java.util.function.Function; import org.apache.avro.specific.SpecificRecord; import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; /** - * Sends monitoring records to Kafka. + * Sends records to Kafka. * - * @param <T> {@link SpecificRecord} to send + * @param <T> Record type to send. */ -public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender<T> { +public interface KafkaRecordSender<T> extends RecordSender<T> { - private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class); - - private final String topic; - - private final Function<T, String> keyAccessor; - - private final Function<T, Long> timestampAccessor; - - private final Producer<String, T> producer; + @Override + public void close(); /** - * Create a new {@link KafkaRecordSender}. + * Creates a builder object for a {@link KafkaRecordSender} based on a Kafka {@link Serializer}. + * + * @param bootstrapServers The server to for accessing Kafka. + * @param topic The topic where to write. + * @param serializer The {@link Serializer} for mapping a value to keys. */ - private KafkaRecordSender(final Builder<T> builder) { - this.topic = builder.topic; - this.keyAccessor = builder.keyAccessor; - this.timestampAccessor = builder.timestampAccessor; - - final Properties properties = new Properties(); - properties.putAll(builder.defaultProperties); - properties.put("bootstrap.servers", builder.bootstrapServers); - // properties.put("acks", this.acknowledges); - // properties.put("batch.size", this.batchSize); - // properties.put("linger.ms", this.lingerMs); - // properties.put("buffer.memory", this.bufferMemory); - - final SchemaRegistryAvroSerdeFactory avroSerdeFactory = - new SchemaRegistryAvroSerdeFactory(builder.schemaRegistryUrl); - this.producer = new KafkaProducer<>( - properties, - new StringSerializer(), - avroSerdeFactory.<T>forKeys().serializer()); + public static <T> Builder<T> builderWithSerializer( + final String bootstrapServers, + final String topic, + final Serializer<T> serializer) { + return new Builder<>(bootstrapServers, topic, serializer); } /** - * Write the passed monitoring record to Kafka. + * Creates a Builder object for a {@link KafkaRecordSender} based on a Confluent Schema Registry + * URL. + * + * @param bootstrapServers The Server to for accessing Kafka. + * @param topic The topic where to write. + * @param schemaRegistryUrl URL to the schema registry for avro. */ - public void write(final T monitoringRecord) { - final ProducerRecord<String, T> record = - new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord), - this.keyAccessor.apply(monitoringRecord), monitoringRecord); - - LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record); - try { - this.producer.send(record); - } catch (final SerializationException e) { - LOGGER.warn( - "Record could not be serialized and thus not sent to Kafka due to exception. Skipping this record.", // NOCS - e); - } - } - - public void terminate() { - this.producer.close(); - } - - @Override - public void send(final T message) { - this.write(message); - } - - public static <T extends SpecificRecord> Builder<T> builder( + public static <T extends SpecificRecord> Builder<T> builderWithSchemaRegistry( final String bootstrapServers, final String topic, final String schemaRegistryUrl) { - return new Builder<>(bootstrapServers, topic, schemaRegistryUrl); + final SchemaRegistryAvroSerdeFactory avroSerdeFactory = + new SchemaRegistryAvroSerdeFactory(schemaRegistryUrl); + return new Builder<>(bootstrapServers, topic, avroSerdeFactory.<T>forValues().serializer()); } /** - * Builder class to build a new {@link KafkaRecordSender}. + * Builder class to build a new {@link KafkaRecordSenderImpl}. * * @param <T> Type of the records that should later be send. */ - public static class Builder<T extends SpecificRecord> { + public static class Builder<T> { private final String bootstrapServers; private final String topic; - private final String schemaRegistryUrl; + private final Serializer<T> serializer; private Function<T, String> keyAccessor = x -> ""; // NOPMD private Function<T, Long> timestampAccessor = x -> null; // NOPMD private Properties defaultProperties = new Properties(); // NOPMD - /** - * Creates a Builder object for a {@link KafkaRecordSender}. - * - * @param bootstrapServers The Server to for accessing Kafka. - * @param topic The topic where to write. - * @param schemaRegistryUrl URL to the schema registry for avro. - */ private Builder(final String bootstrapServers, final String topic, - final String schemaRegistryUrl) { + final Serializer<T> serializer) { this.bootstrapServers = bootstrapServers; this.topic = topic; - this.schemaRegistryUrl = schemaRegistryUrl; + this.serializer = serializer; } public Builder<T> keyAccessor(final Function<T, String> keyAccessor) { @@ -130,9 +87,51 @@ public class KafkaRecordSender<T extends SpecificRecord> implements RecordSender return this; } + /** + * Create a {@link KafkaRecordSender} from this builder. + */ public KafkaRecordSender<T> build() { - return new KafkaRecordSender<>(this); + final Properties properties = new Properties(); + properties.putAll(this.defaultProperties); + properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); + // properties.put("acks", this.acknowledges); + // properties.put("batch.size", this.batchSize); + // properties.put("linger.ms", this.lingerMs); + // properties.put("buffer.memory", this.bufferMemory); + + return new KafkaRecordSenderImpl<>( + new KafkaProducer<>( + properties, + new StringSerializer(), + this.serializer), + new DefaultRecordFactory<>(), + this.topic, + this.keyAccessor, + this.timestampAccessor); } + + private static class DefaultRecordFactory<T> implements KafkaRecordFactory<T, String, T> { + + @Override + public ProducerRecord<String, T> create(final String topic, final String key, final T value, + final long timestamp) { + return new ProducerRecord<>(topic, null, timestamp, key, value); + } + + } + } + + /** + * Create Kafka {@link ProducerRecord}s from a topic, a key, a value and a timestamp. + * + * @param <T> type the records should be created from. + * @param <K> key type of the {@link ProducerRecord}s. + * @param <V> value type of the {@link ProducerRecord}s. + */ + public static interface KafkaRecordFactory<T, K, V> { + + ProducerRecord<K, V> create(String topic, String key, T value, long timestamp); + } } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSenderImpl.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSenderImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..02a4d206b0a2414d5f12f5348f6c1bfc56852281 --- /dev/null +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KafkaRecordSenderImpl.java @@ -0,0 +1,70 @@ +package rocks.theodolite.benchmarks.loadgenerator; + +import java.util.function.Function; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.errors.SerializationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Sends records to Kafka. + * + * @param <T> Record type to send. + * @param <K> Internal key type for Kafka records. + * @param <V> Internal value type for Kafka records. + */ +/* default */ class KafkaRecordSenderImpl<T, K, V> implements KafkaRecordSender<T> { + + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSenderImpl.class); + + private final String topic; + + private final Function<T, String> keyAccessor; + + private final Function<T, Long> timestampAccessor; + + private final Producer<K, V> producer; + + private final KafkaRecordFactory<T, K, V> recordFactory; + + /** + * Create a new {@link KafkaRecordSenderImpl}. + */ + protected KafkaRecordSenderImpl( + final Producer<K, V> producer, + final KafkaRecordFactory<T, K, V> recordFactory, + final String topic, + final Function<T, String> keyAccessor, + final Function<T, Long> timestampAccessor) { + this.topic = topic; + this.producer = producer; + this.recordFactory = recordFactory; + this.keyAccessor = keyAccessor; + this.timestampAccessor = timestampAccessor; + } + + @Override + public void close() { + this.producer.close(); + } + + @Override + public void send(final T message) { + final ProducerRecord<K, V> record = this.recordFactory.create( + this.topic, + this.keyAccessor.apply(message), + message, + this.timestampAccessor.apply(message)); + + LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record); + try { + this.producer.send(record); + } catch (final SerializationException e) { + LOGGER.warn( + "Record could not be serialized and thus not sent to Kafka due to exception. Skipping this record.", // NOCS + e); + } + } + +} diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KeySpace.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KeySpace.java index ed5d9a4982b8b075c11dd7eadfcf2a78edc837ee..82dac7b42dc6e16406f32d30f0dc7a372d7af43d 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KeySpace.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/KeySpace.java @@ -10,7 +10,7 @@ import java.util.stream.IntStream; */ public class KeySpace implements Serializable { - private static final long serialVersionUID = 7343135392720315515L; // NOPMD + private static final long serialVersionUID = 7343135392720315516L; // NOPMD private final String prefix; private final int min; @@ -18,11 +18,11 @@ public class KeySpace implements Serializable { /** * Create a new key space. All keys will have the prefix {@code prefix}. The remaining part of - * each key will be determined by a number of the interval ({@code min}, {@code max}). + * each key will be determined by a number of the interval [{@code min}, {@code max}). * * @param prefix the prefix to use for all keys * @param min the lower bound (inclusive) to start counting from - * @param max the upper bound (inclusive) to count to + * @param max the upper bound (exclusive) to count to */ public KeySpace(final String prefix, final int min, final int max) { this.prefix = prefix; @@ -31,7 +31,7 @@ public class KeySpace implements Serializable { } public KeySpace(final String prefix, final int numberOfKeys) { - this(prefix, 0, numberOfKeys - 1); + this(prefix, 0, numberOfKeys); } public String getPrefix() { @@ -52,21 +52,21 @@ public class KeySpace implements Serializable { * Get the amount of keys in this {@link KeySpace}. */ public int getCount() { - return this.getMax() - this.getMin() + 1; + return this.max - this.min; } /** * Get all keys in this {@link KeySpace}. */ public Collection<String> getKeys() { - return IntStream.rangeClosed(this.min, this.max) + return IntStream.range(this.min, this.max) .mapToObj(id -> this.prefix + id) .collect(Collectors.toUnmodifiableList()); } @Override public String toString() { - return this.prefix + '[' + this.min + '-' + this.max + ']'; + return this.prefix + '[' + this.min + '-' + this.max + ')'; } } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/LoadGenerator.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/LoadGenerator.java index 6866b39e51570299d05795557b66997dc934c035..27edb97efc335400acf1d6244db0ce384ee20f59 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/LoadGenerator.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/LoadGenerator.java @@ -17,6 +17,7 @@ public final class LoadGenerator { public static final LoadGeneratorTarget TARGET_DEFAULT = LoadGeneratorTarget.KAFKA; // Target: HTTP public static final String HTTP_URI_DEFAULT = "http://localhost:8080"; + public static final boolean HTTP_ASYNC_DEFAULT = false; // Target: Kafka public static final String SCHEMA_REGISTRY_URL_DEFAULT = "http://localhost:8081"; public static final String KAFKA_TOPIC_DEFAULT = "input"; // NOCS @@ -78,6 +79,7 @@ public final class LoadGenerator { this.clusterConfig, this.generatorConfig, this.loadDefinition); + Runtime.getRuntime().addShutdownHook(new Thread(() -> runner.stop())); runner.runBlocking(); } @@ -91,11 +93,12 @@ public final class LoadGenerator { new KeySpace(SENSOR_PREFIX_DEFAULT, NUMBER_OF_KEYS_DEFAULT), Duration.ofMillis(PERIOD_MS_DEFAULT))) .setGeneratorConfig(new LoadGeneratorConfig( - TitanRecordGenerator.forConstantValue(VALUE_DEFAULT), - TitanKafkaSenderFactory.forKafkaConfig( - KAFKA_BOOTSTRAP_SERVERS_DEFAULT, - KAFKA_TOPIC_DEFAULT, - SCHEMA_REGISTRY_URL_DEFAULT))); + GeneratorAction.from( + TitanRecordGenerator.forConstantValue(VALUE_DEFAULT), + TitanKafkaSenderFactory.forKafkaConfig( + KAFKA_BOOTSTRAP_SERVERS_DEFAULT, + KAFKA_TOPIC_DEFAULT, + SCHEMA_REGISTRY_URL_DEFAULT)))); } /** diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/LoadGeneratorConfig.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/LoadGeneratorConfig.java index 97ed0b8fce6a18050e2c5846da1c590e891ed80b..e854138b38613ba614c871febcb80cf9c6b059ef 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/LoadGeneratorConfig.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/LoadGeneratorConfig.java @@ -5,21 +5,16 @@ package rocks.theodolite.benchmarks.loadgenerator; */ public class LoadGeneratorConfig { - private final GeneratorAction messageGenerator; + private final GeneratorAction generatorAction; private BeforeAction beforeAction = BeforeAction.doNothing(); private int threads = 1; - public <T> LoadGeneratorConfig( - final RecordGenerator<? extends T> generator, - final RecordSender<? super T> sender) { - this.messageGenerator = GeneratorAction.from(generator, sender); + public LoadGeneratorConfig(final GeneratorAction generatorAction) { + this.generatorAction = generatorAction; } - public <T> LoadGeneratorConfig( - final RecordGenerator<? extends T> generator, - final RecordSender<? super T> sender, - final int threads) { - this(generator, sender); + public LoadGeneratorConfig(final GeneratorAction generatorAction, final int threads) { + this(generatorAction); this.threads = threads; } @@ -37,7 +32,7 @@ public class LoadGeneratorConfig { public LoadGeneratorExecution buildLoadGeneratorExecution( final WorkloadDefinition workloadDefinition) { - return new LoadGeneratorExecution(workloadDefinition, this.messageGenerator, this.threads); + return new LoadGeneratorExecution(workloadDefinition, this.generatorAction, this.threads); } } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/PubSubRecordSender.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/PubSubRecordSender.java index 97c4533dc4b8904f8ae9a5c46c3459216e86b5ca..ecba6961245651c7420d89c5da9bd1f993972188 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/PubSubRecordSender.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/PubSubRecordSender.java @@ -55,7 +55,8 @@ public class PubSubRecordSender<T> implements RecordSender<T> { /** * Terminate this {@link PubSubRecordSender} and shutdown the underlying {@link Publisher}. */ - public void terminate() { + @Override + public void close() { this.publisher.shutdown(); try { this.publisher.awaitTermination(SHUTDOWN_TIMEOUT_SEC, TimeUnit.SECONDS); diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/RecordGenerator.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/RecordGenerator.java index 0b64ace46a9e04f013f843ecd08dd6fcdf5eed9d..05e127eb019cf877cc5df73e09a6f053ef793fc3 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/RecordGenerator.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/RecordGenerator.java @@ -1,5 +1,7 @@ package rocks.theodolite.benchmarks.loadgenerator; +import java.io.Closeable; + /** * This interface describes a function that takes meta information from a string key and produces an * object of type T. @@ -7,8 +9,13 @@ package rocks.theodolite.benchmarks.loadgenerator; * @param <T> the type of the objects that will be generated by the function. */ @FunctionalInterface -public interface RecordGenerator<T> { +public interface RecordGenerator<T> extends Closeable { T generate(final String key); + @Override + default void close() { + // Nothing to do per default + } + } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/RecordSender.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/RecordSender.java index 71732b88d2cf3f119140474c387f78b92a9521f8..f1f1bef980f01da4a23b49440be71ba552c13905 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/RecordSender.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/RecordSender.java @@ -1,5 +1,7 @@ package rocks.theodolite.benchmarks.loadgenerator; +import java.io.Closeable; + /** * This interface describes a function that consumes a message {@code T}. This function is dedicated * to be used to transport individual messages to the messaging system. @@ -7,8 +9,13 @@ package rocks.theodolite.benchmarks.loadgenerator; * @param <T> the type of records to send as messages. */ @FunctionalInterface -public interface RecordSender<T> { +public interface RecordSender<T> extends Closeable { void send(final T message); + @Override + default void close() { + // Nothing to do per default + } + } diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/TitanKafkaSenderFactory.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/TitanKafkaSenderFactory.java index 063bbaaab4a24d9dd2d90ef744672e03ac852b8b..ee7d416513439a5d0ba7bad7bcdb09e1baf5e4c7 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/TitanKafkaSenderFactory.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/TitanKafkaSenderFactory.java @@ -31,7 +31,7 @@ public final class TitanKafkaSenderFactory { final String schemaRegistryUrl, final Properties properties) { return KafkaRecordSender - .<ActivePowerRecord>builder( + .<ActivePowerRecord>builderWithSchemaRegistry( bootstrapServers, topic, schemaRegistryUrl) diff --git a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/WorkloadDefinition.java b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/WorkloadDefinition.java index 458997cce20c6e759c6772107100208dedcb0c3f..1e7e6e5974323b545dd29b5db341405ffbd32eb3 100644 --- a/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/WorkloadDefinition.java +++ b/theodolite-benchmarks/load-generator-commons/src/main/java/rocks/theodolite/benchmarks/loadgenerator/WorkloadDefinition.java @@ -42,16 +42,16 @@ public class WorkloadDefinition implements Serializable { * distributing its {@link KeySpace} (almost) equally among all {@link WorkloadDefinition}s. */ public Set<WorkloadDefinition> divide(final int parts) { - final int effParts = Math.min(parts, this.keySpace.getCount()); - final int minSize = this.keySpace.getCount() / effParts; - final int largerParts = this.keySpace.getCount() % effParts; - return IntStream.range(0, effParts) + // final int effParts = Math.min(parts, this.keySpace.getCount()); + final int minSize = this.keySpace.getCount() / parts; + final int largerParts = this.keySpace.getCount() % parts; + return IntStream.range(0, parts) .mapToObj(part -> { final int thisSize = part < largerParts ? minSize + 1 : minSize; final int largePartsBefore = Math.min(largerParts, part); final int smallPartsBefore = part - largePartsBefore; final int start = largePartsBefore * (minSize + 1) + smallPartsBefore * minSize; - final int end = start + thisSize - 1; + final int end = start + thisSize; return new KeySpace( this.keySpace.getPrefix(), start, diff --git a/theodolite-benchmarks/load-generator-commons/src/test/java/rocks/theodolite/benchmarks/loadgenerator/KeySpaceTest.java b/theodolite-benchmarks/load-generator-commons/src/test/java/rocks/theodolite/benchmarks/loadgenerator/KeySpaceTest.java index 829f26ba43d20285661cf2684d48f09102640402..cbd230433d2345cf00212cf6f68463d07c5ef765 100644 --- a/theodolite-benchmarks/load-generator-commons/src/test/java/rocks/theodolite/benchmarks/loadgenerator/KeySpaceTest.java +++ b/theodolite-benchmarks/load-generator-commons/src/test/java/rocks/theodolite/benchmarks/loadgenerator/KeySpaceTest.java @@ -7,14 +7,14 @@ public class KeySpaceTest { @Test public void testCountFixedRangeFromZero() { - final KeySpace keySpace = new KeySpace("prefix", 0, 9); + final KeySpace keySpace = new KeySpace("prefix", 0, 10); final int count = keySpace.getCount(); Assert.assertEquals(10, count); } @Test public void testCountFixedRangeNotFromZero() { - final KeySpace keySpace = new KeySpace("prefix", 4, 11); + final KeySpace keySpace = new KeySpace("prefix", 4, 12); final int count = keySpace.getCount(); Assert.assertEquals(8, count); } diff --git a/theodolite-benchmarks/load-generator-commons/src/test/java/rocks/theodolite/benchmarks/loadgenerator/WorkloadDefinitionTest.java b/theodolite-benchmarks/load-generator-commons/src/test/java/rocks/theodolite/benchmarks/loadgenerator/WorkloadDefinitionTest.java index fb10f36e08ec32f3137193112b5f84a9ad9ff5c6..b670a73c6fe423e42e02c7d25d67311a5858c1af 100644 --- a/theodolite-benchmarks/load-generator-commons/src/test/java/rocks/theodolite/benchmarks/loadgenerator/WorkloadDefinitionTest.java +++ b/theodolite-benchmarks/load-generator-commons/src/test/java/rocks/theodolite/benchmarks/loadgenerator/WorkloadDefinitionTest.java @@ -5,6 +5,7 @@ import java.util.Comparator; import java.util.List; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.junit.Assert; import org.junit.Test; @@ -47,15 +48,27 @@ public class WorkloadDefinitionTest { final List<WorkloadDefinition> orderedSubworkloads = subworkloads.stream() .sorted(Comparator.comparingInt(l -> l.getKeySpace().getMin())) .collect(Collectors.toList()); + final WorkloadDefinition subworkload1 = orderedSubworkloads.get(0); - Assert.assertEquals(0, subworkload1.getKeySpace().getMin()); - Assert.assertEquals(33, subworkload1.getKeySpace().getMax()); + final List<String> expectedKeySubworkload1 = IntStream + .rangeClosed(0, 33) + .mapToObj(id -> "prefix" + id) + .collect(Collectors.toList()); + Assert.assertEquals(expectedKeySubworkload1, subworkload1.getKeySpace().getKeys()); + final WorkloadDefinition subworkload2 = orderedSubworkloads.get(1); - Assert.assertEquals(34, subworkload2.getKeySpace().getMin()); - Assert.assertEquals(66, subworkload2.getKeySpace().getMax()); + final List<String> expectedKeySubworkload2 = IntStream + .rangeClosed(34, 66) + .mapToObj(id -> "prefix" + id) + .collect(Collectors.toList()); + Assert.assertEquals(expectedKeySubworkload2, subworkload2.getKeySpace().getKeys()); + final WorkloadDefinition subworkload3 = orderedSubworkloads.get(2); - Assert.assertEquals(67, subworkload3.getKeySpace().getMin()); - Assert.assertEquals(99, subworkload3.getKeySpace().getMax()); + final List<String> expectedKeySubworkload3 = IntStream + .rangeClosed(67, 99) + .mapToObj(id -> "prefix" + id) + .collect(Collectors.toList()); + Assert.assertEquals(expectedKeySubworkload3, subworkload3.getKeySpace().getKeys()); } @Test @@ -71,27 +84,109 @@ public class WorkloadDefinitionTest { final List<WorkloadDefinition> orderedSubworkloads = subworkloads.stream() .sorted(Comparator.comparingInt(l -> l.getKeySpace().getMin())) .collect(Collectors.toList()); + final WorkloadDefinition subworkload1 = orderedSubworkloads.get(0); - Assert.assertEquals(0, subworkload1.getKeySpace().getMin()); - Assert.assertEquals(1, subworkload1.getKeySpace().getMax()); + final List<String> expectedKeySubworkload1 = IntStream + .rangeClosed(0, 1) + .mapToObj(id -> "prefix" + id) + .collect(Collectors.toList()); + Assert.assertEquals(expectedKeySubworkload1, subworkload1.getKeySpace().getKeys()); + final WorkloadDefinition subworkload2 = orderedSubworkloads.get(1); - Assert.assertEquals(2, subworkload2.getKeySpace().getMin()); - Assert.assertEquals(3, subworkload2.getKeySpace().getMax()); + final List<String> expectedKeySubworkload2 = IntStream + .rangeClosed(2, 3) + .mapToObj(id -> "prefix" + id) + .collect(Collectors.toList()); + Assert.assertEquals(expectedKeySubworkload2, subworkload2.getKeySpace().getKeys()); + final WorkloadDefinition subworkload3 = orderedSubworkloads.get(2); - Assert.assertEquals(4, subworkload3.getKeySpace().getMin()); - Assert.assertEquals(5, subworkload3.getKeySpace().getMax()); + final List<String> expectedKeySubworkload3 = IntStream + .rangeClosed(4, 5) + .mapToObj(id -> "prefix" + id) + .collect(Collectors.toList()); + Assert.assertEquals(expectedKeySubworkload3, subworkload3.getKeySpace().getKeys()); + final WorkloadDefinition subworkload4 = orderedSubworkloads.get(3); - Assert.assertEquals(6, subworkload4.getKeySpace().getMin()); - Assert.assertEquals(6, subworkload4.getKeySpace().getMax()); + final List<String> expectedKeySubworkload4 = IntStream + .rangeClosed(6, 6) + .mapToObj(id -> "prefix" + id) + .collect(Collectors.toList()); + Assert.assertEquals(expectedKeySubworkload4, subworkload4.getKeySpace().getKeys()); + final WorkloadDefinition subworkload5 = orderedSubworkloads.get(4); - Assert.assertEquals(7, subworkload5.getKeySpace().getMin()); - Assert.assertEquals(7, subworkload5.getKeySpace().getMax()); + final List<String> expectedKeySubworkload5 = IntStream + .rangeClosed(7, 7) + .mapToObj(id -> "prefix" + id) + .collect(Collectors.toList()); + Assert.assertEquals(expectedKeySubworkload5, subworkload5.getKeySpace().getKeys()); + final WorkloadDefinition subworkload6 = orderedSubworkloads.get(5); - Assert.assertEquals(8, subworkload6.getKeySpace().getMin()); - Assert.assertEquals(8, subworkload6.getKeySpace().getMax()); + final List<String> expectedKeySubworkload6 = IntStream + .rangeClosed(8, 8) + .mapToObj(id -> "prefix" + id) + .collect(Collectors.toList()); + Assert.assertEquals(expectedKeySubworkload6, subworkload6.getKeySpace().getKeys()); + final WorkloadDefinition subworkload7 = orderedSubworkloads.get(6); - Assert.assertEquals(9, subworkload7.getKeySpace().getMin()); - Assert.assertEquals(9, subworkload7.getKeySpace().getMax()); + final List<String> expectedKeySubworkload7 = IntStream + .rangeClosed(9, 9) + .mapToObj(id -> "prefix" + id) + .collect(Collectors.toList()); + Assert.assertEquals(expectedKeySubworkload7, subworkload7.getKeySpace().getKeys()); + } + + @Test + public void testDivideWithOneEmpty() { + final KeySpace keySpace = new KeySpace("prefix", 2); + final WorkloadDefinition workload = new WorkloadDefinition(keySpace, Duration.ofSeconds(1)); + final Set<WorkloadDefinition> subworkloads = workload.divide(3); + Assert.assertEquals(3, subworkloads.size()); + for (final WorkloadDefinition subworkload : subworkloads) { + Assert.assertEquals("prefix", subworkload.getKeySpace().getPrefix()); + Assert.assertEquals(Duration.ofSeconds(1), subworkload.getPeriod()); + } + final List<WorkloadDefinition> orderedSubworkloads = subworkloads.stream() + .sorted(Comparator.comparingInt(l -> l.getKeySpace().getMin())) + .collect(Collectors.toList()); + + final WorkloadDefinition subworkload1 = orderedSubworkloads.get(0); + final List<String> expectedKeySubworkload1 = List.of("prefix0"); + Assert.assertEquals(expectedKeySubworkload1, subworkload1.getKeySpace().getKeys()); + + final WorkloadDefinition subworkload2 = orderedSubworkloads.get(1); + final List<String> expectedKeySubworkload2 = List.of("prefix1"); + Assert.assertEquals(expectedKeySubworkload2, subworkload2.getKeySpace().getKeys()); + + final WorkloadDefinition subworkload3 = orderedSubworkloads.get(2); + final List<String> expectedKeySubworkload3 = List.of(); + Assert.assertEquals(expectedKeySubworkload3, subworkload3.getKeySpace().getKeys()); + } + + @Test + public void testDivideWithTwoEmpty() { + final KeySpace keySpace = new KeySpace("prefix", 1); + final WorkloadDefinition workload = new WorkloadDefinition(keySpace, Duration.ofSeconds(1)); + final Set<WorkloadDefinition> subworkloads = workload.divide(3); + Assert.assertEquals(3, subworkloads.size()); + for (final WorkloadDefinition subworkload : subworkloads) { + Assert.assertEquals("prefix", subworkload.getKeySpace().getPrefix()); + Assert.assertEquals(Duration.ofSeconds(1), subworkload.getPeriod()); + } + final List<WorkloadDefinition> orderedSubworkloads = subworkloads.stream() + .sorted(Comparator.comparingInt(l -> l.getKeySpace().getMin())) + .collect(Collectors.toList()); + + final WorkloadDefinition subworkload1 = orderedSubworkloads.get(0); + final List<String> expectedKeySubworkload1 = List.of("prefix0"); + Assert.assertEquals(expectedKeySubworkload1, subworkload1.getKeySpace().getKeys()); + + final WorkloadDefinition subworkload2 = orderedSubworkloads.get(1); + final List<String> expectedKeySubworkload2 = List.of(); + Assert.assertEquals(expectedKeySubworkload2, subworkload2.getKeySpace().getKeys()); + + final WorkloadDefinition subworkload3 = orderedSubworkloads.get(2); + final List<String> expectedKeySubworkload3 = List.of(); + Assert.assertEquals(expectedKeySubworkload3, subworkload3.getKeySpace().getKeys()); } } diff --git a/theodolite-benchmarks/uc1-kstreams/src/main/java/rocks/theodolite/benchmarks/uc1/kstreams/application/HistoryService.java b/theodolite-benchmarks/uc1-kstreams/src/main/java/rocks/theodolite/benchmarks/uc1/kstreams/HistoryService.java similarity index 88% rename from theodolite-benchmarks/uc1-kstreams/src/main/java/rocks/theodolite/benchmarks/uc1/kstreams/application/HistoryService.java rename to theodolite-benchmarks/uc1-kstreams/src/main/java/rocks/theodolite/benchmarks/uc1/kstreams/HistoryService.java index fde5d0e5313e417b6160032a83ec920f2f624f8c..0a2a1bec7c3515f903905efeb07e717a46e329ea 100644 --- a/theodolite-benchmarks/uc1-kstreams/src/main/java/rocks/theodolite/benchmarks/uc1/kstreams/application/HistoryService.java +++ b/theodolite-benchmarks/uc1-kstreams/src/main/java/rocks/theodolite/benchmarks/uc1/kstreams/HistoryService.java @@ -1,9 +1,8 @@ -package rocks.theodolite.benchmarks.uc1.kstreams.application; +package rocks.theodolite.benchmarks.uc1.kstreams; import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; -import rocks.theodolite.benchmarks.uc1.kstreams.streamprocessing.Uc1KafkaStreamsBuilder; import titan.ccp.common.configuration.ServiceConfigurations; /** diff --git a/theodolite-benchmarks/uc1-kstreams/src/main/java/rocks/theodolite/benchmarks/uc1/kstreams/streamprocessing/TopologyBuilder.java b/theodolite-benchmarks/uc1-kstreams/src/main/java/rocks/theodolite/benchmarks/uc1/kstreams/TopologyBuilder.java similarity index 96% rename from theodolite-benchmarks/uc1-kstreams/src/main/java/rocks/theodolite/benchmarks/uc1/kstreams/streamprocessing/TopologyBuilder.java rename to theodolite-benchmarks/uc1-kstreams/src/main/java/rocks/theodolite/benchmarks/uc1/kstreams/TopologyBuilder.java index 043f30540a0a3f14267906b924f8335f28625ad7..944e449c4693dc7c234844c97567d7f9f048cf3b 100644 --- a/theodolite-benchmarks/uc1-kstreams/src/main/java/rocks/theodolite/benchmarks/uc1/kstreams/streamprocessing/TopologyBuilder.java +++ b/theodolite-benchmarks/uc1-kstreams/src/main/java/rocks/theodolite/benchmarks/uc1/kstreams/TopologyBuilder.java @@ -1,4 +1,4 @@ -package rocks.theodolite.benchmarks.uc1.kstreams.streamprocessing; +package rocks.theodolite.benchmarks.uc1.kstreams; import java.util.Properties; import org.apache.kafka.common.serialization.Serdes; diff --git a/theodolite-benchmarks/uc1-kstreams/src/main/java/rocks/theodolite/benchmarks/uc1/kstreams/streamprocessing/Uc1KafkaStreamsBuilder.java b/theodolite-benchmarks/uc1-kstreams/src/main/java/rocks/theodolite/benchmarks/uc1/kstreams/Uc1KafkaStreamsBuilder.java similarity index 92% rename from theodolite-benchmarks/uc1-kstreams/src/main/java/rocks/theodolite/benchmarks/uc1/kstreams/streamprocessing/Uc1KafkaStreamsBuilder.java rename to theodolite-benchmarks/uc1-kstreams/src/main/java/rocks/theodolite/benchmarks/uc1/kstreams/Uc1KafkaStreamsBuilder.java index 8b8b7cfd7bace16539d54841bbbf97840b008b91..a1e9c4d78d0f340273fb3db944ba96913c8d0b13 100644 --- a/theodolite-benchmarks/uc1-kstreams/src/main/java/rocks/theodolite/benchmarks/uc1/kstreams/streamprocessing/Uc1KafkaStreamsBuilder.java +++ b/theodolite-benchmarks/uc1-kstreams/src/main/java/rocks/theodolite/benchmarks/uc1/kstreams/Uc1KafkaStreamsBuilder.java @@ -1,4 +1,4 @@ -package rocks.theodolite.benchmarks.uc1.kstreams.streamprocessing; +package rocks.theodolite.benchmarks.uc1.kstreams; import java.util.Objects; import java.util.Properties;