diff --git a/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java b/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java index 842a668c1bc10fd63db738362b3babe3e2c63293..a7b27dfdb25760f0b96c930c9705c2eed0402442 100644 --- a/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java +++ b/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java @@ -77,15 +77,15 @@ public final class LoadGenerator { // create workload generator final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator = KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder() - .setInstances(instances) - .setKeySpace(new KeySpace("s_", numSensors)) - .setThreads(threads) - .setPeriod(Duration.of(periodMs, ChronoUnit.MILLIS)) - .setDuration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS)) - .setGeneratorFunction( + .instances(instances) + .keySpace(new KeySpace("s_", numSensors)) + .threads(threads) + .period(Duration.of(periodMs, ChronoUnit.MILLIS)) + .duration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS)) + .generatorFunction( sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value)) - .setZooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort)) - .setKafkaRecordSender(kafkaRecordSender) + .zooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort)) + .kafkaRecordSender(kafkaRecordSender) .build(); // start diff --git a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java index 2249d066a1af173266e55f83cd384b60c82e5b3b..b2dfae12a0bd207b490086d8ca0767d5a6b9cb1d 100644 --- a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java +++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java @@ -18,6 +18,8 @@ import org.apache.kafka.streams.kstream.Suppressed.BufferConfig; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.WindowedSerdes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; import titan.ccp.configuration.events.Event; import titan.ccp.configuration.events.EventSerde; @@ -30,8 +32,9 @@ import titan.ccp.model.sensorregistry.SensorRegistry; */ public class TopologyBuilder { + private static final int LATENCY_OUTPOUT_THRESHOLD = 1000; - // private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); + private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); private final String inputTopic; private final String outputTopic; @@ -186,20 +189,22 @@ public class TopologyBuilder { final long latency = time - v.getTimestamp(); this.latencyStats.add(latency); if (time - this.lastTime >= LATENCY_OUTPOUT_THRESHOLD) { - System.out.println("latency," - + time + ',' - + this.latencyStats.mean() + ',' - + (this.latencyStats.count() > 0 - ? this.latencyStats.populationStandardDeviation() - : Double.NaN) - + ',' - + (this.latencyStats.count() > 1 - ? this.latencyStats.sampleStandardDeviation() - : Double.NaN) - + ',' - + this.latencyStats.min() + ',' - + this.latencyStats.max() + ',' - + this.latencyStats.count()); + if (LOGGER.isInfoEnabled()) { + LOGGER.info("latency," + + time + ',' + + this.latencyStats.mean() + ',' + + (this.latencyStats.count() > 0 + ? this.latencyStats.populationStandardDeviation() + : Double.NaN) + + ',' + + (this.latencyStats.count() > 1 + ? this.latencyStats.sampleStandardDeviation() + : Double.NaN) + + ',' + + this.latencyStats.min() + ',' + + this.latencyStats.max() + ',' + + this.latencyStats.count()); + } this.latencyStats = new StatsAccumulator(); this.lastTime = time; } diff --git a/uc2-application/src/test/java/theodolite/uc2/streamprocessing/OptionalParentsSerdeTest.java b/uc2-application/src/test/java/theodolite/uc2/streamprocessing/OptionalParentsSerdeTest.java index 49ed674bc4442f01de1cf51e4510f2079524933d..54e8c460e642d53bb013ef6888570d6fc36ff614 100644 --- a/uc2-application/src/test/java/theodolite/uc2/streamprocessing/OptionalParentsSerdeTest.java +++ b/uc2-application/src/test/java/theodolite/uc2/streamprocessing/OptionalParentsSerdeTest.java @@ -3,7 +3,6 @@ package theodolite.uc2.streamprocessing; import java.util.Optional; import java.util.Set; import org.junit.Test; -import theodolite.uc2.streamprocessing.OptionalParentsSerde; public class OptionalParentsSerdeTest { diff --git a/uc2-application/src/test/java/theodolite/uc2/streamprocessing/ParentsSerdeTest.java b/uc2-application/src/test/java/theodolite/uc2/streamprocessing/ParentsSerdeTest.java index 15872798698ceffcdbaddb689d4179afd7d67a01..f12604d6a19ca36e9c151210005c910b37908307 100644 --- a/uc2-application/src/test/java/theodolite/uc2/streamprocessing/ParentsSerdeTest.java +++ b/uc2-application/src/test/java/theodolite/uc2/streamprocessing/ParentsSerdeTest.java @@ -2,7 +2,6 @@ package theodolite.uc2.streamprocessing; import java.util.Set; import org.junit.Test; -import theodolite.uc2.streamprocessing.ParentsSerde; public class ParentsSerdeTest { diff --git a/uc2-application/src/test/java/theodolite/uc2/streamprocessing/SensorParentKeySerdeTest.java b/uc2-application/src/test/java/theodolite/uc2/streamprocessing/SensorParentKeySerdeTest.java index 7d9fe3a6eb83b82d85913f212fe9a930f194b220..7ca99bcb79baeb5f95a8270b99a559f2f108867e 100644 --- a/uc2-application/src/test/java/theodolite/uc2/streamprocessing/SensorParentKeySerdeTest.java +++ b/uc2-application/src/test/java/theodolite/uc2/streamprocessing/SensorParentKeySerdeTest.java @@ -1,8 +1,6 @@ package theodolite.uc2.streamprocessing; import org.junit.Test; -import theodolite.uc2.streamprocessing.SensorParentKey; -import theodolite.uc2.streamprocessing.SensorParentKeySerde; public class SensorParentKeySerdeTest { diff --git a/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/ConfigPublisher.java b/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/ConfigPublisher.java index c8b3a1846254603c8690bf395c24c6d6f9fb2166..ad24e8e4bc8f86b7ed4d5dc2822622f8da22d6d1 100644 --- a/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/ConfigPublisher.java +++ b/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/ConfigPublisher.java @@ -10,8 +10,14 @@ import org.apache.kafka.common.serialization.StringSerializer; import titan.ccp.configuration.events.Event; import titan.ccp.configuration.events.EventSerde; +/** + * Class to publish a configuration to Kafka. + * + */ public class ConfigPublisher { + private static final String MEMORY_CONFIG = "134217728"; // 128 MB + private final String topic; private final Producer<Event, String> producer; @@ -20,6 +26,13 @@ public class ConfigPublisher { this(bootstrapServers, topic, new Properties()); } + /** + * Creates a new {@link ConfigPublisher} object. + * + * @param bootstrapServers Zoo Keeper server. + * @param topic where to write the configuration. + * @param defaultProperties default properties. + */ public ConfigPublisher(final String bootstrapServers, final String topic, final Properties defaultProperties) { this.topic = topic; @@ -27,13 +40,19 @@ public class ConfigPublisher { final Properties properties = new Properties(); properties.putAll(defaultProperties); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "134217728"); // 128 MB - properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "134217728"); // 128 MB + properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, MEMORY_CONFIG); + properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, MEMORY_CONFIG); this.producer = new KafkaProducer<>(properties, EventSerde.serializer(), new StringSerializer()); } + /** + * Publish an event with given value to the kafka topic. + * + * @param event Which {@link Event} happened. + * @param value Configuration value. + */ public void publish(final Event event, final String value) { final ProducerRecord<Event, String> record = new ProducerRecord<>(this.topic, event, value); try { diff --git a/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java b/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java index eadb4ccd293929d0d46383813e1e98dffebc47a5..a33fba0ea5688a2673b193d45a57693da56b1db4 100644 --- a/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java +++ b/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java @@ -18,18 +18,37 @@ import titan.ccp.model.records.ActivePowerRecord; import titan.ccp.model.sensorregistry.MutableAggregatedSensor; import titan.ccp.model.sensorregistry.MutableSensorRegistry; -public class LoadGenerator { +/** + * The {@code LoadGenerator} creates a load in Kafka. + */ +public final class LoadGenerator { + + private static final int SLEEP_PERIOD = 30_000; private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); + // Constants + private static final String DEEP = "deep"; private static final long MAX_DURATION_IN_DAYS = 30L; + // Make this a utility class, because all methods are static. + private LoadGenerator() { + throw new UnsupportedOperationException(); + } + + /** + * Main method. + * + * @param args CLI arguments + * @throws InterruptedException Interrupt happened + * @throws IOException happened. + */ public static void main(final String[] args) throws InterruptedException, IOException { // uc2 LOGGER.info("Start workload generator for use case UC2."); // get environment variables - final String hierarchy = Objects.requireNonNullElse(System.getenv("HIERARCHY"), "deep"); + final String hierarchy = Objects.requireNonNullElse(System.getenv("HIERARCHY"), DEEP); final int numNestedGroups = Integer .parseInt(Objects.requireNonNullElse(System.getenv("NUM_NESTED_GROUPS"), "1")); final String zooKeeperHost = Objects.requireNonNullElse(System.getenv("ZK_HOST"), "localhost"); @@ -81,12 +100,12 @@ public class LoadGenerator { // create workload generator final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator = KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder() - .setInstances(instances) - .setKeySpace(new KeySpace("s_", numSensors)) - .setThreads(threads) - .setPeriod(Duration.of(periodMs, ChronoUnit.MILLIS)) - .setDuration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS)) - .setBeforeAction(() -> { + .instances(instances) + .keySpace(new KeySpace("s_", numSensors)) + .threads(threads) + .period(Duration.of(periodMs, ChronoUnit.MILLIS)) + .duration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS)) + .beforeAction(() -> { if (sendRegistry) { final ConfigPublisher configPublisher = new ConfigPublisher(kafkaBootstrapServers, "configuration"); @@ -96,18 +115,18 @@ public class LoadGenerator { LOGGER.info("Now wait 30 seconds"); try { - Thread.sleep(30_000); + Thread.sleep(SLEEP_PERIOD); } catch (final InterruptedException e) { // TODO Auto-generated catch block - e.printStackTrace(); + LOGGER.error(e.getMessage(), e); } LOGGER.info("And woke up again :)"); } }) - .setGeneratorFunction( + .generatorFunction( sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value)) - .setZooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort)) - .setKafkaRecordSender(kafkaRecordSender) + .zooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort)) + .kafkaRecordSender(kafkaRecordSender) .build(); // start @@ -119,7 +138,7 @@ public class LoadGenerator { final int numNestedGroups, final int numSensors) { final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0"); - if (hierarchy.equals("deep")) { + if (DEEP.equals(hierarchy)) { MutableAggregatedSensor lastSensor = sensorRegistry.getTopLevelSensor(); for (int lvl = 1; lvl < numNestedGroups; lvl++) { lastSensor = lastSensor.addChildAggregatedSensor("group_lvl_" + lvl); @@ -127,7 +146,7 @@ public class LoadGenerator { for (int s = 0; s < numSensors; s++) { lastSensor.addChildMachineSensor("sensor_" + s); } - } else if (hierarchy.equals("full")) { + } else if ("full".equals(hierarchy)) { addChildren(sensorRegistry.getTopLevelSensor(), numSensors, 1, numNestedGroups, 0); } else { throw new IllegalStateException(); @@ -136,8 +155,8 @@ public class LoadGenerator { } private static int addChildren(final MutableAggregatedSensor parent, final int numChildren, - final int lvl, - final int maxLvl, int nextId) { + final int lvl, final int maxLvl, final int startId) { + int nextId = startId; for (int c = 0; c < numChildren; c++) { if (lvl == maxLvl) { parent.addChildMachineSensor("s_" + nextId); diff --git a/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java b/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java index 5c7441845d83c9b36e0af3de09d2d688b2f8cbaf..85f6a94036c53b48973ba2200212fc8e5dfd663d 100644 --- a/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java +++ b/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java @@ -15,12 +15,28 @@ import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBu import theodolite.commons.workloadgeneration.misc.ZooKeeper; import titan.ccp.model.records.ActivePowerRecord; -public class LoadGenerator { +/** + * The {@code LoadGenerator} creates a load in Kafka. + */ +public final class LoadGenerator { private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); + // constants private static final long MAX_DURATION_IN_DAYS = 30L; + // Make this a utility class, because all methods are static. + private LoadGenerator() { + throw new UnsupportedOperationException(); + } + + /** + * Main method. + * + * @param args CLI arguments + * @throws InterruptedException Interrupt happened + * @throws IOException happened. + */ public static void main(final String[] args) throws InterruptedException, IOException { // uc2 LOGGER.info("Start workload generator for use case UC3."); @@ -68,15 +84,15 @@ public class LoadGenerator { // create workload generator final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator = KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder() - .setInstances(instances) - .setKeySpace(new KeySpace("s_", numSensors)) - .setThreads(threads) - .setPeriod(Duration.of(periodMs, ChronoUnit.MILLIS)) - .setDuration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS)) - .setGeneratorFunction( + .instances(instances) + .keySpace(new KeySpace("s_", numSensors)) + .threads(threads) + .period(Duration.of(periodMs, ChronoUnit.MILLIS)) + .duration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS)) + .generatorFunction( sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value)) - .setZooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort)) - .setKafkaRecordSender(kafkaRecordSender) + .zooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort)) + .kafkaRecordSender(kafkaRecordSender) .build(); // start diff --git a/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java b/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java index 14e9e589dc5997b71b2d1437f1a7e8a78936a2bf..ff551e7ef423633137d122dfed7d6e03d362e7ff 100644 --- a/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java +++ b/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java @@ -15,12 +15,28 @@ import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBu import theodolite.commons.workloadgeneration.misc.ZooKeeper; import titan.ccp.model.records.ActivePowerRecord; -public class LoadGenerator { +/** + * The {@code LoadGenerator} creates a load in Kafka. + */ +public final class LoadGenerator { private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); + // constants private static final long MAX_DURATION_IN_DAYS = 30L; + // Make this a utility class, because all methods are static. + private LoadGenerator() { + throw new UnsupportedOperationException(); + } + + /** + * Main method. + * + * @param args CLI arguments + * @throws InterruptedException Interrupt happened + * @throws IOException happened. + */ public static void main(final String[] args) throws InterruptedException, IOException { // uc4 LOGGER.info("Start workload generator for use case UC4."); @@ -69,15 +85,15 @@ public class LoadGenerator { // create workload generator final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator = KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder() - .setInstances(instances) - .setKeySpace(new KeySpace("s_", numSensors)) - .setThreads(threads) - .setPeriod(Duration.of(periodMs, ChronoUnit.MILLIS)) - .setDuration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS)) - .setGeneratorFunction( + .instances(instances) + .keySpace(new KeySpace("s_", numSensors)) + .threads(threads) + .period(Duration.of(periodMs, ChronoUnit.MILLIS)) + .duration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS)) + .generatorFunction( sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value)) - .setZooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort)) - .setKafkaRecordSender(kafkaRecordSender) + .zooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort)) + .kafkaRecordSender(kafkaRecordSender) .build(); // start diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/zookeeper/WorkloadDistributor.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/zookeeper/WorkloadDistributor.java index 6ad61ae9ced4eda35b6828677efc267cb56aaf19..2249abcbcb1071cf880b2ee80f5d41f2b3dab463 100644 --- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/zookeeper/WorkloadDistributor.java +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/zookeeper/WorkloadDistributor.java @@ -44,10 +44,10 @@ public class WorkloadDistributor { private final BiConsumer<WorkloadDefinition, Integer> workerAction; private final int instances; - private final ZooKeeper zooKeeper; + private final ZooKeeper zooKeeper; // NOPMD keep instance variable instead of local variable private final CuratorFramework client; - private boolean workloadGenerationStarted = false; + private boolean workloadGenerationStarted = false; // NOPMD explicit intention that false /** * Create a new workload distributor. @@ -79,8 +79,8 @@ public class WorkloadDistributor { try { this.client.blockUntilConnected(); } catch (final InterruptedException e) { - LOGGER.error("", e); - throw new IllegalStateException(); + LOGGER.error(e.getMessage(), e); + throw new IllegalStateException(e); } this.counter = @@ -142,9 +142,9 @@ public class WorkloadDistributor { if (!this.workloadGenerationStarted) { LOGGER.warn("No workload definition retrieved for 20 s. Terminating now.."); } - } catch (final Exception e) { - LOGGER.error("", e); - throw new IllegalStateException("Error when starting the distribution of the workload."); + } catch (final Exception e) { // NOPMD need to catch exception because of external framework + LOGGER.error(e.getMessage(), e); + throw new IllegalStateException("Error when starting the distribution of the workload.", e); } } @@ -154,7 +154,9 @@ public class WorkloadDistributor { * @param workerId the ID of this worker * @throws Exception when an error occurs */ - private synchronized void startWorkloadGeneration(final int workerId) throws Exception { + // NOPMD because exception thrown from used framework + private synchronized void startWorkloadGeneration(final int workerId) throws Exception { // NOPMD + if (!this.workloadGenerationStarted) { this.workloadGenerationStarted = true; @@ -181,9 +183,9 @@ public class WorkloadDistributor { if (event.getType() == EventType.NodeChildrenChanged) { try { WorkloadDistributor.this.startWorkloadGeneration(workerId); - } catch (final Exception e) { - LOGGER.error("", e); - throw new IllegalStateException("Error starting workload generation."); + } catch (final Exception e) { // NOPMD external framework throws exception + LOGGER.error(e.getMessage(), e); + throw new IllegalStateException("Error starting workload generation.", e); } } } diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java index 8b03c224ea132015225e54dfcb493b836920807e..82bb5a951087cd9958a2e33247174e10fe4f3335 100644 --- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java @@ -30,17 +30,14 @@ public abstract class AbstractWorkloadGenerator<T> private static final Logger LOGGER = LoggerFactory.getLogger(AbstractWorkloadGenerator.class); - private final int instances; - private final ZooKeeper zooKeeper; - private final KeySpace keySpace; - private final int threads; - private final Duration period; - private final Duration duration; - private final BeforeAction beforeAction; + private final int instances; // NOPMD keep instance variable instead of local variable + private final ZooKeeper zooKeeper; // NOPMD keep instance variable instead of local variable + private final KeySpace keySpace;// NOPMD keep instance variable instead of local variable + private final BeforeAction beforeAction; // NOPMD keep instance variable instead of local variable private final BiFunction<WorkloadDefinition, Integer, List<WorkloadEntity<T>>> workloadSelector; private final MessageGenerator<T> generatorFunction; private final Transport<T> transport; - private WorkloadDistributor workloadDistributor; + private WorkloadDistributor workloadDistributor; // NOPMD keep instance variable instead of local private final ScheduledExecutorService executor; /** @@ -68,10 +65,7 @@ public abstract class AbstractWorkloadGenerator<T> final Transport<T> transport) { this.instances = instances; this.zooKeeper = zooKeeper; - this.period = period; - this.threads = threads; this.keySpace = keySpace; - this.duration = duration; this.beforeAction = beforeAction; this.generatorFunction = generatorFunction; this.workloadSelector = (workloadDefinition, workerId) -> { @@ -93,7 +87,7 @@ public abstract class AbstractWorkloadGenerator<T> final int periodMs = (int) period.toMillis(); - LOGGER.info("Period: " + periodMs); + LOGGER.info("Period: " + periodMs); // NOPMD no computational intensive logger call final BiConsumer<WorkloadDefinition, Integer> workerAction = (declaration, workerId) -> { @@ -120,7 +114,7 @@ public abstract class AbstractWorkloadGenerator<T> this.stop(); } catch (final InterruptedException e) { LOGGER.error("", e); - throw new IllegalStateException("Error when terminating the workload generation."); + throw new IllegalStateException("Error when terminating the workload generation.", e); } }; diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java index 246e83041a268806a1b61dca44f916da096d5c74..ec4504b016a22d06d8c8595b9ee9ee783379f527 100644 --- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java @@ -14,25 +14,17 @@ import theodolite.commons.workloadgeneration.misc.ZooKeeper; * * @param <T> the record for which the builder is dedicated for. */ -public final class KafkaWorkloadGeneratorBuilder<T extends SpecificRecord> { - - private int instances; - - private ZooKeeper zooKeeper; - - private KeySpace keySpace; - - private int threads; - - private Duration period; - - private Duration duration; - - private BeforeAction beforeAction; - - private MessageGenerator<T> generatorFunction; - - private KafkaRecordSender<T> kafkaRecordSender; +public final class KafkaWorkloadGeneratorBuilder<T extends SpecificRecord> { // NOPMD + + private int instances; // NOPMD + private ZooKeeper zooKeeper; // NOPMD + private KeySpace keySpace; // NOPMD + private int threads; // NOPMD + private Duration period; // NOPMD + private Duration duration; // NOPMD + private BeforeAction beforeAction; // NOPMD + private MessageGenerator<T> generatorFunction; // NOPMD + private KafkaRecordSender<T> kafkaRecordSender; // NOPMD private KafkaWorkloadGeneratorBuilder() { @@ -53,7 +45,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends SpecificRecord> { * @param instances the number of instances. * @return the builder. */ - public KafkaWorkloadGeneratorBuilder<T> setInstances(final int instances) { + public KafkaWorkloadGeneratorBuilder<T> instances(final int instances) { this.instances = instances; return this; } @@ -64,7 +56,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends SpecificRecord> { * @param zooKeeper a reference to the ZooKeeper instance. * @return the builder. */ - public KafkaWorkloadGeneratorBuilder<T> setZooKeeper(final ZooKeeper zooKeeper) { + public KafkaWorkloadGeneratorBuilder<T> zooKeeper(final ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; return this; } @@ -75,7 +67,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends SpecificRecord> { * @param beforeAction the {@link BeforeAction}. * @return the builder. */ - public KafkaWorkloadGeneratorBuilder<T> setBeforeAction(final BeforeAction beforeAction) { + public KafkaWorkloadGeneratorBuilder<T> beforeAction(final BeforeAction beforeAction) { this.beforeAction = beforeAction; return this; } @@ -86,7 +78,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends SpecificRecord> { * @param keySpace the {@link KeySpace}. * @return the builder. */ - public KafkaWorkloadGeneratorBuilder<T> setKeySpace(final KeySpace keySpace) { + public KafkaWorkloadGeneratorBuilder<T> keySpace(final KeySpace keySpace) { this.keySpace = keySpace; return this; } @@ -97,7 +89,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends SpecificRecord> { * @param threads the number of threads. * @return the builder. */ - public KafkaWorkloadGeneratorBuilder<T> setThreads(final int threads) { + public KafkaWorkloadGeneratorBuilder<T> threads(final int threads) { this.threads = threads; return this; } @@ -108,7 +100,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends SpecificRecord> { * @param period the {@link Period} * @return the builder. */ - public KafkaWorkloadGeneratorBuilder<T> setPeriod(final Duration period) { + public KafkaWorkloadGeneratorBuilder<T> period(final Duration period) { this.period = period; return this; } @@ -119,7 +111,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends SpecificRecord> { * @param duration the {@link Duration}. * @return the builder. */ - public KafkaWorkloadGeneratorBuilder<T> setDuration(final Duration duration) { + public KafkaWorkloadGeneratorBuilder<T> duration(final Duration duration) { this.duration = duration; return this; } @@ -130,7 +122,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends SpecificRecord> { * @param generatorFunction the generator function. * @return the builder. */ - public KafkaWorkloadGeneratorBuilder<T> setGeneratorFunction( + public KafkaWorkloadGeneratorBuilder<T> generatorFunction( final MessageGenerator<T> generatorFunction) { this.generatorFunction = generatorFunction; return this; @@ -142,7 +134,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends SpecificRecord> { * @param kafkaRecordSender the record sender to use. * @return the builder. */ - public KafkaWorkloadGeneratorBuilder<T> setKafkaRecordSender( + public KafkaWorkloadGeneratorBuilder<T> kafkaRecordSender( final KafkaRecordSender<T> kafkaRecordSender) { this.kafkaRecordSender = kafkaRecordSender; return this;