Skip to content
Snippets Groups Projects
Commit 8c49b983 authored by Simon Ehrenstein's avatar Simon Ehrenstein
Browse files

Replace custom Period by java.time.Duration

parent 00d69d4f
No related branches found
No related tags found
1 merge request!6Add Distributed Workload Generator
Showing
with 11 additions and 56 deletions
......@@ -5,13 +5,11 @@ import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender;
import theodolite.commons.workloadgeneration.dimensions.KeySpace;
import theodolite.commons.workloadgeneration.dimensions.Period;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder;
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
......@@ -69,7 +67,7 @@ public final class LoadGenerator {
KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder()
.setKeySpace(new KeySpace("s_", numSensors))
.setThreads(threads)
.setPeriod(new Period(periodMs, TimeUnit.MILLISECONDS))
.setPeriod(Duration.of(periodMs, ChronoUnit.MILLIS))
.setDuration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS))
.setGeneratorFunction(
sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value))
......
......@@ -5,13 +5,11 @@ import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender;
import theodolite.commons.workloadgeneration.dimensions.KeySpace;
import theodolite.commons.workloadgeneration.dimensions.Period;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder;
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
......@@ -73,7 +71,7 @@ public class LoadGenerator {
KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder()
.setKeySpace(new KeySpace("s_", numSensors))
.setThreads(threads)
.setPeriod(new Period(periodMs, TimeUnit.MILLISECONDS))
.setPeriod(Duration.of(periodMs, ChronoUnit.MILLIS))
.setDuration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS))
.setBeforeAction(() -> {
if (sendRegistry) {
......
......@@ -5,13 +5,11 @@ import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender;
import theodolite.commons.workloadgeneration.dimensions.KeySpace;
import theodolite.commons.workloadgeneration.dimensions.Period;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder;
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
......@@ -61,7 +59,7 @@ public class LoadGenerator {
KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder()
.setKeySpace(new KeySpace("s_", numSensors))
.setThreads(threads)
.setPeriod(new Period(periodMs, TimeUnit.MILLISECONDS))
.setPeriod(Duration.of(periodMs, ChronoUnit.MILLIS))
.setDuration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS))
.setGeneratorFunction(
sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value))
......
......@@ -5,13 +5,11 @@ import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender;
import theodolite.commons.workloadgeneration.dimensions.KeySpace;
import theodolite.commons.workloadgeneration.dimensions.Period;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGenerator;
import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBuilder;
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
......@@ -61,7 +59,7 @@ public class LoadGenerator {
KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder()
.setKeySpace(new KeySpace("s_", numSensors))
.setThreads(threads)
.setPeriod(new Period(periodMs, TimeUnit.MILLISECONDS))
.setPeriod(Duration.of(periodMs, ChronoUnit.MILLIS))
.setDuration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS))
.setGeneratorFunction(
sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value))
......
package theodolite.commons.workloadgeneration.dimensions;
import java.util.concurrent.TimeUnit;
import theodolite.commons.workloadgeneration.generators.AbstractWorkloadGenerator;
/**
* Wrapper class for the definition of period to use for the {@link AbstractWorkloadGenerator}.
*/
public class Period {
private final int period;
private final TimeUnit timeUnit;
/**
* Define a new period.
*
* @param period the period
* @param timeUnit the time unit that applies to the specified {@code period}
*/
public Period(final int period, final TimeUnit timeUnit) {
super();
this.period = period;
this.timeUnit = timeUnit;
}
public int getPeriod() {
return this.period;
}
public TimeUnit getTimeUnit() {
return this.timeUnit;
}
}
......@@ -14,7 +14,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import theodolite.commons.workloadgeneration.communication.zookeeper.WorkloadDistributor;
import theodolite.commons.workloadgeneration.dimensions.KeySpace;
import theodolite.commons.workloadgeneration.dimensions.Period;
import theodolite.commons.workloadgeneration.functions.BeforeAction;
import theodolite.commons.workloadgeneration.functions.MessageGenerator;
import theodolite.commons.workloadgeneration.functions.Transport;
......@@ -34,7 +33,7 @@ public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord>
private final int threads;
private final Period period;
private final Duration period;
private final Duration duration;
......@@ -68,7 +67,7 @@ public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord>
final ZooKeeper zooKeeper,
final KeySpace keySpace,
final int threads,
final Period period,
final Duration period,
final Duration duration,
final BeforeAction beforeAction,
final MessageGenerator<T> generatorFunction,
......@@ -97,7 +96,7 @@ public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord>
this.executor = Executors.newScheduledThreadPool(threads);
final Random random = new Random();
final int periodMs = period.getPeriod();
final int periodMs = period.getNano() / 1_000_000;
final BiConsumer<WorkloadDefinition, Worker> workerAction = (declaration, worker) -> {
......@@ -110,7 +109,7 @@ public abstract class AbstractWorkloadGenerator<T extends IMonitoringRecord>
final T message = entity.generateMessage();
final long initialDelay = random.nextInt(periodMs);
this.executor.scheduleAtFixedRate(() -> this.transport.transport(message), initialDelay,
periodMs, period.getTimeUnit());
periodMs, TimeUnit.MILLISECONDS);
});
......
......@@ -4,7 +4,6 @@ import java.time.Duration;
import kieker.common.record.IMonitoringRecord;
import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender;
import theodolite.commons.workloadgeneration.dimensions.KeySpace;
import theodolite.commons.workloadgeneration.dimensions.Period;
import theodolite.commons.workloadgeneration.functions.BeforeAction;
import theodolite.commons.workloadgeneration.functions.MessageGenerator;
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
......@@ -36,7 +35,7 @@ public class KafkaWorkloadGenerator<T extends IMonitoringRecord>
final ZooKeeper zooKeeper,
final KeySpace keySpace,
final int threads,
final Period period,
final Duration period,
final Duration duration,
final BeforeAction beforeAction,
final MessageGenerator<T> generatorFunction,
......
......@@ -5,7 +5,6 @@ import java.util.Objects;
import kieker.common.record.IMonitoringRecord;
import theodolite.commons.workloadgeneration.communication.kafka.KafkaRecordSender;
import theodolite.commons.workloadgeneration.dimensions.KeySpace;
import theodolite.commons.workloadgeneration.dimensions.Period;
import theodolite.commons.workloadgeneration.functions.BeforeAction;
import theodolite.commons.workloadgeneration.functions.MessageGenerator;
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
......@@ -18,7 +17,7 @@ public class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> {
private int threads;
private Period period;
private Duration period;
private Duration duration;
......@@ -91,7 +90,7 @@ public class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> {
* @param period the {@link Period}
* @return the builder.
*/
public KafkaWorkloadGeneratorBuilder<T> setPeriod(final Period period) {
public KafkaWorkloadGeneratorBuilder<T> setPeriod(final Duration period) {
this.period = period;
return this;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment