Skip to content
Snippets Groups Projects
Commit 94a3999e authored by Simon Ehrenstein's avatar Simon Ehrenstein
Browse files

Implement uc1 wg with new wg

parent 3cd0b988
No related branches found
No related tags found
1 merge request!6Add Distributed Workload Generator
package common.misc.copy;
import common.functions.MessageGenerator;
import common.messages.OutputMessage;
import kieker.common.record.IMonitoringRecord;
public class WorkloadEntity<T extends IMonitoringRecord> {
private final String key;
private final MessageGenerator<T> generator;
public WorkloadEntity(final String key, final MessageGenerator<T> generator) {
this.key = key;
this.generator = generator;
}
public OutputMessage<T> generateMessage() {
return this.generator.generateMessage(this.key);
}
}
......@@ -2,7 +2,6 @@ package communication.kafka;
import java.util.Properties;
import java.util.function.Function;
import kieker.common.record.IMonitoringRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
......@@ -11,6 +10,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import common.functions.Transport;
import common.messages.OutputMessage;
import kieker.common.record.IMonitoringRecord;
import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
......@@ -84,7 +84,8 @@ public class KafkaRecordSender<T extends IMonitoringRecord> implements Transport
}
@Override
public void transport(OutputMessage<T> message) {
public void transport(final OutputMessage<T> message) {
System.out.println(message.getKey());
this.write(message.getValue());
}
......
package communication.kafka.copy;
import java.util.Properties;
import java.util.function.Function;
import kieker.common.record.IMonitoringRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import common.functions.Transport;
import common.messages.OutputMessage;
import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde;
/**
* Sends monitoring records to Kafka.
*
* @param <T> {@link IMonitoringRecord} to send
*/
public class KafkaRecordSender<T extends IMonitoringRecord> implements Transport<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class);
private final String topic;
private final Function<T, String> keyAccessor;
private final Function<T, Long> timestampAccessor;
private final Producer<String, T> producer;
public KafkaRecordSender(final String bootstrapServers, final String topic) {
this(bootstrapServers, topic, x -> "", x -> null, new Properties());
}
public KafkaRecordSender(final String bootstrapServers, final String topic,
final Function<T, String> keyAccessor) {
this(bootstrapServers, topic, keyAccessor, x -> null, new Properties());
}
public KafkaRecordSender(final String bootstrapServers, final String topic,
final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor) {
this(bootstrapServers, topic, keyAccessor, timestampAccessor, new Properties());
}
/**
* Create a new {@link KafkaRecordSender}.
*/
public KafkaRecordSender(final String bootstrapServers, final String topic,
final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor,
final Properties defaultProperties) {
this.topic = topic;
this.keyAccessor = keyAccessor;
this.timestampAccessor = timestampAccessor;
final Properties properties = new Properties();
properties.putAll(defaultProperties);
properties.put("bootstrap.servers", bootstrapServers);
// properties.put("acks", this.acknowledges);
// properties.put("batch.size", this.batchSize);
// properties.put("linger.ms", this.lingerMs);
// properties.put("buffer.memory", this.bufferMemory);
this.producer = new KafkaProducer<>(properties, new StringSerializer(),
IMonitoringRecordSerde.serializer());
}
/**
* Write the passed monitoring record to Kafka.
*/
public void write(final T monitoringRecord) {
final ProducerRecord<String, T> record =
new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord),
this.keyAccessor.apply(monitoringRecord), monitoringRecord);
LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record);
this.producer.send(record);
}
public void terminate() {
this.producer.close();
}
@Override
public void transport(OutputMessage<T> message) {
this.write(message.getValue());
}
}
......@@ -7,32 +7,37 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import common.dimensions.KeySpace;
import common.functions.BeforeAction;
import common.misc.Worker;
import common.misc.WorkloadDefinition;
import common.misc.ZooKeeper;
/*
* The central class responsible for distributing the workload through all workload generators.
*/
public class WorkloadDistributor {
private static final Logger LOGGER = LoggerFactory.getLogger(WorkloadDistributor.class);
private static final String NAMESPACE = "workload-generation";
private static final String COUNTER_PATH = "/counter";
private static final String WORKLOAD_PATH = "/workload";
private static final String WORKLOAD_DEFINITION_PATH = "/workload/definition";
private final DistributedAtomicInteger counter;
private final KeySpace keySpace;
private final BeforeAction beforeAction;
private final BiConsumer<WorkloadDefinition, Worker> workerAction;
private final CuratorFramework client =
CuratorFrameworkFactory.newClient("127.0.0.1:2181", new RetryNTimes(3, 1000));
private final ZooKeeper zooKeeper;
private final CuratorFramework client;
/**
* Create a new workload distributor.
......@@ -41,13 +46,22 @@ public class WorkloadDistributor {
* @param beforeAction the before action for the workload generation.
* @param workerAction the action to perform by the workers.
*/
public WorkloadDistributor(final KeySpace keySpace, final BeforeAction beforeAction,
public WorkloadDistributor(
final ZooKeeper zooKeeper,
final KeySpace keySpace,
final BeforeAction beforeAction,
final BiConsumer<WorkloadDefinition, Worker> workerAction) {
this.zooKeeper = zooKeeper;
this.keySpace = keySpace;
this.beforeAction = beforeAction;
this.workerAction = workerAction;
this.client = CuratorFrameworkFactory.builder()
.namespace(NAMESPACE)
.connectString(this.zooKeeper.getHost() + ":" + this.zooKeeper.getPort())
.retryPolicy(new ExponentialBackoffRetry(2000, 5))
.build();
this.client.start();
try {
......@@ -58,7 +72,8 @@ public class WorkloadDistributor {
}
this.counter =
new DistributedAtomicInteger(this.client, COUNTER_PATH, new RetryNTimes(3, 1000));
new DistributedAtomicInteger(this.client, COUNTER_PATH,
new ExponentialBackoffRetry(2000, 5));
}
/**
......@@ -78,7 +93,7 @@ public class WorkloadDistributor {
this.client.checkExists().creatingParentsIfNeeded().forPath(WORKLOAD_DEFINITION_PATH);
if (worker.getId() == 0) {
System.out.println("is master with id " + worker.getId());
LOGGER.info("This instance is master with id {}", worker.getId());
this.beforeAction.run();
......@@ -89,7 +104,7 @@ public class WorkloadDistributor {
final int numberOfWorkers = this.counter.get().postValue();
System.out.printf("Number of Workers: %d\n", numberOfWorkers);
LOGGER.info("Number of Workers: {}", numberOfWorkers);
final WorkloadDefinition declaration =
new WorkloadDefinition(this.keySpace, numberOfWorkers);
......@@ -98,7 +113,7 @@ public class WorkloadDistributor {
declaration.toString().getBytes(StandardCharsets.UTF_8));
} else {
System.out.println("is worker with id " + worker.getId());
LOGGER.info("This instance is worker with id {}", worker.getId());
this.client.getChildren().usingWatcher(watcher).forPath(WORKLOAD_PATH);
}
......@@ -143,7 +158,6 @@ public class WorkloadDistributor {
*/
public void stop() {
this.client.close();
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment