diff --git a/.settings/org.eclipse.jdt.ui.prefs b/.settings/org.eclipse.jdt.ui.prefs index 98b5ca8064a352aacfe2aebd13fbd0a87735fc3e..4e04e2891754324a6e1bf55348b6a38f592bb301 100644 --- a/.settings/org.eclipse.jdt.ui.prefs +++ b/.settings/org.eclipse.jdt.ui.prefs @@ -101,7 +101,7 @@ sp_cleanup.qualify_static_member_accesses_with_declaring_class=true sp_cleanup.qualify_static_method_accesses_with_declaring_class=false sp_cleanup.remove_private_constructors=true sp_cleanup.remove_redundant_modifiers=false -sp_cleanup.remove_redundant_semicolons=false +sp_cleanup.remove_redundant_semicolons=true sp_cleanup.remove_redundant_type_arguments=true sp_cleanup.remove_trailing_whitespaces=true sp_cleanup.remove_trailing_whitespaces_all=true diff --git a/uc1-application/.settings/org.eclipse.jdt.ui.prefs b/uc1-application/.settings/org.eclipse.jdt.ui.prefs index 98b5ca8064a352aacfe2aebd13fbd0a87735fc3e..4e04e2891754324a6e1bf55348b6a38f592bb301 100644 --- a/uc1-application/.settings/org.eclipse.jdt.ui.prefs +++ b/uc1-application/.settings/org.eclipse.jdt.ui.prefs @@ -101,7 +101,7 @@ sp_cleanup.qualify_static_member_accesses_with_declaring_class=true sp_cleanup.qualify_static_method_accesses_with_declaring_class=false sp_cleanup.remove_private_constructors=true sp_cleanup.remove_redundant_modifiers=false -sp_cleanup.remove_redundant_semicolons=false +sp_cleanup.remove_redundant_semicolons=true sp_cleanup.remove_redundant_type_arguments=true sp_cleanup.remove_trailing_whitespaces=true sp_cleanup.remove_trailing_whitespaces_all=true diff --git a/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java b/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java index e352d8d59dc5470913a4e8300c49f62b613d7cd7..e9de4c42e8a74082a6fd80c78429fe56daafc15b 100644 --- a/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java +++ b/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java @@ -48,6 +48,8 @@ public final class LoadGenerator { final String kafkaBatchSize = System.getenv("KAFKA_BATCH_SIZE"); final String kafkaLingerMs = System.getenv("KAFKA_LINGER_MS"); final String kafkaBufferMemory = System.getenv("KAFKA_BUFFER_MEMORY"); + final int instances = + Integer.parseInt(Objects.requireNonNullElse(System.getenv("INSTANCES"), "1")); // create kafka record sender final Properties kafkaProperties = new Properties(); @@ -65,6 +67,7 @@ public final class LoadGenerator { // create workload generator final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator = KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder() + .setInstances(instances) .setKeySpace(new KeySpace("s_", numSensors)) .setThreads(threads) .setPeriod(Duration.of(periodMs, ChronoUnit.MILLIS)) diff --git a/workload-generator-commons/.settings/org.eclipse.jdt.ui.prefs b/workload-generator-commons/.settings/org.eclipse.jdt.ui.prefs index 98b5ca8064a352aacfe2aebd13fbd0a87735fc3e..4e04e2891754324a6e1bf55348b6a38f592bb301 100644 --- a/workload-generator-commons/.settings/org.eclipse.jdt.ui.prefs +++ b/workload-generator-commons/.settings/org.eclipse.jdt.ui.prefs @@ -101,7 +101,7 @@ sp_cleanup.qualify_static_member_accesses_with_declaring_class=true sp_cleanup.qualify_static_method_accesses_with_declaring_class=false sp_cleanup.remove_private_constructors=true sp_cleanup.remove_redundant_modifiers=false -sp_cleanup.remove_redundant_semicolons=false +sp_cleanup.remove_redundant_semicolons=true sp_cleanup.remove_redundant_type_arguments=true sp_cleanup.remove_trailing_whitespaces=true sp_cleanup.remove_trailing_whitespaces_all=true diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/zookeeper/WorkloadDistributor.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/zookeeper/WorkloadDistributor.java index e6665f4debc888bff89e5b2087b7e2259d52a022..ce7d78d777b165393905eedcb1adbe71b51f417f 100644 --- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/zookeeper/WorkloadDistributor.java +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/communication/zookeeper/WorkloadDistributor.java @@ -11,6 +11,7 @@ import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import theodolite.commons.workloadgeneration.dimensions.KeySpace; @@ -29,15 +30,17 @@ public class WorkloadDistributor { private static final String COUNTER_PATH = "/counter"; private static final String WORKLOAD_PATH = "/workload"; private static final String WORKLOAD_DEFINITION_PATH = "/workload/definition"; - private final DistributedAtomicInteger counter; private final KeySpace keySpace; private final BeforeAction beforeAction; private final BiConsumer<WorkloadDefinition, Integer> workerAction; + private final int instances; private final ZooKeeper zooKeeper; private final CuratorFramework client; + private boolean workloadGenerationStarted = true; + /** * Create a new workload distributor. * @@ -46,10 +49,12 @@ public class WorkloadDistributor { * @param workerAction the action to perform by the workers. */ public WorkloadDistributor( + final int instances, final ZooKeeper zooKeeper, final KeySpace keySpace, final BeforeAction beforeAction, final BiConsumer<WorkloadDefinition, Integer> workerAction) { + this.instances = instances; this.zooKeeper = zooKeeper; this.keySpace = keySpace; this.beforeAction = beforeAction; @@ -89,7 +94,7 @@ public class WorkloadDistributor { final CuratorWatcher watcher = this.buildWatcher(workerId); - this.client.checkExists().creatingParentsIfNeeded().forPath(WORKLOAD_DEFINITION_PATH); + this.client.checkExists().creatingParentsIfNeeded().forPath(WORKLOAD_PATH); if (workerId == 0) { LOGGER.info("This instance is master with id {}", workerId); @@ -99,14 +104,10 @@ public class WorkloadDistributor { // register worker action, as master acts also as worker this.client.getChildren().usingWatcher(watcher).forPath(WORKLOAD_PATH); - Thread.sleep(10000); // wait for all workers to participate in the leader election - - final int numberOfWorkers = this.counter.get().postValue(); - - LOGGER.info("Number of Workers: {}", numberOfWorkers); + LOGGER.info("Number of Workers: {}", this.instances); final WorkloadDefinition definition = - new WorkloadDefinition(this.keySpace, numberOfWorkers); + new WorkloadDefinition(this.keySpace, this.instances); this.client.create().withMode(CreateMode.EPHEMERAL).forPath(WORKLOAD_DEFINITION_PATH, definition.toString().getBytes(StandardCharsets.UTF_8)); @@ -115,12 +116,39 @@ public class WorkloadDistributor { LOGGER.info("This instance is worker with id {}", workerId); this.client.getChildren().usingWatcher(watcher).forPath(WORKLOAD_PATH); + + final Stat exists = + this.client.checkExists().creatingParentsIfNeeded().forPath(WORKLOAD_DEFINITION_PATH); + + if (exists != null) { + this.startWorkloadGeneration(workerId); + } } - Thread.sleep(20000); // wait until the workload definition is retrieved + Thread.sleep(20_000); + } catch (final Exception e) { LOGGER.error("", e); - throw new IllegalStateException("Error when starting thze distribution of the workload."); + throw new IllegalStateException("Error when starting the distribution of the workload."); + } + } + + /** + * Start the workload generation. This methods body does only get executed once. + * + * @param workerId the ID of this worker + * @throws Exception + */ + private synchronized void startWorkloadGeneration(final int workerId) throws Exception { + if (!this.workloadGenerationStarted) { + this.workloadGenerationStarted = true; + + final byte[] bytes = + this.client.getData().forPath(WORKLOAD_DEFINITION_PATH); + final WorkloadDefinition definition = + WorkloadDefinition.fromString(new String(bytes, StandardCharsets.UTF_8)); + + this.workerAction.accept(definition, workerId); } } @@ -134,20 +162,13 @@ public class WorkloadDistributor { return new CuratorWatcher() { @Override - public void process(final WatchedEvent event) throws Exception { + public void process(final WatchedEvent event) { if (event.getType() == EventType.NodeChildrenChanged) { - final byte[] bytes = - WorkloadDistributor.this.client.getData().forPath(WORKLOAD_DEFINITION_PATH); - final WorkloadDefinition definition = - WorkloadDefinition.fromString(new String(bytes, StandardCharsets.UTF_8)); - - if (workerId > definition.getNumberOfWorkers() - 1) { - LOGGER.warn("Worker with id {} was to slow and is therefore in idle state", - workerId); - WorkloadDistributor.this.workerAction.accept(new WorkloadDefinition(new KeySpace(0), 0), - workerId); // this worker generates no workload - } else { - WorkloadDistributor.this.workerAction.accept(definition, workerId); + try { + WorkloadDistributor.this.startWorkloadGeneration(workerId); + } catch (final Exception e) { + LOGGER.error("", e); + throw new IllegalStateException("Error starting workload generation."); } } } diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java index 17aa9871ddfa952b65fa908fd6008af004b0c938..fc58b778afa6c0a9f82b58be25aeb761aa752b2a 100644 --- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java @@ -26,6 +26,8 @@ public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord> private static final Logger LOGGER = LoggerFactory.getLogger(AbstractWorkloadGenerator.class); + private final int instances; + private final ZooKeeper zooKeeper; private final KeySpace keySpace; @@ -63,6 +65,7 @@ public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord> } public AbstractWorkloadGenerator( + final int instances, final ZooKeeper zooKeeper, final KeySpace keySpace, final int threads, @@ -71,6 +74,7 @@ public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord> final BeforeAction beforeAction, final MessageGenerator<T> generatorFunction, final Transport<T> transport) { + this.instances = instances; this.zooKeeper = zooKeeper; this.period = period; this.threads = threads; @@ -123,6 +127,7 @@ public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord> }; this.workloadDistributor = - new WorkloadDistributor(this.zooKeeper, this.keySpace, this.beforeAction, workerAction); + new WorkloadDistributor(this.instances, this.zooKeeper, this.keySpace, this.beforeAction, + workerAction); } } diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java index 010ce837a1b72155b52e96350dbdd65354f365fb..90fecf01c0b537114d229a1701ee6761f688e1b1 100644 --- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java @@ -32,6 +32,7 @@ public class KafkaWorkloadGenerator<T extends IMonitoringRecord> * @param recordSender the record sender which is used to send the generated messages to kafka. */ public KafkaWorkloadGenerator( + final int instances, final ZooKeeper zooKeeper, final KeySpace keySpace, final int threads, @@ -40,7 +41,8 @@ public class KafkaWorkloadGenerator<T extends IMonitoringRecord> final BeforeAction beforeAction, final MessageGenerator<T> generatorFunction, final KafkaRecordSender<T> recordSender) { - super(zooKeeper, keySpace, threads, period, duration, beforeAction, generatorFunction, + super(instances, zooKeeper, keySpace, threads, period, duration, beforeAction, + generatorFunction, recordSender); this.recordSender = recordSender; } diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java index 1609808a3178fb6607c95c924d35a6cc4d8ab153..7a435fca69f50c485c1e46531cf006119ca4cfe1 100644 --- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGeneratorBuilder.java @@ -11,6 +11,8 @@ import theodolite.commons.workloadgeneration.misc.ZooKeeper; public class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> { + private int instances; + private ZooKeeper zooKeeper; private KeySpace keySpace; @@ -40,6 +42,17 @@ public class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> { return new KafkaWorkloadGeneratorBuilder<>(); } + /** + * Set the number of instances. + * + * @param zooKeeper a reference to the ZooKeeper instance. + * @return the builder. + */ + public KafkaWorkloadGeneratorBuilder<T> setInstances(final int instances) { + this.instances = instances; + return this; + } + /** * Set the ZooKeeper reference. * @@ -145,6 +158,7 @@ public class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> { * @return the built instance of the {@link KafkaWorkloadGenerator}. */ public KafkaWorkloadGenerator<T> build() { + Objects.requireNonNull(this.instances, "Please specify the number of instances."); Objects.requireNonNull(this.zooKeeper, "Please specify the ZooKeeper instance."); this.threads = Objects.requireNonNullElse(this.threads, 1); Objects.requireNonNull(this.keySpace, "Please specify the key space."); @@ -156,6 +170,7 @@ public class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> { Objects.requireNonNull(this.kafkaRecordSender, "Please specify the kafka record sender."); return new KafkaWorkloadGenerator<>( + this.instances, this.zooKeeper, this.keySpace, this.threads,