Skip to content
Snippets Groups Projects
Commit c7f4b710 authored by Simon Ehrenstein's avatar Simon Ehrenstein
Browse files

Replace sysouts with LOGGER calls

parent 47e43546
No related branches found
No related tags found
1 merge request!6Add Distributed Workload Generator
This commit is part of merge request !6. Comments created here will be created in the context of that merge request.
Showing
with 29 additions and 259 deletions
9 13
[Dolphin]
Timestamp=2020,5,12,13,3,57
Version=4
[Settings]
HiddenFilesShown=true
cleanup.add_default_serial_version_id=true
cleanup.add_generated_serial_version_id=false
cleanup.add_missing_annotations=true
cleanup.add_missing_deprecated_annotations=true
cleanup.add_missing_methods=false
cleanup.add_missing_nls_tags=false
cleanup.add_missing_override_annotations=true
cleanup.add_missing_override_annotations_interface_methods=true
cleanup.add_serial_version_id=false
cleanup.always_use_blocks=true
cleanup.always_use_parentheses_in_expressions=false
cleanup.always_use_this_for_non_static_field_access=true
cleanup.always_use_this_for_non_static_method_access=true
cleanup.convert_functional_interfaces=false
cleanup.convert_to_enhanced_for_loop=true
cleanup.correct_indentation=true
cleanup.format_source_code=true
cleanup.format_source_code_changes_only=false
cleanup.insert_inferred_type_arguments=false
cleanup.make_local_variable_final=true
cleanup.make_parameters_final=true
cleanup.make_private_fields_final=true
cleanup.make_type_abstract_if_missing_method=false
cleanup.make_variable_declarations_final=true
cleanup.never_use_blocks=false
cleanup.never_use_parentheses_in_expressions=true
cleanup.organize_imports=true
cleanup.qualify_static_field_accesses_with_declaring_class=false
cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
cleanup.qualify_static_member_accesses_with_declaring_class=true
cleanup.qualify_static_method_accesses_with_declaring_class=false
cleanup.remove_private_constructors=true
cleanup.remove_redundant_modifiers=false
cleanup.remove_redundant_semicolons=true
cleanup.remove_redundant_type_arguments=true
cleanup.remove_trailing_whitespaces=true
cleanup.remove_trailing_whitespaces_all=true
cleanup.remove_trailing_whitespaces_ignore_empty=false
cleanup.remove_unnecessary_casts=true
cleanup.remove_unnecessary_nls_tags=true
cleanup.remove_unused_imports=true
cleanup.remove_unused_local_variables=false
cleanup.remove_unused_private_fields=true
cleanup.remove_unused_private_members=false
cleanup.remove_unused_private_methods=true
cleanup.remove_unused_private_types=true
cleanup.sort_members=false
cleanup.sort_members_all=false
cleanup.use_anonymous_class_creation=false
cleanup.use_blocks=true
cleanup.use_blocks_only_for_return_and_throw=false
cleanup.use_lambda=true
cleanup.use_parentheses_in_expressions=true
cleanup.use_this_for_non_static_field_access=true
cleanup.use_this_for_non_static_field_access_only_if_necessary=false
cleanup.use_this_for_non_static_method_access=true
cleanup.use_this_for_non_static_method_access_only_if_necessary=false
cleanup_profile=_CAU-SE-Style
cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_CAU-SE-Style
formatter_settings_version=16
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=;
org.eclipse.jdt.ui.ondemandthreshold=99
org.eclipse.jdt.ui.staticondemandthreshold=99
sp_cleanup.add_default_serial_version_id=true
sp_cleanup.add_generated_serial_version_id=false
sp_cleanup.add_missing_annotations=true
sp_cleanup.add_missing_deprecated_annotations=true
sp_cleanup.add_missing_methods=false
sp_cleanup.add_missing_nls_tags=false
sp_cleanup.add_missing_override_annotations=true
sp_cleanup.add_missing_override_annotations_interface_methods=true
sp_cleanup.add_serial_version_id=false
sp_cleanup.always_use_blocks=true
sp_cleanup.always_use_parentheses_in_expressions=false
sp_cleanup.always_use_this_for_non_static_field_access=true
sp_cleanup.always_use_this_for_non_static_method_access=true
sp_cleanup.convert_functional_interfaces=false
sp_cleanup.convert_to_enhanced_for_loop=true
sp_cleanup.correct_indentation=true
sp_cleanup.format_source_code=true
sp_cleanup.format_source_code_changes_only=false
sp_cleanup.insert_inferred_type_arguments=false
sp_cleanup.make_local_variable_final=true
sp_cleanup.make_parameters_final=true
sp_cleanup.make_private_fields_final=true
sp_cleanup.make_type_abstract_if_missing_method=false
sp_cleanup.make_variable_declarations_final=true
sp_cleanup.never_use_blocks=false
sp_cleanup.never_use_parentheses_in_expressions=true
sp_cleanup.on_save_use_additional_actions=true
sp_cleanup.organize_imports=true
sp_cleanup.qualify_static_field_accesses_with_declaring_class=false
sp_cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
sp_cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
sp_cleanup.qualify_static_member_accesses_with_declaring_class=true
sp_cleanup.qualify_static_method_accesses_with_declaring_class=false
sp_cleanup.remove_private_constructors=true
sp_cleanup.remove_redundant_modifiers=false
sp_cleanup.remove_redundant_semicolons=true
sp_cleanup.remove_redundant_type_arguments=true
sp_cleanup.remove_trailing_whitespaces=true
sp_cleanup.remove_trailing_whitespaces_all=true
sp_cleanup.remove_trailing_whitespaces_ignore_empty=false
sp_cleanup.remove_unnecessary_casts=true
sp_cleanup.remove_unnecessary_nls_tags=true
sp_cleanup.remove_unused_imports=true
sp_cleanup.remove_unused_local_variables=false
sp_cleanup.remove_unused_private_fields=true
sp_cleanup.remove_unused_private_members=false
sp_cleanup.remove_unused_private_methods=true
sp_cleanup.remove_unused_private_types=true
sp_cleanup.sort_members=false
sp_cleanup.sort_members_all=false
sp_cleanup.use_anonymous_class_creation=false
sp_cleanup.use_blocks=true
sp_cleanup.use_blocks_only_for_return_and_throw=false
sp_cleanup.use_lambda=true
sp_cleanup.use_parentheses_in_expressions=true
sp_cleanup.use_this_for_non_static_field_access=true
sp_cleanup.use_this_for_non_static_field_access_only_if_necessary=false
sp_cleanup.use_this_for_non_static_method_access=true
sp_cleanup.use_this_for_non_static_method_access_only_if_necessary=false
configFilePath=../config/checkstyle.xml
customModulesJarPaths=
eclipse.preferences.version=1
enabled=true
customRulesJars=
eclipse.preferences.version=1
enabled=false
ruleSetFilePath=../config/pmd.xml
apply plugin: 'application'
apply plugin: 'eclipse'
buildscript {
repositories {
maven {
url "https://plugins.gradle.org/m2/"
}
}
}
sourceCompatibility = "1.11"
targetCompatibility = "1.11"
dependencies {
compile project(':')
compile 'org.slf4j:slf4j-simple:1.6.1'
// Use JUnit test framework
testCompile 'junit:junit:4.12'
}
mainClassName = "test.Main"
eclipse {
classpath {
downloadSources=true
downloadJavadoc=true
}
}
version: '3.1'
services:
version: '3.1'
services:
zookeeper:
image: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost # Replace with docker network
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 30000
KAFKA_CREATE_TOPICS: "input:3:1"
package test;
import common.dimensions.Duration;
import common.dimensions.KeySpace;
import common.dimensions.Period;
import common.generators.KafkaWorkloadGenerator;
import common.generators.KafkaWorkloadGeneratorBuilder;
import common.messages.OutputMessage;
import common.misc.ZooKeeper;
import communication.kafka.KafkaRecordSender;
import java.util.concurrent.TimeUnit;
import kieker.common.record.IMonitoringRecord;
import titan.ccp.models.records.ActivePowerRecord;
public class Main {
public static void main(final String[] args) {
final KafkaRecordSender<IMonitoringRecord> recordSender =
new KafkaRecordSender<>("localhost:9092", "input");
final KafkaWorkloadGenerator<IMonitoringRecord> generator =
KafkaWorkloadGeneratorBuilder.builder()
.setZooKeeper(new ZooKeeper("127.0.0.1", 2181))
.setKafkaRecordSender(recordSender)
.setBeforeAction(() -> {
System.out.println("Before Hook");
})
.setKeySpace(new KeySpace(5))
.setPeriod(new Period(1000, TimeUnit.MILLISECONDS))
.setDuration(new Duration(60, TimeUnit.SECONDS))
.setGeneratorFunction(
key -> new OutputMessage<>(key,
new ActivePowerRecord(key, 0L, 100d)))
.build();
generator.start();
}
}
...@@ -78,16 +78,16 @@ public class LoadGenerator { ...@@ -78,16 +78,16 @@ public class LoadGenerator {
new ConfigPublisher(kafkaBootstrapServers, "configuration"); new ConfigPublisher(kafkaBootstrapServers, "configuration");
configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson()); configPublisher.publish(Event.SENSOR_REGISTRY_CHANGED, sensorRegistry.toJson());
configPublisher.close(); configPublisher.close();
System.out.println("Configuration sent."); LOGGER.info("Configuration sent.");
System.out.println("Now wait 30 seconds"); LOGGER.info("Now wait 30 seconds");
try { try {
Thread.sleep(30_000); Thread.sleep(30_000);
} catch (final InterruptedException e) { } catch (final InterruptedException e) {
// TODO Auto-generated catch block // TODO Auto-generated catch block
e.printStackTrace(); e.printStackTrace();
} }
System.out.println("And woke up again :)"); LOGGER.info("And woke up again :)");
} }
}) })
.setGeneratorFunction( .setGeneratorFunction(
......
...@@ -20,9 +20,8 @@ public class LoadGenerator { ...@@ -20,9 +20,8 @@ public class LoadGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class); private static final Logger LOGGER = LoggerFactory.getLogger(LoadGenerator.class);
private static final int WL_MAX_RECORDS = 150_000;
public static void main(final String[] args) throws InterruptedException, IOException { public static void main(final String[] args) throws InterruptedException, IOException {
// uc2
LOGGER.info("Start workload generator for use case UC3."); LOGGER.info("Start workload generator for use case UC3.");
// get environment variables // get environment variables
......
package common.dimensions;
/*
* Base class for workload dimensions.
*/
public abstract class Dimension {
}
...@@ -6,7 +6,7 @@ import common.generators.WorkloadGenerator; ...@@ -6,7 +6,7 @@ import common.generators.WorkloadGenerator;
/** /**
* Wrapper class for the definition of the duration for the {@link WorkloadGenerator}. * Wrapper class for the definition of the duration for the {@link WorkloadGenerator}.
*/ */
public class Duration extends Dimension { public class Duration {
private final int duration; private final int duration;
private final TimeUnit timeUnit; private final TimeUnit timeUnit;
......
...@@ -6,7 +6,7 @@ import common.generators.WorkloadGenerator; ...@@ -6,7 +6,7 @@ import common.generators.WorkloadGenerator;
* Wrapper class for the definition of the Keys that should be used by the * Wrapper class for the definition of the Keys that should be used by the
* {@link WorkloadGenerator}. * {@link WorkloadGenerator}.
*/ */
public class KeySpace extends Dimension { public class KeySpace {
private final String prefix; private final String prefix;
private final int min; private final int min;
......
...@@ -6,7 +6,7 @@ import common.generators.WorkloadGenerator; ...@@ -6,7 +6,7 @@ import common.generators.WorkloadGenerator;
/** /**
* Wrapper class for the definition of period to use for the {@link WorkloadGenerator}. * Wrapper class for the definition of period to use for the {@link WorkloadGenerator}.
*/ */
public class Period extends Dimension { public class Period {
private final int period; private final int period;
private final TimeUnit timeUnit; private final TimeUnit timeUnit;
......
...@@ -41,6 +41,12 @@ public class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> { ...@@ -41,6 +41,12 @@ public class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> {
return new KafkaWorkloadGeneratorBuilder<>(); return new KafkaWorkloadGeneratorBuilder<>();
} }
/**
* Set the ZooKeeper reference.
*
* @param zooKeeper a reference to the ZooKeeper instance.
* @return the builder.
*/
public KafkaWorkloadGeneratorBuilder<T> setZooKeeper(final ZooKeeper zooKeeper) { public KafkaWorkloadGeneratorBuilder<T> setZooKeeper(final ZooKeeper zooKeeper) {
this.zooKeeper = zooKeeper; this.zooKeeper = zooKeeper;
return this; return this;
...@@ -129,6 +135,7 @@ public class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> { ...@@ -129,6 +135,7 @@ public class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> {
* Build the actual {@link KafkaWorkloadGenerator}. The following parameters are must be * Build the actual {@link KafkaWorkloadGenerator}. The following parameters are must be
* specicified before this method is called: * specicified before this method is called:
* <ul> * <ul>
* <li>zookeeper</li>
* <li>key space</li> * <li>key space</li>
* <li>period</li> * <li>period</li>
* <li>duration</li> * <li>duration</li>
......
...@@ -78,13 +78,13 @@ public abstract class WorkloadGenerator<T extends IMonitoringRecord> implements ...@@ -78,13 +78,13 @@ public abstract class WorkloadGenerator<T extends IMonitoringRecord> implements
this.duration = duration; this.duration = duration;
this.beforeAction = beforeAction; this.beforeAction = beforeAction;
this.generatorFunction = generatorFunction; this.generatorFunction = generatorFunction;
this.workloadSelector = (workloadDeclaration, worker) -> { this.workloadSelector = (workloadDefinition, worker) -> {
final List<WorkloadEntity<T>> workloadEntities = new LinkedList<>(); final List<WorkloadEntity<T>> workloadEntities = new LinkedList<>();
for (int i = for (int i =
workloadDeclaration.getKeySpace().getMin() + worker.getId(); i <= workloadDeclaration workloadDefinition.getKeySpace().getMin() + worker.getId(); i <= workloadDefinition
.getKeySpace().getMax(); i += workloadDeclaration.getNumberOfWorkers()) { .getKeySpace().getMax(); i += workloadDefinition.getNumberOfWorkers()) {
final String id = workloadDeclaration.getKeySpace().getPrefix() + i; final String id = workloadDefinition.getKeySpace().getPrefix() + i;
workloadEntities.add(new WorkloadEntity<>(id, this.generatorFunction)); workloadEntities.add(new WorkloadEntity<>(id, this.generatorFunction));
} }
...@@ -103,6 +103,7 @@ public abstract class WorkloadGenerator<T extends IMonitoringRecord> implements ...@@ -103,6 +103,7 @@ public abstract class WorkloadGenerator<T extends IMonitoringRecord> implements
LOGGER.info("Beginning of Experiment..."); LOGGER.info("Beginning of Experiment...");
LOGGER.info("Experiment is going to be executed for the specified duration..."); LOGGER.info("Experiment is going to be executed for the specified duration...");
entities.forEach(entity -> { entities.forEach(entity -> {
final T message = entity.generateMessage(); final T message = entity.generateMessage();
final long initialDelay = random.nextInt(periodMs); final long initialDelay = random.nextInt(periodMs);
......
...@@ -3,6 +3,9 @@ package common.misc; ...@@ -3,6 +3,9 @@ package common.misc;
import common.functions.MessageGenerator; import common.functions.MessageGenerator;
import kieker.common.record.IMonitoringRecord; import kieker.common.record.IMonitoringRecord;
/*
* Representation of a entity of the workload generation that generates load for one fixed key.
*/
public class WorkloadEntity<T extends IMonitoringRecord> { public class WorkloadEntity<T extends IMonitoringRecord> {
private final String key; private final String key;
private final MessageGenerator<T> generator; private final MessageGenerator<T> generator;
......
...@@ -106,11 +106,11 @@ public class WorkloadDistributor { ...@@ -106,11 +106,11 @@ public class WorkloadDistributor {
LOGGER.info("Number of Workers: {}", numberOfWorkers); LOGGER.info("Number of Workers: {}", numberOfWorkers);
final WorkloadDefinition declaration = final WorkloadDefinition definition =
new WorkloadDefinition(this.keySpace, numberOfWorkers); new WorkloadDefinition(this.keySpace, numberOfWorkers);
this.client.create().withMode(CreateMode.EPHEMERAL).forPath(WORKLOAD_DEFINITION_PATH, this.client.create().withMode(CreateMode.EPHEMERAL).forPath(WORKLOAD_DEFINITION_PATH,
declaration.toString().getBytes(StandardCharsets.UTF_8)); definition.toString().getBytes(StandardCharsets.UTF_8));
} else { } else {
LOGGER.info("This instance is worker with id {}", worker.getId()); LOGGER.info("This instance is worker with id {}", worker.getId());
...@@ -118,7 +118,7 @@ public class WorkloadDistributor { ...@@ -118,7 +118,7 @@ public class WorkloadDistributor {
this.client.getChildren().usingWatcher(watcher).forPath(WORKLOAD_PATH); this.client.getChildren().usingWatcher(watcher).forPath(WORKLOAD_PATH);
} }
Thread.sleep(20000); // wait until the workload declaration is retrieved Thread.sleep(20000); // wait until the workload definition is retrieved
} catch (final Exception e) { } catch (final Exception e) {
// TODO Auto-generated catch block // TODO Auto-generated catch block
e.printStackTrace(); e.printStackTrace();
...@@ -139,16 +139,16 @@ public class WorkloadDistributor { ...@@ -139,16 +139,16 @@ public class WorkloadDistributor {
if (event.getType() == EventType.NodeChildrenChanged) { if (event.getType() == EventType.NodeChildrenChanged) {
final byte[] bytes = final byte[] bytes =
WorkloadDistributor.this.client.getData().forPath(WORKLOAD_DEFINITION_PATH); WorkloadDistributor.this.client.getData().forPath(WORKLOAD_DEFINITION_PATH);
final WorkloadDefinition declaration = final WorkloadDefinition definition =
WorkloadDefinition.fromString(new String(bytes, StandardCharsets.UTF_8)); WorkloadDefinition.fromString(new String(bytes, StandardCharsets.UTF_8));
if (worker.getId() > declaration.getNumberOfWorkers() - 1) { if (worker.getId() > definition.getNumberOfWorkers() - 1) {
LOGGER.warn("Worker with id {} was to slow and is therefore in idle state", LOGGER.warn("Worker with id {} was to slow and is therefore in idle state",
worker.getId()); worker.getId());
WorkloadDistributor.this.workerAction.accept(new WorkloadDefinition(new KeySpace(0), 0), WorkloadDistributor.this.workerAction.accept(new WorkloadDefinition(new KeySpace(0), 0),
worker); // this worker generates no workload worker); // this worker generates no workload
} else { } else {
WorkloadDistributor.this.workerAction.accept(declaration, worker); WorkloadDistributor.this.workerAction.accept(definition, worker);
} }
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment