diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/dimensions/Duration.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/dimensions/Duration.java index 9744117c5247452e23e252a97197cfa14fda296c..d4652813cf9a06488e400fe37ddf16a63fd4bab6 100644 --- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/dimensions/Duration.java +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/dimensions/Duration.java @@ -1,10 +1,10 @@ package theodolite.commons.workloadgeneration.dimensions; import java.util.concurrent.TimeUnit; -import theodolite.commons.workloadgeneration.generators.WorkloadGenerator; +import theodolite.commons.workloadgeneration.generators.AbstractWorkloadGenerator; /** - * Wrapper class for the definition of the duration for the {@link WorkloadGenerator}. + * Wrapper class for the definition of the duration for the {@link AbstractWorkloadGenerator}. */ public class Duration { diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/dimensions/KeySpace.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/dimensions/KeySpace.java index 686fbe2c3f7c5b1121bdfd06e0472ff9d5a6118c..2eaa1d487f67ae8325a3622a7ae6c4529fbb1cd6 100644 --- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/dimensions/KeySpace.java +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/dimensions/KeySpace.java @@ -1,10 +1,10 @@ package theodolite.commons.workloadgeneration.dimensions; -import theodolite.commons.workloadgeneration.generators.WorkloadGenerator; +import theodolite.commons.workloadgeneration.generators.AbstractWorkloadGenerator; /** * Wrapper class for the definition of the Keys that should be used by the - * {@link WorkloadGenerator}. + * {@link AbstractWorkloadGenerator}. */ public class KeySpace { diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/dimensions/Period.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/dimensions/Period.java index e96546ab904e8677bd586aeccc122cd88b449a38..62d3a216c5d0eeee1f86624db84bbcf2dd4ee80c 100644 --- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/dimensions/Period.java +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/dimensions/Period.java @@ -1,10 +1,10 @@ package theodolite.commons.workloadgeneration.dimensions; import java.util.concurrent.TimeUnit; -import theodolite.commons.workloadgeneration.generators.WorkloadGenerator; +import theodolite.commons.workloadgeneration.generators.AbstractWorkloadGenerator; /** - * Wrapper class for the definition of period to use for the {@link WorkloadGenerator}. + * Wrapper class for the definition of period to use for the {@link AbstractWorkloadGenerator}. */ public class Period { diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java new file mode 100644 index 0000000000000000000000000000000000000000..abb0b7eadd134ab4bbcbc7fda202efb123a24a73 --- /dev/null +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/AbstractWorkloadGenerator.java @@ -0,0 +1,128 @@ +package theodolite.commons.workloadgeneration.generators; + +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import kieker.common.record.IMonitoringRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import theodolite.commons.workloadgeneration.communication.zookeeper.WorkloadDistributor; +import theodolite.commons.workloadgeneration.dimensions.Duration; +import theodolite.commons.workloadgeneration.dimensions.KeySpace; +import theodolite.commons.workloadgeneration.dimensions.Period; +import theodolite.commons.workloadgeneration.functions.BeforeAction; +import theodolite.commons.workloadgeneration.functions.MessageGenerator; +import theodolite.commons.workloadgeneration.functions.Transport; +import theodolite.commons.workloadgeneration.misc.Worker; +import theodolite.commons.workloadgeneration.misc.WorkloadDefinition; +import theodolite.commons.workloadgeneration.misc.WorkloadEntity; +import theodolite.commons.workloadgeneration.misc.ZooKeeper; + +public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord> + implements WorkloadGenerator { + + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractWorkloadGenerator.class); + + private final ZooKeeper zooKeeper; + + private final KeySpace keySpace; + + private final int threads; + + private final Period period; + + private final Duration duration; + + private final BeforeAction beforeAction; + + private final BiFunction<WorkloadDefinition, Worker, List<WorkloadEntity<T>>> workloadSelector; + + private final MessageGenerator<T> generatorFunction; + + private final Transport<T> transport; + + private WorkloadDistributor workloadDistributor; + + private final ScheduledExecutorService executor; + + /** + * Start the workload generation. The generation terminates automatically after the specified + * {@code duration}. + */ + @Override + public void start() { + this.workloadDistributor.start(); + } + + @Override + public void stop() { + this.workloadDistributor.stop(); + } + + public AbstractWorkloadGenerator( + final ZooKeeper zooKeeper, + final KeySpace keySpace, + final int threads, + final Period period, + final Duration duration, + final BeforeAction beforeAction, + final MessageGenerator<T> generatorFunction, + final Transport<T> transport) { + this.zooKeeper = zooKeeper; + this.period = period; + this.threads = threads; + this.keySpace = keySpace; + this.duration = duration; + this.beforeAction = beforeAction; + this.generatorFunction = generatorFunction; + this.workloadSelector = (workloadDefinition, worker) -> { + final List<WorkloadEntity<T>> workloadEntities = new LinkedList<>(); + + for (int i = + workloadDefinition.getKeySpace().getMin() + worker.getId(); i <= workloadDefinition + .getKeySpace().getMax(); i += workloadDefinition.getNumberOfWorkers()) { + final String id = workloadDefinition.getKeySpace().getPrefix() + i; + workloadEntities.add(new WorkloadEntity<>(id, this.generatorFunction)); + } + + return workloadEntities; + }; + this.transport = transport; + + this.executor = Executors.newScheduledThreadPool(threads); + final Random random = new Random(); + + final int periodMs = period.getPeriod(); + + final BiConsumer<WorkloadDefinition, Worker> workerAction = (declaration, worker) -> { + + final List<WorkloadEntity<T>> entities = this.workloadSelector.apply(declaration, worker); + + LOGGER.info("Beginning of Experiment..."); + LOGGER.info("Experiment is going to be executed for the specified duration..."); + + entities.forEach(entity -> { + final T message = entity.generateMessage(); + final long initialDelay = random.nextInt(periodMs); + this.executor.scheduleAtFixedRate(() -> this.transport.transport(message), initialDelay, + periodMs, period.getTimeUnit()); + }); + + try { + this.executor.awaitTermination(duration.getDuration(), duration.getTimeUnit()); + LOGGER.info("Terminating now..."); + this.stop(); + } catch (final InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + }; + + this.workloadDistributor = + new WorkloadDistributor(this.zooKeeper, this.keySpace, this.beforeAction, workerAction); + } +} diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/IWorkloadGenerator.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/IWorkloadGenerator.java deleted file mode 100644 index 2cdef55545d5e3dfa7450937a0a78021706ad1b0..0000000000000000000000000000000000000000 --- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/IWorkloadGenerator.java +++ /dev/null @@ -1,18 +0,0 @@ -package theodolite.commons.workloadgeneration.generators; - -/** - * Base methods for workload generators. - */ -public interface IWorkloadGenerator { - - /** - * Start the workload generation. - */ - void start(); - - /** - * Stop the workload generation. - */ - void stop(); - -} diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java index 96d70f09492bc8c8cd9b25cf5ddb0157c9650927..3c9955c6097d870d3069a3568afcfcb6582aa8c2 100644 --- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/KafkaWorkloadGenerator.java @@ -12,7 +12,7 @@ import theodolite.commons.workloadgeneration.misc.ZooKeeper; /** * Workload generator for generating load for the kafka messaging system. */ -public class KafkaWorkloadGenerator<T extends IMonitoringRecord> extends WorkloadGenerator<T> { +public class KafkaWorkloadGenerator<T extends IMonitoringRecord> extends AbstractWorkloadGenerator<T> { private final KafkaRecordSender<T> recordSender; diff --git a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/WorkloadGenerator.java b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/WorkloadGenerator.java index 412a59b8afd75547d14154d58ece6f717ec5e207..b121ac157b84d64818d9fdfc90589d49fd933752 100644 --- a/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/WorkloadGenerator.java +++ b/workload-generator-commons/src/main/java/theodolite/commons/workloadgeneration/generators/WorkloadGenerator.java @@ -1,127 +1,18 @@ package theodolite.commons.workloadgeneration.generators; -import java.util.LinkedList; -import java.util.List; -import java.util.Random; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import kieker.common.record.IMonitoringRecord; -import theodolite.commons.workloadgeneration.communication.zookeeper.WorkloadDistributor; -import theodolite.commons.workloadgeneration.dimensions.Duration; -import theodolite.commons.workloadgeneration.dimensions.KeySpace; -import theodolite.commons.workloadgeneration.dimensions.Period; -import theodolite.commons.workloadgeneration.functions.BeforeAction; -import theodolite.commons.workloadgeneration.functions.MessageGenerator; -import theodolite.commons.workloadgeneration.functions.Transport; -import theodolite.commons.workloadgeneration.misc.Worker; -import theodolite.commons.workloadgeneration.misc.WorkloadDefinition; -import theodolite.commons.workloadgeneration.misc.WorkloadEntity; -import theodolite.commons.workloadgeneration.misc.ZooKeeper; - -public abstract class WorkloadGenerator<T extends IMonitoringRecord> implements IWorkloadGenerator { - - private static final Logger LOGGER = LoggerFactory.getLogger(WorkloadGenerator.class); - - private final ZooKeeper zooKeeper; - - private final KeySpace keySpace; - - private final int threads; - - private final Period period; - - private final Duration duration; - - private final BeforeAction beforeAction; - - private final BiFunction<WorkloadDefinition, Worker, List<WorkloadEntity<T>>> workloadSelector; - - private final MessageGenerator<T> generatorFunction; - - private final Transport<T> transport; - - private WorkloadDistributor workloadDistributor; - - private final ScheduledExecutorService executor; +/** + * Base methods for workload generators. + */ +public interface WorkloadGenerator { /** - * Start the workload generation. The generation terminates automatically after the specified - * {@code duration}. + * Start the workload generation. */ - @Override - public void start() { - this.workloadDistributor.start(); - } + void start(); - @Override - public void stop() { - this.workloadDistributor.stop(); - } - - public WorkloadGenerator( - final ZooKeeper zooKeeper, - final KeySpace keySpace, - final int threads, - final Period period, - final Duration duration, - final BeforeAction beforeAction, - final MessageGenerator<T> generatorFunction, - final Transport<T> transport) { - this.zooKeeper = zooKeeper; - this.period = period; - this.threads = threads; - this.keySpace = keySpace; - this.duration = duration; - this.beforeAction = beforeAction; - this.generatorFunction = generatorFunction; - this.workloadSelector = (workloadDefinition, worker) -> { - final List<WorkloadEntity<T>> workloadEntities = new LinkedList<>(); - - for (int i = - workloadDefinition.getKeySpace().getMin() + worker.getId(); i <= workloadDefinition - .getKeySpace().getMax(); i += workloadDefinition.getNumberOfWorkers()) { - final String id = workloadDefinition.getKeySpace().getPrefix() + i; - workloadEntities.add(new WorkloadEntity<>(id, this.generatorFunction)); - } - - return workloadEntities; - }; - this.transport = transport; - - this.executor = Executors.newScheduledThreadPool(threads); - final Random random = new Random(); - - final int periodMs = period.getPeriod(); - - final BiConsumer<WorkloadDefinition, Worker> workerAction = (declaration, worker) -> { - - final List<WorkloadEntity<T>> entities = this.workloadSelector.apply(declaration, worker); - - LOGGER.info("Beginning of Experiment..."); - LOGGER.info("Experiment is going to be executed for the specified duration..."); - - entities.forEach(entity -> { - final T message = entity.generateMessage(); - final long initialDelay = random.nextInt(periodMs); - this.executor.scheduleAtFixedRate(() -> this.transport.transport(message), initialDelay, - periodMs, period.getTimeUnit()); - }); - - try { - this.executor.awaitTermination(duration.getDuration(), duration.getTimeUnit()); - LOGGER.info("Terminating now..."); - this.stop(); - } catch (final InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - }; + /** + * Stop the workload generation. + */ + void stop(); - this.workloadDistributor = - new WorkloadDistributor(this.zooKeeper, this.keySpace, this.beforeAction, workerAction); - } }