From 254bfe4b37b0d6eb752c9ff5517c8ac8deb2be86 Mon Sep 17 00:00:00 2001 From: Simon Ehrenstein <simon.ehrenstein@gmail.com> Date: Tue, 19 May 2020 22:02:09 +0200 Subject: [PATCH] Cleanup code --- .settings/org.eclipse.jdt.ui.prefs | 4 +- .../.settings/org.eclipse.jdt.ui.prefs | 4 +- .../src/main/java/test/Main.java | 9 +- .../.settings/org.eclipse.jdt.ui.prefs | 59 ------- .../.settings/org.eclipse.jdt.ui.prefs | 128 +++++++++++++++ .../qa.eclipse.plugin.checkstyle.prefs | 4 + .../.settings/qa.eclipse.plugin.pmd.prefs | 4 + .../main/java/common/IWorkloadGenerator.java | 10 -- .../java/common/KafkaWorkloadGenerator.java | 38 ----- .../common/KafkaWorkloadGeneratorBuilder.java | 74 --------- .../main/java/common/WorkloadDeclaration.java | 36 ----- .../main/java/common/WorkloadGenerator.java | 109 ------------- .../java/common/dimensions/Dimension.java | 8 + .../main/java/common/dimensions/Duration.java | 19 ++- .../main/java/common/dimensions/KeySpace.java | 42 +++-- .../main/java/common/dimensions/Period.java | 20 ++- .../common/dimensions/copy/Dimension.java | 8 + .../java/common/dimensions/copy/Duration.java | 34 ++++ .../java/common/dimensions/copy/KeySpace.java | 56 +++++++ .../java/common/dimensions/copy/Period.java | 34 ++++ .../common/dimensions/copy2/Dimension.java | 8 + .../common/dimensions/copy2/Duration.java | 34 ++++ .../common/dimensions/copy2/KeySpace.java | 56 +++++++ .../java/common/dimensions/copy2/Period.java | 34 ++++ .../java/common/functions/BeforeAction.java | 2 +- .../common/functions/MessageGenerator.java | 7 +- .../main/java/common/functions/Transport.java | 5 +- .../common/functions/copy/BeforeAction.java | 8 + .../functions/copy/MessageGenerator.java | 11 ++ .../java/common/functions/copy/Transport.java | 11 ++ .../common/generators/IWorkloadGenerator.java | 18 +++ .../generators/KafkaWorkloadGenerator.java | 51 ++++++ .../KafkaWorkloadGeneratorBuilder.java | 132 ++++++++++++++++ .../common/generators/WorkloadGenerator.java | 116 ++++++++++++++ .../generators/copy/IWorkloadGenerator.java | 18 +++ .../copy/KafkaWorkloadGenerator.java | 51 ++++++ .../copy/KafkaWorkloadGeneratorBuilder.java | 132 ++++++++++++++++ .../generators/copy/WorkloadGenerator.java | 116 ++++++++++++++ .../java/common/messages/OutputMessage.java | 29 ++-- .../common/messages/copy/OutputMessage.java | 32 ++++ .../common/messages/copy2/OutputMessage.java | 32 ++++ .../main/java/common/{ => misc}/Worker.java | 17 +- .../java/common/misc/WorkloadDefinition.java | 63 ++++++++ .../common/{ => misc}/WorkloadEntity.java | 15 +- .../main/java/common/misc/copy/Worker.java | 24 +++ .../common/misc/copy/WorkloadDefinition.java | 63 ++++++++ .../java/common/misc/copy/WorkloadEntity.java | 19 +++ .../kafka/KafkaRecordSender.java | 9 +- .../kafka/copy/KafkaRecordSender.java | 91 +++++++++++ .../zookeeper/WorkloadDistributor.java | 149 ++++++++++++++++++ .../zookeeper/leader/WorkloadDistributor.java | 122 -------------- 51 files changed, 1672 insertions(+), 503 deletions(-) create mode 100644 workload-generator-common/.settings/org.eclipse.jdt.ui.prefs create mode 100644 workload-generator-common/.settings/qa.eclipse.plugin.checkstyle.prefs create mode 100644 workload-generator-common/.settings/qa.eclipse.plugin.pmd.prefs delete mode 100644 workload-generator-common/src/main/java/common/IWorkloadGenerator.java delete mode 100644 workload-generator-common/src/main/java/common/KafkaWorkloadGenerator.java delete mode 100644 workload-generator-common/src/main/java/common/KafkaWorkloadGeneratorBuilder.java delete mode 100644 workload-generator-common/src/main/java/common/WorkloadDeclaration.java delete mode 100644 workload-generator-common/src/main/java/common/WorkloadGenerator.java create mode 100644 workload-generator-common/src/main/java/common/dimensions/Dimension.java create mode 100644 workload-generator-common/src/main/java/common/dimensions/copy/Dimension.java create mode 100644 workload-generator-common/src/main/java/common/dimensions/copy/Duration.java create mode 100644 workload-generator-common/src/main/java/common/dimensions/copy/KeySpace.java create mode 100644 workload-generator-common/src/main/java/common/dimensions/copy/Period.java create mode 100644 workload-generator-common/src/main/java/common/dimensions/copy2/Dimension.java create mode 100644 workload-generator-common/src/main/java/common/dimensions/copy2/Duration.java create mode 100644 workload-generator-common/src/main/java/common/dimensions/copy2/KeySpace.java create mode 100644 workload-generator-common/src/main/java/common/dimensions/copy2/Period.java create mode 100644 workload-generator-common/src/main/java/common/functions/copy/BeforeAction.java create mode 100644 workload-generator-common/src/main/java/common/functions/copy/MessageGenerator.java create mode 100644 workload-generator-common/src/main/java/common/functions/copy/Transport.java create mode 100644 workload-generator-common/src/main/java/common/generators/IWorkloadGenerator.java create mode 100644 workload-generator-common/src/main/java/common/generators/KafkaWorkloadGenerator.java create mode 100644 workload-generator-common/src/main/java/common/generators/KafkaWorkloadGeneratorBuilder.java create mode 100644 workload-generator-common/src/main/java/common/generators/WorkloadGenerator.java create mode 100644 workload-generator-common/src/main/java/common/generators/copy/IWorkloadGenerator.java create mode 100644 workload-generator-common/src/main/java/common/generators/copy/KafkaWorkloadGenerator.java create mode 100644 workload-generator-common/src/main/java/common/generators/copy/KafkaWorkloadGeneratorBuilder.java create mode 100644 workload-generator-common/src/main/java/common/generators/copy/WorkloadGenerator.java create mode 100644 workload-generator-common/src/main/java/common/messages/copy/OutputMessage.java create mode 100644 workload-generator-common/src/main/java/common/messages/copy2/OutputMessage.java rename workload-generator-common/src/main/java/common/{ => misc}/Worker.java (50%) create mode 100644 workload-generator-common/src/main/java/common/misc/WorkloadDefinition.java rename workload-generator-common/src/main/java/common/{ => misc}/WorkloadEntity.java (54%) create mode 100644 workload-generator-common/src/main/java/common/misc/copy/Worker.java create mode 100644 workload-generator-common/src/main/java/common/misc/copy/WorkloadDefinition.java create mode 100644 workload-generator-common/src/main/java/common/misc/copy/WorkloadEntity.java create mode 100644 workload-generator-common/src/main/java/communication/kafka/copy/KafkaRecordSender.java create mode 100644 workload-generator-common/src/main/java/communication/zookeeper/WorkloadDistributor.java delete mode 100644 workload-generator-common/src/main/java/communication/zookeeper/leader/WorkloadDistributor.java diff --git a/.settings/org.eclipse.jdt.ui.prefs b/.settings/org.eclipse.jdt.ui.prefs index 98b5ca806..0c714be81 100644 --- a/.settings/org.eclipse.jdt.ui.prefs +++ b/.settings/org.eclipse.jdt.ui.prefs @@ -32,7 +32,7 @@ cleanup.qualify_static_member_accesses_with_declaring_class=true cleanup.qualify_static_method_accesses_with_declaring_class=false cleanup.remove_private_constructors=true cleanup.remove_redundant_modifiers=false -cleanup.remove_redundant_semicolons=false +cleanup.remove_redundant_semicolons=true cleanup.remove_redundant_type_arguments=true cleanup.remove_trailing_whitespaces=true cleanup.remove_trailing_whitespaces_all=true @@ -61,7 +61,7 @@ cleanup_settings_version=2 eclipse.preferences.version=1 editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true formatter_profile=_CAU-SE-Style -formatter_settings_version=15 +formatter_settings_version=16 org.eclipse.jdt.ui.ignorelowercasenames=true org.eclipse.jdt.ui.importorder=; org.eclipse.jdt.ui.ondemandthreshold=99 diff --git a/test-workload-generator/.settings/org.eclipse.jdt.ui.prefs b/test-workload-generator/.settings/org.eclipse.jdt.ui.prefs index 4e04e2891..f0bbf1d01 100644 --- a/test-workload-generator/.settings/org.eclipse.jdt.ui.prefs +++ b/test-workload-generator/.settings/org.eclipse.jdt.ui.prefs @@ -32,7 +32,7 @@ cleanup.qualify_static_member_accesses_with_declaring_class=true cleanup.qualify_static_method_accesses_with_declaring_class=false cleanup.remove_private_constructors=true cleanup.remove_redundant_modifiers=false -cleanup.remove_redundant_semicolons=false +cleanup.remove_redundant_semicolons=true cleanup.remove_redundant_type_arguments=true cleanup.remove_trailing_whitespaces=true cleanup.remove_trailing_whitespaces_all=true @@ -61,7 +61,7 @@ cleanup_settings_version=2 eclipse.preferences.version=1 editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true formatter_profile=_CAU-SE-Style -formatter_settings_version=15 +formatter_settings_version=16 org.eclipse.jdt.ui.ignorelowercasenames=true org.eclipse.jdt.ui.importorder=; org.eclipse.jdt.ui.ondemandthreshold=99 diff --git a/test-workload-generator/src/main/java/test/Main.java b/test-workload-generator/src/main/java/test/Main.java index 9172a7380..29ff7ded9 100644 --- a/test-workload-generator/src/main/java/test/Main.java +++ b/test-workload-generator/src/main/java/test/Main.java @@ -15,16 +15,21 @@ public class Main { final KafkaWorkloadGenerator generator = KafkaWorkloadGeneratorBuilder.builder() - .setBeforeHook(() -> { + .setBeforeAction(() -> { System.out.println("Before Hook"); }) .setKeySpace(new KeySpace(5)) .setPeriod(new Period(1000, TimeUnit.MILLISECONDS)) .setDuration(new Duration(60, TimeUnit.SECONDS)) .setGeneratorFunction( - key -> new OutputMessage(key, new ActivePowerRecord(key, 0L, 100d))) + key -> new OutputMessage<>(key, + new ActivePowerRecord(key, 0L, 100d))) .build(); + + // dwhedhwedherbfherf ferufer e u uebvhebzvbjkr fjkebhr erfberf rt gtr grt gtr + // gebuwbfuzerfuzerzgfer fe rf er fe rferhfveurfgerzfgzuerf erf erf ethvrif + generator.start(); } diff --git a/uc1-workload-generator/.settings/org.eclipse.jdt.ui.prefs b/uc1-workload-generator/.settings/org.eclipse.jdt.ui.prefs index 4e04e2891..029959e9d 100644 --- a/uc1-workload-generator/.settings/org.eclipse.jdt.ui.prefs +++ b/uc1-workload-generator/.settings/org.eclipse.jdt.ui.prefs @@ -1,62 +1,3 @@ -cleanup.add_default_serial_version_id=true -cleanup.add_generated_serial_version_id=false -cleanup.add_missing_annotations=true -cleanup.add_missing_deprecated_annotations=true -cleanup.add_missing_methods=false -cleanup.add_missing_nls_tags=false -cleanup.add_missing_override_annotations=true -cleanup.add_missing_override_annotations_interface_methods=true -cleanup.add_serial_version_id=false -cleanup.always_use_blocks=true -cleanup.always_use_parentheses_in_expressions=false -cleanup.always_use_this_for_non_static_field_access=true -cleanup.always_use_this_for_non_static_method_access=true -cleanup.convert_functional_interfaces=false -cleanup.convert_to_enhanced_for_loop=true -cleanup.correct_indentation=true -cleanup.format_source_code=true -cleanup.format_source_code_changes_only=false -cleanup.insert_inferred_type_arguments=false -cleanup.make_local_variable_final=true -cleanup.make_parameters_final=true -cleanup.make_private_fields_final=true -cleanup.make_type_abstract_if_missing_method=false -cleanup.make_variable_declarations_final=true -cleanup.never_use_blocks=false -cleanup.never_use_parentheses_in_expressions=true -cleanup.organize_imports=true -cleanup.qualify_static_field_accesses_with_declaring_class=false -cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true -cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true -cleanup.qualify_static_member_accesses_with_declaring_class=true -cleanup.qualify_static_method_accesses_with_declaring_class=false -cleanup.remove_private_constructors=true -cleanup.remove_redundant_modifiers=false -cleanup.remove_redundant_semicolons=false -cleanup.remove_redundant_type_arguments=true -cleanup.remove_trailing_whitespaces=true -cleanup.remove_trailing_whitespaces_all=true -cleanup.remove_trailing_whitespaces_ignore_empty=false -cleanup.remove_unnecessary_casts=true -cleanup.remove_unnecessary_nls_tags=true -cleanup.remove_unused_imports=true -cleanup.remove_unused_local_variables=false -cleanup.remove_unused_private_fields=true -cleanup.remove_unused_private_members=false -cleanup.remove_unused_private_methods=true -cleanup.remove_unused_private_types=true -cleanup.sort_members=false -cleanup.sort_members_all=false -cleanup.use_anonymous_class_creation=false -cleanup.use_blocks=true -cleanup.use_blocks_only_for_return_and_throw=false -cleanup.use_lambda=true -cleanup.use_parentheses_in_expressions=true -cleanup.use_this_for_non_static_field_access=true -cleanup.use_this_for_non_static_field_access_only_if_necessary=false -cleanup.use_this_for_non_static_method_access=true -cleanup.use_this_for_non_static_method_access_only_if_necessary=false -cleanup_profile=_CAU-SE-Style cleanup_settings_version=2 eclipse.preferences.version=1 editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true diff --git a/workload-generator-common/.settings/org.eclipse.jdt.ui.prefs b/workload-generator-common/.settings/org.eclipse.jdt.ui.prefs new file mode 100644 index 000000000..733a83a71 --- /dev/null +++ b/workload-generator-common/.settings/org.eclipse.jdt.ui.prefs @@ -0,0 +1,128 @@ +cleanup.add_default_serial_version_id=true +cleanup.add_generated_serial_version_id=false +cleanup.add_missing_annotations=true +cleanup.add_missing_deprecated_annotations=true +cleanup.add_missing_methods=false +cleanup.add_missing_nls_tags=false +cleanup.add_missing_override_annotations=true +cleanup.add_missing_override_annotations_interface_methods=true +cleanup.add_serial_version_id=false +cleanup.always_use_blocks=true +cleanup.always_use_parentheses_in_expressions=false +cleanup.always_use_this_for_non_static_field_access=true +cleanup.always_use_this_for_non_static_method_access=true +cleanup.convert_functional_interfaces=false +cleanup.convert_to_enhanced_for_loop=true +cleanup.correct_indentation=true +cleanup.format_source_code=true +cleanup.format_source_code_changes_only=false +cleanup.insert_inferred_type_arguments=false +cleanup.make_local_variable_final=true +cleanup.make_parameters_final=true +cleanup.make_private_fields_final=true +cleanup.make_type_abstract_if_missing_method=false +cleanup.make_variable_declarations_final=true +cleanup.never_use_blocks=false +cleanup.never_use_parentheses_in_expressions=true +cleanup.organize_imports=true +cleanup.qualify_static_field_accesses_with_declaring_class=false +cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true +cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true +cleanup.qualify_static_member_accesses_with_declaring_class=true +cleanup.qualify_static_method_accesses_with_declaring_class=false +cleanup.remove_private_constructors=true +cleanup.remove_redundant_modifiers=false +cleanup.remove_redundant_semicolons=true +cleanup.remove_redundant_type_arguments=true +cleanup.remove_trailing_whitespaces=true +cleanup.remove_trailing_whitespaces_all=true +cleanup.remove_trailing_whitespaces_ignore_empty=false +cleanup.remove_unnecessary_casts=true +cleanup.remove_unnecessary_nls_tags=true +cleanup.remove_unused_imports=true +cleanup.remove_unused_local_variables=false +cleanup.remove_unused_private_fields=true +cleanup.remove_unused_private_members=false +cleanup.remove_unused_private_methods=true +cleanup.remove_unused_private_types=true +cleanup.sort_members=false +cleanup.sort_members_all=false +cleanup.use_anonymous_class_creation=false +cleanup.use_blocks=true +cleanup.use_blocks_only_for_return_and_throw=false +cleanup.use_lambda=true +cleanup.use_parentheses_in_expressions=true +cleanup.use_this_for_non_static_field_access=true +cleanup.use_this_for_non_static_field_access_only_if_necessary=false +cleanup.use_this_for_non_static_method_access=true +cleanup.use_this_for_non_static_method_access_only_if_necessary=false +cleanup_profile=_CAU-SE-Style +cleanup_settings_version=2 +eclipse.preferences.version=1 +editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true +formatter_profile=_CAU-SE-Style +formatter_settings_version=16 +org.eclipse.jdt.ui.ignorelowercasenames=true +org.eclipse.jdt.ui.importorder=java;javax;org;com; +org.eclipse.jdt.ui.ondemandthreshold=99 +org.eclipse.jdt.ui.staticondemandthreshold=99 +org.eclipse.jdt.ui.text.custom_code_templates= +sp_cleanup.add_default_serial_version_id=true +sp_cleanup.add_generated_serial_version_id=false +sp_cleanup.add_missing_annotations=true +sp_cleanup.add_missing_deprecated_annotations=true +sp_cleanup.add_missing_methods=false +sp_cleanup.add_missing_nls_tags=false +sp_cleanup.add_missing_override_annotations=true +sp_cleanup.add_missing_override_annotations_interface_methods=true +sp_cleanup.add_serial_version_id=false +sp_cleanup.always_use_blocks=true +sp_cleanup.always_use_parentheses_in_expressions=false +sp_cleanup.always_use_this_for_non_static_field_access=true +sp_cleanup.always_use_this_for_non_static_method_access=true +sp_cleanup.convert_functional_interfaces=false +sp_cleanup.convert_to_enhanced_for_loop=true +sp_cleanup.correct_indentation=true +sp_cleanup.format_source_code=true +sp_cleanup.format_source_code_changes_only=false +sp_cleanup.insert_inferred_type_arguments=false +sp_cleanup.make_local_variable_final=true +sp_cleanup.make_parameters_final=true +sp_cleanup.make_private_fields_final=true +sp_cleanup.make_type_abstract_if_missing_method=false +sp_cleanup.make_variable_declarations_final=true +sp_cleanup.never_use_blocks=false +sp_cleanup.never_use_parentheses_in_expressions=true +sp_cleanup.on_save_use_additional_actions=true +sp_cleanup.organize_imports=true +sp_cleanup.qualify_static_field_accesses_with_declaring_class=false +sp_cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true +sp_cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true +sp_cleanup.qualify_static_member_accesses_with_declaring_class=true +sp_cleanup.qualify_static_method_accesses_with_declaring_class=false +sp_cleanup.remove_private_constructors=true +sp_cleanup.remove_redundant_modifiers=true +sp_cleanup.remove_redundant_semicolons=true +sp_cleanup.remove_redundant_type_arguments=true +sp_cleanup.remove_trailing_whitespaces=true +sp_cleanup.remove_trailing_whitespaces_all=true +sp_cleanup.remove_trailing_whitespaces_ignore_empty=false +sp_cleanup.remove_unnecessary_casts=true +sp_cleanup.remove_unnecessary_nls_tags=true +sp_cleanup.remove_unused_imports=true +sp_cleanup.remove_unused_local_variables=false +sp_cleanup.remove_unused_private_fields=true +sp_cleanup.remove_unused_private_members=false +sp_cleanup.remove_unused_private_methods=true +sp_cleanup.remove_unused_private_types=true +sp_cleanup.sort_members=false +sp_cleanup.sort_members_all=false +sp_cleanup.use_anonymous_class_creation=false +sp_cleanup.use_blocks=true +sp_cleanup.use_blocks_only_for_return_and_throw=false +sp_cleanup.use_lambda=true +sp_cleanup.use_parentheses_in_expressions=true +sp_cleanup.use_this_for_non_static_field_access=true +sp_cleanup.use_this_for_non_static_field_access_only_if_necessary=false +sp_cleanup.use_this_for_non_static_method_access=true +sp_cleanup.use_this_for_non_static_method_access_only_if_necessary=false diff --git a/workload-generator-common/.settings/qa.eclipse.plugin.checkstyle.prefs b/workload-generator-common/.settings/qa.eclipse.plugin.checkstyle.prefs new file mode 100644 index 000000000..87860c815 --- /dev/null +++ b/workload-generator-common/.settings/qa.eclipse.plugin.checkstyle.prefs @@ -0,0 +1,4 @@ +configFilePath=../config/checkstyle.xml +customModulesJarPaths= +eclipse.preferences.version=1 +enabled=true diff --git a/workload-generator-common/.settings/qa.eclipse.plugin.pmd.prefs b/workload-generator-common/.settings/qa.eclipse.plugin.pmd.prefs new file mode 100644 index 000000000..efbcb8c9e --- /dev/null +++ b/workload-generator-common/.settings/qa.eclipse.plugin.pmd.prefs @@ -0,0 +1,4 @@ +customRulesJars= +eclipse.preferences.version=1 +enabled=true +ruleSetFilePath=../config/pmd.xml diff --git a/workload-generator-common/src/main/java/common/IWorkloadGenerator.java b/workload-generator-common/src/main/java/common/IWorkloadGenerator.java deleted file mode 100644 index b4b94f2d7..000000000 --- a/workload-generator-common/src/main/java/common/IWorkloadGenerator.java +++ /dev/null @@ -1,10 +0,0 @@ -package common; - -public interface IWorkloadGenerator { - - - public void start(); - public void stop(); - - -} diff --git a/workload-generator-common/src/main/java/common/KafkaWorkloadGenerator.java b/workload-generator-common/src/main/java/common/KafkaWorkloadGenerator.java deleted file mode 100644 index f34630813..000000000 --- a/workload-generator-common/src/main/java/common/KafkaWorkloadGenerator.java +++ /dev/null @@ -1,38 +0,0 @@ -package common; - -import common.dimensions.Duration; -import common.dimensions.KeySpace; -import common.dimensions.Period; -import common.functions.BeforeAction; -import common.functions.MessageGenerator; -import communication.kafka.KafkaRecordSender; -import titan.ccp.models.records.ActivePowerRecord; - -public class KafkaWorkloadGenerator extends WorkloadGenerator { - - private final KafkaRecordSender<ActivePowerRecord> recordSender; - - public KafkaWorkloadGenerator( - final KeySpace keySpace, - final Period period, - final Duration duration, - final BeforeAction beforeHook, - final MessageGenerator generatorFunction, - final KafkaRecordSender<ActivePowerRecord> recordSender - ) { - super(keySpace, period, duration, beforeHook, generatorFunction, outputMessage -> { - //recordSender.write(outputMessage.getValue()); removed for dev - System.out.println(outputMessage.getKey()); - }); - this.recordSender = recordSender; - } - - - @Override - public void stop() { - System.out.println("subclass terminated"); - // this.recordSender.terminate(); - - super.stop(); - } -} diff --git a/workload-generator-common/src/main/java/common/KafkaWorkloadGeneratorBuilder.java b/workload-generator-common/src/main/java/common/KafkaWorkloadGeneratorBuilder.java deleted file mode 100644 index fc737df4e..000000000 --- a/workload-generator-common/src/main/java/common/KafkaWorkloadGeneratorBuilder.java +++ /dev/null @@ -1,74 +0,0 @@ -package common; - -import java.util.Objects; -import common.dimensions.Duration; -import common.dimensions.KeySpace; -import common.dimensions.Period; -import common.functions.BeforeAction; -import common.functions.MessageGenerator; -import communication.kafka.KafkaRecordSender; -import titan.ccp.models.records.ActivePowerRecord; - -public class KafkaWorkloadGeneratorBuilder { - - private KeySpace keySpace; - - private Period period; - - private Duration duration; - - private BeforeAction beforeAction; - - private MessageGenerator generatorFunction; - - private KafkaRecordSender<ActivePowerRecord> kafkaRecordSender; - - private KafkaWorkloadGeneratorBuilder() { - - } - - public static KafkaWorkloadGeneratorBuilder builder() { - return new KafkaWorkloadGeneratorBuilder(); - } - - public KafkaWorkloadGeneratorBuilder setBeforeHook(final BeforeAction beforeAction) { - this.beforeAction = beforeAction; - return this; - } - - public KafkaWorkloadGeneratorBuilder setKeySpace(final KeySpace keySpace) { - this.keySpace = keySpace; - return this; - } - - public KafkaWorkloadGeneratorBuilder setPeriod(final Period period) { - this.period = period; - return this; - } - - public KafkaWorkloadGeneratorBuilder setDuration(final Duration duration) { - this.duration = duration; - return this; - } - - public KafkaWorkloadGeneratorBuilder setGeneratorFunction(final MessageGenerator generatorFunction) { - this.generatorFunction = generatorFunction; - return this; - } - - public KafkaWorkloadGeneratorBuilder setKafkaRecordSender(final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender) { - this.kafkaRecordSender = kafkaRecordSender; - return this; - } - - public KafkaWorkloadGenerator build() { - Objects.requireNonNull(this.keySpace, "Please specify the key space."); - Objects.requireNonNull(this.period, "Please specify the period."); - Objects.requireNonNull(this.duration, "Please specify the duration."); - final BeforeAction beforeAction = Objects.requireNonNullElse(this.beforeAction, () -> {}); - Objects.requireNonNull(this.generatorFunction, "Please specify the generator function."); - //Objects.requireNonNull(this.kafkaRecordSender, "Please specify the kafka record sender."); - - return new KafkaWorkloadGenerator(this.keySpace, this.period, this.duration, beforeAction, this.generatorFunction, this.kafkaRecordSender); - } -} diff --git a/workload-generator-common/src/main/java/common/WorkloadDeclaration.java b/workload-generator-common/src/main/java/common/WorkloadDeclaration.java deleted file mode 100644 index 08f3ceea0..000000000 --- a/workload-generator-common/src/main/java/common/WorkloadDeclaration.java +++ /dev/null @@ -1,36 +0,0 @@ -package common; - -import common.dimensions.KeySpace; - -public class WorkloadDeclaration { - private final KeySpace keySpace; - private final int numberOfWorkers; - - public WorkloadDeclaration(final KeySpace keySpace, final int numberOfWorkers) { - - this.keySpace = keySpace; - this.numberOfWorkers = numberOfWorkers; - } - - public KeySpace getKeySpace() { - return this.keySpace; - } - - public int getNumberOfWorkers() { - return this.numberOfWorkers; - } - - /* - * Replace by json format serialization/deserialization - */ - @Override - public String toString() { - return this.getKeySpace().getPrefix() + ";" + this.getKeySpace().getMin() + ";" + this.getKeySpace().getMax() + ";" + this.getNumberOfWorkers(); - } - - public static WorkloadDeclaration fromString(String declarationString) { - final String[] deserialized = declarationString.split(";"); - - return new WorkloadDeclaration(new KeySpace(deserialized[0], Integer.valueOf(deserialized[1]), Integer.valueOf(deserialized[2])), Integer.valueOf(deserialized[3])); - } -} diff --git a/workload-generator-common/src/main/java/common/WorkloadGenerator.java b/workload-generator-common/src/main/java/common/WorkloadGenerator.java deleted file mode 100644 index 74b1bb3ca..000000000 --- a/workload-generator-common/src/main/java/common/WorkloadGenerator.java +++ /dev/null @@ -1,109 +0,0 @@ -package common; - -import java.util.LinkedList; -import java.util.List; -import java.util.Random; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; -import common.dimensions.Duration; -import common.dimensions.KeySpace; -import common.dimensions.Period; -import common.functions.BeforeAction; -import common.functions.MessageGenerator; -import common.functions.Transport; -import common.messages.OutputMessage; -import communication.zookeeper.leader.WorkloadDistributor; - -public abstract class WorkloadGenerator implements IWorkloadGenerator { - - private final KeySpace keySpace; - - private final Period period; - - private final Duration duration; - - private final BeforeAction beforeAction; - - private final BiFunction<WorkloadDeclaration, Worker, List<WorkloadEntity>> workloadSelector; - - private final MessageGenerator generatorFunction; - - private final Transport transport; // connect with previous stage by generating workloadentities from workloadranges / exchange via ZK - - private WorkloadDistributor workloadDistributor; - - private final ScheduledExecutorService executor; - - @Override - public void start() { - this.workloadDistributor.start(); - } - - @Override - public void stop() { - this.workloadDistributor.stop(); - this.executor.shutdown(); - } - - public WorkloadGenerator( - final KeySpace keySpace, - final Period period, - final Duration duration, - final BeforeAction beforeHook, - final MessageGenerator generatorFunction, - final Transport transport - ) { - this.period = period; - this.keySpace = keySpace; - this.duration = duration; - this.beforeAction = beforeHook; - this.generatorFunction = generatorFunction; - this.workloadSelector = (workloadDeclaration, worker) -> { - final List<WorkloadEntity> workloadEntities = new LinkedList<>(); - - // construct workload entities of the subspace, this worker is accountable for - // counting modulo #of workers with offset of the current worker id (worker ids starting at 0) - for (int i = workloadDeclaration.getKeySpace().getMin() + worker.getId(); i <= workloadDeclaration.getKeySpace().getMax(); i+=workloadDeclaration.getNumberOfWorkers()) { - final String id = workloadDeclaration.getKeySpace().getPrefix() + i; - workloadEntities.add(new WorkloadEntity(id, this.generatorFunction)); - } - - return workloadEntities; - }; - this.transport = transport; - - final int threads = 10; // env - this.executor = Executors.newScheduledThreadPool(threads); - final Random random = new Random(); - - final int periodMs = period.getDuration(); - - final BiConsumer<WorkloadDeclaration, Worker> workerAction = (declaration, worker) -> { - - List<WorkloadEntity> entities = this.workloadSelector.apply(declaration, worker); - - System.out.println("Beginning of Experiment..."); - entities.forEach(entity -> { - final OutputMessage message = entity.generateMessage(); - final long initialDelay = random.nextInt(periodMs); - executor.scheduleAtFixedRate(() -> this.transport.consume(message), initialDelay, periodMs, period.getTimeUnit()); - - }); - - try { - System.out.println("Experiment is going to be executed for the specified duration..."); - executor.awaitTermination(duration.getDuration(), duration.getTimeUnit()); - System.out.println("Terminating now..."); - this.stop(); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - }; - - this.workloadDistributor = new WorkloadDistributor(this.keySpace, workerAction); - } -} diff --git a/workload-generator-common/src/main/java/common/dimensions/Dimension.java b/workload-generator-common/src/main/java/common/dimensions/Dimension.java new file mode 100644 index 000000000..9869a5a44 --- /dev/null +++ b/workload-generator-common/src/main/java/common/dimensions/Dimension.java @@ -0,0 +1,8 @@ +package common.dimensions; + +/* + * Base class for workload dimensions. + */ +public abstract class Dimension { + +} diff --git a/workload-generator-common/src/main/java/common/dimensions/Duration.java b/workload-generator-common/src/main/java/common/dimensions/Duration.java index ac4474d4f..519293c27 100644 --- a/workload-generator-common/src/main/java/common/dimensions/Duration.java +++ b/workload-generator-common/src/main/java/common/dimensions/Duration.java @@ -1,11 +1,22 @@ package common.dimensions; import java.util.concurrent.TimeUnit; +import common.generators.WorkloadGenerator; + +/** + * Wrapper class for the definition of the duration for the {@link WorkloadGenerator}. + */ +public class Duration extends Dimension { -public class Duration { private final int duration; private final TimeUnit timeUnit; - + + /** + * Define a new duration. + * + * @param duration the duration + * @param timeUnit the time unit that applies to the specified {@code duration} + */ public Duration(final int duration, final TimeUnit timeUnit) { super(); this.duration = duration; @@ -13,11 +24,11 @@ public class Duration { } public int getDuration() { - return duration; + return this.duration; } public TimeUnit getTimeUnit() { - return timeUnit; + return this.timeUnit; } } diff --git a/workload-generator-common/src/main/java/common/dimensions/KeySpace.java b/workload-generator-common/src/main/java/common/dimensions/KeySpace.java index 963dacd90..c91e54223 100644 --- a/workload-generator-common/src/main/java/common/dimensions/KeySpace.java +++ b/workload-generator-common/src/main/java/common/dimensions/KeySpace.java @@ -1,38 +1,56 @@ package common.dimensions; -public class KeySpace { - +import common.generators.WorkloadGenerator; + +/** + * Wrapper class for the definition of the Keys that should be used by the + * {@link WorkloadGenerator}. + */ +public class KeySpace extends Dimension { + private final String prefix; private final int min; private final int max; - - + + + /** + * Create a new key space. All keys will have the prefix {@code prefix}. The remaining part of + * each key will be determined by a number of the interval ({@code min}, {@code max}-1). + * + * @param prefix the prefix to use for all keys + * @param min the lower bound (inclusive) to start counting from + * @param max the upper bound (exclusive) to count to + */ public KeySpace(final String prefix, final int min, final int max) { - super(); + if (prefix == null || prefix.contains(";")) { + throw new IllegalArgumentException( + "The prefix must not be null and must not contain the ';' character."); + } this.prefix = prefix; this.min = min; this.max = max; + } - + public KeySpace(final String prefix, final int numberOfKeys) { - this(prefix, 0, numberOfKeys-1); + this(prefix, 0, numberOfKeys - 1); } - + public KeySpace(final int numberOfKeys) { - this("sensor_", 0, numberOfKeys-1); + this("sensor_", 0, numberOfKeys - 1); } public String getPrefix() { - return prefix; + return this.prefix; } public int getMin() { - return min; + return this.min; } public int getMax() { - return max; + return this.max; } } diff --git a/workload-generator-common/src/main/java/common/dimensions/Period.java b/workload-generator-common/src/main/java/common/dimensions/Period.java index e50f375ea..a7d4087c0 100644 --- a/workload-generator-common/src/main/java/common/dimensions/Period.java +++ b/workload-generator-common/src/main/java/common/dimensions/Period.java @@ -1,12 +1,22 @@ package common.dimensions; import java.util.concurrent.TimeUnit; +import common.generators.WorkloadGenerator; + +/** + * Wrapper class for the definition of period to use for the {@link WorkloadGenerator}. + */ +public class Period extends Dimension { -public class Period { - private final int period; private final TimeUnit timeUnit; - + + /** + * Define a new period. + * + * @param period the period + * @param timeUnit the time unit that applies to the specified {@code period} + */ public Period(final int period, final TimeUnit timeUnit) { super(); this.period = period; @@ -14,11 +24,11 @@ public class Period { } public int getDuration() { - return period; + return this.period; } public TimeUnit getTimeUnit() { - return timeUnit; + return this.timeUnit; } } diff --git a/workload-generator-common/src/main/java/common/dimensions/copy/Dimension.java b/workload-generator-common/src/main/java/common/dimensions/copy/Dimension.java new file mode 100644 index 000000000..9ed031df7 --- /dev/null +++ b/workload-generator-common/src/main/java/common/dimensions/copy/Dimension.java @@ -0,0 +1,8 @@ +package common.dimensions.copy; + +/* + * Base class for workload dimensions. + */ +public abstract class Dimension { + +} diff --git a/workload-generator-common/src/main/java/common/dimensions/copy/Duration.java b/workload-generator-common/src/main/java/common/dimensions/copy/Duration.java new file mode 100644 index 000000000..8105347f3 --- /dev/null +++ b/workload-generator-common/src/main/java/common/dimensions/copy/Duration.java @@ -0,0 +1,34 @@ +package common.dimensions.copy; + +import java.util.concurrent.TimeUnit; +import common.generators.WorkloadGenerator; + +/** + * Wrapper class for the definition of the duration for the {@link WorkloadGenerator}. + */ +public class Duration extends Dimension { + + private final int duration; + private final TimeUnit timeUnit; + + /** + * Define a new duration. + * + * @param duration the duration + * @param timeUnit the time unit that applies to the specified {@code duration} + */ + public Duration(final int duration, final TimeUnit timeUnit) { + super(); + this.duration = duration; + this.timeUnit = timeUnit; + } + + public int getDuration() { + return this.duration; + } + + public TimeUnit getTimeUnit() { + return this.timeUnit; + } + +} diff --git a/workload-generator-common/src/main/java/common/dimensions/copy/KeySpace.java b/workload-generator-common/src/main/java/common/dimensions/copy/KeySpace.java new file mode 100644 index 000000000..5c2c1776d --- /dev/null +++ b/workload-generator-common/src/main/java/common/dimensions/copy/KeySpace.java @@ -0,0 +1,56 @@ +package common.dimensions.copy; + +import common.generators.WorkloadGenerator; + +/** + * Wrapper class for the definition of the Keys that should be used by the + * {@link WorkloadGenerator}. + */ +public class KeySpace extends Dimension { + + private final String prefix; + private final int min; + private final int max; + + + /** + * Create a new key space. All keys will have the prefix {@code prefix}. The remaining part of + * each key will be determined by a number of the interval ({@code min}, {@code max}-1). + * + * @param prefix the prefix to use for all keys + * @param min the lower bound (inclusive) to start counting from + * @param max the upper bound (exclusive) to count to + */ + public KeySpace(final String prefix, final int min, final int max) { + if (prefix == null || prefix.contains(";")) { + throw new IllegalArgumentException( + "The prefix must not be null and must not contain the ';' character."); + } + this.prefix = prefix; + this.min = min; + this.max = max; + + } + + public KeySpace(final String prefix, final int numberOfKeys) { + this(prefix, 0, numberOfKeys - 1); + } + + public KeySpace(final int numberOfKeys) { + this("sensor_", 0, numberOfKeys - 1); + } + + public String getPrefix() { + return this.prefix; + } + + + public int getMin() { + return this.min; + } + + + public int getMax() { + return this.max; + } +} diff --git a/workload-generator-common/src/main/java/common/dimensions/copy/Period.java b/workload-generator-common/src/main/java/common/dimensions/copy/Period.java new file mode 100644 index 000000000..6c44543f3 --- /dev/null +++ b/workload-generator-common/src/main/java/common/dimensions/copy/Period.java @@ -0,0 +1,34 @@ +package common.dimensions.copy; + +import java.util.concurrent.TimeUnit; +import common.generators.WorkloadGenerator; + +/** + * Wrapper class for the definition of period to use for the {@link WorkloadGenerator}. + */ +public class Period extends Dimension { + + private final int period; + private final TimeUnit timeUnit; + + /** + * Define a new period. + * + * @param period the period + * @param timeUnit the time unit that applies to the specified {@code period} + */ + public Period(final int period, final TimeUnit timeUnit) { + super(); + this.period = period; + this.timeUnit = timeUnit; + } + + public int getDuration() { + return this.period; + } + + public TimeUnit getTimeUnit() { + return this.timeUnit; + } + +} diff --git a/workload-generator-common/src/main/java/common/dimensions/copy2/Dimension.java b/workload-generator-common/src/main/java/common/dimensions/copy2/Dimension.java new file mode 100644 index 000000000..84b94088f --- /dev/null +++ b/workload-generator-common/src/main/java/common/dimensions/copy2/Dimension.java @@ -0,0 +1,8 @@ +package common.dimensions.copy2; + +/* + * Base class for workload dimensions. + */ +public abstract class Dimension { + +} diff --git a/workload-generator-common/src/main/java/common/dimensions/copy2/Duration.java b/workload-generator-common/src/main/java/common/dimensions/copy2/Duration.java new file mode 100644 index 000000000..fb31929cf --- /dev/null +++ b/workload-generator-common/src/main/java/common/dimensions/copy2/Duration.java @@ -0,0 +1,34 @@ +package common.dimensions.copy2; + +import java.util.concurrent.TimeUnit; +import common.generators.WorkloadGenerator; + +/** + * Wrapper class for the definition of the duration for the {@link WorkloadGenerator}. + */ +public class Duration extends Dimension { + + private final int duration; + private final TimeUnit timeUnit; + + /** + * Define a new duration. + * + * @param duration the duration + * @param timeUnit the time unit that applies to the specified {@code duration} + */ + public Duration(final int duration, final TimeUnit timeUnit) { + super(); + this.duration = duration; + this.timeUnit = timeUnit; + } + + public int getDuration() { + return this.duration; + } + + public TimeUnit getTimeUnit() { + return this.timeUnit; + } + +} diff --git a/workload-generator-common/src/main/java/common/dimensions/copy2/KeySpace.java b/workload-generator-common/src/main/java/common/dimensions/copy2/KeySpace.java new file mode 100644 index 000000000..311dffa62 --- /dev/null +++ b/workload-generator-common/src/main/java/common/dimensions/copy2/KeySpace.java @@ -0,0 +1,56 @@ +package common.dimensions.copy2; + +import common.generators.WorkloadGenerator; + +/** + * Wrapper class for the definition of the Keys that should be used by the + * {@link WorkloadGenerator}. + */ +public class KeySpace extends Dimension { + + private final String prefix; + private final int min; + private final int max; + + + /** + * Create a new key space. All keys will have the prefix {@code prefix}. The remaining part of + * each key will be determined by a number of the interval ({@code min}, {@code max}-1). + * + * @param prefix the prefix to use for all keys + * @param min the lower bound (inclusive) to start counting from + * @param max the upper bound (exclusive) to count to + */ + public KeySpace(final String prefix, final int min, final int max) { + if (prefix == null || prefix.contains(";")) { + throw new IllegalArgumentException( + "The prefix must not be null and must not contain the ';' character."); + } + this.prefix = prefix; + this.min = min; + this.max = max; + + } + + public KeySpace(final String prefix, final int numberOfKeys) { + this(prefix, 0, numberOfKeys - 1); + } + + public KeySpace(final int numberOfKeys) { + this("sensor_", 0, numberOfKeys - 1); + } + + public String getPrefix() { + return this.prefix; + } + + + public int getMin() { + return this.min; + } + + + public int getMax() { + return this.max; + } +} diff --git a/workload-generator-common/src/main/java/common/dimensions/copy2/Period.java b/workload-generator-common/src/main/java/common/dimensions/copy2/Period.java new file mode 100644 index 000000000..d89a42615 --- /dev/null +++ b/workload-generator-common/src/main/java/common/dimensions/copy2/Period.java @@ -0,0 +1,34 @@ +package common.dimensions.copy2; + +import java.util.concurrent.TimeUnit; +import common.generators.WorkloadGenerator; + +/** + * Wrapper class for the definition of period to use for the {@link WorkloadGenerator}. + */ +public class Period extends Dimension { + + private final int period; + private final TimeUnit timeUnit; + + /** + * Define a new period. + * + * @param period the period + * @param timeUnit the time unit that applies to the specified {@code period} + */ + public Period(final int period, final TimeUnit timeUnit) { + super(); + this.period = period; + this.timeUnit = timeUnit; + } + + public int getDuration() { + return this.period; + } + + public TimeUnit getTimeUnit() { + return this.timeUnit; + } + +} diff --git a/workload-generator-common/src/main/java/common/functions/BeforeAction.java b/workload-generator-common/src/main/java/common/functions/BeforeAction.java index c6a40b91b..a9c953b5f 100644 --- a/workload-generator-common/src/main/java/common/functions/BeforeAction.java +++ b/workload-generator-common/src/main/java/common/functions/BeforeAction.java @@ -3,6 +3,6 @@ package common.functions; @FunctionalInterface public interface BeforeAction { - public void f(); + public void run(); } diff --git a/workload-generator-common/src/main/java/common/functions/MessageGenerator.java b/workload-generator-common/src/main/java/common/functions/MessageGenerator.java index d8432185e..525e8e58a 100644 --- a/workload-generator-common/src/main/java/common/functions/MessageGenerator.java +++ b/workload-generator-common/src/main/java/common/functions/MessageGenerator.java @@ -1,10 +1,11 @@ package common.functions; import common.messages.OutputMessage; +import kieker.common.record.IMonitoringRecord; @FunctionalInterface -public interface MessageGenerator { +public interface MessageGenerator<T extends IMonitoringRecord> { + + OutputMessage<T> generateMessage(final String key); - public OutputMessage generateMessage(final String key); - } diff --git a/workload-generator-common/src/main/java/common/functions/Transport.java b/workload-generator-common/src/main/java/common/functions/Transport.java index fb203a607..e32b055b6 100644 --- a/workload-generator-common/src/main/java/common/functions/Transport.java +++ b/workload-generator-common/src/main/java/common/functions/Transport.java @@ -1,10 +1,11 @@ package common.functions; import common.messages.OutputMessage; +import kieker.common.record.IMonitoringRecord; @FunctionalInterface -public interface Transport { +public interface Transport<T extends IMonitoringRecord> { - public void consume(final OutputMessage message); + public void transport(final OutputMessage<T> message); } diff --git a/workload-generator-common/src/main/java/common/functions/copy/BeforeAction.java b/workload-generator-common/src/main/java/common/functions/copy/BeforeAction.java new file mode 100644 index 000000000..55542255a --- /dev/null +++ b/workload-generator-common/src/main/java/common/functions/copy/BeforeAction.java @@ -0,0 +1,8 @@ +package common.functions.copy; + +@FunctionalInterface +public interface BeforeAction { + + public void run(); + +} diff --git a/workload-generator-common/src/main/java/common/functions/copy/MessageGenerator.java b/workload-generator-common/src/main/java/common/functions/copy/MessageGenerator.java new file mode 100644 index 000000000..fb347210d --- /dev/null +++ b/workload-generator-common/src/main/java/common/functions/copy/MessageGenerator.java @@ -0,0 +1,11 @@ +package common.functions.copy; + +import common.messages.OutputMessage; +import kieker.common.record.IMonitoringRecord; + +@FunctionalInterface +public interface MessageGenerator<T extends IMonitoringRecord> { + + OutputMessage<T> generateMessage(final String key); + +} diff --git a/workload-generator-common/src/main/java/common/functions/copy/Transport.java b/workload-generator-common/src/main/java/common/functions/copy/Transport.java new file mode 100644 index 000000000..6b76a1791 --- /dev/null +++ b/workload-generator-common/src/main/java/common/functions/copy/Transport.java @@ -0,0 +1,11 @@ +package common.functions.copy; + +import common.messages.OutputMessage; +import kieker.common.record.IMonitoringRecord; + +@FunctionalInterface +public interface Transport<T extends IMonitoringRecord> { + + public void transport(final OutputMessage<T> message); + +} diff --git a/workload-generator-common/src/main/java/common/generators/IWorkloadGenerator.java b/workload-generator-common/src/main/java/common/generators/IWorkloadGenerator.java new file mode 100644 index 000000000..dfb166c7a --- /dev/null +++ b/workload-generator-common/src/main/java/common/generators/IWorkloadGenerator.java @@ -0,0 +1,18 @@ +package common.generators; + +/** + * Base methods for workload generators. + */ +public interface IWorkloadGenerator { + + /** + * Start the workload generation. + */ + void start(); + + /** + * Stop the workload generation. + */ + void stop(); + +} diff --git a/workload-generator-common/src/main/java/common/generators/KafkaWorkloadGenerator.java b/workload-generator-common/src/main/java/common/generators/KafkaWorkloadGenerator.java new file mode 100644 index 000000000..42f26db80 --- /dev/null +++ b/workload-generator-common/src/main/java/common/generators/KafkaWorkloadGenerator.java @@ -0,0 +1,51 @@ +package common.generators; + +import common.dimensions.Duration; +import common.dimensions.KeySpace; +import common.dimensions.Period; +import common.functions.BeforeAction; +import common.functions.MessageGenerator; +import communication.kafka.KafkaRecordSender; +import kieker.common.record.IMonitoringRecord; + +/** + * Workload generator for generating load for the kafka messaging system. + */ +public class KafkaWorkloadGenerator<T extends IMonitoringRecord> extends WorkloadGenerator<T> { + + private final KafkaRecordSender<T> recordSender; + + /** + * Create a new workload generator. + * + * @param keySpace the key space to generate the workload for. + * @param period the period how often a message is generated for each key specified in the + * {@code keySpace} + * @param duration the duration how long the workload generator will emit messages. + * @param beforeAction the action which will be performed before the workload generator starts + * generating messages. If {@code null}, no before action will be performed. + * @param generatorFunction the generator function. This function is executed, each time a message + * is generated. + * @param recordSender the record sender which is used to send the generated messages to kafka. + */ + public KafkaWorkloadGenerator( + final KeySpace keySpace, + final Period period, + final Duration duration, + final BeforeAction beforeAction, + final MessageGenerator<T> generatorFunction, + final KafkaRecordSender<T> recordSender) { + super(keySpace, period, duration, beforeAction, generatorFunction, o -> { + System.out.println(o.getKey()); + }); + this.recordSender = recordSender; + } + + + @Override + public void stop() { + // this.recordSender.terminate(); + + super.stop(); + } +} diff --git a/workload-generator-common/src/main/java/common/generators/KafkaWorkloadGeneratorBuilder.java b/workload-generator-common/src/main/java/common/generators/KafkaWorkloadGeneratorBuilder.java new file mode 100644 index 000000000..38132686d --- /dev/null +++ b/workload-generator-common/src/main/java/common/generators/KafkaWorkloadGeneratorBuilder.java @@ -0,0 +1,132 @@ +package common.generators; + +import java.util.Objects; +import common.dimensions.Duration; +import common.dimensions.KeySpace; +import common.dimensions.Period; +import common.functions.BeforeAction; +import common.functions.MessageGenerator; +import communication.kafka.KafkaRecordSender; +import kieker.common.record.IMonitoringRecord; + +public class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> { + + private KeySpace keySpace; + + private Period period; + + private Duration duration; + + private BeforeAction beforeAction; + + private MessageGenerator<T> generatorFunction; + + private KafkaRecordSender<T> kafkaRecordSender; + + private KafkaWorkloadGeneratorBuilder() { + + } + + /** + * Get a builder for the {@link KafkaWorkloadGenerator}. + * + * @return the builder. + */ + public static KafkaWorkloadGeneratorBuilder<IMonitoringRecord> builder() { + return new KafkaWorkloadGeneratorBuilder<>(); + } + + /** + * Set the before action for the {@link KafkaWorkloadGenerator}. + * + * @param beforeAction the {@link BeforeAction}. + * @return the builder. + */ + public KafkaWorkloadGeneratorBuilder<T> setBeforeAction(final BeforeAction beforeAction) { + this.beforeAction = beforeAction; + return this; + } + + /** + * Set the key space for the {@link KafkaWorkloadGenerator}. + * + * @param keySpace the {@link KeySpace}. + * @return the builder. + */ + public KafkaWorkloadGeneratorBuilder<T> setKeySpace(final KeySpace keySpace) { + this.keySpace = keySpace; + return this; + } + + /** + * Set the period for the {@link KafkaWorkloadGenerator}. + * + * @param period the {@link Period} + * @return the builder. + */ + public KafkaWorkloadGeneratorBuilder<T> setPeriod(final Period period) { + this.period = period; + return this; + } + + /** + * Set the durtion for the {@link KafkaWorkloadGenerator}. + * + * @param duration the {@link Duration}. + * @return the builder. + */ + public KafkaWorkloadGeneratorBuilder<T> setDuration(final Duration duration) { + this.duration = duration; + return this; + } + + /** + * Set the generator function for the {@link KafkaWorkloadGenerator}. + * + * @param generatorFunction the generator function. + * @return the builder. + */ + public KafkaWorkloadGeneratorBuilder<T> setGeneratorFunction( + final MessageGenerator<T> generatorFunction) { + this.generatorFunction = generatorFunction; + return this; + } + + /** + * Set the {@link KafkaRecordSender} for the {@link KafkaWorkloadGenerator}. + * + * @param kafkaRecordSender the record sender to use. + * @return the builder. + */ + public KafkaWorkloadGeneratorBuilder<T> setKafkaRecordSender( + final KafkaRecordSender<T> kafkaRecordSender) { + this.kafkaRecordSender = kafkaRecordSender; + return this; + } + + /** + * Build the actual {@link KafkaWorkloadGenerator}. The following parameters are must be + * specicified before this method is called: + * <ul> + * <li>key space</li> + * <li>period</li> + * <li>duration</li> + * <li>generator function</li> + * <li>kafka record sender</li> + * </ul> + * + * @return the built instance of the {@link KafkaWorkloadGenerator}. + */ + public KafkaWorkloadGenerator<T> build() { + Objects.requireNonNull(this.keySpace, "Please specify the key space."); + Objects.requireNonNull(this.period, "Please specify the period."); + Objects.requireNonNull(this.duration, "Please specify the duration."); + final BeforeAction beforeAction = Objects.requireNonNullElse(this.beforeAction, () -> { + }); + Objects.requireNonNull(this.generatorFunction, "Please specify the generator function."); + // Objects.requireNonNull(this.kafkaRecordSender, "Please specify the kafka record sender."); + + return new KafkaWorkloadGenerator<>(this.keySpace, this.period, this.duration, beforeAction, + this.generatorFunction, this.kafkaRecordSender); + } +} diff --git a/workload-generator-common/src/main/java/common/generators/WorkloadGenerator.java b/workload-generator-common/src/main/java/common/generators/WorkloadGenerator.java new file mode 100644 index 000000000..b012c812f --- /dev/null +++ b/workload-generator-common/src/main/java/common/generators/WorkloadGenerator.java @@ -0,0 +1,116 @@ +package common.generators; + +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import common.dimensions.Duration; +import common.dimensions.KeySpace; +import common.dimensions.Period; +import common.functions.BeforeAction; +import common.functions.MessageGenerator; +import common.functions.Transport; +import common.messages.OutputMessage; +import common.misc.Worker; +import common.misc.WorkloadDefinition; +import common.misc.WorkloadEntity; +import communication.zookeeper.WorkloadDistributor; +import kieker.common.record.IMonitoringRecord; + +public abstract class WorkloadGenerator<T extends IMonitoringRecord> implements IWorkloadGenerator { + + private final KeySpace keySpace; + + private final Period period; + + private final Duration duration; + + private final BeforeAction beforeAction; + + private final BiFunction<WorkloadDefinition, Worker, List<WorkloadEntity<T>>> workloadSelector; + + private final MessageGenerator<T> generatorFunction; + + private final Transport<T> transport; + + private WorkloadDistributor workloadDistributor; + + private final ScheduledExecutorService executor; + + /** + * Start the workload generation. The generation terminates automatically after the specified + * {@code duration}.s + */ + @Override + public void start() { + this.workloadDistributor.start(); + } + + @Override + public void stop() { + this.workloadDistributor.stop(); + } + + public WorkloadGenerator( + final KeySpace keySpace, + final Period period, + final Duration duration, + final BeforeAction beforeAction, + final MessageGenerator<T> generatorFunction, + final Transport<T> transport) { + this.period = period; + this.keySpace = keySpace; + this.duration = duration; + this.beforeAction = beforeAction; + this.generatorFunction = generatorFunction; + this.workloadSelector = (workloadDeclaration, worker) -> { + final List<WorkloadEntity<T>> workloadEntities = new LinkedList<>(); + + for (int i = + workloadDeclaration.getKeySpace().getMin() + worker.getId(); i <= workloadDeclaration + .getKeySpace().getMax(); i += workloadDeclaration.getNumberOfWorkers()) { + final String id = workloadDeclaration.getKeySpace().getPrefix() + i; + workloadEntities.add(new WorkloadEntity<>(id, this.generatorFunction)); + } + + return workloadEntities; + }; + this.transport = transport; + + final int threads = 10; // env + this.executor = Executors.newScheduledThreadPool(threads); + final Random random = new Random(); + + final int periodMs = period.getDuration(); + + final BiConsumer<WorkloadDefinition, Worker> workerAction = (declaration, worker) -> { + + final List<WorkloadEntity<T>> entities = this.workloadSelector.apply(declaration, worker); + + System.out.println("Beginning of Experiment..."); + System.out.println("Experiment is going to be executed for the specified duration..."); + entities.forEach(entity -> { + final OutputMessage<T> message = entity.generateMessage(); + final long initialDelay = random.nextInt(periodMs); + this.executor.scheduleAtFixedRate(() -> this.transport.transport(message), initialDelay, + periodMs, period.getTimeUnit()); + + }); + + try { + this.executor.awaitTermination(duration.getDuration(), duration.getTimeUnit()); + System.out.println("Terminating now..."); + this.stop(); + } catch (final InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + }; + + this.workloadDistributor = + new WorkloadDistributor(this.keySpace, this.beforeAction, workerAction); + } +} diff --git a/workload-generator-common/src/main/java/common/generators/copy/IWorkloadGenerator.java b/workload-generator-common/src/main/java/common/generators/copy/IWorkloadGenerator.java new file mode 100644 index 000000000..5d5c9170e --- /dev/null +++ b/workload-generator-common/src/main/java/common/generators/copy/IWorkloadGenerator.java @@ -0,0 +1,18 @@ +package common.generators.copy; + +/** + * Base methods for workload generators. + */ +public interface IWorkloadGenerator { + + /** + * Start the workload generation. + */ + void start(); + + /** + * Stop the workload generation. + */ + void stop(); + +} diff --git a/workload-generator-common/src/main/java/common/generators/copy/KafkaWorkloadGenerator.java b/workload-generator-common/src/main/java/common/generators/copy/KafkaWorkloadGenerator.java new file mode 100644 index 000000000..4e954a080 --- /dev/null +++ b/workload-generator-common/src/main/java/common/generators/copy/KafkaWorkloadGenerator.java @@ -0,0 +1,51 @@ +package common.generators.copy; + +import common.dimensions.Duration; +import common.dimensions.KeySpace; +import common.dimensions.Period; +import common.functions.BeforeAction; +import common.functions.MessageGenerator; +import communication.kafka.KafkaRecordSender; +import kieker.common.record.IMonitoringRecord; + +/** + * Workload generator for generating load for the kafka messaging system. + */ +public class KafkaWorkloadGenerator<T extends IMonitoringRecord> extends WorkloadGenerator<T> { + + private final KafkaRecordSender<T> recordSender; + + /** + * Create a new workload generator. + * + * @param keySpace the key space to generate the workload for. + * @param period the period how often a message is generated for each key specified in the + * {@code keySpace} + * @param duration the duration how long the workload generator will emit messages. + * @param beforeAction the action which will be performed before the workload generator starts + * generating messages. If {@code null}, no before action will be performed. + * @param generatorFunction the generator function. This function is executed, each time a message + * is generated. + * @param recordSender the record sender which is used to send the generated messages to kafka. + */ + public KafkaWorkloadGenerator( + final KeySpace keySpace, + final Period period, + final Duration duration, + final BeforeAction beforeAction, + final MessageGenerator<T> generatorFunction, + final KafkaRecordSender<T> recordSender) { + super(keySpace, period, duration, beforeAction, generatorFunction, o -> { + System.out.println(o.getKey()); + }); + this.recordSender = recordSender; + } + + + @Override + public void stop() { + // this.recordSender.terminate(); + + super.stop(); + } +} diff --git a/workload-generator-common/src/main/java/common/generators/copy/KafkaWorkloadGeneratorBuilder.java b/workload-generator-common/src/main/java/common/generators/copy/KafkaWorkloadGeneratorBuilder.java new file mode 100644 index 000000000..c4fb96f2c --- /dev/null +++ b/workload-generator-common/src/main/java/common/generators/copy/KafkaWorkloadGeneratorBuilder.java @@ -0,0 +1,132 @@ +package common.generators.copy; + +import java.util.Objects; +import common.dimensions.Duration; +import common.dimensions.KeySpace; +import common.dimensions.Period; +import common.functions.BeforeAction; +import common.functions.MessageGenerator; +import communication.kafka.KafkaRecordSender; +import kieker.common.record.IMonitoringRecord; + +public class KafkaWorkloadGeneratorBuilder<T extends IMonitoringRecord> { + + private KeySpace keySpace; + + private Period period; + + private Duration duration; + + private BeforeAction beforeAction; + + private MessageGenerator<T> generatorFunction; + + private KafkaRecordSender<T> kafkaRecordSender; + + private KafkaWorkloadGeneratorBuilder() { + + } + + /** + * Get a builder for the {@link KafkaWorkloadGenerator}. + * + * @return the builder. + */ + public static KafkaWorkloadGeneratorBuilder<IMonitoringRecord> builder() { + return new KafkaWorkloadGeneratorBuilder<>(); + } + + /** + * Set the before action for the {@link KafkaWorkloadGenerator}. + * + * @param beforeAction the {@link BeforeAction}. + * @return the builder. + */ + public KafkaWorkloadGeneratorBuilder<T> setBeforeAction(final BeforeAction beforeAction) { + this.beforeAction = beforeAction; + return this; + } + + /** + * Set the key space for the {@link KafkaWorkloadGenerator}. + * + * @param keySpace the {@link KeySpace}. + * @return the builder. + */ + public KafkaWorkloadGeneratorBuilder<T> setKeySpace(final KeySpace keySpace) { + this.keySpace = keySpace; + return this; + } + + /** + * Set the period for the {@link KafkaWorkloadGenerator}. + * + * @param period the {@link Period} + * @return the builder. + */ + public KafkaWorkloadGeneratorBuilder<T> setPeriod(final Period period) { + this.period = period; + return this; + } + + /** + * Set the durtion for the {@link KafkaWorkloadGenerator}. + * + * @param duration the {@link Duration}. + * @return the builder. + */ + public KafkaWorkloadGeneratorBuilder<T> setDuration(final Duration duration) { + this.duration = duration; + return this; + } + + /** + * Set the generator function for the {@link KafkaWorkloadGenerator}. + * + * @param generatorFunction the generator function. + * @return the builder. + */ + public KafkaWorkloadGeneratorBuilder<T> setGeneratorFunction( + final MessageGenerator<T> generatorFunction) { + this.generatorFunction = generatorFunction; + return this; + } + + /** + * Set the {@link KafkaRecordSender} for the {@link KafkaWorkloadGenerator}. + * + * @param kafkaRecordSender the record sender to use. + * @return the builder. + */ + public KafkaWorkloadGeneratorBuilder<T> setKafkaRecordSender( + final KafkaRecordSender<T> kafkaRecordSender) { + this.kafkaRecordSender = kafkaRecordSender; + return this; + } + + /** + * Build the actual {@link KafkaWorkloadGenerator}. The following parameters are must be + * specicified before this method is called: + * <ul> + * <li>key space</li> + * <li>period</li> + * <li>duration</li> + * <li>generator function</li> + * <li>kafka record sender</li> + * </ul> + * + * @return the built instance of the {@link KafkaWorkloadGenerator}. + */ + public KafkaWorkloadGenerator<T> build() { + Objects.requireNonNull(this.keySpace, "Please specify the key space."); + Objects.requireNonNull(this.period, "Please specify the period."); + Objects.requireNonNull(this.duration, "Please specify the duration."); + final BeforeAction beforeAction = Objects.requireNonNullElse(this.beforeAction, () -> { + }); + Objects.requireNonNull(this.generatorFunction, "Please specify the generator function."); + // Objects.requireNonNull(this.kafkaRecordSender, "Please specify the kafka record sender."); + + return new KafkaWorkloadGenerator<>(this.keySpace, this.period, this.duration, beforeAction, + this.generatorFunction, this.kafkaRecordSender); + } +} diff --git a/workload-generator-common/src/main/java/common/generators/copy/WorkloadGenerator.java b/workload-generator-common/src/main/java/common/generators/copy/WorkloadGenerator.java new file mode 100644 index 000000000..4e56eade9 --- /dev/null +++ b/workload-generator-common/src/main/java/common/generators/copy/WorkloadGenerator.java @@ -0,0 +1,116 @@ +package common.generators.copy; + +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import common.dimensions.Duration; +import common.dimensions.KeySpace; +import common.dimensions.Period; +import common.functions.BeforeAction; +import common.functions.MessageGenerator; +import common.functions.Transport; +import common.messages.OutputMessage; +import common.misc.Worker; +import common.misc.WorkloadDefinition; +import common.misc.WorkloadEntity; +import communication.zookeeper.WorkloadDistributor; +import kieker.common.record.IMonitoringRecord; + +public abstract class WorkloadGenerator<T extends IMonitoringRecord> implements IWorkloadGenerator { + + private final KeySpace keySpace; + + private final Period period; + + private final Duration duration; + + private final BeforeAction beforeAction; + + private final BiFunction<WorkloadDefinition, Worker, List<WorkloadEntity<T>>> workloadSelector; + + private final MessageGenerator<T> generatorFunction; + + private final Transport<T> transport; + + private WorkloadDistributor workloadDistributor; + + private final ScheduledExecutorService executor; + + /** + * Start the workload generation. The generation terminates automatically after the specified + * {@code duration}.s + */ + @Override + public void start() { + this.workloadDistributor.start(); + } + + @Override + public void stop() { + this.workloadDistributor.stop(); + } + + public WorkloadGenerator( + final KeySpace keySpace, + final Period period, + final Duration duration, + final BeforeAction beforeAction, + final MessageGenerator<T> generatorFunction, + final Transport<T> transport) { + this.period = period; + this.keySpace = keySpace; + this.duration = duration; + this.beforeAction = beforeAction; + this.generatorFunction = generatorFunction; + this.workloadSelector = (workloadDeclaration, worker) -> { + final List<WorkloadEntity<T>> workloadEntities = new LinkedList<>(); + + for (int i = + workloadDeclaration.getKeySpace().getMin() + worker.getId(); i <= workloadDeclaration + .getKeySpace().getMax(); i += workloadDeclaration.getNumberOfWorkers()) { + final String id = workloadDeclaration.getKeySpace().getPrefix() + i; + workloadEntities.add(new WorkloadEntity<>(id, this.generatorFunction)); + } + + return workloadEntities; + }; + this.transport = transport; + + final int threads = 10; // env + this.executor = Executors.newScheduledThreadPool(threads); + final Random random = new Random(); + + final int periodMs = period.getDuration(); + + final BiConsumer<WorkloadDefinition, Worker> workerAction = (declaration, worker) -> { + + final List<WorkloadEntity<T>> entities = this.workloadSelector.apply(declaration, worker); + + System.out.println("Beginning of Experiment..."); + System.out.println("Experiment is going to be executed for the specified duration..."); + entities.forEach(entity -> { + final OutputMessage<T> message = entity.generateMessage(); + final long initialDelay = random.nextInt(periodMs); + this.executor.scheduleAtFixedRate(() -> this.transport.transport(message), initialDelay, + periodMs, period.getTimeUnit()); + + }); + + try { + this.executor.awaitTermination(duration.getDuration(), duration.getTimeUnit()); + System.out.println("Terminating now..."); + this.stop(); + } catch (final InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + }; + + this.workloadDistributor = + new WorkloadDistributor(this.keySpace, this.beforeAction, workerAction); + } +} diff --git a/workload-generator-common/src/main/java/common/messages/OutputMessage.java b/workload-generator-common/src/main/java/common/messages/OutputMessage.java index a0d30bdcd..cd61411c0 100644 --- a/workload-generator-common/src/main/java/common/messages/OutputMessage.java +++ b/workload-generator-common/src/main/java/common/messages/OutputMessage.java @@ -1,23 +1,32 @@ package common.messages; -import titan.ccp.models.records.ActivePowerRecord; +import kieker.common.record.IMonitoringRecord; -public class OutputMessage { - private String key; - private ActivePowerRecord value; - - public OutputMessage(String key, ActivePowerRecord value) { +/* + * Wrapper class for messages within the messaging system. + */ +public class OutputMessage<T extends IMonitoringRecord> { + private final String key; + private final T value; + + /*** + * Create a new Message. + * + * @param key the key of the message. + * @param value the value of the message. + */ + public OutputMessage(final String key, final T value) { super(); this.key = key; this.value = value; } public String getKey() { - return key; + return this.key; } - public ActivePowerRecord getValue() { - return value; + public T getValue() { + return this.value; } - + } diff --git a/workload-generator-common/src/main/java/common/messages/copy/OutputMessage.java b/workload-generator-common/src/main/java/common/messages/copy/OutputMessage.java new file mode 100644 index 000000000..174ae8a12 --- /dev/null +++ b/workload-generator-common/src/main/java/common/messages/copy/OutputMessage.java @@ -0,0 +1,32 @@ +package common.messages.copy; + +import kieker.common.record.IMonitoringRecord; + +/* + * Wrapper class for messages within the messaging system. + */ +public class OutputMessage<T extends IMonitoringRecord> { + private final String key; + private final T value; + + /*** + * Create a new Message. + * + * @param key the key of the message. + * @param value the value of the message. + */ + public OutputMessage(final String key, final T value) { + super(); + this.key = key; + this.value = value; + } + + public String getKey() { + return this.key; + } + + public T getValue() { + return this.value; + } + +} diff --git a/workload-generator-common/src/main/java/common/messages/copy2/OutputMessage.java b/workload-generator-common/src/main/java/common/messages/copy2/OutputMessage.java new file mode 100644 index 000000000..87da409ec --- /dev/null +++ b/workload-generator-common/src/main/java/common/messages/copy2/OutputMessage.java @@ -0,0 +1,32 @@ +package common.messages.copy2; + +import kieker.common.record.IMonitoringRecord; + +/* + * Wrapper class for messages within the messaging system. + */ +public class OutputMessage<T extends IMonitoringRecord> { + private final String key; + private final T value; + + /*** + * Create a new Message. + * + * @param key the key of the message. + * @param value the value of the message. + */ + public OutputMessage(final String key, final T value) { + super(); + this.key = key; + this.value = value; + } + + public String getKey() { + return this.key; + } + + public T getValue() { + return this.value; + } + +} diff --git a/workload-generator-common/src/main/java/common/Worker.java b/workload-generator-common/src/main/java/common/misc/Worker.java similarity index 50% rename from workload-generator-common/src/main/java/common/Worker.java rename to workload-generator-common/src/main/java/common/misc/Worker.java index cd03219ea..5452b156f 100644 --- a/workload-generator-common/src/main/java/common/Worker.java +++ b/workload-generator-common/src/main/java/common/misc/Worker.java @@ -1,15 +1,24 @@ -package common; +package common.misc; +/* + * Wrapper class for a worker. + */ public class Worker { + private final int id; - + + /** + * Create a new worker with an {@code id} + * + * @param id the id of the worker. + */ public Worker(final int id) { super(); this.id = id; } - + public int getId() { return this.id; } - + } diff --git a/workload-generator-common/src/main/java/common/misc/WorkloadDefinition.java b/workload-generator-common/src/main/java/common/misc/WorkloadDefinition.java new file mode 100644 index 000000000..7a1c3cd73 --- /dev/null +++ b/workload-generator-common/src/main/java/common/misc/WorkloadDefinition.java @@ -0,0 +1,63 @@ +package common.misc; + +import common.dimensions.KeySpace; + +/* + * The central class that contains all information that needs to be exchanged between the nodes for + * distributed workload generation. + */ +public class WorkloadDefinition { + private final KeySpace keySpace; + private final int numberOfWorkers; + + /** + * Create a new workload definition. + * + * @param keySpace the key space to use. + * @param numberOfWorkers the number of workers participating in the workload generation. + */ + public WorkloadDefinition(final KeySpace keySpace, final int numberOfWorkers) { + + this.keySpace = keySpace; + this.numberOfWorkers = numberOfWorkers; + } + + public KeySpace getKeySpace() { + return this.keySpace; + } + + public int getNumberOfWorkers() { + return this.numberOfWorkers; + } + + /** + * Simple method for encoding all information of the workload definition into one string. + * + * @return a string that encodes all information of the workload generation in a compact format. + * The format is 'keySpace;keySpace.min;keySpace.max;numberOfWorkers'. + */ + @Override + public String toString() { + return this.getKeySpace().getPrefix() + ";" + this.getKeySpace().getMin() + ";" + + this.getKeySpace().getMax() + ";" + this.getNumberOfWorkers(); + } + + /** + * Parse a workload generation from a previously encoded string with the format returned by + * {@link WorkloadDefinition#toString()}. + * + * @param workloadDefinitionString the workload definition string. + * @return the parsed workload definition. + */ + public static WorkloadDefinition fromString(final String workloadDefinitionString) { + final String[] deserialized = workloadDefinitionString.split(";"); + + if (deserialized.length != 4) { + throw new IllegalArgumentException( + "Wrong workload definition string when trying to parse the workload generation."); + } + + return new WorkloadDefinition(new KeySpace(deserialized[0], Integer.valueOf(deserialized[1]), + Integer.valueOf(deserialized[2])), Integer.valueOf(deserialized[3])); + } +} diff --git a/workload-generator-common/src/main/java/common/WorkloadEntity.java b/workload-generator-common/src/main/java/common/misc/WorkloadEntity.java similarity index 54% rename from workload-generator-common/src/main/java/common/WorkloadEntity.java rename to workload-generator-common/src/main/java/common/misc/WorkloadEntity.java index 0ab11f3c3..ef092c698 100644 --- a/workload-generator-common/src/main/java/common/WorkloadEntity.java +++ b/workload-generator-common/src/main/java/common/misc/WorkloadEntity.java @@ -1,18 +1,19 @@ -package common; +package common.misc; import common.functions.MessageGenerator; import common.messages.OutputMessage; +import kieker.common.record.IMonitoringRecord; -public class WorkloadEntity { +public class WorkloadEntity<T extends IMonitoringRecord> { private final String key; - private final MessageGenerator generator; - - public WorkloadEntity(final String key, final MessageGenerator generator) { + private final MessageGenerator<T> generator; + + public WorkloadEntity(final String key, final MessageGenerator<T> generator) { this.key = key; this.generator = generator; } - - public OutputMessage generateMessage() { + + public OutputMessage<T> generateMessage() { return this.generator.generateMessage(this.key); } } diff --git a/workload-generator-common/src/main/java/common/misc/copy/Worker.java b/workload-generator-common/src/main/java/common/misc/copy/Worker.java new file mode 100644 index 000000000..8077c0797 --- /dev/null +++ b/workload-generator-common/src/main/java/common/misc/copy/Worker.java @@ -0,0 +1,24 @@ +package common.misc.copy; + +/* + * Wrapper class for a worker. + */ +public class Worker { + + private final int id; + + /** + * Create a new worker with an {@code id} + * + * @param id the id of the worker. + */ + public Worker(final int id) { + super(); + this.id = id; + } + + public int getId() { + return this.id; + } + +} diff --git a/workload-generator-common/src/main/java/common/misc/copy/WorkloadDefinition.java b/workload-generator-common/src/main/java/common/misc/copy/WorkloadDefinition.java new file mode 100644 index 000000000..672812a2f --- /dev/null +++ b/workload-generator-common/src/main/java/common/misc/copy/WorkloadDefinition.java @@ -0,0 +1,63 @@ +package common.misc.copy; + +import common.dimensions.KeySpace; + +/* + * The central class that contains all information that needs to be exchanged between the nodes for + * distributed workload generation. + */ +public class WorkloadDefinition { + private final KeySpace keySpace; + private final int numberOfWorkers; + + /** + * Create a new workload definition. + * + * @param keySpace the key space to use. + * @param numberOfWorkers the number of workers participating in the workload generation. + */ + public WorkloadDefinition(final KeySpace keySpace, final int numberOfWorkers) { + + this.keySpace = keySpace; + this.numberOfWorkers = numberOfWorkers; + } + + public KeySpace getKeySpace() { + return this.keySpace; + } + + public int getNumberOfWorkers() { + return this.numberOfWorkers; + } + + /** + * Simple method for encoding all information of the workload definition into one string. + * + * @return a string that encodes all information of the workload generation in a compact format. + * The format is 'keySpace;keySpace.min;keySpace.max;numberOfWorkers'. + */ + @Override + public String toString() { + return this.getKeySpace().getPrefix() + ";" + this.getKeySpace().getMin() + ";" + + this.getKeySpace().getMax() + ";" + this.getNumberOfWorkers(); + } + + /** + * Parse a workload generation from a previously encoded string with the format returned by + * {@link WorkloadDefinition#toString()}. + * + * @param workloadDefinitionString the workload definition string. + * @return the parsed workload definition. + */ + public static WorkloadDefinition fromString(final String workloadDefinitionString) { + final String[] deserialized = workloadDefinitionString.split(";"); + + if (deserialized.length != 4) { + throw new IllegalArgumentException( + "Wrong workload definition string when trying to parse the workload generation."); + } + + return new WorkloadDefinition(new KeySpace(deserialized[0], Integer.valueOf(deserialized[1]), + Integer.valueOf(deserialized[2])), Integer.valueOf(deserialized[3])); + } +} diff --git a/workload-generator-common/src/main/java/common/misc/copy/WorkloadEntity.java b/workload-generator-common/src/main/java/common/misc/copy/WorkloadEntity.java new file mode 100644 index 000000000..6b44fe45d --- /dev/null +++ b/workload-generator-common/src/main/java/common/misc/copy/WorkloadEntity.java @@ -0,0 +1,19 @@ +package common.misc.copy; + +import common.functions.MessageGenerator; +import common.messages.OutputMessage; +import kieker.common.record.IMonitoringRecord; + +public class WorkloadEntity<T extends IMonitoringRecord> { + private final String key; + private final MessageGenerator<T> generator; + + public WorkloadEntity(final String key, final MessageGenerator<T> generator) { + this.key = key; + this.generator = generator; + } + + public OutputMessage<T> generateMessage() { + return this.generator.generateMessage(this.key); + } +} diff --git a/workload-generator-common/src/main/java/communication/kafka/KafkaRecordSender.java b/workload-generator-common/src/main/java/communication/kafka/KafkaRecordSender.java index e31ed829f..5af3fc0d5 100644 --- a/workload-generator-common/src/main/java/communication/kafka/KafkaRecordSender.java +++ b/workload-generator-common/src/main/java/communication/kafka/KafkaRecordSender.java @@ -9,6 +9,8 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import common.functions.Transport; +import common.messages.OutputMessage; import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; @@ -17,7 +19,7 @@ import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; * * @param <T> {@link IMonitoringRecord} to send */ -public class KafkaRecordSender<T extends IMonitoringRecord> { +public class KafkaRecordSender<T extends IMonitoringRecord> implements Transport<T> { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class); @@ -81,4 +83,9 @@ public class KafkaRecordSender<T extends IMonitoringRecord> { this.producer.close(); } + @Override + public void transport(OutputMessage<T> message) { + this.write(message.getValue()); + } + } diff --git a/workload-generator-common/src/main/java/communication/kafka/copy/KafkaRecordSender.java b/workload-generator-common/src/main/java/communication/kafka/copy/KafkaRecordSender.java new file mode 100644 index 000000000..168e34e0b --- /dev/null +++ b/workload-generator-common/src/main/java/communication/kafka/copy/KafkaRecordSender.java @@ -0,0 +1,91 @@ +package communication.kafka.copy; + +import java.util.Properties; +import java.util.function.Function; +import kieker.common.record.IMonitoringRecord; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import common.functions.Transport; +import common.messages.OutputMessage; +import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; + + +/** + * Sends monitoring records to Kafka. + * + * @param <T> {@link IMonitoringRecord} to send + */ +public class KafkaRecordSender<T extends IMonitoringRecord> implements Transport<T> { + + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class); + + private final String topic; + + private final Function<T, String> keyAccessor; + + private final Function<T, Long> timestampAccessor; + + private final Producer<String, T> producer; + + public KafkaRecordSender(final String bootstrapServers, final String topic) { + this(bootstrapServers, topic, x -> "", x -> null, new Properties()); + } + + public KafkaRecordSender(final String bootstrapServers, final String topic, + final Function<T, String> keyAccessor) { + this(bootstrapServers, topic, keyAccessor, x -> null, new Properties()); + } + + public KafkaRecordSender(final String bootstrapServers, final String topic, + final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor) { + this(bootstrapServers, topic, keyAccessor, timestampAccessor, new Properties()); + } + + /** + * Create a new {@link KafkaRecordSender}. + */ + public KafkaRecordSender(final String bootstrapServers, final String topic, + final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor, + final Properties defaultProperties) { + this.topic = topic; + this.keyAccessor = keyAccessor; + this.timestampAccessor = timestampAccessor; + + final Properties properties = new Properties(); + properties.putAll(defaultProperties); + properties.put("bootstrap.servers", bootstrapServers); + // properties.put("acks", this.acknowledges); + // properties.put("batch.size", this.batchSize); + // properties.put("linger.ms", this.lingerMs); + // properties.put("buffer.memory", this.bufferMemory); + + this.producer = new KafkaProducer<>(properties, new StringSerializer(), + IMonitoringRecordSerde.serializer()); + } + + /** + * Write the passed monitoring record to Kafka. + */ + public void write(final T monitoringRecord) { + final ProducerRecord<String, T> record = + new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord), + this.keyAccessor.apply(monitoringRecord), monitoringRecord); + + LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record); + this.producer.send(record); + } + + public void terminate() { + this.producer.close(); + } + + @Override + public void transport(OutputMessage<T> message) { + this.write(message.getValue()); + } + +} diff --git a/workload-generator-common/src/main/java/communication/zookeeper/WorkloadDistributor.java b/workload-generator-common/src/main/java/communication/zookeeper/WorkloadDistributor.java new file mode 100644 index 000000000..8cbba005e --- /dev/null +++ b/workload-generator-common/src/main/java/communication/zookeeper/WorkloadDistributor.java @@ -0,0 +1,149 @@ +package communication.zookeeper; + +import java.nio.charset.StandardCharsets; +import java.util.function.BiConsumer; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.framework.recipes.atomic.AtomicValue; +import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger; +import org.apache.curator.retry.RetryNTimes; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher.Event.EventType; +import common.dimensions.KeySpace; +import common.functions.BeforeAction; +import common.misc.Worker; +import common.misc.WorkloadDefinition; + +/* + * The central class responsible for distributing the workload through all workload generators. + */ +public class WorkloadDistributor { + + private static final String COUNTER_PATH = "/counter"; + private static final String WORKLOAD_PATH = "/workload"; + private static final String WORKLOAD_DEFINITION_PATH = "/workload/definition"; + + private final DistributedAtomicInteger counter; + + private final KeySpace keySpace; + private final BeforeAction beforeAction; + private final BiConsumer<WorkloadDefinition, Worker> workerAction; + + private final CuratorFramework client = + CuratorFrameworkFactory.newClient("127.0.0.1:2181", new RetryNTimes(3, 1000)); + + /** + * Create a new workload distributor. + * + * @param keySpace the keyspace for the workload generation. + * @param beforeAction the before action for the workload generation. + * @param workerAction the action to perform by the workers. + */ + public WorkloadDistributor(final KeySpace keySpace, final BeforeAction beforeAction, + final BiConsumer<WorkloadDefinition, Worker> workerAction) { + + this.keySpace = keySpace; + this.beforeAction = beforeAction; + this.workerAction = workerAction; + + this.client.start(); + + try { + this.client.blockUntilConnected(); + } catch (final InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + this.counter = + new DistributedAtomicInteger(this.client, COUNTER_PATH, new RetryNTimes(3, 1000)); + } + + /** + * Start the workload distribution. + */ + public void start() { + try { + AtomicValue<Integer> result = this.counter.increment(); + while (!result.succeeded()) { + result = this.counter.increment(); + } + + final Worker worker = new Worker(result.preValue()); + + final CuratorWatcher watcher = this.buildWatcher(worker); + + this.client.checkExists().creatingParentsIfNeeded().forPath(WORKLOAD_DEFINITION_PATH); + + if (worker.getId() == 0) { + System.out.println("is master with id " + worker.getId()); + + this.beforeAction.run(); + + // register worker action, as master acts also as worker + this.client.getChildren().usingWatcher(watcher).forPath(WORKLOAD_PATH); + + Thread.sleep(10000); // wait for all workers to participate in the leader election + + final int numberOfWorkers = this.counter.get().postValue(); + + System.out.printf("Number of Workers: %d\n", numberOfWorkers); + + final WorkloadDefinition declaration = + new WorkloadDefinition(this.keySpace, numberOfWorkers); + + this.client.create().withMode(CreateMode.EPHEMERAL).forPath(WORKLOAD_DEFINITION_PATH, + declaration.toString().getBytes(StandardCharsets.UTF_8)); + + } else { + System.out.println("is worker with id " + worker.getId()); + + this.client.getChildren().usingWatcher(watcher).forPath(WORKLOAD_PATH); + } + + Thread.sleep(20000); // wait until the workload declaration is retrieved + } catch (final Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + /** + * Build a curator watcher which performs the worker action. + * + * @param worker the worker to create the watcher for. + * @return the curator watcher. + */ + private CuratorWatcher buildWatcher(final Worker worker) { + return new CuratorWatcher() { + + @Override + public void process(final WatchedEvent event) throws Exception { + if (event.getType() == EventType.NodeChildrenChanged) { + final byte[] bytes = + WorkloadDistributor.this.client.getData().forPath(WORKLOAD_DEFINITION_PATH); + final WorkloadDefinition declaration = + WorkloadDefinition.fromString(new String(bytes, StandardCharsets.UTF_8)); + + if (worker.getId() > declaration.getNumberOfWorkers() - 1) { + throw new IllegalStateException("Worker with id " + worker.getId() + + " was too slow and is therefore not participating in the workload generation."); + } else { + WorkloadDistributor.this.workerAction.accept(declaration, worker); + } + } + } + }; + } + + /** + * Stop the workload distributor. + */ + public void stop() { + this.client.close(); + + } + +} diff --git a/workload-generator-common/src/main/java/communication/zookeeper/leader/WorkloadDistributor.java b/workload-generator-common/src/main/java/communication/zookeeper/leader/WorkloadDistributor.java deleted file mode 100644 index 40060d865..000000000 --- a/workload-generator-common/src/main/java/communication/zookeeper/leader/WorkloadDistributor.java +++ /dev/null @@ -1,122 +0,0 @@ -package communication.zookeeper.leader; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.BiConsumer; -import java.util.function.Consumer; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.framework.api.CuratorWatcher; -import org.apache.curator.framework.recipes.atomic.AtomicValue; -import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger; -import org.apache.curator.framework.recipes.cache.NodeCache; -import org.apache.curator.framework.recipes.cache.NodeCacheListener; -import org.apache.curator.framework.recipes.nodes.PersistentNode; -import org.apache.curator.retry.RetryNTimes; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher.Event.EventType; -import common.Worker; -import common.WorkloadDeclaration; -import common.dimensions.KeySpace; - -public class WorkloadDistributor { - - - private static final String COUNTER_PATH = "/counter"; - private static final String WORKLOAD_PATH = "/workload"; - private static final String WORKLOAD_DEFINITION_PATH = "/workload/definition"; - - private final DistributedAtomicInteger counter; - - private final KeySpace keySpace; - private final BiConsumer<WorkloadDeclaration, Worker> workerAction; - - private final CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new RetryNTimes(3, 1000)); - - - - public WorkloadDistributor(final KeySpace keySpace, final BiConsumer<WorkloadDeclaration, Worker> workerAction) { - - this.keySpace = keySpace; - this.workerAction = workerAction; - - client.start(); - - try { - client.blockUntilConnected(); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - - counter = new DistributedAtomicInteger(client, COUNTER_PATH, new RetryNTimes(3, 1000)); - } - - public void start() { - try { - AtomicValue<Integer> result = counter.increment(); - final int id = result.preValue(); - if (result.succeeded()) { - - CuratorWatcher watcher = this.buildWatcher(id); - - client.checkExists().creatingParentsIfNeeded().forPath(WORKLOAD_DEFINITION_PATH); - - if (id == 0) { - System.out.println("is master"); - - // register worker action, as master acts also as worker - client.getChildren().usingWatcher(watcher).forPath(WORKLOAD_PATH); - - - Thread.sleep(10000); // wait for all workers to participate in the leader election - - int numberOfWorkers = this.counter.get().postValue(); - - System.out.printf("Number of Workers: %d\n", numberOfWorkers); - - final WorkloadDeclaration declaration = new WorkloadDeclaration(this.keySpace, numberOfWorkers); - - client.create().forPath(WORKLOAD_DEFINITION_PATH, declaration.toString().getBytes(StandardCharsets.UTF_8)); - - } else { - System.out.println("is worker"); - - client.getChildren().usingWatcher(watcher).forPath(WORKLOAD_PATH); - } - - Thread.sleep(20000); // wait until the workload declaration is retrieved - - } - } catch (Exception e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - - private CuratorWatcher buildWatcher(int id) { - return new CuratorWatcher() { - - @Override - public void process(WatchedEvent event) throws Exception { - if(event.getType() == EventType.NodeChildrenChanged) { - byte[] bytes = client.getData().forPath(WORKLOAD_DEFINITION_PATH); - final WorkloadDeclaration declaration = WorkloadDeclaration.fromString(new String(bytes, StandardCharsets.UTF_8)); - - if (id > declaration.getNumberOfWorkers() - 1) { - throw new IllegalStateException("Worker with id " + id + " was too slow and is therefore not participating in the workload generation."); - } else { - workerAction.accept(declaration, new Worker(id)); - } - } - } - }; - } - - public void stop() { - this.client.close(); - } - -} -- GitLab