Skip to content
Snippets Groups Projects
Commit 12e99080 authored by Björn Vonheiden's avatar Björn Vonheiden
Browse files

Fix static analysis code style violations.

parent 1d3a979f
No related branches found
No related tags found
1 merge request!28Use Titan CC Avro Records in UC App and Workload Generator
Showing
with 175 additions and 116 deletions
......@@ -77,15 +77,15 @@ public final class LoadGenerator {
// create workload generator
final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator =
KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder()
.setInstances(instances)
.setKeySpace(new KeySpace("s_", numSensors))
.setThreads(threads)
.setPeriod(Duration.of(periodMs, ChronoUnit.MILLIS))
.setDuration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS))
.setGeneratorFunction(
.instances(instances)
.keySpace(new KeySpace("s_", numSensors))
.threads(threads)
.period(Duration.of(periodMs, ChronoUnit.MILLIS))
.duration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS))
.generatorFunction(
sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value))
.setZooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort))
.setKafkaRecordSender(kafkaRecordSender)
.zooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort))
.kafkaRecordSender(kafkaRecordSender)
.build();
// start
......
......@@ -18,6 +18,8 @@ import org.apache.kafka.streams.kstream.Suppressed.BufferConfig;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.WindowedSerdes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory;
import titan.ccp.configuration.events.Event;
import titan.ccp.configuration.events.EventSerde;
......@@ -30,8 +32,9 @@ import titan.ccp.model.sensorregistry.SensorRegistry;
*/
public class TopologyBuilder {
private static final int LATENCY_OUTPOUT_THRESHOLD = 1000;
// private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class);
private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class);
private final String inputTopic;
private final String outputTopic;
......@@ -186,7 +189,8 @@ public class TopologyBuilder {
final long latency = time - v.getTimestamp();
this.latencyStats.add(latency);
if (time - this.lastTime >= LATENCY_OUTPOUT_THRESHOLD) {
System.out.println("latency,"
if (LOGGER.isInfoEnabled()) {
LOGGER.info("latency,"
+ time + ','
+ this.latencyStats.mean() + ','
+ (this.latencyStats.count() > 0
......@@ -200,6 +204,7 @@ public class TopologyBuilder {
+ this.latencyStats.min() + ','
+ this.latencyStats.max() + ','
+ this.latencyStats.count());
}
this.latencyStats = new StatsAccumulator();
this.lastTime = time;
}
......
......@@ -3,7 +3,6 @@ package theodolite.uc2.streamprocessing;
import java.util.Optional;
import java.util.Set;
import org.junit.Test;
import theodolite.uc2.streamprocessing.OptionalParentsSerde;
public class OptionalParentsSerdeTest {
......
......@@ -2,7 +2,6 @@ package theodolite.uc2.streamprocessing;
import java.util.Set;
import org.junit.Test;
import theodolite.uc2.streamprocessing.ParentsSerde;
public class ParentsSerdeTest {
......
package theodolite.uc2.streamprocessing;
import org.junit.Test;
import theodolite.uc2.streamprocessing.SensorParentKey;
import theodolite.uc2.streamprocessing.SensorParentKeySerde;
public class SensorParentKeySerdeTest {
......
......@@ -10,8 +10,14 @@ import org.apache.kafka.common.serialization.StringSerializer;
import titan.ccp.configuration.events.Event;
import titan.ccp.configuration.events.EventSerde;
/**
* Class to publish a configuration to Kafka.
*
*/
public class ConfigPublisher {
private static final String MEMORY_CONFIG = "134217728"; // 128 MB
private final String topic;
private final Producer<Event, String> producer;
......@@ -20,6 +26,13 @@ public class ConfigPublisher {
this(bootstrapServers, topic, new Properties());
}
/**
* Creates a new {@link ConfigPublisher} object.
*
* @param bootstrapServers Zoo Keeper server.
* @param topic where to write the configuration.
* @param defaultProperties default properties.
*/
public ConfigPublisher(final String bootstrapServers, final String topic,
final Properties defaultProperties) {
this.topic = topic;
......@@ -27,13 +40,19 @@ public class ConfigPublisher {
final Properties properties = new Properties();
properties.putAll(defaultProperties);
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "134217728"); // 128 MB
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "134217728"); // 128 MB
properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, MEMORY_CONFIG);
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, MEMORY_CONFIG);
this.producer =
new KafkaProducer<>(properties, EventSerde.serializer(), new StringSerializer());
}
/**
* Publish an event with given value to the kafka topic.
*
* @param event Which {@link Event} happened.
* @param value Configuration value.
*/
public void publish(final Event event, final String value) {
final ProducerRecord<Event, String> record = new ProducerRecord<>(this.topic, event, value);
try {
......
......@@ -18,18 +18,37 @@ import titan.ccp.model.records.ActivePowerRecord;
import titan.ccp.model.sensorregistry.MutableAggregatedSensor;
import titan.ccp.model.sensorregistry.MutableSensorRegistry;
public class LoadGenerator {
/**
* The {@code LoadGenerator} creates a load in Kafka.
*/
public final class LoadGenerator {
private static final int SLEEP_PERIOD = 30_000;
private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class);
// Constants
private static final String DEEP = "deep";
private static final long MAX_DURATION_IN_DAYS = 30L;
// Make this a utility class, because all methods are static.
private LoadGenerator() {
throw new UnsupportedOperationException();
}
/**
* Main method.
*
* @param args CLI arguments
* @throws InterruptedException Interrupt happened
* @throws IOException happened.
*/
public static void main(final String[] args) throws InterruptedException, IOException {
// uc2
LOGGER.info("Start workload generator for use case UC2.");
// get environment variables
final String hierarchy = Objects.requireNonNullElse(System.getenv("HIERARCHY"), "deep");
final String hierarchy = Objects.requireNonNullElse(System.getenv("HIERARCHY"), DEEP);
final int numNestedGroups = Integer
.parseInt(Objects.requireNonNullElse(System.getenv("NUM_NESTED_GROUPS"), "1"));
final String zooKeeperHost = Objects.requireNonNullElse(System.getenv("ZK_HOST"), "localhost");
......@@ -81,12 +100,12 @@ public class LoadGenerator {
// create workload generator
final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator =
KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder()
.setInstances(instances)
.setKeySpace(new KeySpace("s_", numSensors))
.setThreads(threads)
.setPeriod(Duration.of(periodMs, ChronoUnit.MILLIS))
.setDuration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS))
.setBeforeAction(() -> {
.instances(instances)
.keySpace(new KeySpace("s_", numSensors))
.threads(threads)
.period(Duration.of(periodMs, ChronoUnit.MILLIS))
.duration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS))
.beforeAction(() -> {
if (sendRegistry) {
final ConfigPublisher configPublisher =
new ConfigPublisher(kafkaBootstrapServers, "configuration");
......@@ -96,18 +115,18 @@ public class LoadGenerator {
LOGGER.info("Now wait 30 seconds");
try {
Thread.sleep(30_000);
Thread.sleep(SLEEP_PERIOD);
} catch (final InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
LOGGER.error(e.getMessage(), e);
}
LOGGER.info("And woke up again :)");
}
})
.setGeneratorFunction(
.generatorFunction(
sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value))
.setZooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort))
.setKafkaRecordSender(kafkaRecordSender)
.zooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort))
.kafkaRecordSender(kafkaRecordSender)
.build();
// start
......@@ -119,7 +138,7 @@ public class LoadGenerator {
final int numNestedGroups,
final int numSensors) {
final MutableSensorRegistry sensorRegistry = new MutableSensorRegistry("group_lvl_0");
if (hierarchy.equals("deep")) {
if (DEEP.equals(hierarchy)) {
MutableAggregatedSensor lastSensor = sensorRegistry.getTopLevelSensor();
for (int lvl = 1; lvl < numNestedGroups; lvl++) {
lastSensor = lastSensor.addChildAggregatedSensor("group_lvl_" + lvl);
......@@ -127,7 +146,7 @@ public class LoadGenerator {
for (int s = 0; s < numSensors; s++) {
lastSensor.addChildMachineSensor("sensor_" + s);
}
} else if (hierarchy.equals("full")) {
} else if ("full".equals(hierarchy)) {
addChildren(sensorRegistry.getTopLevelSensor(), numSensors, 1, numNestedGroups, 0);
} else {
throw new IllegalStateException();
......@@ -136,8 +155,8 @@ public class LoadGenerator {
}
private static int addChildren(final MutableAggregatedSensor parent, final int numChildren,
final int lvl,
final int maxLvl, int nextId) {
final int lvl, final int maxLvl, final int startId) {
int nextId = startId;
for (int c = 0; c < numChildren; c++) {
if (lvl == maxLvl) {
parent.addChildMachineSensor("s_" + nextId);
......
......@@ -15,12 +15,28 @@ import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBu
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
import titan.ccp.model.records.ActivePowerRecord;
public class LoadGenerator {
/**
* The {@code LoadGenerator} creates a load in Kafka.
*/
public final class LoadGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class);
// constants
private static final long MAX_DURATION_IN_DAYS = 30L;
// Make this a utility class, because all methods are static.
private LoadGenerator() {
throw new UnsupportedOperationException();
}
/**
* Main method.
*
* @param args CLI arguments
* @throws InterruptedException Interrupt happened
* @throws IOException happened.
*/
public static void main(final String[] args) throws InterruptedException, IOException {
// uc2
LOGGER.info("Start workload generator for use case UC3.");
......@@ -68,15 +84,15 @@ public class LoadGenerator {
// create workload generator
final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator =
KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder()
.setInstances(instances)
.setKeySpace(new KeySpace("s_", numSensors))
.setThreads(threads)
.setPeriod(Duration.of(periodMs, ChronoUnit.MILLIS))
.setDuration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS))
.setGeneratorFunction(
.instances(instances)
.keySpace(new KeySpace("s_", numSensors))
.threads(threads)
.period(Duration.of(periodMs, ChronoUnit.MILLIS))
.duration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS))
.generatorFunction(
sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value))
.setZooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort))
.setKafkaRecordSender(kafkaRecordSender)
.zooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort))
.kafkaRecordSender(kafkaRecordSender)
.build();
// start
......
......@@ -15,12 +15,28 @@ import theodolite.commons.workloadgeneration.generators.KafkaWorkloadGeneratorBu
import theodolite.commons.workloadgeneration.misc.ZooKeeper;
import titan.ccp.model.records.ActivePowerRecord;
public class LoadGenerator {
/**
* The {@code LoadGenerator} creates a load in Kafka.
*/
public final class LoadGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class);
// constants
private static final long MAX_DURATION_IN_DAYS = 30L;
// Make this a utility class, because all methods are static.
private LoadGenerator() {
throw new UnsupportedOperationException();
}
/**
* Main method.
*
* @param args CLI arguments
* @throws InterruptedException Interrupt happened
* @throws IOException happened.
*/
public static void main(final String[] args) throws InterruptedException, IOException {
// uc4
LOGGER.info("Start workload generator for use case UC4.");
......@@ -69,15 +85,15 @@ public class LoadGenerator {
// create workload generator
final KafkaWorkloadGenerator<ActivePowerRecord> workloadGenerator =
KafkaWorkloadGeneratorBuilder.<ActivePowerRecord>builder()
.setInstances(instances)
.setKeySpace(new KeySpace("s_", numSensors))
.setThreads(threads)
.setPeriod(Duration.of(periodMs, ChronoUnit.MILLIS))
.setDuration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS))
.setGeneratorFunction(
.instances(instances)
.keySpace(new KeySpace("s_", numSensors))
.threads(threads)
.period(Duration.of(periodMs, ChronoUnit.MILLIS))
.duration(Duration.of(MAX_DURATION_IN_DAYS, ChronoUnit.DAYS))
.generatorFunction(
sensor -> new ActivePowerRecord(sensor, System.currentTimeMillis(), value))
.setZooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort))
.setKafkaRecordSender(kafkaRecordSender)
.zooKeeper(new ZooKeeper(zooKeeperHost, zooKeeperPort))
.kafkaRecordSender(kafkaRecordSender)
.build();
// start
......
......@@ -44,10 +44,10 @@ public class WorkloadDistributor {
private final BiConsumer<WorkloadDefinition, Integer> workerAction;
private final int instances;
private final ZooKeeper zooKeeper;
private final ZooKeeper zooKeeper; // NOPMD keep instance variable instead of local variable
private final CuratorFramework client;
private boolean workloadGenerationStarted = false;
private boolean workloadGenerationStarted = false; // NOPMD explicit intention that false
/**
* Create a new workload distributor.
......@@ -79,8 +79,8 @@ public class WorkloadDistributor {
try {
this.client.blockUntilConnected();
} catch (final InterruptedException e) {
LOGGER.error("", e);
throw new IllegalStateException();
LOGGER.error(e.getMessage(), e);
throw new IllegalStateException(e);
}
this.counter =
......@@ -142,9 +142,9 @@ public class WorkloadDistributor {
if (!this.workloadGenerationStarted) {
LOGGER.warn("No workload definition retrieved for 20 s. Terminating now..");
}
} catch (final Exception e) {
LOGGER.error("", e);
throw new IllegalStateException("Error when starting the distribution of the workload.");
} catch (final Exception e) { // NOPMD need to catch exception because of external framework
LOGGER.error(e.getMessage(), e);
throw new IllegalStateException("Error when starting the distribution of the workload.", e);
}
}
......@@ -154,7 +154,9 @@ public class WorkloadDistributor {
* @param workerId the ID of this worker
* @throws Exception when an error occurs
*/
private synchronized void startWorkloadGeneration(final int workerId) throws Exception {
// NOPMD because exception thrown from used framework
private synchronized void startWorkloadGeneration(final int workerId) throws Exception { // NOPMD
if (!this.workloadGenerationStarted) {
this.workloadGenerationStarted = true;
......@@ -181,9 +183,9 @@ public class WorkloadDistributor {
if (event.getType() == EventType.NodeChildrenChanged) {
try {
WorkloadDistributor.this.startWorkloadGeneration(workerId);
} catch (final Exception e) {
LOGGER.error("", e);
throw new IllegalStateException("Error starting workload generation.");
} catch (final Exception e) { // NOPMD external framework throws exception
LOGGER.error(e.getMessage(), e);
throw new IllegalStateException("Error starting workload generation.", e);
}
}
}
......
......@@ -30,17 +30,14 @@ public abstract class AbstractWorkloadGenerator<T>
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractWorkloadGenerator.class);
private final int instances;
private final ZooKeeper zooKeeper;
private final KeySpace keySpace;
private final int threads;
private final Duration period;
private final Duration duration;
private final BeforeAction beforeAction;
private final int instances; // NOPMD keep instance variable instead of local variable
private final ZooKeeper zooKeeper; // NOPMD keep instance variable instead of local variable
private final KeySpace keySpace;// NOPMD keep instance variable instead of local variable
private final BeforeAction beforeAction; // NOPMD keep instance variable instead of local variable
private final BiFunction<WorkloadDefinition, Integer, List<WorkloadEntity<T>>> workloadSelector;
private final MessageGenerator<T> generatorFunction;
private final Transport<T> transport;
private WorkloadDistributor workloadDistributor;
private WorkloadDistributor workloadDistributor; // NOPMD keep instance variable instead of local
private final ScheduledExecutorService executor;
/**
......@@ -68,10 +65,7 @@ public abstract class AbstractWorkloadGenerator<T>
final Transport<T> transport) {
this.instances = instances;
this.zooKeeper = zooKeeper;
this.period = period;
this.threads = threads;
this.keySpace = keySpace;
this.duration = duration;
this.beforeAction = beforeAction;
this.generatorFunction = generatorFunction;
this.workloadSelector = (workloadDefinition, workerId) -> {
......@@ -93,7 +87,7 @@ public abstract class AbstractWorkloadGenerator<T>
final int periodMs = (int) period.toMillis();
LOGGER.info("Period: " + periodMs);
LOGGER.info("Period: " + periodMs); // NOPMD no computational intensive logger call
final BiConsumer<WorkloadDefinition, Integer> workerAction = (declaration, workerId) -> {
......@@ -120,7 +114,7 @@ public abstract class AbstractWorkloadGenerator<T>
this.stop();
} catch (final InterruptedException e) {
LOGGER.error("", e);
throw new IllegalStateException("Error when terminating the workload generation.");
throw new IllegalStateException("Error when terminating the workload generation.", e);
}
};
......
......@@ -14,25 +14,17 @@ import theodolite.commons.workloadgeneration.misc.ZooKeeper;
*
* @param <T> the record for which the builder is dedicated for.
*/
public final class KafkaWorkloadGeneratorBuilder<T extends SpecificRecord> {
private int instances;
private ZooKeeper zooKeeper;
private KeySpace keySpace;
private int threads;
private Duration period;
private Duration duration;
private BeforeAction beforeAction;
private MessageGenerator<T> generatorFunction;
private KafkaRecordSender<T> kafkaRecordSender;
public final class KafkaWorkloadGeneratorBuilder<T extends SpecificRecord> { // NOPMD
private int instances; // NOPMD
private ZooKeeper zooKeeper; // NOPMD
private KeySpace keySpace; // NOPMD
private int threads; // NOPMD
private Duration period; // NOPMD
private Duration duration; // NOPMD
private BeforeAction beforeAction; // NOPMD
private MessageGenerator<T> generatorFunction; // NOPMD
private KafkaRecordSender<T> kafkaRecordSender; // NOPMD
private KafkaWorkloadGeneratorBuilder() {
......@@ -53,7 +45,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends SpecificRecord> {
* @param instances the number of instances.
* @return the builder.
*/
public KafkaWorkloadGeneratorBuilder<T> setInstances(final int instances) {
public KafkaWorkloadGeneratorBuilder<T> instances(final int instances) {
this.instances = instances;
return this;
}
......@@ -64,7 +56,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends SpecificRecord> {
* @param zooKeeper a reference to the ZooKeeper instance.
* @return the builder.
*/
public KafkaWorkloadGeneratorBuilder<T> setZooKeeper(final ZooKeeper zooKeeper) {
public KafkaWorkloadGeneratorBuilder<T> zooKeeper(final ZooKeeper zooKeeper) {
this.zooKeeper = zooKeeper;
return this;
}
......@@ -75,7 +67,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends SpecificRecord> {
* @param beforeAction the {@link BeforeAction}.
* @return the builder.
*/
public KafkaWorkloadGeneratorBuilder<T> setBeforeAction(final BeforeAction beforeAction) {
public KafkaWorkloadGeneratorBuilder<T> beforeAction(final BeforeAction beforeAction) {
this.beforeAction = beforeAction;
return this;
}
......@@ -86,7 +78,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends SpecificRecord> {
* @param keySpace the {@link KeySpace}.
* @return the builder.
*/
public KafkaWorkloadGeneratorBuilder<T> setKeySpace(final KeySpace keySpace) {
public KafkaWorkloadGeneratorBuilder<T> keySpace(final KeySpace keySpace) {
this.keySpace = keySpace;
return this;
}
......@@ -97,7 +89,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends SpecificRecord> {
* @param threads the number of threads.
* @return the builder.
*/
public KafkaWorkloadGeneratorBuilder<T> setThreads(final int threads) {
public KafkaWorkloadGeneratorBuilder<T> threads(final int threads) {
this.threads = threads;
return this;
}
......@@ -108,7 +100,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends SpecificRecord> {
* @param period the {@link Period}
* @return the builder.
*/
public KafkaWorkloadGeneratorBuilder<T> setPeriod(final Duration period) {
public KafkaWorkloadGeneratorBuilder<T> period(final Duration period) {
this.period = period;
return this;
}
......@@ -119,7 +111,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends SpecificRecord> {
* @param duration the {@link Duration}.
* @return the builder.
*/
public KafkaWorkloadGeneratorBuilder<T> setDuration(final Duration duration) {
public KafkaWorkloadGeneratorBuilder<T> duration(final Duration duration) {
this.duration = duration;
return this;
}
......@@ -130,7 +122,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends SpecificRecord> {
* @param generatorFunction the generator function.
* @return the builder.
*/
public KafkaWorkloadGeneratorBuilder<T> setGeneratorFunction(
public KafkaWorkloadGeneratorBuilder<T> generatorFunction(
final MessageGenerator<T> generatorFunction) {
this.generatorFunction = generatorFunction;
return this;
......@@ -142,7 +134,7 @@ public final class KafkaWorkloadGeneratorBuilder<T extends SpecificRecord> {
* @param kafkaRecordSender the record sender to use.
* @return the builder.
*/
public KafkaWorkloadGeneratorBuilder<T> setKafkaRecordSender(
public KafkaWorkloadGeneratorBuilder<T> kafkaRecordSender(
final KafkaRecordSender<T> kafkaRecordSender) {
this.kafkaRecordSender = kafkaRecordSender;
return this;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment