diff --git a/build.gradle b/build.gradle index c73a8582d8b08b21bb5667ebcc047933c890855b..957316bff2ad6b7e5ca6432ef43d44aa3e6e7d01 100644 --- a/build.gradle +++ b/build.gradle @@ -37,7 +37,7 @@ dependencies { api('org.industrial-devops:titan-ccp-common:0.0.3-SNAPSHOT') { changing = true } api 'net.kieker-monitoring:kieker:1.14-SNAPSHOT' api 'net.sourceforge.teetime:teetime:3.0' - + // These dependencies are used internally, and not exposed to consumers on their own compile classpath. implementation 'org.apache.kafka:kafka-clients:2.1.0' implementation 'com.google.guava:guava:24.1-jre' diff --git a/settings.gradle b/settings.gradle index e3c73f5ef74c328e4aef187a8becf829a8ae5316..9796a4950011a9832e1da486fbb7363c403cd6eb 100644 --- a/settings.gradle +++ b/settings.gradle @@ -11,3 +11,6 @@ include 'uc3-application' include 'uc4-workload-generator' include 'uc4-application' + +include 'workload-generator-common' +include 'test-workload-generator' diff --git a/test-workload-generator/.directory b/test-workload-generator/.directory new file mode 100644 index 0000000000000000000000000000000000000000..60c20eff55ff9e0062a815272153893728d74da3 --- /dev/null +++ b/test-workload-generator/.directory @@ -0,0 +1,6 @@ +[Dolphin] +Timestamp=2020,5,12,13,3,57 +Version=4 + +[Settings] +HiddenFilesShown=true diff --git a/test-workload-generator/.settings/org.eclipse.jdt.ui.prefs b/test-workload-generator/.settings/org.eclipse.jdt.ui.prefs new file mode 100644 index 0000000000000000000000000000000000000000..4e04e2891754324a6e1bf55348b6a38f592bb301 --- /dev/null +++ b/test-workload-generator/.settings/org.eclipse.jdt.ui.prefs @@ -0,0 +1,127 @@ +cleanup.add_default_serial_version_id=true +cleanup.add_generated_serial_version_id=false +cleanup.add_missing_annotations=true +cleanup.add_missing_deprecated_annotations=true +cleanup.add_missing_methods=false +cleanup.add_missing_nls_tags=false +cleanup.add_missing_override_annotations=true +cleanup.add_missing_override_annotations_interface_methods=true +cleanup.add_serial_version_id=false +cleanup.always_use_blocks=true +cleanup.always_use_parentheses_in_expressions=false +cleanup.always_use_this_for_non_static_field_access=true +cleanup.always_use_this_for_non_static_method_access=true +cleanup.convert_functional_interfaces=false +cleanup.convert_to_enhanced_for_loop=true +cleanup.correct_indentation=true +cleanup.format_source_code=true +cleanup.format_source_code_changes_only=false +cleanup.insert_inferred_type_arguments=false +cleanup.make_local_variable_final=true +cleanup.make_parameters_final=true +cleanup.make_private_fields_final=true +cleanup.make_type_abstract_if_missing_method=false +cleanup.make_variable_declarations_final=true +cleanup.never_use_blocks=false +cleanup.never_use_parentheses_in_expressions=true +cleanup.organize_imports=true +cleanup.qualify_static_field_accesses_with_declaring_class=false +cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true +cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true +cleanup.qualify_static_member_accesses_with_declaring_class=true +cleanup.qualify_static_method_accesses_with_declaring_class=false +cleanup.remove_private_constructors=true +cleanup.remove_redundant_modifiers=false +cleanup.remove_redundant_semicolons=false +cleanup.remove_redundant_type_arguments=true +cleanup.remove_trailing_whitespaces=true +cleanup.remove_trailing_whitespaces_all=true +cleanup.remove_trailing_whitespaces_ignore_empty=false +cleanup.remove_unnecessary_casts=true +cleanup.remove_unnecessary_nls_tags=true +cleanup.remove_unused_imports=true +cleanup.remove_unused_local_variables=false +cleanup.remove_unused_private_fields=true +cleanup.remove_unused_private_members=false +cleanup.remove_unused_private_methods=true +cleanup.remove_unused_private_types=true +cleanup.sort_members=false +cleanup.sort_members_all=false +cleanup.use_anonymous_class_creation=false +cleanup.use_blocks=true +cleanup.use_blocks_only_for_return_and_throw=false +cleanup.use_lambda=true +cleanup.use_parentheses_in_expressions=true +cleanup.use_this_for_non_static_field_access=true +cleanup.use_this_for_non_static_field_access_only_if_necessary=false +cleanup.use_this_for_non_static_method_access=true +cleanup.use_this_for_non_static_method_access_only_if_necessary=false +cleanup_profile=_CAU-SE-Style +cleanup_settings_version=2 +eclipse.preferences.version=1 +editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true +formatter_profile=_CAU-SE-Style +formatter_settings_version=15 +org.eclipse.jdt.ui.ignorelowercasenames=true +org.eclipse.jdt.ui.importorder=; +org.eclipse.jdt.ui.ondemandthreshold=99 +org.eclipse.jdt.ui.staticondemandthreshold=99 +sp_cleanup.add_default_serial_version_id=true +sp_cleanup.add_generated_serial_version_id=false +sp_cleanup.add_missing_annotations=true +sp_cleanup.add_missing_deprecated_annotations=true +sp_cleanup.add_missing_methods=false +sp_cleanup.add_missing_nls_tags=false +sp_cleanup.add_missing_override_annotations=true +sp_cleanup.add_missing_override_annotations_interface_methods=true +sp_cleanup.add_serial_version_id=false +sp_cleanup.always_use_blocks=true +sp_cleanup.always_use_parentheses_in_expressions=false +sp_cleanup.always_use_this_for_non_static_field_access=true +sp_cleanup.always_use_this_for_non_static_method_access=true +sp_cleanup.convert_functional_interfaces=false +sp_cleanup.convert_to_enhanced_for_loop=true +sp_cleanup.correct_indentation=true +sp_cleanup.format_source_code=true +sp_cleanup.format_source_code_changes_only=false +sp_cleanup.insert_inferred_type_arguments=false +sp_cleanup.make_local_variable_final=true +sp_cleanup.make_parameters_final=true +sp_cleanup.make_private_fields_final=true +sp_cleanup.make_type_abstract_if_missing_method=false +sp_cleanup.make_variable_declarations_final=true +sp_cleanup.never_use_blocks=false +sp_cleanup.never_use_parentheses_in_expressions=true +sp_cleanup.on_save_use_additional_actions=true +sp_cleanup.organize_imports=true +sp_cleanup.qualify_static_field_accesses_with_declaring_class=false +sp_cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true +sp_cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true +sp_cleanup.qualify_static_member_accesses_with_declaring_class=true +sp_cleanup.qualify_static_method_accesses_with_declaring_class=false +sp_cleanup.remove_private_constructors=true +sp_cleanup.remove_redundant_modifiers=false +sp_cleanup.remove_redundant_semicolons=true +sp_cleanup.remove_redundant_type_arguments=true +sp_cleanup.remove_trailing_whitespaces=true +sp_cleanup.remove_trailing_whitespaces_all=true +sp_cleanup.remove_trailing_whitespaces_ignore_empty=false +sp_cleanup.remove_unnecessary_casts=true +sp_cleanup.remove_unnecessary_nls_tags=true +sp_cleanup.remove_unused_imports=true +sp_cleanup.remove_unused_local_variables=false +sp_cleanup.remove_unused_private_fields=true +sp_cleanup.remove_unused_private_members=false +sp_cleanup.remove_unused_private_methods=true +sp_cleanup.remove_unused_private_types=true +sp_cleanup.sort_members=false +sp_cleanup.sort_members_all=false +sp_cleanup.use_anonymous_class_creation=false +sp_cleanup.use_blocks=true +sp_cleanup.use_blocks_only_for_return_and_throw=false +sp_cleanup.use_lambda=true +sp_cleanup.use_parentheses_in_expressions=true +sp_cleanup.use_this_for_non_static_field_access=true +sp_cleanup.use_this_for_non_static_field_access_only_if_necessary=false +sp_cleanup.use_this_for_non_static_method_access=true +sp_cleanup.use_this_for_non_static_method_access_only_if_necessary=false diff --git a/test-workload-generator/.settings/qa.eclipse.plugin.checkstyle.prefs b/test-workload-generator/.settings/qa.eclipse.plugin.checkstyle.prefs new file mode 100644 index 0000000000000000000000000000000000000000..87860c815222845c1d264d7d0ce498d3397f8280 --- /dev/null +++ b/test-workload-generator/.settings/qa.eclipse.plugin.checkstyle.prefs @@ -0,0 +1,4 @@ +configFilePath=../config/checkstyle.xml +customModulesJarPaths= +eclipse.preferences.version=1 +enabled=true diff --git a/test-workload-generator/.settings/qa.eclipse.plugin.pmd.prefs b/test-workload-generator/.settings/qa.eclipse.plugin.pmd.prefs new file mode 100644 index 0000000000000000000000000000000000000000..efbcb8c9e5d449194a48ca1ea42b7d807b573db9 --- /dev/null +++ b/test-workload-generator/.settings/qa.eclipse.plugin.pmd.prefs @@ -0,0 +1,4 @@ +customRulesJars= +eclipse.preferences.version=1 +enabled=true +ruleSetFilePath=../config/pmd.xml diff --git a/test-workload-generator/build.gradle b/test-workload-generator/build.gradle new file mode 100644 index 0000000000000000000000000000000000000000..1c1986ab6d6ed0efc1f7577394358b19ed0c33e1 --- /dev/null +++ b/test-workload-generator/build.gradle @@ -0,0 +1,31 @@ +apply plugin: 'application' +apply plugin: 'eclipse' + +buildscript { + repositories { + maven { + url "https://plugins.gradle.org/m2/" + } + } +} + +sourceCompatibility = "1.11" +targetCompatibility = "1.11" + +dependencies { + compile project(':') + compile 'org.slf4j:slf4j-simple:1.6.1' + + + // Use JUnit test framework + testCompile 'junit:junit:4.12' +} + +mainClassName = "test.Main" + +eclipse { + classpath { + downloadSources=true + downloadJavadoc=true + } +} diff --git a/test-workload-generator/docker-compose.yml b/test-workload-generator/docker-compose.yml new file mode 100644 index 0000000000000000000000000000000000000000..a1d7533d2f46cb334e63738cb501d9223c309668 --- /dev/null +++ b/test-workload-generator/docker-compose.yml @@ -0,0 +1,7 @@ +version: '3.1' + +services: + zookeeper: + image: zookeeper + ports: + - 2181:2181 diff --git a/test-workload-generator/src/main/java/test/Main.java b/test-workload-generator/src/main/java/test/Main.java new file mode 100644 index 0000000000000000000000000000000000000000..9172a7380469bb69451609c5434643cb8085d9c1 --- /dev/null +++ b/test-workload-generator/src/main/java/test/Main.java @@ -0,0 +1,31 @@ +package test; + +import common.KafkaWorkloadGenerator; +import common.KafkaWorkloadGeneratorBuilder; +import common.dimensions.Duration; +import common.dimensions.KeySpace; +import common.dimensions.Period; +import common.messages.OutputMessage; +import java.util.concurrent.TimeUnit; +import titan.ccp.models.records.ActivePowerRecord; + +public class Main { + + public static void main(final String[] args) { + + final KafkaWorkloadGenerator generator = + KafkaWorkloadGeneratorBuilder.builder() + .setBeforeHook(() -> { + System.out.println("Before Hook"); + }) + .setKeySpace(new KeySpace(5)) + .setPeriod(new Period(1000, TimeUnit.MILLISECONDS)) + .setDuration(new Duration(60, TimeUnit.SECONDS)) + .setGeneratorFunction( + key -> new OutputMessage(key, new ActivePowerRecord(key, 0L, 100d))) + .build(); + + generator.start(); + } + +} diff --git a/workload-generator-common/build.gradle b/workload-generator-common/build.gradle new file mode 100644 index 0000000000000000000000000000000000000000..7e1c2aebe2786c7fe525850613c1ee9105b66826 --- /dev/null +++ b/workload-generator-common/build.gradle @@ -0,0 +1,29 @@ +apply plugin: 'java-library' +apply plugin: 'eclipse' + +buildscript { + repositories { + maven { + url "https://plugins.gradle.org/m2/" + } + } +} + +sourceCompatibility = "1.11" +targetCompatibility = "1.11" + +dependencies { + compile project(':') + compile 'org.apache.curator:curator-recipes:4.3.0' + compile 'org.slf4j:slf4j-simple:1.6.1' + + // Use JUnit test framework + testCompile 'junit:junit:4.12' +} + +eclipse { + classpath { + downloadSources=true + downloadJavadoc=true + } +} \ No newline at end of file diff --git a/workload-generator-common/gradle/wrapper/gradle-wrapper.jar b/workload-generator-common/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 0000000000000000000000000000000000000000..29953ea141f55e3b8fc691d31b5ca8816d89fa87 Binary files /dev/null and b/workload-generator-common/gradle/wrapper/gradle-wrapper.jar differ diff --git a/workload-generator-common/gradle/wrapper/gradle-wrapper.properties b/workload-generator-common/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 0000000000000000000000000000000000000000..e0b3fb8d70b1bbf790f6f8ed1c928ddf09f54628 --- /dev/null +++ b/workload-generator-common/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,5 @@ +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-4.10.2-bin.zip +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists diff --git a/workload-generator-common/gradlew b/workload-generator-common/gradlew new file mode 100755 index 0000000000000000000000000000000000000000..cccdd3d517fc5249beaefa600691cf150f2fa3e6 --- /dev/null +++ b/workload-generator-common/gradlew @@ -0,0 +1,172 @@ +#!/usr/bin/env sh + +############################################################################## +## +## Gradle start up script for UN*X +## +############################################################################## + +# Attempt to set APP_HOME +# Resolve links: $0 may be a link +PRG="$0" +# Need this for relative symlinks. +while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG=`dirname "$PRG"`"/$link" + fi +done +SAVED="`pwd`" +cd "`dirname \"$PRG\"`/" >/dev/null +APP_HOME="`pwd -P`" +cd "$SAVED" >/dev/null + +APP_NAME="Gradle" +APP_BASE_NAME=`basename "$0"` + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS="" + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD="maximum" + +warn () { + echo "$*" +} + +die () { + echo + echo "$*" + echo + exit 1 +} + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "`uname`" in + CYGWIN* ) + cygwin=true + ;; + Darwin* ) + darwin=true + ;; + MINGW* ) + msys=true + ;; + NONSTOP* ) + nonstop=true + ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD="java" + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then + MAX_FD_LIMIT=`ulimit -H -n` + if [ $? -eq 0 ] ; then + if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then + MAX_FD="$MAX_FD_LIMIT" + fi + ulimit -n $MAX_FD + if [ $? -ne 0 ] ; then + warn "Could not set maximum file descriptor limit: $MAX_FD" + fi + else + warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT" + fi +fi + +# For Darwin, add options to specify how the application appears in the dock +if $darwin; then + GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\"" +fi + +# For Cygwin, switch paths to Windows format before running java +if $cygwin ; then + APP_HOME=`cygpath --path --mixed "$APP_HOME"` + CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` + JAVACMD=`cygpath --unix "$JAVACMD"` + + # We build the pattern for arguments to be converted via cygpath + ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null` + SEP="" + for dir in $ROOTDIRSRAW ; do + ROOTDIRS="$ROOTDIRS$SEP$dir" + SEP="|" + done + OURCYGPATTERN="(^($ROOTDIRS))" + # Add a user-defined pattern to the cygpath arguments + if [ "$GRADLE_CYGPATTERN" != "" ] ; then + OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)" + fi + # Now convert the arguments - kludge to limit ourselves to /bin/sh + i=0 + for arg in "$@" ; do + CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -` + CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option + + if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition + eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"` + else + eval `echo args$i`="\"$arg\"" + fi + i=$((i+1)) + done + case $i in + (0) set -- ;; + (1) set -- "$args0" ;; + (2) set -- "$args0" "$args1" ;; + (3) set -- "$args0" "$args1" "$args2" ;; + (4) set -- "$args0" "$args1" "$args2" "$args3" ;; + (5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; + (6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; + (7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; + (8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; + (9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; + esac +fi + +# Escape application args +save () { + for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done + echo " " +} +APP_ARGS=$(save "$@") + +# Collect all arguments for the java command, following the shell quoting and substitution rules +eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS" + +# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong +if [ "$(uname)" = "Darwin" ] && [ "$HOME" = "$PWD" ]; then + cd "$(dirname "$0")" +fi + +exec "$JAVACMD" "$@" diff --git a/workload-generator-common/gradlew.bat b/workload-generator-common/gradlew.bat new file mode 100644 index 0000000000000000000000000000000000000000..e95643d6a2ca62258464e83c72f5156dc941c609 --- /dev/null +++ b/workload-generator-common/gradlew.bat @@ -0,0 +1,84 @@ +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS= + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto init + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto init + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:init +@rem Get command-line arguments, handling Windows variants + +if not "%OS%" == "Windows_NT" goto win9xME_args + +:win9xME_args +@rem Slurp the command line arguments. +set CMD_LINE_ARGS= +set _SKIP=2 + +:win9xME_args_slurp +if "x%~1" == "x" goto execute + +set CMD_LINE_ARGS=%* + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS% + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/workload-generator-common/settings.gradle b/workload-generator-common/settings.gradle new file mode 100644 index 0000000000000000000000000000000000000000..13a527c8d9af1885e1ba49f647bd4243907c82a7 --- /dev/null +++ b/workload-generator-common/settings.gradle @@ -0,0 +1,10 @@ +/* + * This file was generated by the Gradle 'init' task. + * + * The settings file is used to specify which projects to include in your build. + * + * Detailed information about configuring a multi-project build in Gradle can be found + * in the user guide at https://docs.gradle.org/4.10.2/userguide/multi_project_builds.html + */ + +rootProject.name = 'workload-generator-common' diff --git a/workload-generator-common/src/main/java/common/IWorkloadGenerator.java b/workload-generator-common/src/main/java/common/IWorkloadGenerator.java new file mode 100644 index 0000000000000000000000000000000000000000..b4b94f2d77ac4a1c31b7d32f0b7db241b174aa3b --- /dev/null +++ b/workload-generator-common/src/main/java/common/IWorkloadGenerator.java @@ -0,0 +1,10 @@ +package common; + +public interface IWorkloadGenerator { + + + public void start(); + public void stop(); + + +} diff --git a/workload-generator-common/src/main/java/common/KafkaWorkloadGenerator.java b/workload-generator-common/src/main/java/common/KafkaWorkloadGenerator.java new file mode 100644 index 0000000000000000000000000000000000000000..f34630813c48df91c6134a344e3be6063a911a15 --- /dev/null +++ b/workload-generator-common/src/main/java/common/KafkaWorkloadGenerator.java @@ -0,0 +1,38 @@ +package common; + +import common.dimensions.Duration; +import common.dimensions.KeySpace; +import common.dimensions.Period; +import common.functions.BeforeAction; +import common.functions.MessageGenerator; +import communication.kafka.KafkaRecordSender; +import titan.ccp.models.records.ActivePowerRecord; + +public class KafkaWorkloadGenerator extends WorkloadGenerator { + + private final KafkaRecordSender<ActivePowerRecord> recordSender; + + public KafkaWorkloadGenerator( + final KeySpace keySpace, + final Period period, + final Duration duration, + final BeforeAction beforeHook, + final MessageGenerator generatorFunction, + final KafkaRecordSender<ActivePowerRecord> recordSender + ) { + super(keySpace, period, duration, beforeHook, generatorFunction, outputMessage -> { + //recordSender.write(outputMessage.getValue()); removed for dev + System.out.println(outputMessage.getKey()); + }); + this.recordSender = recordSender; + } + + + @Override + public void stop() { + System.out.println("subclass terminated"); + // this.recordSender.terminate(); + + super.stop(); + } +} diff --git a/workload-generator-common/src/main/java/common/KafkaWorkloadGeneratorBuilder.java b/workload-generator-common/src/main/java/common/KafkaWorkloadGeneratorBuilder.java new file mode 100644 index 0000000000000000000000000000000000000000..fc737df4ec6080fe1393c4c3fae755451c2e2347 --- /dev/null +++ b/workload-generator-common/src/main/java/common/KafkaWorkloadGeneratorBuilder.java @@ -0,0 +1,74 @@ +package common; + +import java.util.Objects; +import common.dimensions.Duration; +import common.dimensions.KeySpace; +import common.dimensions.Period; +import common.functions.BeforeAction; +import common.functions.MessageGenerator; +import communication.kafka.KafkaRecordSender; +import titan.ccp.models.records.ActivePowerRecord; + +public class KafkaWorkloadGeneratorBuilder { + + private KeySpace keySpace; + + private Period period; + + private Duration duration; + + private BeforeAction beforeAction; + + private MessageGenerator generatorFunction; + + private KafkaRecordSender<ActivePowerRecord> kafkaRecordSender; + + private KafkaWorkloadGeneratorBuilder() { + + } + + public static KafkaWorkloadGeneratorBuilder builder() { + return new KafkaWorkloadGeneratorBuilder(); + } + + public KafkaWorkloadGeneratorBuilder setBeforeHook(final BeforeAction beforeAction) { + this.beforeAction = beforeAction; + return this; + } + + public KafkaWorkloadGeneratorBuilder setKeySpace(final KeySpace keySpace) { + this.keySpace = keySpace; + return this; + } + + public KafkaWorkloadGeneratorBuilder setPeriod(final Period period) { + this.period = period; + return this; + } + + public KafkaWorkloadGeneratorBuilder setDuration(final Duration duration) { + this.duration = duration; + return this; + } + + public KafkaWorkloadGeneratorBuilder setGeneratorFunction(final MessageGenerator generatorFunction) { + this.generatorFunction = generatorFunction; + return this; + } + + public KafkaWorkloadGeneratorBuilder setKafkaRecordSender(final KafkaRecordSender<ActivePowerRecord> kafkaRecordSender) { + this.kafkaRecordSender = kafkaRecordSender; + return this; + } + + public KafkaWorkloadGenerator build() { + Objects.requireNonNull(this.keySpace, "Please specify the key space."); + Objects.requireNonNull(this.period, "Please specify the period."); + Objects.requireNonNull(this.duration, "Please specify the duration."); + final BeforeAction beforeAction = Objects.requireNonNullElse(this.beforeAction, () -> {}); + Objects.requireNonNull(this.generatorFunction, "Please specify the generator function."); + //Objects.requireNonNull(this.kafkaRecordSender, "Please specify the kafka record sender."); + + return new KafkaWorkloadGenerator(this.keySpace, this.period, this.duration, beforeAction, this.generatorFunction, this.kafkaRecordSender); + } +} diff --git a/workload-generator-common/src/main/java/common/Worker.java b/workload-generator-common/src/main/java/common/Worker.java new file mode 100644 index 0000000000000000000000000000000000000000..cd03219ead1624d0a658dcb2987413b09997b19a --- /dev/null +++ b/workload-generator-common/src/main/java/common/Worker.java @@ -0,0 +1,15 @@ +package common; + +public class Worker { + private final int id; + + public Worker(final int id) { + super(); + this.id = id; + } + + public int getId() { + return this.id; + } + +} diff --git a/workload-generator-common/src/main/java/common/WorkloadDeclaration.java b/workload-generator-common/src/main/java/common/WorkloadDeclaration.java new file mode 100644 index 0000000000000000000000000000000000000000..08f3ceea0670d90b2367ff176257fc404bb3e94a --- /dev/null +++ b/workload-generator-common/src/main/java/common/WorkloadDeclaration.java @@ -0,0 +1,36 @@ +package common; + +import common.dimensions.KeySpace; + +public class WorkloadDeclaration { + private final KeySpace keySpace; + private final int numberOfWorkers; + + public WorkloadDeclaration(final KeySpace keySpace, final int numberOfWorkers) { + + this.keySpace = keySpace; + this.numberOfWorkers = numberOfWorkers; + } + + public KeySpace getKeySpace() { + return this.keySpace; + } + + public int getNumberOfWorkers() { + return this.numberOfWorkers; + } + + /* + * Replace by json format serialization/deserialization + */ + @Override + public String toString() { + return this.getKeySpace().getPrefix() + ";" + this.getKeySpace().getMin() + ";" + this.getKeySpace().getMax() + ";" + this.getNumberOfWorkers(); + } + + public static WorkloadDeclaration fromString(String declarationString) { + final String[] deserialized = declarationString.split(";"); + + return new WorkloadDeclaration(new KeySpace(deserialized[0], Integer.valueOf(deserialized[1]), Integer.valueOf(deserialized[2])), Integer.valueOf(deserialized[3])); + } +} diff --git a/workload-generator-common/src/main/java/common/WorkloadEntity.java b/workload-generator-common/src/main/java/common/WorkloadEntity.java new file mode 100644 index 0000000000000000000000000000000000000000..0ab11f3c38de6f9e18d7e9dadaa67b16dc4e250b --- /dev/null +++ b/workload-generator-common/src/main/java/common/WorkloadEntity.java @@ -0,0 +1,18 @@ +package common; + +import common.functions.MessageGenerator; +import common.messages.OutputMessage; + +public class WorkloadEntity { + private final String key; + private final MessageGenerator generator; + + public WorkloadEntity(final String key, final MessageGenerator generator) { + this.key = key; + this.generator = generator; + } + + public OutputMessage generateMessage() { + return this.generator.generateMessage(this.key); + } +} diff --git a/workload-generator-common/src/main/java/common/WorkloadGenerator.java b/workload-generator-common/src/main/java/common/WorkloadGenerator.java new file mode 100644 index 0000000000000000000000000000000000000000..74b1bb3ca5fff5346074b83b2d4b722341286ec3 --- /dev/null +++ b/workload-generator-common/src/main/java/common/WorkloadGenerator.java @@ -0,0 +1,109 @@ +package common; + +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import common.dimensions.Duration; +import common.dimensions.KeySpace; +import common.dimensions.Period; +import common.functions.BeforeAction; +import common.functions.MessageGenerator; +import common.functions.Transport; +import common.messages.OutputMessage; +import communication.zookeeper.leader.WorkloadDistributor; + +public abstract class WorkloadGenerator implements IWorkloadGenerator { + + private final KeySpace keySpace; + + private final Period period; + + private final Duration duration; + + private final BeforeAction beforeAction; + + private final BiFunction<WorkloadDeclaration, Worker, List<WorkloadEntity>> workloadSelector; + + private final MessageGenerator generatorFunction; + + private final Transport transport; // connect with previous stage by generating workloadentities from workloadranges / exchange via ZK + + private WorkloadDistributor workloadDistributor; + + private final ScheduledExecutorService executor; + + @Override + public void start() { + this.workloadDistributor.start(); + } + + @Override + public void stop() { + this.workloadDistributor.stop(); + this.executor.shutdown(); + } + + public WorkloadGenerator( + final KeySpace keySpace, + final Period period, + final Duration duration, + final BeforeAction beforeHook, + final MessageGenerator generatorFunction, + final Transport transport + ) { + this.period = period; + this.keySpace = keySpace; + this.duration = duration; + this.beforeAction = beforeHook; + this.generatorFunction = generatorFunction; + this.workloadSelector = (workloadDeclaration, worker) -> { + final List<WorkloadEntity> workloadEntities = new LinkedList<>(); + + // construct workload entities of the subspace, this worker is accountable for + // counting modulo #of workers with offset of the current worker id (worker ids starting at 0) + for (int i = workloadDeclaration.getKeySpace().getMin() + worker.getId(); i <= workloadDeclaration.getKeySpace().getMax(); i+=workloadDeclaration.getNumberOfWorkers()) { + final String id = workloadDeclaration.getKeySpace().getPrefix() + i; + workloadEntities.add(new WorkloadEntity(id, this.generatorFunction)); + } + + return workloadEntities; + }; + this.transport = transport; + + final int threads = 10; // env + this.executor = Executors.newScheduledThreadPool(threads); + final Random random = new Random(); + + final int periodMs = period.getDuration(); + + final BiConsumer<WorkloadDeclaration, Worker> workerAction = (declaration, worker) -> { + + List<WorkloadEntity> entities = this.workloadSelector.apply(declaration, worker); + + System.out.println("Beginning of Experiment..."); + entities.forEach(entity -> { + final OutputMessage message = entity.generateMessage(); + final long initialDelay = random.nextInt(periodMs); + executor.scheduleAtFixedRate(() -> this.transport.consume(message), initialDelay, periodMs, period.getTimeUnit()); + + }); + + try { + System.out.println("Experiment is going to be executed for the specified duration..."); + executor.awaitTermination(duration.getDuration(), duration.getTimeUnit()); + System.out.println("Terminating now..."); + this.stop(); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + }; + + this.workloadDistributor = new WorkloadDistributor(this.keySpace, workerAction); + } +} diff --git a/workload-generator-common/src/main/java/common/dimensions/Duration.java b/workload-generator-common/src/main/java/common/dimensions/Duration.java new file mode 100644 index 0000000000000000000000000000000000000000..ac4474d4f010d25016bdce1ee449646651869c48 --- /dev/null +++ b/workload-generator-common/src/main/java/common/dimensions/Duration.java @@ -0,0 +1,23 @@ +package common.dimensions; + +import java.util.concurrent.TimeUnit; + +public class Duration { + private final int duration; + private final TimeUnit timeUnit; + + public Duration(final int duration, final TimeUnit timeUnit) { + super(); + this.duration = duration; + this.timeUnit = timeUnit; + } + + public int getDuration() { + return duration; + } + + public TimeUnit getTimeUnit() { + return timeUnit; + } + +} diff --git a/workload-generator-common/src/main/java/common/dimensions/KeySpace.java b/workload-generator-common/src/main/java/common/dimensions/KeySpace.java new file mode 100644 index 0000000000000000000000000000000000000000..963dacd906bfa0868e92c7ad4f0815d55db1801d --- /dev/null +++ b/workload-generator-common/src/main/java/common/dimensions/KeySpace.java @@ -0,0 +1,38 @@ +package common.dimensions; + +public class KeySpace { + + private final String prefix; + private final int min; + private final int max; + + + public KeySpace(final String prefix, final int min, final int max) { + super(); + this.prefix = prefix; + this.min = min; + this.max = max; + } + + public KeySpace(final String prefix, final int numberOfKeys) { + this(prefix, 0, numberOfKeys-1); + } + + public KeySpace(final int numberOfKeys) { + this("sensor_", 0, numberOfKeys-1); + } + + public String getPrefix() { + return prefix; + } + + + public int getMin() { + return min; + } + + + public int getMax() { + return max; + } +} diff --git a/workload-generator-common/src/main/java/common/dimensions/Period.java b/workload-generator-common/src/main/java/common/dimensions/Period.java new file mode 100644 index 0000000000000000000000000000000000000000..e50f375eaea78228b5ae2b73e999cf3cebd2a9a3 --- /dev/null +++ b/workload-generator-common/src/main/java/common/dimensions/Period.java @@ -0,0 +1,24 @@ +package common.dimensions; + +import java.util.concurrent.TimeUnit; + +public class Period { + + private final int period; + private final TimeUnit timeUnit; + + public Period(final int period, final TimeUnit timeUnit) { + super(); + this.period = period; + this.timeUnit = timeUnit; + } + + public int getDuration() { + return period; + } + + public TimeUnit getTimeUnit() { + return timeUnit; + } + +} diff --git a/workload-generator-common/src/main/java/common/functions/BeforeAction.java b/workload-generator-common/src/main/java/common/functions/BeforeAction.java new file mode 100644 index 0000000000000000000000000000000000000000..c6a40b91be2e7387de4f61b05d3616576498ceff --- /dev/null +++ b/workload-generator-common/src/main/java/common/functions/BeforeAction.java @@ -0,0 +1,8 @@ +package common.functions; + +@FunctionalInterface +public interface BeforeAction { + + public void f(); + +} diff --git a/workload-generator-common/src/main/java/common/functions/MessageGenerator.java b/workload-generator-common/src/main/java/common/functions/MessageGenerator.java new file mode 100644 index 0000000000000000000000000000000000000000..d8432185ec10fd9f950b0f5b239fc4c02daa2780 --- /dev/null +++ b/workload-generator-common/src/main/java/common/functions/MessageGenerator.java @@ -0,0 +1,10 @@ +package common.functions; + +import common.messages.OutputMessage; + +@FunctionalInterface +public interface MessageGenerator { + + public OutputMessage generateMessage(final String key); + +} diff --git a/workload-generator-common/src/main/java/common/functions/Transport.java b/workload-generator-common/src/main/java/common/functions/Transport.java new file mode 100644 index 0000000000000000000000000000000000000000..fb203a607ae464917b540e8f44a2159c77ef9cdf --- /dev/null +++ b/workload-generator-common/src/main/java/common/functions/Transport.java @@ -0,0 +1,10 @@ +package common.functions; + +import common.messages.OutputMessage; + +@FunctionalInterface +public interface Transport { + + public void consume(final OutputMessage message); + +} diff --git a/workload-generator-common/src/main/java/common/messages/OutputMessage.java b/workload-generator-common/src/main/java/common/messages/OutputMessage.java new file mode 100644 index 0000000000000000000000000000000000000000..a0d30bdcd68c94a0bdf5927fa279c766c1538296 --- /dev/null +++ b/workload-generator-common/src/main/java/common/messages/OutputMessage.java @@ -0,0 +1,23 @@ +package common.messages; + +import titan.ccp.models.records.ActivePowerRecord; + +public class OutputMessage { + private String key; + private ActivePowerRecord value; + + public OutputMessage(String key, ActivePowerRecord value) { + super(); + this.key = key; + this.value = value; + } + + public String getKey() { + return key; + } + + public ActivePowerRecord getValue() { + return value; + } + +} diff --git a/workload-generator-common/src/main/java/communication/kafka/KafkaRecordSender.java b/workload-generator-common/src/main/java/communication/kafka/KafkaRecordSender.java new file mode 100644 index 0000000000000000000000000000000000000000..e31ed829f44e02598852e6894b6c5f9bfc45f4e7 --- /dev/null +++ b/workload-generator-common/src/main/java/communication/kafka/KafkaRecordSender.java @@ -0,0 +1,84 @@ +package communication.kafka; + +import java.util.Properties; +import java.util.function.Function; +import kieker.common.record.IMonitoringRecord; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; + + +/** + * Sends monitoring records to Kafka. + * + * @param <T> {@link IMonitoringRecord} to send + */ +public class KafkaRecordSender<T extends IMonitoringRecord> { + + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class); + + private final String topic; + + private final Function<T, String> keyAccessor; + + private final Function<T, Long> timestampAccessor; + + private final Producer<String, T> producer; + + public KafkaRecordSender(final String bootstrapServers, final String topic) { + this(bootstrapServers, topic, x -> "", x -> null, new Properties()); + } + + public KafkaRecordSender(final String bootstrapServers, final String topic, + final Function<T, String> keyAccessor) { + this(bootstrapServers, topic, keyAccessor, x -> null, new Properties()); + } + + public KafkaRecordSender(final String bootstrapServers, final String topic, + final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor) { + this(bootstrapServers, topic, keyAccessor, timestampAccessor, new Properties()); + } + + /** + * Create a new {@link KafkaRecordSender}. + */ + public KafkaRecordSender(final String bootstrapServers, final String topic, + final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor, + final Properties defaultProperties) { + this.topic = topic; + this.keyAccessor = keyAccessor; + this.timestampAccessor = timestampAccessor; + + final Properties properties = new Properties(); + properties.putAll(defaultProperties); + properties.put("bootstrap.servers", bootstrapServers); + // properties.put("acks", this.acknowledges); + // properties.put("batch.size", this.batchSize); + // properties.put("linger.ms", this.lingerMs); + // properties.put("buffer.memory", this.bufferMemory); + + this.producer = new KafkaProducer<>(properties, new StringSerializer(), + IMonitoringRecordSerde.serializer()); + } + + /** + * Write the passed monitoring record to Kafka. + */ + public void write(final T monitoringRecord) { + final ProducerRecord<String, T> record = + new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord), + this.keyAccessor.apply(monitoringRecord), monitoringRecord); + + LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record); + this.producer.send(record); + } + + public void terminate() { + this.producer.close(); + } + +} diff --git a/workload-generator-common/src/main/java/communication/zookeeper/leader/WorkloadDistributor.java b/workload-generator-common/src/main/java/communication/zookeeper/leader/WorkloadDistributor.java new file mode 100644 index 0000000000000000000000000000000000000000..40060d865675526c8991f7a225c6cfdf4dd7d30b --- /dev/null +++ b/workload-generator-common/src/main/java/communication/zookeeper/leader/WorkloadDistributor.java @@ -0,0 +1,122 @@ +package communication.zookeeper.leader; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.framework.recipes.atomic.AtomicValue; +import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger; +import org.apache.curator.framework.recipes.cache.NodeCache; +import org.apache.curator.framework.recipes.cache.NodeCacheListener; +import org.apache.curator.framework.recipes.nodes.PersistentNode; +import org.apache.curator.retry.RetryNTimes; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher.Event.EventType; +import common.Worker; +import common.WorkloadDeclaration; +import common.dimensions.KeySpace; + +public class WorkloadDistributor { + + + private static final String COUNTER_PATH = "/counter"; + private static final String WORKLOAD_PATH = "/workload"; + private static final String WORKLOAD_DEFINITION_PATH = "/workload/definition"; + + private final DistributedAtomicInteger counter; + + private final KeySpace keySpace; + private final BiConsumer<WorkloadDeclaration, Worker> workerAction; + + private final CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new RetryNTimes(3, 1000)); + + + + public WorkloadDistributor(final KeySpace keySpace, final BiConsumer<WorkloadDeclaration, Worker> workerAction) { + + this.keySpace = keySpace; + this.workerAction = workerAction; + + client.start(); + + try { + client.blockUntilConnected(); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + counter = new DistributedAtomicInteger(client, COUNTER_PATH, new RetryNTimes(3, 1000)); + } + + public void start() { + try { + AtomicValue<Integer> result = counter.increment(); + final int id = result.preValue(); + if (result.succeeded()) { + + CuratorWatcher watcher = this.buildWatcher(id); + + client.checkExists().creatingParentsIfNeeded().forPath(WORKLOAD_DEFINITION_PATH); + + if (id == 0) { + System.out.println("is master"); + + // register worker action, as master acts also as worker + client.getChildren().usingWatcher(watcher).forPath(WORKLOAD_PATH); + + + Thread.sleep(10000); // wait for all workers to participate in the leader election + + int numberOfWorkers = this.counter.get().postValue(); + + System.out.printf("Number of Workers: %d\n", numberOfWorkers); + + final WorkloadDeclaration declaration = new WorkloadDeclaration(this.keySpace, numberOfWorkers); + + client.create().forPath(WORKLOAD_DEFINITION_PATH, declaration.toString().getBytes(StandardCharsets.UTF_8)); + + } else { + System.out.println("is worker"); + + client.getChildren().usingWatcher(watcher).forPath(WORKLOAD_PATH); + } + + Thread.sleep(20000); // wait until the workload declaration is retrieved + + } + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + private CuratorWatcher buildWatcher(int id) { + return new CuratorWatcher() { + + @Override + public void process(WatchedEvent event) throws Exception { + if(event.getType() == EventType.NodeChildrenChanged) { + byte[] bytes = client.getData().forPath(WORKLOAD_DEFINITION_PATH); + final WorkloadDeclaration declaration = WorkloadDeclaration.fromString(new String(bytes, StandardCharsets.UTF_8)); + + if (id > declaration.getNumberOfWorkers() - 1) { + throw new IllegalStateException("Worker with id " + id + " was too slow and is therefore not participating in the workload generation."); + } else { + workerAction.accept(declaration, new Worker(id)); + } + } + } + }; + } + + public void stop() { + this.client.close(); + } + +}