diff --git a/.settings/qa.eclipse.plugin.checkstyle.prefs b/.settings/qa.eclipse.plugin.checkstyle.prefs index 64a0564d59f627ecf6dc178fd69f2788a74da418..2e24669b8614a13e1a133bab499a588c3a1d1758 100644 --- a/.settings/qa.eclipse.plugin.checkstyle.prefs +++ b/.settings/qa.eclipse.plugin.checkstyle.prefs @@ -1,4 +1,4 @@ configFilePath=config/checkstyle.xml customModulesJarPaths= eclipse.preferences.version=1 -enabled=true +enabled=false diff --git a/.settings/qa.eclipse.plugin.pmd.prefs b/.settings/qa.eclipse.plugin.pmd.prefs index 04ac39484810c88779a38e11662b0bab0961a22e..a8950d58ca40d8762a665ce3174deda7b2d89b85 100644 --- a/.settings/qa.eclipse.plugin.pmd.prefs +++ b/.settings/qa.eclipse.plugin.pmd.prefs @@ -1,4 +1,4 @@ customRulesJars= eclipse.preferences.version=1 -enabled=true +enabled=false ruleSetFilePath=config/pmd.xml diff --git a/README.md b/README.md index c3abef2dce738570bb8e620033165a48ee96f095..6ad1dd576bc165630fb378234102f324f9b66d8a 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,19 @@ -Current envisaged project structure: +# Theodolite +> A theodolite is a precision optical instrument for measuring angles between designated visible points in the horizontal and vertical planes. -- <cite>[Wikipedia](https://en.wikipedia.org/wiki/Theodolite)</cite> -* `scalability-benchmarking` - * `uc1-application-kstreams` - * `uc1-application-samza` - * `uc1-workload-generation` - * ... +Theodolite is a framework for benchmarking the horizontal and vertical scalability of stream processing engines. It consists of three modules: + +## Theodolite Benchmarks + +Theodolite contains 4 application benchmarks, which are based on typical use cases for stream processing within microservices. For each benchmark, a corresponding workload generator is provided. Currently, this repository provides benchmark implementations for Kafka Streams. + + +## Theodolite Execution Framework + +Theodolite aims to benchmark scalability of stream processing engines for real use cases. Microservices that apply stream processing techniques are usually deployed in elastic cloud environments. Hence, Theodolite's cloud-native benchmarking framework deploys as components in a cloud environment, orchestrated by Kubernetes. More information on how to execute scalability benchmarks can be found in [Thedolite execution framework](execution). + + +## Theodolite Analysis Tools + +Theodolite's benchmarking method create a *scalability graph* allowing to draw conclusions about the scalability of a stream processing engine or its deployment. A scalability graph shows how resource demand evolves with an increasing workload. Theodolite provides Jupyter notebooks for creating such scalability graphs based on benchmarking results from the execution framework. More information can be found in [Theodolite analysis tool](analysis). diff --git a/analysis/README.md b/analysis/README.md new file mode 100644 index 0000000000000000000000000000000000000000..263b1db16fcabefe5409ebe744afe5997bc90d89 --- /dev/null +++ b/analysis/README.md @@ -0,0 +1,22 @@ +# Theodolite Analysis + +This directory contains Jupyter notebooks for analyzing and visualizing +benchmark execution results and plotting. The following notebooks are provided: + +* [scalability-graph.ipynb](scalability-graph.ipynb): Creates a scalability graph for a certain benchmark execution. +* [scalability-graph-final.ipynb](scalability-graph-final.ipynb): Combines the scalability graphs of multiple benchmarks executions (e.g. for comparing different configuration). +* [lag-trend-graph.ipynb](lag-trend-graph.ipynb): Visualizes the consumer lag evaluation over time along with the computed trend. + +## Usage + +For executing benchmarks and analyzing their results, a **Python 3.7** +installation is required (e.g., in a virtual environment). Our notebooks require some +Python libraries, which can be installed via: + +```sh +pip install -r requirements.txt +``` + +We have tested these +notebooks with [Visual Studio Code](https://code.visualstudio.com/docs/python/jupyter-support), +however, every other server should be fine as well. diff --git a/execution/lag-trend-graph.ipynb b/analysis/lag-trend-graph.ipynb similarity index 91% rename from execution/lag-trend-graph.ipynb rename to analysis/lag-trend-graph.ipynb index 71cd54ceefbcce4548e118a9dd0ab484df52a207..4e574ceb6a6273a7299bb50d9e81598002c330f5 100644 --- a/execution/lag-trend-graph.ipynb +++ b/analysis/lag-trend-graph.ipynb @@ -20,8 +20,9 @@ "metadata": {}, "outputs": [], "source": [ - "directory = ''\n", - "filename = 'xxx_totallag.csv'\n", + "directory = '<path-to>/results'\n", + "#filename = 'exp1002_uc3_75000_1_totallag.csv'\n", + "filename = 'exp1002_uc3_50000_2_totallag.csv'\n", "warmup_sec = 60\n", "threshold = 2000 #slope" ] @@ -105,20 +106,6 @@ "\n", "plt.savefig(\"plot.pdf\", bbox_inches='tight')\n" ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [] } ], "metadata": { @@ -138,7 +125,7 @@ "pygments_lexer": "ipython3", "version": 3, "kernelspec": { - "name": "python37064bitvenvvenv469ea2e0a7854dc7b367eee45386afee", + "name": "python37064bitvenvvenv21b61136d7f443749f2918b47e00d223", "display_name": "Python 3.7.0 64-bit ('.venv': venv)" } }, diff --git a/analysis/requirements.txt b/analysis/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..c97a862620dfc9cd9602fe02e420752b077c6c0a --- /dev/null +++ b/analysis/requirements.txt @@ -0,0 +1,4 @@ +jupyter==1.0.0 +matplotlib==3.2.0 +pandas==1.0.1 +scikit-learn==0.22.2.post1 \ No newline at end of file diff --git a/execution/scalability-graph-finish.ipynb b/analysis/scalability-graph-finish.ipynb similarity index 98% rename from execution/scalability-graph-finish.ipynb rename to analysis/scalability-graph-finish.ipynb index ffcf33b6b044a7f5f354b682a5cafc3c3f42e2f0..8cadff0daee03f0ed0c2fa0ac0c7c72b462f340d 100644 --- a/execution/scalability-graph-finish.ipynb +++ b/analysis/scalability-graph-finish.ipynb @@ -18,7 +18,7 @@ "metadata": {}, "outputs": [], "source": [ - "directory = '../results-inst'\n", + "directory = '<path-to>/results-inst'\n", "\n", "experiments = {\n", " 'exp1003': 'exp1003',\n", diff --git a/execution/scalability-graph.ipynb b/analysis/scalability-graph.ipynb similarity index 95% rename from execution/scalability-graph.ipynb rename to analysis/scalability-graph.ipynb index 752c0bebc901e756e18d4b11fc0d8ae02cddcf13..868f950dfea091b8fd6dbc78dc4b7471086c8947 100644 --- a/execution/scalability-graph.ipynb +++ b/analysis/scalability-graph.ipynb @@ -16,7 +16,6 @@ "outputs": [], "source": [ "import os\n", - "import requests\n", "from datetime import datetime, timedelta, timezone\n", "import pandas as pd\n", "from sklearn.linear_model import LinearRegression\n", @@ -38,11 +37,13 @@ "metadata": {}, "outputs": [], "source": [ - "exp_id = 1003\n", + "exp_id = 2012\n", "warmup_sec = 60\n", "warmup_partitions_sec = 120\n", "threshold = 2000 #slope\n", - "directory = '../results'\n" + "#directory = '../results'\n", + "directory = '<path-to>/results'\n", + "directory_out = '<path-to>/results-inst'\n" ] }, { @@ -244,7 +245,7 @@ "metadata": {}, "outputs": [], "source": [ - "min_suitable_instances.to_csv(f'../results-inst/exp{exp_id}_min-suitable-instances.csv', index=False)" + "min_suitable_instances.to_csv(os.path.join(directory_out, f'../results-inst/exp{exp_id}_min-suitable-instances.csv'), index=False)" ] }, { @@ -284,7 +285,7 @@ "pygments_lexer": "ipython3", "version": 3, "kernelspec": { - "name": "python37064bitvenvvenv469ea2e0a7854dc7b367eee45386afee", + "name": "python37064bitvenvvenv6c432ee1239d4f3cb23f871068b0267d", "display_name": "Python 3.7.0 64-bit ('.venv': venv)" } }, diff --git a/application-kafkastreams-commons/.settings/org.eclipse.jdt.ui.prefs b/application-kafkastreams-commons/.settings/org.eclipse.jdt.ui.prefs new file mode 100644 index 0000000000000000000000000000000000000000..98b5ca8064a352aacfe2aebd13fbd0a87735fc3e --- /dev/null +++ b/application-kafkastreams-commons/.settings/org.eclipse.jdt.ui.prefs @@ -0,0 +1,127 @@ +cleanup.add_default_serial_version_id=true +cleanup.add_generated_serial_version_id=false +cleanup.add_missing_annotations=true +cleanup.add_missing_deprecated_annotations=true +cleanup.add_missing_methods=false +cleanup.add_missing_nls_tags=false +cleanup.add_missing_override_annotations=true +cleanup.add_missing_override_annotations_interface_methods=true +cleanup.add_serial_version_id=false +cleanup.always_use_blocks=true +cleanup.always_use_parentheses_in_expressions=false +cleanup.always_use_this_for_non_static_field_access=true +cleanup.always_use_this_for_non_static_method_access=true +cleanup.convert_functional_interfaces=false +cleanup.convert_to_enhanced_for_loop=true +cleanup.correct_indentation=true +cleanup.format_source_code=true +cleanup.format_source_code_changes_only=false +cleanup.insert_inferred_type_arguments=false +cleanup.make_local_variable_final=true +cleanup.make_parameters_final=true +cleanup.make_private_fields_final=true +cleanup.make_type_abstract_if_missing_method=false +cleanup.make_variable_declarations_final=true +cleanup.never_use_blocks=false +cleanup.never_use_parentheses_in_expressions=true +cleanup.organize_imports=true +cleanup.qualify_static_field_accesses_with_declaring_class=false +cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true +cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true +cleanup.qualify_static_member_accesses_with_declaring_class=true +cleanup.qualify_static_method_accesses_with_declaring_class=false +cleanup.remove_private_constructors=true +cleanup.remove_redundant_modifiers=false +cleanup.remove_redundant_semicolons=false +cleanup.remove_redundant_type_arguments=true +cleanup.remove_trailing_whitespaces=true +cleanup.remove_trailing_whitespaces_all=true +cleanup.remove_trailing_whitespaces_ignore_empty=false +cleanup.remove_unnecessary_casts=true +cleanup.remove_unnecessary_nls_tags=true +cleanup.remove_unused_imports=true +cleanup.remove_unused_local_variables=false +cleanup.remove_unused_private_fields=true +cleanup.remove_unused_private_members=false +cleanup.remove_unused_private_methods=true +cleanup.remove_unused_private_types=true +cleanup.sort_members=false +cleanup.sort_members_all=false +cleanup.use_anonymous_class_creation=false +cleanup.use_blocks=true +cleanup.use_blocks_only_for_return_and_throw=false +cleanup.use_lambda=true +cleanup.use_parentheses_in_expressions=true +cleanup.use_this_for_non_static_field_access=true +cleanup.use_this_for_non_static_field_access_only_if_necessary=false +cleanup.use_this_for_non_static_method_access=true +cleanup.use_this_for_non_static_method_access_only_if_necessary=false +cleanup_profile=_CAU-SE-Style +cleanup_settings_version=2 +eclipse.preferences.version=1 +editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true +formatter_profile=_CAU-SE-Style +formatter_settings_version=15 +org.eclipse.jdt.ui.ignorelowercasenames=true +org.eclipse.jdt.ui.importorder=; +org.eclipse.jdt.ui.ondemandthreshold=99 +org.eclipse.jdt.ui.staticondemandthreshold=99 +sp_cleanup.add_default_serial_version_id=true +sp_cleanup.add_generated_serial_version_id=false +sp_cleanup.add_missing_annotations=true +sp_cleanup.add_missing_deprecated_annotations=true +sp_cleanup.add_missing_methods=false +sp_cleanup.add_missing_nls_tags=false +sp_cleanup.add_missing_override_annotations=true +sp_cleanup.add_missing_override_annotations_interface_methods=true +sp_cleanup.add_serial_version_id=false +sp_cleanup.always_use_blocks=true +sp_cleanup.always_use_parentheses_in_expressions=false +sp_cleanup.always_use_this_for_non_static_field_access=true +sp_cleanup.always_use_this_for_non_static_method_access=true +sp_cleanup.convert_functional_interfaces=false +sp_cleanup.convert_to_enhanced_for_loop=true +sp_cleanup.correct_indentation=true +sp_cleanup.format_source_code=true +sp_cleanup.format_source_code_changes_only=false +sp_cleanup.insert_inferred_type_arguments=false +sp_cleanup.make_local_variable_final=true +sp_cleanup.make_parameters_final=true +sp_cleanup.make_private_fields_final=true +sp_cleanup.make_type_abstract_if_missing_method=false +sp_cleanup.make_variable_declarations_final=true +sp_cleanup.never_use_blocks=false +sp_cleanup.never_use_parentheses_in_expressions=true +sp_cleanup.on_save_use_additional_actions=true +sp_cleanup.organize_imports=true +sp_cleanup.qualify_static_field_accesses_with_declaring_class=false +sp_cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true +sp_cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true +sp_cleanup.qualify_static_member_accesses_with_declaring_class=true +sp_cleanup.qualify_static_method_accesses_with_declaring_class=false +sp_cleanup.remove_private_constructors=true +sp_cleanup.remove_redundant_modifiers=false +sp_cleanup.remove_redundant_semicolons=false +sp_cleanup.remove_redundant_type_arguments=true +sp_cleanup.remove_trailing_whitespaces=true +sp_cleanup.remove_trailing_whitespaces_all=true +sp_cleanup.remove_trailing_whitespaces_ignore_empty=false +sp_cleanup.remove_unnecessary_casts=true +sp_cleanup.remove_unnecessary_nls_tags=true +sp_cleanup.remove_unused_imports=true +sp_cleanup.remove_unused_local_variables=false +sp_cleanup.remove_unused_private_fields=true +sp_cleanup.remove_unused_private_members=false +sp_cleanup.remove_unused_private_methods=true +sp_cleanup.remove_unused_private_types=true +sp_cleanup.sort_members=false +sp_cleanup.sort_members_all=false +sp_cleanup.use_anonymous_class_creation=false +sp_cleanup.use_blocks=true +sp_cleanup.use_blocks_only_for_return_and_throw=false +sp_cleanup.use_lambda=true +sp_cleanup.use_parentheses_in_expressions=true +sp_cleanup.use_this_for_non_static_field_access=true +sp_cleanup.use_this_for_non_static_field_access_only_if_necessary=false +sp_cleanup.use_this_for_non_static_method_access=true +sp_cleanup.use_this_for_non_static_method_access_only_if_necessary=false diff --git a/application-kafkastreams-commons/.settings/qa.eclipse.plugin.checkstyle.prefs b/application-kafkastreams-commons/.settings/qa.eclipse.plugin.checkstyle.prefs new file mode 100644 index 0000000000000000000000000000000000000000..87860c815222845c1d264d7d0ce498d3397f8280 --- /dev/null +++ b/application-kafkastreams-commons/.settings/qa.eclipse.plugin.checkstyle.prefs @@ -0,0 +1,4 @@ +configFilePath=../config/checkstyle.xml +customModulesJarPaths= +eclipse.preferences.version=1 +enabled=true diff --git a/application-kafkastreams-commons/.settings/qa.eclipse.plugin.pmd.prefs b/application-kafkastreams-commons/.settings/qa.eclipse.plugin.pmd.prefs new file mode 100644 index 0000000000000000000000000000000000000000..efbcb8c9e5d449194a48ca1ea42b7d807b573db9 --- /dev/null +++ b/application-kafkastreams-commons/.settings/qa.eclipse.plugin.pmd.prefs @@ -0,0 +1,4 @@ +customRulesJars= +eclipse.preferences.version=1 +enabled=true +ruleSetFilePath=../config/pmd.xml diff --git a/application-kafkastreams-commons/build.gradle b/application-kafkastreams-commons/build.gradle new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/uc1-application/src/main/java/spesb/uc1/streamprocessing/KafkaStreamsBuilder.java b/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java similarity index 51% rename from uc1-application/src/main/java/spesb/uc1/streamprocessing/KafkaStreamsBuilder.java rename to application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java index 4b7f487c8e848f0b1d6d652b7d86a8c50c202af1..ae2a6dafa3d36dada927d17a1ca00d2df63db78b 100644 --- a/uc1-application/src/main/java/spesb/uc1/streamprocessing/KafkaStreamsBuilder.java +++ b/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java @@ -1,32 +1,55 @@ -package spesb.uc1.streamprocessing; +package theodolite.commons.kafkastreams; import java.util.Objects; import java.util.Properties; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; import titan.ccp.common.kafka.streams.PropertiesBuilder; /** * Builder for the Kafka Streams configuration. */ -public class KafkaStreamsBuilder { - - private static final String APPLICATION_NAME = "titan-ccp-history"; - private static final String APPLICATION_VERSION = "0.0.1"; - - // private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStreamsBuilder.class); +public abstract class KafkaStreamsBuilder { + // Kafkastreams application specific + private String applicationName; // NOPMD + private String applicationVersion; // NOPMD private String bootstrapServers; // NOPMD - private String inputTopic; // NOPMD private int numThreads = -1; // NOPMD private int commitIntervalMs = -1; // NOPMD private int cacheMaxBytesBuff = -1; // NOPMD - public KafkaStreamsBuilder inputTopic(final String inputTopic) { - this.inputTopic = inputTopic; + /** + * Sets the application name for the {@code KafkaStreams} application. It is used to create the + * application ID. + * + * @param applicationName Name of the application. + * @return + */ + public KafkaStreamsBuilder applicationName(final String applicationName) { + this.applicationName = applicationName; + return this; + } + + /** + * Sets the application version for the {@code KafkaStreams} application. It is used to create the + * application ID. + * + * @param applicationVersion Version of the application. + * @return + */ + public KafkaStreamsBuilder applicationVersion(final String applicationVersion) { + this.applicationVersion = applicationVersion; return this; } + /** + * Sets the bootstrap servers for the {@code KafkaStreams} application. + * + * @param bootstrapServers String for a bootstrap server. + * @return + */ public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) { this.bootstrapServers = bootstrapServers; return this; @@ -35,6 +58,9 @@ public class KafkaStreamsBuilder { /** * Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus * one for using the default. + * + * @param numThreads Number of threads. -1 for using the default. + * @return */ public KafkaStreamsBuilder numThreads(final int numThreads) { if (numThreads < -1 || numThreads == 0) { @@ -48,6 +74,10 @@ public class KafkaStreamsBuilder { * Sets the Kafka Streams property for the frequency with which to save the position (offsets in * source topics) of tasks (commit.interval.ms). Must be zero for processing all record, for * example, when processing bulks of records. Can be minus one for using the default. + * + * @param commitIntervalMs Frequency with which to save the position of tasks. In ms, -1 for using + * the default. + * @return */ public KafkaStreamsBuilder commitIntervalMs(final int commitIntervalMs) { if (commitIntervalMs < -1) { @@ -61,6 +91,10 @@ public class KafkaStreamsBuilder { * Sets the Kafka Streams property for maximum number of memory bytes to be used for record caches * across all threads (cache.max.bytes.buffering). Must be zero for processing all record, for * example, when processing bulks of records. Can be minus one for using the default. + * + * @param cacheMaxBytesBuffering Number of memory bytes to be used for record caches across all + * threads. -1 for using the default. + * @return */ public KafkaStreamsBuilder cacheMaxBytesBuffering(final int cacheMaxBytesBuffering) { if (cacheMaxBytesBuffering < -1) { @@ -71,22 +105,38 @@ public class KafkaStreamsBuilder { } /** - * Builds the {@link KafkaStreams} instance. + * Method to implement a {@link Topology} for a {@code KafkaStreams} application. + * + * @return A {@code Topology} for a {@code KafkaStreams} application. */ - public KafkaStreams build() { - Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); - // TODO log parameters - final TopologyBuilder topologyBuilder = new TopologyBuilder( - this.inputTopic); - final Properties properties = PropertiesBuilder + protected abstract Topology buildTopology(); + + /** + * Build the {@link Properties} for a {@code KafkaStreams} application. + * + * @return A {@code Properties} object. + */ + protected Properties buildProperties() { + return PropertiesBuilder .bootstrapServers(this.bootstrapServers) - .applicationId(APPLICATION_NAME + '-' + APPLICATION_VERSION) // TODO as parameter + .applicationId(this.applicationName + '-' + this.applicationVersion) .set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0) .set(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs, p -> p >= 0) .set(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuff, p -> p >= 0) - // .set(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG") .build(); - return new KafkaStreams(topologyBuilder.build(), properties); + } + + /** + * Builds the {@link KafkaStreams} instance. + */ + public KafkaStreams build() { + // Check for required attributes for building properties. + Objects.requireNonNull(this.bootstrapServers, "Bootstrap server has not been set."); + Objects.requireNonNull(this.applicationName, "Application name has not been set."); + Objects.requireNonNull(this.applicationVersion, "Application version has not been set."); + + // Create the Kafka streams instance. + return new KafkaStreams(this.buildTopology(), this.buildProperties()); } } diff --git a/application-kafkastreams-commons/src/test/java/.gitkeep b/application-kafkastreams-commons/src/test/java/.gitkeep new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/build.gradle b/build.gradle index f0887c989ad703da8eb141385f19d1d098627dd6..0e71087e4014f9882eef3d575f4856fe35b6f808 100644 --- a/build.gradle +++ b/build.gradle @@ -12,9 +12,8 @@ buildscript { // Variables used to distinct different subprojects def useCaseProjects = subprojects.findAll {it -> it.name.matches('uc(.)*')} -def useCaseApplications = subprojects.findAll {it -> it.name.matches('uc[0-9]+-application')} -def useCaseGenerators = subprojects.findAll {it -> it.name.matches('uc[0-9]+-workload-generator*')} -def commonProjects = subprojects.findAll {it -> it.name.matches('(.)*common(s?)(.)*')} +def commonProjects = subprojects.findAll {it -> it.name.matches('(.)*commons(.)*')} + // Plugins allprojects { @@ -58,8 +57,8 @@ allprojects { } } -// Dependencies for all use case applications -configure(useCaseApplications) { +// Dependencies for all use cases +configure(useCaseProjects) { dependencies { // These dependencies are used internally, and not exposed to consumers on their own compile classpath. implementation('org.industrial-devops:titan-ccp-common:0.0.3-SNAPSHOT') { changing = true } // This branch depends still on old version of titan-ccp-common @@ -84,11 +83,7 @@ configure(useCaseGenerators) { implementation 'com.google.guava:guava:24.1-jre' implementation 'org.jctools:jctools-core:2.1.1' implementation 'org.slf4j:slf4j-simple:1.6.1' - implementation project(':workload-generator-common') - - // maintain build of generators - implementation 'net.kieker-monitoring:kieker:1.14-SNAPSHOT' - implementation('org.industrial-devops:titan-ccp-common:0.0.3-SNAPSHOT') { changing = true } + implementation project(':application-kafkastreams-commons') // Use JUnit test framework testImplementation 'junit:junit:4.12' @@ -100,11 +95,10 @@ configure(commonProjects) { dependencies { // These dependencies is exported to consumers, that is to say found on their compile classpath. api 'org.apache.kafka:kafka-clients:2.4.0' - + // These dependencies are used internally, and not exposed to consumers on their own compile classpath. implementation 'org.slf4j:slf4j-simple:1.6.1' - implementation('org.industrial-devops:titan-ccp-common:0.0.3-SNAPSHOT') { changing = true } // This branch depends still on old version of titan-ccp-common - implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } + implementation('org.industrial-devops:titan-ccp-common:0.0.3-SNAPSHOT') { changing = true } // Use JUnit test framework testImplementation 'junit:junit:4.12' diff --git a/execution/.gitignore b/execution/.gitignore index aab7f5dde3e52f51a5a788f9a83f44b2afeb123b..d4dceff0274cd6ab3296e85e995f7e5d504f114d 100644 --- a/execution/.gitignore +++ b/execution/.gitignore @@ -1,2 +1 @@ -exp* exp_counter.txt \ No newline at end of file diff --git a/execution/README.md b/execution/README.md index f7df1a32ad8b7898ba34b792095957a8e1362fc8..89a851a9c8bafd29a4232f142e7b0da8b88c1132 100644 --- a/execution/README.md +++ b/execution/README.md @@ -1,13 +1,20 @@ -# Requirements +# Theodolite Execution Framework +This directory contains the Theodolite framework for executing scalability +benchmarks in a Kubernetes cluster. As Theodolite aims for executing benchmarks +in realistic execution environments,, some third-party components are [required](#requirements). +After everything is installed and configured, you can move on the [execution of +benchmarks](#execution). -## Kubernetes Cluster +## Requirements + +### Kubernetes Cluster For executing benchmarks, access to Kubernetes cluster is required. We suggest to create a dedicated namespace for executing our benchmarks. The following services need to be available as well. -### Prometheus +#### Prometheus We suggest to use the [Prometheus Operator](https://github.com/coreos/prometheus-operator) and create a dedicated Prometheus instance for these benchmarks. @@ -34,7 +41,7 @@ depending on your cluster's security policies. For the individual benchmarking components to be monitored, [ServiceMonitors](https://github.com/coreos/prometheus-operator#customresourcedefinitions) are used. See the corresponding sections below for how to install them. -### Grafana +#### Grafana As with Prometheus, we suggest to create a dedicated Grafana instance. Grafana with our default configuration can be installed with Helm: @@ -60,7 +67,7 @@ Create the Configmap for the data source: kubectl apply -f infrastructure/grafana/prometheus-datasource-config-map.yaml ``` -### A Kafka cluster +#### A Kafka cluster One possible way to set up a Kafka cluster is via [Confluent's Helm Charts](https://github.com/confluentinc/cp-helm-charts). For using these Helm charts and conjuction with the Prometheus Operator (see @@ -68,7 +75,7 @@ below), we provide a [patch](https://github.com/SoerenHenning/cp-helm-charts) for these helm charts. Note that this patch is only required for observation and not for the actual benchmark execution and evaluation. -#### Our patched Confluent Helm Charts +##### Our patched Confluent Helm Charts To use our patched Confluent Helm Charts clone the [chart's repsoitory](https://github.com/SoerenHenning/cp-helm-charts). We also @@ -86,42 +93,42 @@ To let Prometheus scrape Kafka metrics, deploy a ServiceMonitor: kubectl apply -f infrastructure/kafka/service-monitor.yaml ``` -#### Other options for Kafka +##### Other options for Kafka -Other Kafka deployments, for example, using Strimzi, should work in similiar way. +Other Kafka deployments, for example, using Strimzi, should work in a similar way. -### The Kafka Lag Exporter +#### A Kafka Client Pod -[Lightbend's Kafka Lag Exporter](https://github.com/lightbend/kafka-lag-exporter) -can be installed via Helm. We also provide a [default configuration](infrastructure/kafka-lag-exporter/values.yaml). -To install it: +A permanently running pod used for Kafka configuration is started via: ```sh -helm install kafka-lag-exporter https://github.com/lightbend/kafka-lag-exporter/releases/download/v0.6.0/kafka-lag-exporter-0.6.0.tgz -f infrastructure/kafka-lag-exporter/values.yaml +kubectl apply -f infrastructure/kafka/kafka-client.yaml ``` -To let Prometheus scrape Kafka lag metrics, deploy a ServiceMonitor: +#### The Kafka Lag Exporter + +[Lightbend's Kafka Lag Exporter](https://github.com/lightbend/kafka-lag-exporter) +can be installed via Helm. We also provide a [default configuration](infrastructure/kafka-lag-exporter/values.yaml). +To install it: ```sh -kubectl apply -f infrastructure/kafka-lag-exporter/service-monitor.yaml +helm install kafka-lag-exporter https://github.com/lightbend/kafka-lag-exporter/releases/download/v0.6.0/kafka-lag-exporter-0.6.0.tgz -f infrastructure/kafka-lag-exporter/values.yaml ``` -## Python 3.7 - -For executing benchmarks and analyzing their results, a **Python 3.7** installation -is required. We suggest to use a virtual environment placed in the `.venv` directory. +### Python 3.7 -As set of requirements is needed for the analysis Jupyter notebooks and the -execution tool. You can install them with the following command (make sure to -be in your virtual environment if you use one): +For executing benchmarks, a **Python 3.7** installation is required. We suggest +to use a virtual environment placed in the `.venv` directory (in the Theodolite +root directory). As set of requirements is needed. You can install them with the following +command (make sure to be in your virtual environment if you use one): ```sh pip install -r requirements.txt ``` -## Required Manual Adjustments +### Required Manual Adjustments Depending on your setup, some additional adjustments may be necessary: @@ -133,7 +140,7 @@ Depending on your setup, some additional adjustments may be necessary: -# Execution +## Execution The `./run_loop.sh` is the entrypoint for all benchmark executions. Is has to be called as follows: diff --git a/execution/execution.sh b/execution/execution.sh deleted file mode 100755 index 0a1ead95049564b9d88f35d40ea622788119e4dc..0000000000000000000000000000000000000000 --- a/execution/execution.sh +++ /dev/null @@ -1,23 +0,0 @@ -#!/bin/bash - -./run_loop.sh 1 "25000 50000 75000 100000 125000 150000" "1 2 3 4 5" 40 #6*5=3Std -sleep 5m -./run_loop.sh 2 "6 7 8 9" "1 2 3 4 6 8 10 12 14 16 18 20" 40 #4*12=5Std -sleep 5m -./run_loop.sh 3 "25000 50000 75000 100000 125000 150000" "1 2 3 4 5 6" 40 #6*6=3.5Std -sleep 5m -./run_loop.sh 4 "25000 50000 75000 100000 125000 150000" "1 2 4 6 8 10 12 14 16 18 20 30 40 50 60 70 80 90" 40 #6*18=11Std -sleep 5m - -./run_loop.sh 1 "25000 50000 75000 100000 125000 150000" "1 2 3 4 5" 400 #6*5=3Std -sleep 5m -./run_loop.sh 2 "6 7 8 9" "1 2 3 4 6 8 10 12 14 16 18 20" 400 #4*12=5Std -sleep 5m -./run_loop.sh 3 "25000 50000 75000 100000 125000 150000" "1 2 3 4 5 6" 400 #6*6=3.5Std -sleep 5m -./run_loop.sh 4 "25000 50000 75000 100000 125000 150000" "1 2 4 6 8 10 12 14 16 18 20 30 40 50 60 70 80 90" 400 #6*18=11Std -sleep 5m -./run_loop.sh 4 "150000" "100 110 120 130 140 150 160 17 18 190 200" 400 #6*18=11Std -sleep 5m -# For commit interval evaluation -./run_loop.sh 4 "5000 10000 15000 20000 25000 30000" "1 2 3 4 5 6 7 8 9 10 11 12 13 14 15" 160 \ No newline at end of file diff --git a/execution/execution_tmp_200507.sh b/execution/execution_tmp_200507.sh deleted file mode 100644 index 932940ae78dc5e5f0d2362da1047329a22713f51..0000000000000000000000000000000000000000 --- a/execution/execution_tmp_200507.sh +++ /dev/null @@ -1,17 +0,0 @@ -#!/bin/bash - -#./run_loop.sh 1 "50000 100000 150000 200000 250000 300000" "1 2 3 4 5" 40 #3Std -./run_loop.sh 1 "200000 250000 300000" "1 2 3 4 5" 40 1000m 4Gi 100 5 #1.5Std -sleep 1m -#./run_loop.sh 1 "50000 100000 150000 200000 250000 300000" "1 2 3 4 5" 400 #3Std -./run_loop.sh 1 "200000 250000 300000" "1 2 3 4 5" 400 1000m 4Gi 100 5 #1.5Std -sleep 1m - -#./run_loop.sh 3 "50000 100000 150000 200000 250000 300000" "1 2 3 4 5 6 7 8 9 10" 40 #6 Std -./run_loop.sh 3 "200000 250000 300000" "1 2 3 4 5 6 7 8 9 10" 40 1000m 4Gi 100 5 #3 Std -sleep 1m -#./run_loop.sh 3 "50000 100000 150000 200000 250000 300000" "1 2 3 4 5 6 7 8 9 10" 400 #6 Std -./run_loop.sh 3 "200000 250000 300000" "1 2 3 4 5 6 7 8 9 10" 400 1000m 4Gi 100 5 #3 Std -sleep 1m - -./run_loop.sh 1 "50000 100000 150000 200000 250000 300000" "1 2 3 4 5" 40 500m 2Gi 100 5 #3Std diff --git a/execution/exp_counter.txt b/execution/exp_counter.txt deleted file mode 100644 index 56a6051ca2b02b04ef92d5150c9ef600403cb1de..0000000000000000000000000000000000000000 --- a/execution/exp_counter.txt +++ /dev/null @@ -1 +0,0 @@ -1 \ No newline at end of file diff --git a/execution/experiments.txt b/execution/experiments.txt deleted file mode 100644 index 5ef8210943cb4404777032c7978f2ee3fb6bca56..0000000000000000000000000000000000000000 --- a/execution/experiments.txt +++ /dev/null @@ -1,2 +0,0 @@ -# Test Partition count of 100 -./run_loop.sh 1 "10000 50000 100000 200000" "1, 2, 4, 8, 12, 16, 20" 100 \ No newline at end of file diff --git a/execution/infrastructure/kafka-lag-exporter/install.sh b/execution/infrastructure/kafka-lag-exporter/install.sh old mode 100644 new mode 100755 diff --git a/execution/infrastructure/kafka-lag-exporter/service-monitor.yaml b/execution/infrastructure/kafka-lag-exporter/service-monitor.yaml deleted file mode 100644 index 141dd96f9bb3973bb0f22a4aa04c29768e0a1376..0000000000000000000000000000000000000000 --- a/execution/infrastructure/kafka-lag-exporter/service-monitor.yaml +++ /dev/null @@ -1,15 +0,0 @@ -apiVersion: monitoring.coreos.com/v1 -kind: ServiceMonitor -metadata: - labels: - app: kafka-lag-exporter - appScope: titan-ccp - name: kafka-lag-exporter -spec: - selector: - matchLabels: - #app: cp-kafka - jobLabel: kafka-lag-exporter - endpoints: - - port: http - interval: 5s \ No newline at end of file diff --git a/execution/infrastructure/kafka-lag-exporter/values.yaml b/execution/infrastructure/kafka-lag-exporter/values.yaml index 8e31cc9fdf31f0a5d3b4542c3a227e4de212f6b2..b83a911283a7e8264f982f9eb5d550ad5497ec9d 100644 --- a/execution/infrastructure/kafka-lag-exporter/values.yaml +++ b/execution/infrastructure/kafka-lag-exporter/values.yaml @@ -7,8 +7,10 @@ pollIntervalSeconds: 15 prometheus: serviceMonitor: - enabled: false - interval: "30s" + enabled: true + interval: "5s" + additionalLabels: + appScope: titan-ccp # service monitor label selectors: https://github.com/helm/charts/blob/f5a751f174263971fafd21eee4e35416d6612a3d/stable/prometheus-operator/templates/prometheus/prometheus.yaml#L74 # additionalLabels: - # prometheus: k8s + # prometheus: k8s \ No newline at end of file diff --git a/execution/infrastructure/kafka/kafka-client.yaml b/execution/infrastructure/kafka/kafka-client.yaml new file mode 100644 index 0000000000000000000000000000000000000000..4c7d3ed239faed62022c110e92b264b338a8c9a4 --- /dev/null +++ b/execution/infrastructure/kafka/kafka-client.yaml @@ -0,0 +1,12 @@ +apiVersion: v1 +kind: Pod +metadata: + name: kafka-client-2 +spec: + containers: + - name: kafka-client + image: confluentinc/cp-enterprise-kafka:5.4.0 + command: + - sh + - -c + - "exec tail -f /dev/null" \ No newline at end of file diff --git a/execution/lag_analysis.py b/execution/lag_analysis.py index c529853fd423babf0783331b02810f4e892af357..23e3d5f6c9552814f5301cd81e517f49d044cd33 100644 --- a/execution/lag_analysis.py +++ b/execution/lag_analysis.py @@ -11,7 +11,7 @@ exp_id = sys.argv[1] benchmark = sys.argv[2] dim_value = sys.argv[3] instances = sys.argv[4] -execution_minutes = 5 +execution_minutes = int(sys.argv[5]) time_diff_ms = int(os.getenv('CLOCK_DIFF_MS', 0)) #http://localhost:9090/api/v1/query_range?query=sum%20by(job,topic)(kafka_consumer_consumer_fetch_manager_metrics_records_lag)&start=2015-07-01T20:10:30.781Z&end=2020-07-01T20:11:00.781Z&step=15s diff --git a/execution/requirements.txt b/execution/requirements.txt index 17f29b0b16a3f130399612c7bffd3ce12896c946..7224efe80aa1686bb3de90b2beac5df47a56ed8f 100644 --- a/execution/requirements.txt +++ b/execution/requirements.txt @@ -1,62 +1,4 @@ -attrs==19.3.0 -backcall==0.1.0 -bleach==3.1.1 -certifi==2019.11.28 -chardet==3.0.4 -cycler==0.10.0 -decorator==4.4.2 -defusedxml==0.6.0 -entrypoints==0.3 -idna==2.9 -importlib-metadata==1.5.0 -ipykernel==5.1.4 -ipython==7.13.0 -ipython-genutils==0.2.0 -ipywidgets==7.5.1 -jedi==0.16.0 -Jinja2==2.11.1 -joblib==0.14.1 -jsonschema==3.2.0 -jupyter==1.0.0 -jupyter-client==6.0.0 -jupyter-console==6.1.0 -jupyter-core==4.6.3 -kiwisolver==1.1.0 -MarkupSafe==1.1.1 matplotlib==3.2.0 -mistune==0.8.4 -nbconvert==5.6.1 -nbformat==5.0.4 -notebook==6.0.3 -numpy==1.18.1 pandas==1.0.1 -pandocfilters==1.4.2 -parso==0.6.2 -pexpect==4.8.0 -pickleshare==0.7.5 -prometheus-client==0.7.1 -prompt-toolkit==3.0.4 -ptyprocess==0.6.0 -Pygments==2.6.1 -pyparsing==2.4.6 -pyrsistent==0.15.7 -python-dateutil==2.8.1 -pytz==2019.3 -pyzmq==19.0.0 -qtconsole==4.7.1 -QtPy==1.9.0 requests==2.23.0 -scikit-learn==0.22.2.post1 -scipy==1.4.1 -Send2Trash==1.5.0 -six==1.14.0 -sklearn==0.0 -terminado==0.8.3 -testpath==0.4.4 -tornado==6.0.4 -traitlets==4.3.3 -urllib3==1.25.8 -wcwidth==0.1.8 -webencodings==0.5.1 -widgetsnbextension==3.5.1 -zipp==3.1.0 +scikit-learn==0.22.2.post1 \ No newline at end of file diff --git a/execution/run_loop.sh b/execution/run_loop.sh index e63c0ecdfc54d27456afd720cc66303bfb143b28..b139ad6ff3e1950baa3d7f4579f574f7231ecb5f 100755 --- a/execution/run_loop.sh +++ b/execution/run_loop.sh @@ -10,7 +10,7 @@ KAFKA_STREAMS_COMMIT_INTERVAL_MS=${7:-100} EXECUTION_MINUTES=${8:-5} # Get and increment counter -EXP_ID=$(cat exp_counter.txt) +EXP_ID=$(cat exp_counter.txt 2>/dev/null || echo "0") echo $((EXP_ID+1)) > exp_counter.txt # Store meta information diff --git a/execution/run_uc1-new.sh b/execution/run_uc1-new.sh index f3beccdf6a15b2155390f2ed7a31430e15fcc798..15bef5f13905876f64644bfbdb97930536255a73 100755 --- a/execution/run_uc1-new.sh +++ b/execution/run_uc1-new.sh @@ -34,7 +34,11 @@ echo "$WORKLOAD_GENERATOR_YAML" | kubectl apply -f - # Start application REPLICAS=$INSTANCES -#kubectl apply -f uc3-application/aggregation-deployment.yaml +# When not using `sed` anymore, use `kubectl apply -f uc1-application` +kubectl apply -f uc1-application/aggregation-service.yaml +kubectl apply -f uc1-application/jmx-configmap.yaml +kubectl apply -f uc1-application/service-monitor.yaml +#kubectl apply -f uc1-application/aggregation-deployment.yaml APPLICATION_YAML=$(sed "s/{{CPU_LIMIT}}/$CPU_LIMIT/g; s/{{MEMORY_LIMIT}}/$MEMORY_LIMIT/g; s/{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}/$KAFKA_STREAMS_COMMIT_INTERVAL_MS/g" uc1-application/aggregation-deployment.yaml) echo "$APPLICATION_YAML" | kubectl apply -f - kubectl scale deployment titan-ccp-aggregation --replicas=$REPLICAS @@ -44,7 +48,7 @@ sleep ${EXECUTION_MINUTES}m # Run eval script source ../.venv/bin/activate -python lag_analysis.py $EXP_ID uc1 $DIM_VALUE $INSTANCES +python lag_analysis.py $EXP_ID uc1 $DIM_VALUE $INSTANCES $EXECUTION_MINUTES deactivate # Stop wl and app @@ -52,6 +56,9 @@ deactivate #sed "s/{{INSTANCES}}/1/g" uc1-workload-generator/deployment.yaml | kubectl delete -f - #sed "s/{{NUM_SENSORS}}/$NUM_SENSORS/g; s/{{INSTANCES}}/$WL_INSTANCES/g" uc1-workload-generator/deployment.yaml | kubectl delete -f - echo "$WORKLOAD_GENERATOR_YAML" | kubectl delete -f - +kubectl delete -f uc1-application/aggregation-service.yaml +kubectl delete -f uc1-application/jmx-configmap.yaml +kubectl delete -f uc1-application/service-monitor.yaml #kubectl delete -f uc1-application/aggregation-deployment.yaml echo "$APPLICATION_YAML" | kubectl delete -f - diff --git a/execution/run_uc2-new.sh b/execution/run_uc2-new.sh index d913b135ecad59ae22d480645125a502d8b2769c..1ec781ace658457636d67ad1ad00fb29e3260f46 100755 --- a/execution/run_uc2-new.sh +++ b/execution/run_uc2-new.sh @@ -30,6 +30,10 @@ sed "s/{{NUM_NESTED_GROUPS}}/$NUM_NESTED_GROUPS/g" uc2-workload-generator/deploy # Start application REPLICAS=$INSTANCES +# When not using `sed` anymore, use `kubectl apply -f uc2-application` +kubectl apply -f uc2-application/aggregation-service.yaml +kubectl apply -f uc2-application/jmx-configmap.yaml +kubectl apply -f uc2-application/service-monitor.yaml #kubectl apply -f uc2-application/aggregation-deployment.yaml APPLICATION_YAML=$(sed "s/{{CPU_LIMIT}}/$CPU_LIMIT/g; s/{{MEMORY_LIMIT}}/$MEMORY_LIMIT/g; s/{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}/$KAFKA_STREAMS_COMMIT_INTERVAL_MS/g" uc2-application/aggregation-deployment.yaml) echo "$APPLICATION_YAML" | kubectl apply -f - @@ -40,11 +44,14 @@ sleep ${EXECUTION_MINUTES}m # Run eval script source ../.venv/bin/activate -python lag_analysis.py $EXP_ID uc2 $DIM_VALUE $INSTANCES +python lag_analysis.py $EXP_ID uc2 $DIM_VALUE $INSTANCES $EXECUTION_MINUTES deactivate # Stop wl and app kubectl delete -f uc2-workload-generator/deployment.yaml +kubectl delete -f uc2-application/aggregation-service.yaml +kubectl delete -f uc2-application/jmx-configmap.yaml +kubectl delete -f uc2-application/service-monitor.yaml #kubectl delete -f uc2-application/aggregation-deployment.yaml echo "$APPLICATION_YAML" | kubectl delete -f - diff --git a/execution/run_uc3-new.sh b/execution/run_uc3-new.sh index 725b1ab955f3bae33748bd451603a0307d34746e..2295d615332077361e3ad5fd29bdb1e5d2c47893 100755 --- a/execution/run_uc3-new.sh +++ b/execution/run_uc3-new.sh @@ -34,6 +34,10 @@ echo "$WORKLOAD_GENERATOR_YAML" | kubectl apply -f - # Start application REPLICAS=$INSTANCES +# When not using `sed` anymore, use `kubectl apply -f uc3-application` +kubectl apply -f uc3-application/aggregation-service.yaml +kubectl apply -f uc3-application/jmx-configmap.yaml +kubectl apply -f uc3-application/service-monitor.yaml #kubectl apply -f uc3-application/aggregation-deployment.yaml APPLICATION_YAML=$(sed "s/{{CPU_LIMIT}}/$CPU_LIMIT/g; s/{{MEMORY_LIMIT}}/$MEMORY_LIMIT/g; s/{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}/$KAFKA_STREAMS_COMMIT_INTERVAL_MS/g" uc3-application/aggregation-deployment.yaml) echo "$APPLICATION_YAML" | kubectl apply -f - @@ -44,14 +48,17 @@ sleep ${EXECUTION_MINUTES}m # Run eval script source ../.venv/bin/activate -python lag_analysis.py $EXP_ID uc3 $DIM_VALUE $INSTANCES +python lag_analysis.py $EXP_ID uc3 $DIM_VALUE $INSTANCES $EXECUTION_MINUTES deactivate # Stop wl and app #kubectl delete -f uc3-workload-generator/deployment.yaml #sed "s/{{INSTANCES}}/1/g" uc3-workload-generator/deployment.yaml | kubectl delete -f - echo "$WORKLOAD_GENERATOR_YAML" | kubectl delete -f - -#kubectl delete -f uc1-application/aggregation-deployment.yaml +kubectl delete -f uc3-application/aggregation-service.yaml +kubectl delete -f uc3-application/jmx-configmap.yaml +kubectl delete -f uc3-application/service-monitor.yaml +#kubectl delete -f uc3-application/aggregation-deployment.yaml #sed "s/{{CPU_LIMIT}}/1000m/g; s/{{MEMORY_LIMIT}}/4Gi/g; s/{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}/100/g" uc3-application/aggregation-deployment.yaml | kubectl delete -f - echo "$APPLICATION_YAML" | kubectl delete -f - diff --git a/execution/run_uc4-new.sh b/execution/run_uc4-new.sh index c9fbfdae53d2f9c0336de24c857f02520c8455a4..cec4fd9c45e1fd044e093f6fa0786b55b16d3065 100755 --- a/execution/run_uc4-new.sh +++ b/execution/run_uc4-new.sh @@ -32,6 +32,10 @@ sed "s/{{NUM_SENSORS}}/$NUM_SENSORS/g" uc4-workload-generator/deployment.yaml | # Start application REPLICAS=$INSTANCES #AGGREGATION_DURATION_DAYS=$DIM_VALUE +# When not using `sed` anymore, use `kubectl apply -f uc4-application` +kubectl apply -f uc4-application/aggregation-service.yaml +kubectl apply -f uc4-application/jmx-configmap.yaml +kubectl apply -f uc4-application/service-monitor.yaml #kubectl apply -f uc4-application/aggregation-deployment.yaml #sed "s/{{AGGREGATION_DURATION_DAYS}}/$AGGREGATION_DURATION_DAYS/g" uc4-application/aggregation-deployment.yaml | kubectl apply -f - APPLICATION_YAML=$(sed "s/{{CPU_LIMIT}}/$CPU_LIMIT/g; s/{{MEMORY_LIMIT}}/$MEMORY_LIMIT/g; s/{{KAFKA_STREAMS_COMMIT_INTERVAL_MS}}/$KAFKA_STREAMS_COMMIT_INTERVAL_MS/g" uc4-application/aggregation-deployment.yaml) @@ -43,11 +47,14 @@ sleep ${EXECUTION_MINUTES}m # Run eval script source ../.venv/bin/activate -python lag_analysis.py $EXP_ID uc4 $DIM_VALUE $INSTANCES +python lag_analysis.py $EXP_ID uc4 $DIM_VALUE $INSTANCES $EXECUTION_MINUTES deactivate # Stop wl and app kubectl delete -f uc4-workload-generator/deployment.yaml +kubectl delete -f uc4-application/aggregation-service.yaml +kubectl delete -f uc4-application/jmx-configmap.yaml +kubectl delete -f uc4-application/service-monitor.yaml #kubectl delete -f uc4-application/aggregation-deployment.yaml echo "$APPLICATION_YAML" | kubectl delete -f - diff --git a/execution/uc1-workload-generator/jmx-configmap.yaml b/execution/uc1-workload-generator/jmx-configmap.yaml deleted file mode 100644 index ea16037d74978a9273936c26eb06420983dd3139..0000000000000000000000000000000000000000 --- a/execution/uc1-workload-generator/jmx-configmap.yaml +++ /dev/null @@ -1,10 +0,0 @@ -apiVersion: v1 -kind: ConfigMap -metadata: - name: load-generator-jmx-configmap -data: - jmx-kafka-prometheus.yml: |+ - jmxUrl: service:jmx:rmi:///jndi/rmi://localhost:5555/jmxrmi - lowercaseOutputName: true - lowercaseOutputLabelNames: true - ssl: false diff --git a/execution/uc2-workload-generator/jmx-configmap.yaml b/execution/uc2-workload-generator/jmx-configmap.yaml deleted file mode 100644 index ea16037d74978a9273936c26eb06420983dd3139..0000000000000000000000000000000000000000 --- a/execution/uc2-workload-generator/jmx-configmap.yaml +++ /dev/null @@ -1,10 +0,0 @@ -apiVersion: v1 -kind: ConfigMap -metadata: - name: load-generator-jmx-configmap -data: - jmx-kafka-prometheus.yml: |+ - jmxUrl: service:jmx:rmi:///jndi/rmi://localhost:5555/jmxrmi - lowercaseOutputName: true - lowercaseOutputLabelNames: true - ssl: false diff --git a/execution/uc3-workload-generator/jmx-configmap.yaml b/execution/uc3-workload-generator/jmx-configmap.yaml deleted file mode 100644 index ea16037d74978a9273936c26eb06420983dd3139..0000000000000000000000000000000000000000 --- a/execution/uc3-workload-generator/jmx-configmap.yaml +++ /dev/null @@ -1,10 +0,0 @@ -apiVersion: v1 -kind: ConfigMap -metadata: - name: load-generator-jmx-configmap -data: - jmx-kafka-prometheus.yml: |+ - jmxUrl: service:jmx:rmi:///jndi/rmi://localhost:5555/jmxrmi - lowercaseOutputName: true - lowercaseOutputLabelNames: true - ssl: false diff --git a/execution/uc4-workload-generator/jmx-configmap.yaml b/execution/uc4-workload-generator/jmx-configmap.yaml deleted file mode 100644 index ea16037d74978a9273936c26eb06420983dd3139..0000000000000000000000000000000000000000 --- a/execution/uc4-workload-generator/jmx-configmap.yaml +++ /dev/null @@ -1,10 +0,0 @@ -apiVersion: v1 -kind: ConfigMap -metadata: - name: load-generator-jmx-configmap -data: - jmx-kafka-prometheus.yml: |+ - jmxUrl: service:jmx:rmi:///jndi/rmi://localhost:5555/jmxrmi - lowercaseOutputName: true - lowercaseOutputLabelNames: true - ssl: false diff --git a/settings.gradle b/settings.gradle index 4a8dfe3a6f7372f3fef319f94afef877e5fe74d8..6b2922feee6fe727fe0b12ed3cd6d1ff6aa48048 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,6 +1,7 @@ rootProject.name = 'scalability-benchmarking' -include 'workload-generator-common' +include 'workload-generator-commons' +include 'application-kafkastreams-commons' include 'uc1-workload-generator' include 'uc1-application' diff --git a/uc1-application/build.gradle b/uc1-application/build.gradle index ec18bbebfae085ea227cd94dd19ed5fe06cfc80d..3b197e85116f41dde5574d9253d60e1146fe44a2 100644 --- a/uc1-application/build.gradle +++ b/uc1-application/build.gradle @@ -1 +1 @@ -mainClassName = "spesb.uc1.application.HistoryService" +mainClassName = "theodolite.uc1.application.HistoryService" diff --git a/uc1-application/src/main/java/theodolite/uc1/application/ConfigurationKeys.java b/uc1-application/src/main/java/theodolite/uc1/application/ConfigurationKeys.java new file mode 100644 index 0000000000000000000000000000000000000000..ee4113c3088629fe01988721e32d9704f5d30da5 --- /dev/null +++ b/uc1-application/src/main/java/theodolite/uc1/application/ConfigurationKeys.java @@ -0,0 +1,25 @@ +package theodolite.uc1.application; + +/** + * Keys to access configuration parameters. + */ +public final class ConfigurationKeys { + + public static final String APPLICATION_NAME = "application.name"; + + public static final String APPLICATION_VERSION = "application.version"; + + public static final String NUM_THREADS = "num.threads"; + + public static final String COMMIT_INTERVAL_MS = "commit.interval.ms"; + + public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering"; + + public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; + + public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; + + + private ConfigurationKeys() {} + +} diff --git a/uc1-application/src/main/java/spesb/uc1/application/HistoryService.java b/uc1-application/src/main/java/theodolite/uc1/application/HistoryService.java similarity index 70% rename from uc1-application/src/main/java/spesb/uc1/application/HistoryService.java rename to uc1-application/src/main/java/theodolite/uc1/application/HistoryService.java index 18a39da7229d961249be900eeeff679e267a1eef..b551fb7f8ff74f5ddc7e3aad901c1412075c6da6 100644 --- a/uc1-application/src/main/java/spesb/uc1/application/HistoryService.java +++ b/uc1-application/src/main/java/theodolite/uc1/application/HistoryService.java @@ -1,9 +1,9 @@ -package spesb.uc1.application; +package theodolite.uc1.application; import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; -import spesb.uc1.streamprocessing.KafkaStreamsBuilder; +import theodolite.uc1.streamprocessing.Uc1KafkaStreamsBuilder; import titan.ccp.common.configuration.Configurations; /** @@ -30,14 +30,20 @@ public class HistoryService { */ private void createKafkaStreamsApplication() { - final KafkaStreams kafkaStreams = new KafkaStreamsBuilder() - .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) - .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) + final Uc1KafkaStreamsBuilder uc1KafkaStreamsBuilder = new Uc1KafkaStreamsBuilder(); + uc1KafkaStreamsBuilder.inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)); + + final KafkaStreams kafkaStreams = uc1KafkaStreamsBuilder + .applicationName(this.config.getString(ConfigurationKeys.APPLICATION_NAME)) + .applicationVersion(this.config.getString(ConfigurationKeys.APPLICATION_VERSION)) .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) + .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) .build(); + this.stopEvent.thenRun(kafkaStreams::close); + kafkaStreams.start(); } diff --git a/uc1-application/src/main/java/spesb/uc1/streamprocessing/TopologyBuilder.java b/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java similarity index 93% rename from uc1-application/src/main/java/spesb/uc1/streamprocessing/TopologyBuilder.java rename to uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java index 279b70d0b7311f2b45b986e54cdf5b6c81c28263..824a8dadd4d80dd29d09b21543fa6da6aedf5365 100644 --- a/uc1-application/src/main/java/spesb/uc1/streamprocessing/TopologyBuilder.java +++ b/uc1-application/src/main/java/theodolite/uc1/streamprocessing/TopologyBuilder.java @@ -1,4 +1,4 @@ -package spesb.uc1.streamprocessing; +package theodolite.uc1.streamprocessing; import com.google.gson.Gson; import org.apache.kafka.common.serialization.Serdes; @@ -18,8 +18,7 @@ public class TopologyBuilder { private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); private final String inputTopic; - private final Gson gson; - + private final Gson gson = new Gson(); private final StreamsBuilder builder = new StreamsBuilder(); /** @@ -27,14 +26,12 @@ public class TopologyBuilder { */ public TopologyBuilder(final String inputTopic) { this.inputTopic = inputTopic; - this.gson = new Gson(); } /** * Build the {@link Topology} for the History microservice. */ public Topology build() { - this.builder .stream(this.inputTopic, Consumed.with( Serdes.String(), diff --git a/uc1-application/src/main/java/theodolite/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java b/uc1-application/src/main/java/theodolite/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java new file mode 100644 index 0000000000000000000000000000000000000000..4af3f130373d0596232921b9c5cc0b48df573b72 --- /dev/null +++ b/uc1-application/src/main/java/theodolite/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java @@ -0,0 +1,23 @@ +package theodolite.uc1.streamprocessing; + +import java.util.Objects; +import org.apache.kafka.streams.Topology; +import theodolite.commons.kafkastreams.KafkaStreamsBuilder; + +/** + * Builder for the Kafka Streams configuration. + */ +public class Uc1KafkaStreamsBuilder extends KafkaStreamsBuilder { + private String inputTopic; // NOPMD + + public KafkaStreamsBuilder inputTopic(final String inputTopic) { + this.inputTopic = inputTopic; + return this; + } + + @Override + protected Topology buildTopology() { + Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); + return new TopologyBuilder(this.inputTopic).build(); + } +} diff --git a/uc1-application/src/main/resources/META-INF/application.properties b/uc1-application/src/main/resources/META-INF/application.properties index ef279332f911108fa8ca42d840d4a147460e8e35..8f029be66f9decadc87c8e88f58698d1422d596d 100644 --- a/uc1-application/src/main/resources/META-INF/application.properties +++ b/uc1-application/src/main/resources/META-INF/application.properties @@ -1,6 +1,10 @@ +application.name="uc1-application" +application.version="0.0.1" + kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input kafka.output.topic=output + num.threads=1 commit.interval.ms=100 cache.max.bytes.buffering=-1 diff --git a/uc1-workload-generator/build.gradle b/uc1-workload-generator/build.gradle index d934bd09de1d64cadac982669d7cab5b564f0dd5..9cc0bdbf01032efa3b251db06a2837cc9b920675 100644 --- a/uc1-workload-generator/build.gradle +++ b/uc1-workload-generator/build.gradle @@ -1 +1 @@ -mainClassName = "spesb.uc1.workloadgenerator.LoadGenerator" +mainClassName = "theodolite.uc1.workloadgenerator.LoadGenerator" diff --git a/uc1-workload-generator/src/main/java/spesb/uc1/workloadgenerator/LoadGenerator.java b/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java similarity index 97% rename from uc1-workload-generator/src/main/java/spesb/uc1/workloadgenerator/LoadGenerator.java rename to uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java index 664ce366ddfb84b9a3f4748ab86fbc7a62c93a43..27b0f7c0d8bcdd450de3b38b22939f1475bccc35 100644 --- a/uc1-workload-generator/src/main/java/spesb/uc1/workloadgenerator/LoadGenerator.java +++ b/uc1-workload-generator/src/main/java/theodolite/uc1/workloadgenerator/LoadGenerator.java @@ -1,4 +1,4 @@ -package spesb.uc1.workloadgenerator; +package theodolite.uc1.workloadgenerator; import common.dimensions.Duration; import common.dimensions.KeySpace; @@ -14,6 +14,7 @@ import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.ProducerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import theodolite.kafkasender.KafkaRecordSender; import titan.ccp.models.records.ActivePowerRecord; /** diff --git a/uc2-application/build.gradle b/uc2-application/build.gradle index 90f54fc6110ac88ef7d0d80ae8ec60c6087ce808..ea3d8779a0cd5406808df190d623d1508a143b9d 100644 --- a/uc2-application/build.gradle +++ b/uc2-application/build.gradle @@ -1 +1 @@ -mainClassName = "spesb.uc2.application.AggregationService" +mainClassName = "theodolite.uc2.application.AggregationService" diff --git a/uc2-application/src/main/java/spesb/uc2/application/AggregationService.java b/uc2-application/src/main/java/spesb/uc2/application/AggregationService.java deleted file mode 100644 index 79d8c94c75ede32d92485d4b3c49d716ae19ccf8..0000000000000000000000000000000000000000 --- a/uc2-application/src/main/java/spesb/uc2/application/AggregationService.java +++ /dev/null @@ -1,59 +0,0 @@ -package spesb.uc2.application; - -import java.time.Duration; -import java.util.concurrent.CompletableFuture; -import org.apache.commons.configuration2.Configuration; -import org.apache.kafka.streams.KafkaStreams; -import spesb.uc2.streamprocessing.KafkaStreamsBuilder; -import titan.ccp.common.configuration.Configurations; - -/** - * A microservice that manages the history and, therefore, stores and aggregates - * incoming measurements. - * - */ -public class AggregationService { - - private final Configuration config = Configurations.create(); - - private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); - - /** - * Start the service. - */ - public void run() { - this.createKafkaStreamsApplication(); - } - - public static void main(final String[] args) { - new AggregationService().run(); - } - - /** - * Build and start the underlying Kafka Streams Application of the service. - * - * @param clusterSession the database session which the application should use. - */ - private void createKafkaStreamsApplication() { - final KafkaStreams kafkaStreams = new KafkaStreamsBuilder() - .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) - .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) - .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) - .configurationTopic(this.config.getString(ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC)) - .windowSize(Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_SIZE_MS))) - .gracePeriod(Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_GRACE_MS))) - .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) - .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) - .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)).build(); - this.stopEvent.thenRun(kafkaStreams::close); - kafkaStreams.start(); - } - - /** - * Stop the service. - */ - public void stop() { - this.stopEvent.complete(null); - } - -} diff --git a/uc2-application/src/main/java/spesb/uc2/streamprocessing/KafkaStreamsBuilder.java b/uc2-application/src/main/java/spesb/uc2/streamprocessing/KafkaStreamsBuilder.java deleted file mode 100644 index 9b43f5e66fb4336602c026df8941d5545f39bfb4..0000000000000000000000000000000000000000 --- a/uc2-application/src/main/java/spesb/uc2/streamprocessing/KafkaStreamsBuilder.java +++ /dev/null @@ -1,134 +0,0 @@ -package spesb.uc2.streamprocessing; - -import java.time.Duration; -import java.util.Objects; -import java.util.Properties; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.StreamsConfig; - -/** - * Builder for the Kafka Streams configuration. - */ -public class KafkaStreamsBuilder { // NOPMD builder method - - private static final String APPLICATION_NAME = "titan-ccp-aggregation"; - private static final String APPLICATION_VERSION = "0.0.1"; - - private static final Duration WINDOW_SIZE_DEFAULT = Duration.ofSeconds(1); - private static final Duration GRACE_PERIOD_DEFAULT = Duration.ZERO; - - // private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStreamsBuilder.class); - - private String bootstrapServers; // NOPMD - private String inputTopic; // NOPMD - private String outputTopic; // NOPMD - private String configurationTopic; // NOPMD - private Duration windowSize = null; // NOPMD - private Duration gracePeriod = null; // NOPMD - private int numThreads = -1; // NOPMD - private int commitIntervalMs = -1; // NOPMD - private int cacheMaxBytesBuffering = -1; // NOPMD - - public KafkaStreamsBuilder inputTopic(final String inputTopic) { - this.inputTopic = inputTopic; - return this; - } - - public KafkaStreamsBuilder outputTopic(final String outputTopic) { - this.outputTopic = outputTopic; - return this; - } - - public KafkaStreamsBuilder configurationTopic(final String configurationTopic) { - this.configurationTopic = configurationTopic; - return this; - } - - public KafkaStreamsBuilder windowSize(final Duration windowSize) { - this.windowSize = Objects.requireNonNull(windowSize); - return this; - } - - public KafkaStreamsBuilder gracePeriod(final Duration gracePeriod) { - this.gracePeriod = Objects.requireNonNull(gracePeriod); - return this; - } - - public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) { - this.bootstrapServers = bootstrapServers; - return this; - } - - /** - * Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus - * one for using the default. - */ - public KafkaStreamsBuilder numThreads(final int numThreads) { - if (numThreads < -1 || numThreads == 0) { - throw new IllegalArgumentException("Number of threads must be greater 0 or -1."); - } - this.numThreads = numThreads; - return this; - } - - /** - * Sets the Kafka Streams property for the frequency with which to save the position (offsets in - * source topics) of tasks (commit.interval.ms). Must be zero for processing all record, for - * example, when processing bulks of records. Can be minus one for using the default. - */ - public KafkaStreamsBuilder commitIntervalMs(final int commitIntervalMs) { - if (commitIntervalMs < -1) { - throw new IllegalArgumentException("Commit interval must be greater or equal -1."); - } - this.commitIntervalMs = commitIntervalMs; - return this; - } - - /** - * Sets the Kafka Streams property for maximum number of memory bytes to be used for record caches - * across all threads (cache.max.bytes.buffering). Must be zero for processing all record, for - * example, when processing bulks of records. Can be minus one for using the default. - */ - public KafkaStreamsBuilder cacheMaxBytesBuffering(final int cacheMaxBytesBuffering) { - if (cacheMaxBytesBuffering < -1) { - throw new IllegalArgumentException("Cache max bytes buffering must be greater or equal -1."); - } - this.cacheMaxBytesBuffering = cacheMaxBytesBuffering; - return this; - } - - /** - * Builds the {@link KafkaStreams} instance. - */ - public KafkaStreams build() { - Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); - Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); - Objects.requireNonNull(this.configurationTopic, "Configuration topic has not been set."); - // TODO log parameters - final TopologyBuilder topologyBuilder = new TopologyBuilder( - this.inputTopic, - this.outputTopic, - this.configurationTopic, - this.windowSize == null ? WINDOW_SIZE_DEFAULT : this.windowSize, - this.gracePeriod == null ? GRACE_PERIOD_DEFAULT : this.gracePeriod); - return new KafkaStreams(topologyBuilder.build(), this.buildProperties()); - } - - private Properties buildProperties() { - final Properties properties = new Properties(); - properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); - properties.put(StreamsConfig.APPLICATION_ID_CONFIG, - APPLICATION_NAME + '-' + APPLICATION_VERSION); // TODO as parameter - if (this.numThreads > 0) { - properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads); - } - if (this.commitIntervalMs >= 0) { - properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs); - } - if (this.cacheMaxBytesBuffering >= 0) { - properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuffering); - } - return properties; - } - -} diff --git a/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java b/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java new file mode 100644 index 0000000000000000000000000000000000000000..2f37bf757aaa4d745a7f3a6416b359da73e2babc --- /dev/null +++ b/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java @@ -0,0 +1,66 @@ +package theodolite.uc2.application; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import org.apache.commons.configuration2.Configuration; +import org.apache.kafka.streams.KafkaStreams; +import theodolite.uc2.streamprocessing.Uc2KafkaStreamsBuilder; +import titan.ccp.common.configuration.Configurations; + +/** + * A microservice that manages the history and, therefore, stores and aggregates incoming + * measurements. + * + */ +public class AggregationService { + + private final Configuration config = Configurations.create(); + + private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); + + /** + * Start the service. + */ + public void run() { + this.createKafkaStreamsApplication(); + } + + public static void main(final String[] args) { + new AggregationService().run(); + } + + /** + * Build and start the underlying Kafka Streams Application of the service. + * + * @param clusterSession the database session which the application should use. + */ + private void createKafkaStreamsApplication() { + // Use case specific stream configuration + final Uc2KafkaStreamsBuilder uc2KafkaStreamsBuilder = new Uc2KafkaStreamsBuilder(); + uc2KafkaStreamsBuilder + .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) + .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) + .configurationTopic(this.config.getString(ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC)) + .windowSize(Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_SIZE_MS))) + .gracePeriod(Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_GRACE_MS))); + + // Configuration of the stream application + final KafkaStreams kafkaStreams = uc2KafkaStreamsBuilder + .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) + .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) + .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) + .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) + .build(); + + this.stopEvent.thenRun(kafkaStreams::close); + kafkaStreams.start(); + } + + /** + * Stop the service. + */ + public void stop() { + this.stopEvent.complete(null); + } + +} diff --git a/uc2-application/src/main/java/spesb/uc2/application/ConfigurationKeys.java b/uc2-application/src/main/java/theodolite/uc2/application/ConfigurationKeys.java similarity index 95% rename from uc2-application/src/main/java/spesb/uc2/application/ConfigurationKeys.java rename to uc2-application/src/main/java/theodolite/uc2/application/ConfigurationKeys.java index ec3bb14be72a1032fa2dfd49cdd6d3c0cb0b18e6..b57f5c38e79f04098bb6fc2a8c861c4655daa8a6 100644 --- a/uc2-application/src/main/java/spesb/uc2/application/ConfigurationKeys.java +++ b/uc2-application/src/main/java/theodolite/uc2/application/ConfigurationKeys.java @@ -1,4 +1,4 @@ -package spesb.uc2.application; +package theodolite.uc2.application; /** * Keys to access configuration parameters. diff --git a/uc2-application/src/main/java/spesb/uc2/streamprocessing/ChildParentsTransformer.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/ChildParentsTransformer.java similarity index 99% rename from uc2-application/src/main/java/spesb/uc2/streamprocessing/ChildParentsTransformer.java rename to uc2-application/src/main/java/theodolite/uc2/streamprocessing/ChildParentsTransformer.java index 82217b30a539a9f722c3f27777000fb1d7d6e97c..d4f9097ad0fa176842872e43f2f69a8616a65166 100644 --- a/uc2-application/src/main/java/spesb/uc2/streamprocessing/ChildParentsTransformer.java +++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/ChildParentsTransformer.java @@ -1,4 +1,4 @@ -package spesb.uc2.streamprocessing; +package theodolite.uc2.streamprocessing; import java.util.Map; import java.util.Optional; diff --git a/uc2-application/src/main/java/spesb/uc2/streamprocessing/ChildParentsTransformerFactory.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/ChildParentsTransformerFactory.java similarity index 97% rename from uc2-application/src/main/java/spesb/uc2/streamprocessing/ChildParentsTransformerFactory.java rename to uc2-application/src/main/java/theodolite/uc2/streamprocessing/ChildParentsTransformerFactory.java index 6cf2d2c6f3facc96f76148e874244cbe895a8596..3060fdaaf2605766df93b767e50e426c5ebafae9 100644 --- a/uc2-application/src/main/java/spesb/uc2/streamprocessing/ChildParentsTransformerFactory.java +++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/ChildParentsTransformerFactory.java @@ -1,4 +1,4 @@ -package spesb.uc2.streamprocessing; +package theodolite.uc2.streamprocessing; import java.util.Map; import java.util.Optional; diff --git a/uc2-application/src/main/java/spesb/uc2/streamprocessing/JointFlatTransformer.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformer.java similarity index 98% rename from uc2-application/src/main/java/spesb/uc2/streamprocessing/JointFlatTransformer.java rename to uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformer.java index 27857fa4505d679f841d2bae639506cb8eeb0845..0555df96c153065ecf9be2bf2ead10de60d55cbf 100644 --- a/uc2-application/src/main/java/spesb/uc2/streamprocessing/JointFlatTransformer.java +++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformer.java @@ -1,4 +1,4 @@ -package spesb.uc2.streamprocessing; +package theodolite.uc2.streamprocessing; import com.google.common.base.MoreObjects; import java.util.ArrayList; diff --git a/uc2-application/src/main/java/spesb/uc2/streamprocessing/JointFlatTransformerFactory.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformerFactory.java similarity index 97% rename from uc2-application/src/main/java/spesb/uc2/streamprocessing/JointFlatTransformerFactory.java rename to uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformerFactory.java index 44c99b1f50475c3bd1051322001877e98bca9b68..b78eec51e1cd9e717f79b075e5e27230af56dbe7 100644 --- a/uc2-application/src/main/java/spesb/uc2/streamprocessing/JointFlatTransformerFactory.java +++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointFlatTransformerFactory.java @@ -1,4 +1,4 @@ -package spesb.uc2.streamprocessing; +package theodolite.uc2.streamprocessing; import java.util.Map; import java.util.Set; diff --git a/uc2-application/src/main/java/spesb/uc2/streamprocessing/JointRecordParents.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointRecordParents.java similarity index 93% rename from uc2-application/src/main/java/spesb/uc2/streamprocessing/JointRecordParents.java rename to uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointRecordParents.java index 64de26d996d4b87b3942491a36607a2b09bf43f0..02b7318587a77228e7fb2f7dc1b3350bac532c89 100644 --- a/uc2-application/src/main/java/spesb/uc2/streamprocessing/JointRecordParents.java +++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/JointRecordParents.java @@ -1,4 +1,4 @@ -package spesb.uc2.streamprocessing; +package theodolite.uc2.streamprocessing; import java.util.Set; import titan.ccp.models.records.ActivePowerRecord; diff --git a/uc2-application/src/main/java/spesb/uc2/streamprocessing/OptionalParentsSerde.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/OptionalParentsSerde.java similarity index 97% rename from uc2-application/src/main/java/spesb/uc2/streamprocessing/OptionalParentsSerde.java rename to uc2-application/src/main/java/theodolite/uc2/streamprocessing/OptionalParentsSerde.java index 5e31a55406a321d393098633856d9b2776768676..5cb8f1ed8fcc1cecff1eefa4922531555a78c25f 100644 --- a/uc2-application/src/main/java/spesb/uc2/streamprocessing/OptionalParentsSerde.java +++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/OptionalParentsSerde.java @@ -1,4 +1,4 @@ -package spesb.uc2.streamprocessing; +package theodolite.uc2.streamprocessing; import java.util.HashSet; import java.util.Optional; diff --git a/uc2-application/src/main/java/spesb/uc2/streamprocessing/ParentsSerde.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/ParentsSerde.java similarity index 96% rename from uc2-application/src/main/java/spesb/uc2/streamprocessing/ParentsSerde.java rename to uc2-application/src/main/java/theodolite/uc2/streamprocessing/ParentsSerde.java index 4385d3bfb9360755fbfa13217abcb95f786ebd39..266eaad015979a9e4ae748f7647ddcaf5947c78b 100644 --- a/uc2-application/src/main/java/spesb/uc2/streamprocessing/ParentsSerde.java +++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/ParentsSerde.java @@ -1,4 +1,4 @@ -package spesb.uc2.streamprocessing; +package theodolite.uc2.streamprocessing; import java.util.HashSet; import java.util.Set; diff --git a/uc2-application/src/main/java/spesb/uc2/streamprocessing/RecordAggregator.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/RecordAggregator.java similarity index 97% rename from uc2-application/src/main/java/spesb/uc2/streamprocessing/RecordAggregator.java rename to uc2-application/src/main/java/theodolite/uc2/streamprocessing/RecordAggregator.java index 6951d49c94c8b14d4463fcfdd6274a0b1cf965f7..10fb98c9c575bde508a7e24c9e825b25475eff76 100644 --- a/uc2-application/src/main/java/spesb/uc2/streamprocessing/RecordAggregator.java +++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/RecordAggregator.java @@ -1,4 +1,4 @@ -package spesb.uc2.streamprocessing; +package theodolite.uc2.streamprocessing; import org.apache.kafka.streams.kstream.Windowed; import titan.ccp.models.records.ActivePowerRecord; diff --git a/uc2-application/src/main/java/spesb/uc2/streamprocessing/SensorParentKey.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/SensorParentKey.java similarity index 94% rename from uc2-application/src/main/java/spesb/uc2/streamprocessing/SensorParentKey.java rename to uc2-application/src/main/java/theodolite/uc2/streamprocessing/SensorParentKey.java index 390ecf0e381435197cf7e741a0e306f3dcca3f2c..d65c93034a0fc9a801cf5be0c2f7f50e38d9178e 100644 --- a/uc2-application/src/main/java/spesb/uc2/streamprocessing/SensorParentKey.java +++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/SensorParentKey.java @@ -1,4 +1,4 @@ -package spesb.uc2.streamprocessing; +package theodolite.uc2.streamprocessing; /** * A key consisting of the identifier of a sensor and an identifier of parent sensor. diff --git a/uc2-application/src/main/java/spesb/uc2/streamprocessing/SensorParentKeySerde.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/SensorParentKeySerde.java similarity index 95% rename from uc2-application/src/main/java/spesb/uc2/streamprocessing/SensorParentKeySerde.java rename to uc2-application/src/main/java/theodolite/uc2/streamprocessing/SensorParentKeySerde.java index 7021c0832db2af6836f53ee6ba70851514443759..d6773c6159f1d04ddf1c3f36fd25447575befce8 100644 --- a/uc2-application/src/main/java/spesb/uc2/streamprocessing/SensorParentKeySerde.java +++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/SensorParentKeySerde.java @@ -1,4 +1,4 @@ -package spesb.uc2.streamprocessing; +package theodolite.uc2.streamprocessing; import org.apache.kafka.common.serialization.Serde; import titan.ccp.common.kafka.simpleserdes.BufferSerde; diff --git a/uc2-application/src/main/java/spesb/uc2/streamprocessing/TopologyBuilder.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java similarity index 99% rename from uc2-application/src/main/java/spesb/uc2/streamprocessing/TopologyBuilder.java rename to uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java index c83de4efd43688a8b9669f5d0f3dea3bbf70f48b..b6c46fa3a1822cbf1a11e3a8399aa7a061283952 100644 --- a/uc2-application/src/main/java/spesb/uc2/streamprocessing/TopologyBuilder.java +++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/TopologyBuilder.java @@ -1,4 +1,4 @@ -package spesb.uc2.streamprocessing; +package theodolite.uc2.streamprocessing; import com.google.common.math.StatsAccumulator; import java.time.Duration; diff --git a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java new file mode 100644 index 0000000000000000000000000000000000000000..ce7d5e90b476a9d8b8508ea2356f4a2da1d856f3 --- /dev/null +++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java @@ -0,0 +1,63 @@ +package theodolite.uc2.streamprocessing; + +import java.time.Duration; +import java.util.Objects; +import org.apache.kafka.streams.Topology; +import theodolite.commons.kafkastreams.KafkaStreamsBuilder; + +/** + * Builder for the Kafka Streams configuration. + */ +public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD builder method + + private static final Duration WINDOW_SIZE_DEFAULT = Duration.ofSeconds(1); + private static final Duration GRACE_PERIOD_DEFAULT = Duration.ZERO; + + private String inputTopic; // NOPMD + private String outputTopic; // NOPMD + private String configurationTopic; // NOPMD + private Duration windowSize; // NOPMD + private Duration gracePeriod; // NOPMD + + public Uc2KafkaStreamsBuilder inputTopic(final String inputTopic) { + this.inputTopic = inputTopic; + return this; + } + + public Uc2KafkaStreamsBuilder outputTopic(final String outputTopic) { + this.outputTopic = outputTopic; + return this; + } + + public Uc2KafkaStreamsBuilder configurationTopic(final String configurationTopic) { + this.configurationTopic = configurationTopic; + return this; + } + + public Uc2KafkaStreamsBuilder windowSize(final Duration windowSize) { + this.windowSize = Objects.requireNonNull(windowSize); + return this; + } + + public Uc2KafkaStreamsBuilder gracePeriod(final Duration gracePeriod) { + this.gracePeriod = Objects.requireNonNull(gracePeriod); + return this; + } + + @Override + protected Topology buildTopology() { + Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); + Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); + Objects.requireNonNull(this.configurationTopic, "Configuration topic has not been set."); + + final TopologyBuilder topologyBuilder = new TopologyBuilder( + this.inputTopic, + this.outputTopic, + this.configurationTopic, + this.windowSize == null ? WINDOW_SIZE_DEFAULT : this.windowSize, + this.gracePeriod == null ? GRACE_PERIOD_DEFAULT : this.gracePeriod); + + return topologyBuilder.build(); + } + +} diff --git a/uc2-application/src/test/java/spesb/uc2/streamprocessing/OptionalParentsSerdeTest.java b/uc2-application/src/test/java/theodolite/uc2/streamprocessing/OptionalParentsSerdeTest.java similarity index 88% rename from uc2-application/src/test/java/spesb/uc2/streamprocessing/OptionalParentsSerdeTest.java rename to uc2-application/src/test/java/theodolite/uc2/streamprocessing/OptionalParentsSerdeTest.java index dc9f7e20e60564df7b982d8c3635cf2678c829c4..49ed674bc4442f01de1cf51e4510f2079524933d 100644 --- a/uc2-application/src/test/java/spesb/uc2/streamprocessing/OptionalParentsSerdeTest.java +++ b/uc2-application/src/test/java/theodolite/uc2/streamprocessing/OptionalParentsSerdeTest.java @@ -1,9 +1,9 @@ -package spesb.uc2.streamprocessing; +package theodolite.uc2.streamprocessing; import java.util.Optional; import java.util.Set; import org.junit.Test; -import spesb.uc2.streamprocessing.OptionalParentsSerde; +import theodolite.uc2.streamprocessing.OptionalParentsSerde; public class OptionalParentsSerdeTest { diff --git a/uc2-application/src/test/java/spesb/uc2/streamprocessing/ParentsSerdeTest.java b/uc2-application/src/test/java/theodolite/uc2/streamprocessing/ParentsSerdeTest.java similarity index 81% rename from uc2-application/src/test/java/spesb/uc2/streamprocessing/ParentsSerdeTest.java rename to uc2-application/src/test/java/theodolite/uc2/streamprocessing/ParentsSerdeTest.java index 7f166669bc34ea6d5482504f2d5ada4c26f64fc8..15872798698ceffcdbaddb689d4179afd7d67a01 100644 --- a/uc2-application/src/test/java/spesb/uc2/streamprocessing/ParentsSerdeTest.java +++ b/uc2-application/src/test/java/theodolite/uc2/streamprocessing/ParentsSerdeTest.java @@ -1,8 +1,8 @@ -package spesb.uc2.streamprocessing; +package theodolite.uc2.streamprocessing; import java.util.Set; import org.junit.Test; -import spesb.uc2.streamprocessing.ParentsSerde; +import theodolite.uc2.streamprocessing.ParentsSerde; public class ParentsSerdeTest { diff --git a/uc2-application/src/test/java/spesb/uc2/streamprocessing/SensorParentKeySerdeTest.java b/uc2-application/src/test/java/theodolite/uc2/streamprocessing/SensorParentKeySerdeTest.java similarity index 75% rename from uc2-application/src/test/java/spesb/uc2/streamprocessing/SensorParentKeySerdeTest.java rename to uc2-application/src/test/java/theodolite/uc2/streamprocessing/SensorParentKeySerdeTest.java index 5e0495f85423c582a17aefc1fda1a7c937ce14f7..7d9fe3a6eb83b82d85913f212fe9a930f194b220 100644 --- a/uc2-application/src/test/java/spesb/uc2/streamprocessing/SensorParentKeySerdeTest.java +++ b/uc2-application/src/test/java/theodolite/uc2/streamprocessing/SensorParentKeySerdeTest.java @@ -1,8 +1,8 @@ -package spesb.uc2.streamprocessing; +package theodolite.uc2.streamprocessing; import org.junit.Test; -import spesb.uc2.streamprocessing.SensorParentKey; -import spesb.uc2.streamprocessing.SensorParentKeySerde; +import theodolite.uc2.streamprocessing.SensorParentKey; +import theodolite.uc2.streamprocessing.SensorParentKeySerde; public class SensorParentKeySerdeTest { diff --git a/uc2-application/src/test/java/spesb/uc2/streamprocessing/SerdeTester.java b/uc2-application/src/test/java/theodolite/uc2/streamprocessing/SerdeTester.java similarity index 94% rename from uc2-application/src/test/java/spesb/uc2/streamprocessing/SerdeTester.java rename to uc2-application/src/test/java/theodolite/uc2/streamprocessing/SerdeTester.java index 443d8b845b89a595f4280c4d0b0ae845c542b338..8e9f5a3608e5bae032c6e79b7cd059a0776987c2 100644 --- a/uc2-application/src/test/java/spesb/uc2/streamprocessing/SerdeTester.java +++ b/uc2-application/src/test/java/theodolite/uc2/streamprocessing/SerdeTester.java @@ -1,4 +1,4 @@ -package spesb.uc2.streamprocessing; +package theodolite.uc2.streamprocessing; import static org.junit.Assert.assertEquals; import java.util.function.Function; diff --git a/uc2-application/src/test/java/spesb/uc2/streamprocessing/SerdeTesterFactory.java b/uc2-application/src/test/java/theodolite/uc2/streamprocessing/SerdeTesterFactory.java similarity index 94% rename from uc2-application/src/test/java/spesb/uc2/streamprocessing/SerdeTesterFactory.java rename to uc2-application/src/test/java/theodolite/uc2/streamprocessing/SerdeTesterFactory.java index 9e5549fc1ced4ff4012ae699a8e6cdf65726f9a3..5cdbfc60574bfc924423516f80ec61850853bcff 100644 --- a/uc2-application/src/test/java/spesb/uc2/streamprocessing/SerdeTesterFactory.java +++ b/uc2-application/src/test/java/theodolite/uc2/streamprocessing/SerdeTesterFactory.java @@ -1,4 +1,4 @@ -package spesb.uc2.streamprocessing; +package theodolite.uc2.streamprocessing; import org.apache.kafka.common.serialization.Serde; diff --git a/uc2-workload-generator/build.gradle b/uc2-workload-generator/build.gradle index 9b35bdaa4d618943a61042d9bbf93d4c9c4f35a2..c367d9fcf62e151fb9592eeec611392b3c90899d 100644 --- a/uc2-workload-generator/build.gradle +++ b/uc2-workload-generator/build.gradle @@ -1 +1 @@ -mainClassName = "spesb.uc2.workloadgenerator.LoadGenerator" \ No newline at end of file +mainClassName = "theodolite.uc2.workloadgenerator.LoadGenerator" \ No newline at end of file diff --git a/uc2-workload-generator/src/main/java/spesb/kafkasender/KafkaRecordSender.java b/uc2-workload-generator/src/main/java/spesb/kafkasender/KafkaRecordSender.java deleted file mode 100644 index 034201411a84d3769dbe8c02a210098c62dca881..0000000000000000000000000000000000000000 --- a/uc2-workload-generator/src/main/java/spesb/kafkasender/KafkaRecordSender.java +++ /dev/null @@ -1,84 +0,0 @@ -package spesb.kafkasender; - -import java.util.Properties; -import java.util.function.Function; -import kieker.common.record.IMonitoringRecord; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; - - -/** - * Sends monitoring records to Kafka. - * - * @param <T> {@link IMonitoringRecord} to send - */ -public class KafkaRecordSender<T extends IMonitoringRecord> { - - private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordSender.class); - - private final String topic; - - private final Function<T, String> keyAccessor; - - private final Function<T, Long> timestampAccessor; - - private final Producer<String, T> producer; - - public KafkaRecordSender(final String bootstrapServers, final String topic) { - this(bootstrapServers, topic, x -> "", x -> null, new Properties()); - } - - public KafkaRecordSender(final String bootstrapServers, final String topic, - final Function<T, String> keyAccessor) { - this(bootstrapServers, topic, keyAccessor, x -> null, new Properties()); - } - - public KafkaRecordSender(final String bootstrapServers, final String topic, - final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor) { - this(bootstrapServers, topic, keyAccessor, timestampAccessor, new Properties()); - } - - /** - * Create a new {@link KafkaRecordSender}. - */ - public KafkaRecordSender(final String bootstrapServers, final String topic, - final Function<T, String> keyAccessor, final Function<T, Long> timestampAccessor, - final Properties defaultProperties) { - this.topic = topic; - this.keyAccessor = keyAccessor; - this.timestampAccessor = timestampAccessor; - - final Properties properties = new Properties(); - properties.putAll(defaultProperties); - properties.put("bootstrap.servers", bootstrapServers); - // properties.put("acks", this.acknowledges); - // properties.put("batch.size", this.batchSize); - // properties.put("linger.ms", this.lingerMs); - // properties.put("buffer.memory", this.bufferMemory); - - this.producer = new KafkaProducer<>(properties, new StringSerializer(), - IMonitoringRecordSerde.serializer()); - } - - /** - * Write the passed monitoring record to Kafka. - */ - public void write(final T monitoringRecord) { - final ProducerRecord<String, T> record = - new ProducerRecord<>(this.topic, null, this.timestampAccessor.apply(monitoringRecord), - this.keyAccessor.apply(monitoringRecord), monitoringRecord); - - LOGGER.debug("Send record to Kafka topic {}: {}", this.topic, record); - this.producer.send(record); - } - - public void terminate() { - this.producer.close(); - } - -} diff --git a/uc2-workload-generator/src/main/java/spesb/uc2/workloadgenerator/ConfigPublisher.java b/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/ConfigPublisher.java similarity index 97% rename from uc2-workload-generator/src/main/java/spesb/uc2/workloadgenerator/ConfigPublisher.java rename to uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/ConfigPublisher.java index 8cc3095fff902336273bf1145270a22044fad97e..c8b3a1846254603c8690bf395c24c6d6f9fb2166 100644 --- a/uc2-workload-generator/src/main/java/spesb/uc2/workloadgenerator/ConfigPublisher.java +++ b/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/ConfigPublisher.java @@ -1,4 +1,4 @@ -package spesb.uc2.workloadgenerator; +package theodolite.uc2.workloadgenerator; import java.util.Properties; import java.util.concurrent.ExecutionException; diff --git a/uc2-workload-generator/src/main/java/spesb/uc2/workloadgenerator/LoadGenerator.java b/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java similarity index 98% rename from uc2-workload-generator/src/main/java/spesb/uc2/workloadgenerator/LoadGenerator.java rename to uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java index 46ff2890cbaaf9c0578c8ecfb867548d06375867..0e085a53078c0aa85df0cdd7689ca50e5f5faa65 100644 --- a/uc2-workload-generator/src/main/java/spesb/uc2/workloadgenerator/LoadGenerator.java +++ b/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGenerator.java @@ -1,4 +1,4 @@ -package spesb.uc2.workloadgenerator; +package theodolite.uc2.workloadgenerator; import common.dimensions.Duration; import common.dimensions.KeySpace; @@ -14,6 +14,7 @@ import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.ProducerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import theodolite.kafkasender.KafkaRecordSender; import titan.ccp.configuration.events.Event; import titan.ccp.model.sensorregistry.MutableAggregatedSensor; import titan.ccp.model.sensorregistry.MutableSensorRegistry; diff --git a/uc2-workload-generator/src/main/java/spesb/uc2/workloadgenerator/LoadGeneratorExtrem.java b/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGeneratorExtrem.java similarity index 98% rename from uc2-workload-generator/src/main/java/spesb/uc2/workloadgenerator/LoadGeneratorExtrem.java rename to uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGeneratorExtrem.java index c78647edbb4f829237c25237d9edc9d11beeffc5..1e58541758602cd2b1ea84f3ac3360aa3911425d 100644 --- a/uc2-workload-generator/src/main/java/spesb/uc2/workloadgenerator/LoadGeneratorExtrem.java +++ b/uc2-workload-generator/src/main/java/theodolite/uc2/workloadgenerator/LoadGeneratorExtrem.java @@ -1,4 +1,4 @@ -package spesb.uc2.workloadgenerator; +package theodolite.uc2.workloadgenerator; import java.io.IOException; import java.lang.management.ManagementFactory; @@ -10,7 +10,7 @@ import java.util.Properties; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.kafka.clients.producer.ProducerConfig; -import spesb.kafkasender.KafkaRecordSender; +import theodolite.kafkasender.KafkaRecordSender; import titan.ccp.configuration.events.Event; import titan.ccp.model.sensorregistry.MutableAggregatedSensor; import titan.ccp.model.sensorregistry.MutableSensorRegistry; diff --git a/uc3-application/build.gradle b/uc3-application/build.gradle index 89d122ba69512548a011505c71f636c0bd3b0b47..82df66fae434e5b0a0f9b31ef9a44f04ca857173 100644 --- a/uc3-application/build.gradle +++ b/uc3-application/build.gradle @@ -10,4 +10,4 @@ dependencies { compile('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') } -mainClassName = "spesb.uc3.application.HistoryService" +mainClassName = "theodolite.uc3.application.HistoryService" diff --git a/uc3-application/src/main/java/spesb/uc3/streamprocessing/KafkaStreamsBuilder.java b/uc3-application/src/main/java/spesb/uc3/streamprocessing/KafkaStreamsBuilder.java deleted file mode 100644 index 28382bedd3b02ceb2c48925212087c28ed371aad..0000000000000000000000000000000000000000 --- a/uc3-application/src/main/java/spesb/uc3/streamprocessing/KafkaStreamsBuilder.java +++ /dev/null @@ -1,105 +0,0 @@ -package spesb.uc3.streamprocessing; - -import java.time.Duration; -import java.util.Objects; -import java.util.Properties; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.StreamsConfig; -import titan.ccp.common.kafka.streams.PropertiesBuilder; - -/** - * Builder for the Kafka Streams configuration. - */ -public class KafkaStreamsBuilder { - - private static final String APPLICATION_NAME = "titan-ccp-history"; - private static final String APPLICATION_VERSION = "0.0.1"; - - // private static final Logger LOGGER = - // LoggerFactory.getLogger(KafkaStreamsBuilder.class); - - private String bootstrapServers; // NOPMD - private String inputTopic; // NOPMD - private String outputTopic; // NOPMD - private Duration windowDuration; // NOPMD - private int numThreads = -1; // NOPMD - private int commitIntervalMs = -1; // NOPMD - private int cacheMaxBytesBuff = -1; // NOPMD - - public KafkaStreamsBuilder inputTopic(final String inputTopic) { - this.inputTopic = inputTopic; - return this; - } - - public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) { - this.bootstrapServers = bootstrapServers; - return this; - } - - public KafkaStreamsBuilder outputTopic(final String outputTopic) { - this.outputTopic = outputTopic; - return this; - } - - public KafkaStreamsBuilder windowDuration(final Duration windowDuration) { - this.windowDuration = windowDuration; - return this; - } - - /** - * Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus - * one for using the default. - */ - public KafkaStreamsBuilder numThreads(final int numThreads) { - if (numThreads < -1 || numThreads == 0) { - throw new IllegalArgumentException("Number of threads must be greater 0 or -1."); - } - this.numThreads = numThreads; - return this; - } - - /** - * Sets the Kafka Streams property for the frequency with which to save the position (offsets in - * source topics) of tasks (commit.interval.ms). Must be zero for processing all record, for - * example, when processing bulks of records. Can be minus one for using the default. - */ - public KafkaStreamsBuilder commitIntervalMs(final int commitIntervalMs) { - if (commitIntervalMs < -1) { - throw new IllegalArgumentException("Commit interval must be greater or equal -1."); - } - this.commitIntervalMs = commitIntervalMs; - return this; - } - - /** - * Sets the Kafka Streams property for maximum number of memory bytes to be used for record caches - * across all threads (cache.max.bytes.buffering). Must be zero for processing all record, for - * example, when processing bulks of records. Can be minus one for using the default. - */ - public KafkaStreamsBuilder cacheMaxBytesBuffering(final int cacheMaxBytesBuffering) { - if (cacheMaxBytesBuffering < -1) { - throw new IllegalArgumentException("Cache max bytes buffering must be greater or equal -1."); - } - this.cacheMaxBytesBuff = cacheMaxBytesBuffering; - return this; - } - - /** - * Builds the {@link KafkaStreams} instance. - */ - public KafkaStreams build() { - Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); - // TODO log parameters - final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic, - this.windowDuration); - final Properties properties = PropertiesBuilder.bootstrapServers(this.bootstrapServers) - .applicationId(APPLICATION_NAME + '-' + APPLICATION_VERSION) // TODO as parameter - .set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0) - .set(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs, p -> p >= 0) - .set(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuff, p -> p >= 0) - // .set(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG") - .build(); - return new KafkaStreams(topologyBuilder.build(), properties); - } - -} diff --git a/uc3-application/src/main/java/spesb/uc3/application/ConfigurationKeys.java b/uc3-application/src/main/java/theodolite/uc3/application/ConfigurationKeys.java similarity index 94% rename from uc3-application/src/main/java/spesb/uc3/application/ConfigurationKeys.java rename to uc3-application/src/main/java/theodolite/uc3/application/ConfigurationKeys.java index df51385a6f61cf25028f1d45552fa9687f40dc15..d95d245e0b354f11abfc40277e088f1a3f205c95 100644 --- a/uc3-application/src/main/java/spesb/uc3/application/ConfigurationKeys.java +++ b/uc3-application/src/main/java/theodolite/uc3/application/ConfigurationKeys.java @@ -1,4 +1,4 @@ -package spesb.uc3.application; +package theodolite.uc3.application; /** * Keys to access configuration parameters. diff --git a/uc3-application/src/main/java/spesb/uc3/application/HistoryService.java b/uc3-application/src/main/java/theodolite/uc3/application/HistoryService.java similarity index 81% rename from uc3-application/src/main/java/spesb/uc3/application/HistoryService.java rename to uc3-application/src/main/java/theodolite/uc3/application/HistoryService.java index 2b6c40e51a09e179778209d0626da6f6718bc07a..916d64f011a742d497a3512dd09da9db080576e5 100644 --- a/uc3-application/src/main/java/spesb/uc3/application/HistoryService.java +++ b/uc3-application/src/main/java/theodolite/uc3/application/HistoryService.java @@ -1,11 +1,11 @@ -package spesb.uc3.application; +package theodolite.uc3.application; import java.time.Duration; import java.util.Objects; import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; -import spesb.uc3.streamprocessing.KafkaStreamsBuilder; +import theodolite.uc3.streamprocessing.Uc3KafkaStreamsBuilder; import titan.ccp.common.configuration.Configurations; /** @@ -33,11 +33,16 @@ public class HistoryService { * */ private void createKafkaStreamsApplication() { - final KafkaStreams kafkaStreams = new KafkaStreamsBuilder() - .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) + // Use case specific stream configuration + final Uc3KafkaStreamsBuilder uc3KafkaStreamsBuilder = new Uc3KafkaStreamsBuilder(); + uc3KafkaStreamsBuilder .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) - .windowDuration(Duration.ofMinutes(this.windowDurationMinutes)) + .windowDuration(Duration.ofMinutes(this.windowDurationMinutes)); + + // Configuration of the stream application + final KafkaStreams kafkaStreams = uc3KafkaStreamsBuilder + .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) diff --git a/uc3-application/src/main/java/spesb/uc3/streamprocessing/TopologyBuilder.java b/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java similarity index 96% rename from uc3-application/src/main/java/spesb/uc3/streamprocessing/TopologyBuilder.java rename to uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java index d79451088e78e07f003dc076933f20489f594523..0ad1845f656bcbd11b61c0e0affa9b6bcfabd2f7 100644 --- a/uc3-application/src/main/java/spesb/uc3/streamprocessing/TopologyBuilder.java +++ b/uc3-application/src/main/java/theodolite/uc3/streamprocessing/TopologyBuilder.java @@ -1,4 +1,4 @@ -package spesb.uc3.streamprocessing; +package theodolite.uc3.streamprocessing; import com.google.common.math.Stats; import java.time.Duration; @@ -12,7 +12,7 @@ import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.TimeWindows; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import spesb.uc3.streamprocessing.util.StatsFactory; +import theodolite.uc3.streamprocessing.util.StatsFactory; import titan.ccp.common.kafka.GenericSerde; import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; import titan.ccp.models.records.ActivePowerRecordFactory; diff --git a/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java b/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java new file mode 100644 index 0000000000000000000000000000000000000000..63841361b06bb054fee203a894fba0c11c249d16 --- /dev/null +++ b/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java @@ -0,0 +1,43 @@ +package theodolite.uc3.streamprocessing; + +import java.time.Duration; +import java.util.Objects; +import org.apache.kafka.streams.Topology; +import theodolite.commons.kafkastreams.KafkaStreamsBuilder; + +/** + * Builder for the Kafka Streams configuration. + */ +public class Uc3KafkaStreamsBuilder extends KafkaStreamsBuilder { + + private String inputTopic; // NOPMD + private String outputTopic; // NOPMD + private Duration windowDuration; // NOPMD + + public Uc3KafkaStreamsBuilder inputTopic(final String inputTopic) { + this.inputTopic = inputTopic; + return this; + } + + public Uc3KafkaStreamsBuilder outputTopic(final String outputTopic) { + this.outputTopic = outputTopic; + return this; + } + + public Uc3KafkaStreamsBuilder windowDuration(final Duration windowDuration) { + this.windowDuration = windowDuration; + return this; + } + + @Override + protected Topology buildTopology() { + Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); + Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); + Objects.requireNonNull(this.windowDuration, "Window duration has not been set."); + + final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic, + this.windowDuration); + return topologyBuilder.build(); + } + +} diff --git a/uc3-application/src/main/java/spesb/uc3/streamprocessing/util/StatsFactory.java b/uc3-application/src/main/java/theodolite/uc3/streamprocessing/util/StatsFactory.java similarity index 91% rename from uc3-application/src/main/java/spesb/uc3/streamprocessing/util/StatsFactory.java rename to uc3-application/src/main/java/theodolite/uc3/streamprocessing/util/StatsFactory.java index 964199c0083dc9d096b59227c181dba732ca72b4..8099c85d652e57d30fe38e9d598783e2dc45ecb9 100644 --- a/uc3-application/src/main/java/spesb/uc3/streamprocessing/util/StatsFactory.java +++ b/uc3-application/src/main/java/theodolite/uc3/streamprocessing/util/StatsFactory.java @@ -1,4 +1,4 @@ -package spesb.uc3.streamprocessing.util; +package theodolite.uc3.streamprocessing.util; import com.google.common.math.Stats; import com.google.common.math.StatsAccumulator; diff --git a/uc3-workload-generator/build.gradle b/uc3-workload-generator/build.gradle index e27cf26d28ba0d3f85a4c2a11e4eae2b85f29e4c..c3ca94290c8600d8482210362666efc1249b8f02 100644 --- a/uc3-workload-generator/build.gradle +++ b/uc3-workload-generator/build.gradle @@ -1 +1 @@ -mainClassName = "spesb.uc3.workloadgenerator.LoadGenerator" +mainClassName = "theodolite.uc3.workloadgenerator.LoadGenerator" diff --git a/uc3-workload-generator/src/main/java/spesb/uc3/workloadgenerator/LoadGenerator.java b/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java similarity index 97% rename from uc3-workload-generator/src/main/java/spesb/uc3/workloadgenerator/LoadGenerator.java rename to uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java index 74bd38b28d0665ec891d852beb476d91477bd3d2..126c5d753ca39a26d9fd44c7432b8f78673b49a7 100644 --- a/uc3-workload-generator/src/main/java/spesb/uc3/workloadgenerator/LoadGenerator.java +++ b/uc3-workload-generator/src/main/java/theodolite/uc3/workloadgenerator/LoadGenerator.java @@ -1,4 +1,4 @@ -package spesb.uc3.workloadgenerator; +package theodolite.uc3.workloadgenerator; import common.dimensions.Duration; import common.dimensions.KeySpace; @@ -14,6 +14,7 @@ import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.ProducerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import theodolite.kafkasender.KafkaRecordSender; import titan.ccp.models.records.ActivePowerRecord; public class LoadGenerator { diff --git a/uc4-application/build.gradle b/uc4-application/build.gradle index c89b18b1bfd5a131e58e512e79934e498f182adb..c5891b2bfb2073e829ff7013e47a17b1ac2313e5 100644 --- a/uc4-application/build.gradle +++ b/uc4-application/build.gradle @@ -10,4 +10,4 @@ dependencies { compile('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') } -mainClassName = "spesb.uc4.application.HistoryService" +mainClassName = "theodolite.uc4.application.HistoryService" diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekKey.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekKey.java deleted file mode 100644 index a3ae3461d055694669e4d874930d5ade9dd83658..0000000000000000000000000000000000000000 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekKey.java +++ /dev/null @@ -1,31 +0,0 @@ -package spesb.uc4.streamprocessing; - -import java.time.DayOfWeek; - -/** - * Composed key of a {@link DayOfWeek} and a sensor id. - */ -public class DayOfWeekKey { - - private final DayOfWeek dayOfWeek; - private final String sensorId; - - public DayOfWeekKey(final DayOfWeek dayOfWeek, final String sensorId) { - this.dayOfWeek = dayOfWeek; - this.sensorId = sensorId; - } - - public DayOfWeek getDayOfWeek() { - return this.dayOfWeek; - } - - public String getSensorId() { - return this.sensorId; - } - - @Override - public String toString() { - return this.sensorId + ";" + this.dayOfWeek.toString(); - } - -} diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekKeyFactory.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekKeyFactory.java deleted file mode 100644 index 222785ca8a2d8db72c81929a216fc53b43d06ec0..0000000000000000000000000000000000000000 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekKeyFactory.java +++ /dev/null @@ -1,22 +0,0 @@ -package spesb.uc4.streamprocessing; - -import java.time.DayOfWeek; -import java.time.LocalDateTime; - -/** - * {@link StatsKeyFactory} for {@link DayOfWeekKey}. - */ -public class DayOfWeekKeyFactory implements StatsKeyFactory<DayOfWeekKey> { - - @Override - public DayOfWeekKey createKey(final String sensorId, final LocalDateTime dateTime) { - final DayOfWeek dayOfWeek = dateTime.getDayOfWeek(); - return new DayOfWeekKey(dayOfWeek, sensorId); - } - - @Override - public String getSensorId(final DayOfWeekKey key) { - return key.getSensorId(); - } - -} diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekKeySerde.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekKeySerde.java deleted file mode 100644 index 9c246f912ffc67ff6fb8d211a99d478cb58c2898..0000000000000000000000000000000000000000 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekKeySerde.java +++ /dev/null @@ -1,33 +0,0 @@ -package spesb.uc4.streamprocessing; - -import java.time.DayOfWeek; -import org.apache.kafka.common.serialization.Serde; -import titan.ccp.common.kafka.simpleserdes.BufferSerde; -import titan.ccp.common.kafka.simpleserdes.ReadBuffer; -import titan.ccp.common.kafka.simpleserdes.SimpleSerdes; -import titan.ccp.common.kafka.simpleserdes.WriteBuffer; - -/** - * {@link BufferSerde} for a {@link DayOfWeekKey}. Use the {@link #create()} method to create a new - * Kafka {@link Serde}. - */ -public class DayOfWeekKeySerde implements BufferSerde<DayOfWeekKey> { - - @Override - public void serialize(final WriteBuffer buffer, final DayOfWeekKey data) { - buffer.putInt(data.getDayOfWeek().getValue()); - buffer.putString(data.getSensorId()); - } - - @Override - public DayOfWeekKey deserialize(final ReadBuffer buffer) { - final DayOfWeek dayOfWeek = DayOfWeek.of(buffer.getInt()); - final String sensorId = buffer.getString(); - return new DayOfWeekKey(dayOfWeek, sensorId); - } - - public static Serde<DayOfWeekKey> create() { - return SimpleSerdes.create(new DayOfWeekKeySerde()); - } - -} diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekRecordFactory.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekRecordFactory.java deleted file mode 100644 index bdfecdbc4857b4d7a630b4afa07de39618435544..0000000000000000000000000000000000000000 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekRecordFactory.java +++ /dev/null @@ -1,28 +0,0 @@ -package spesb.uc4.streamprocessing; - -import com.google.common.math.Stats; -import org.apache.kafka.streams.kstream.Windowed; -import titan.ccp.model.records.DayOfWeekActivePowerRecord; - -/** - * {@link StatsRecordFactory} to create an {@link DayOfWeekActivePowerRecord}. - */ -public class DayOfWeekRecordFactory - implements StatsRecordFactory<DayOfWeekKey, DayOfWeekActivePowerRecord> { - - @Override - public DayOfWeekActivePowerRecord create(final Windowed<DayOfWeekKey> windowed, - final Stats stats) { - return new DayOfWeekActivePowerRecord( - windowed.key().getSensorId(), - windowed.key().getDayOfWeek().getValue(), - windowed.window().start(), - windowed.window().end(), - stats.count(), - stats.mean(), - stats.populationVariance(), - stats.min(), - stats.max()); - } - -} diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekKey.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekKey.java deleted file mode 100644 index 81d33f3042796ecb3c890e73a82e879ab2d0ac6e..0000000000000000000000000000000000000000 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekKey.java +++ /dev/null @@ -1,40 +0,0 @@ -package spesb.uc4.streamprocessing; - -import java.time.DayOfWeek; - -/** - * Composed key of a {@link DayOfWeek}, an hour of the day and a sensor id. - */ -public class HourOfWeekKey { - - private final DayOfWeek dayOfWeek; - private final int hourOfDay; - private final String sensorId; - - /** - * Create a new {@link HourOfDayKey} using its components. - */ - public HourOfWeekKey(final DayOfWeek dayOfWeek, final int hourOfDay, final String sensorId) { - this.dayOfWeek = dayOfWeek; - this.hourOfDay = hourOfDay; - this.sensorId = sensorId; - } - - public DayOfWeek getDayOfWeek() { - return this.dayOfWeek; - } - - public int getHourOfDay() { - return this.hourOfDay; - } - - public String getSensorId() { - return this.sensorId; - } - - @Override - public String toString() { - return this.sensorId + ";" + this.dayOfWeek.toString() + ";" + this.hourOfDay; - } - -} diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekKeyFactory.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekKeyFactory.java deleted file mode 100644 index 980549309ce94b2e4a4c6da0835b8adfe47bb61e..0000000000000000000000000000000000000000 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekKeyFactory.java +++ /dev/null @@ -1,23 +0,0 @@ -package spesb.uc4.streamprocessing; - -import java.time.DayOfWeek; -import java.time.LocalDateTime; - -/** - * {@link StatsKeyFactory} for {@link HourOfWeekKey}. - */ -public class HourOfWeekKeyFactory implements StatsKeyFactory<HourOfWeekKey> { - - @Override - public HourOfWeekKey createKey(final String sensorId, final LocalDateTime dateTime) { - final DayOfWeek dayOfWeek = dateTime.getDayOfWeek(); - final int hourOfDay = dateTime.getHour(); - return new HourOfWeekKey(dayOfWeek, hourOfDay, sensorId); - } - - @Override - public String getSensorId(final HourOfWeekKey key) { - return key.getSensorId(); - } - -} diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekKeySerde.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekKeySerde.java deleted file mode 100644 index 63a6a445bf46f521a220816896529a081a15bca0..0000000000000000000000000000000000000000 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekKeySerde.java +++ /dev/null @@ -1,35 +0,0 @@ -package spesb.uc4.streamprocessing; - -import java.time.DayOfWeek; -import org.apache.kafka.common.serialization.Serde; -import titan.ccp.common.kafka.simpleserdes.BufferSerde; -import titan.ccp.common.kafka.simpleserdes.ReadBuffer; -import titan.ccp.common.kafka.simpleserdes.SimpleSerdes; -import titan.ccp.common.kafka.simpleserdes.WriteBuffer; - -/** - * {@link BufferSerde} for a {@link HourOfWeekKey}. Use the {@link #create()} method to create a new - * Kafka {@link Serde}. - */ -public class HourOfWeekKeySerde implements BufferSerde<HourOfWeekKey> { - - @Override - public void serialize(final WriteBuffer buffer, final HourOfWeekKey data) { - buffer.putInt(data.getDayOfWeek().getValue()); - buffer.putInt(data.getHourOfDay()); - buffer.putString(data.getSensorId()); - } - - @Override - public HourOfWeekKey deserialize(final ReadBuffer buffer) { - final DayOfWeek dayOfWeek = DayOfWeek.of(buffer.getInt()); - final int hourOfDay = buffer.getInt(); - final String sensorId = buffer.getString(); - return new HourOfWeekKey(dayOfWeek, hourOfDay, sensorId); - } - - public static Serde<HourOfWeekKey> create() { - return SimpleSerdes.create(new HourOfWeekKeySerde()); - } - -} diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekRecordFactory.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekRecordFactory.java deleted file mode 100644 index 358e3d1a5acf8bfd9f4fca7c95b84bd5b13bea53..0000000000000000000000000000000000000000 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekRecordFactory.java +++ /dev/null @@ -1,29 +0,0 @@ -package spesb.uc4.streamprocessing; - -import com.google.common.math.Stats; -import org.apache.kafka.streams.kstream.Windowed; -import titan.ccp.model.records.HourOfWeekActivePowerRecord; - -/** - * {@link StatsRecordFactory} to create an {@link HourOfWeekActivePowerRecord}. - */ -public class HourOfWeekRecordFactory - implements StatsRecordFactory<HourOfWeekKey, HourOfWeekActivePowerRecord> { - - @Override - public HourOfWeekActivePowerRecord create(final Windowed<HourOfWeekKey> windowed, - final Stats stats) { - return new HourOfWeekActivePowerRecord( - windowed.key().getSensorId(), - windowed.key().getDayOfWeek().getValue(), - windowed.key().getHourOfDay(), - windowed.window().start(), - windowed.window().end(), - stats.count(), - stats.mean(), - stats.populationVariance(), - stats.min(), - stats.max()); - } - -} diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/KafkaStreamsBuilder.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/KafkaStreamsBuilder.java deleted file mode 100644 index 9cbff4f61ec5975e3dcdfc5c4e4a9f900e6707ec..0000000000000000000000000000000000000000 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/KafkaStreamsBuilder.java +++ /dev/null @@ -1,117 +0,0 @@ -package spesb.uc4.streamprocessing; - -import java.time.Duration; -import java.util.Objects; -import java.util.Properties; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.StreamsConfig; -import titan.ccp.common.kafka.streams.PropertiesBuilder; - -/** - * Builder for the Kafka Streams configuration. - */ -public class KafkaStreamsBuilder { - - private static final String APPLICATION_NAME = "titan-ccp-history"; - private static final String APPLICATION_VERSION = "0.0.1"; - - // private static final Logger LOGGER = - // LoggerFactory.getLogger(KafkaStreamsBuilder.class); - - private String bootstrapServers; // NOPMD - private String inputTopic; // NOPMD - private String outputTopic; // NOPMD - private Duration aggregtionDuration; // NOPMD - private Duration aggregationAdvance; // NOPMD - private int numThreads = -1; // NOPMD - private int commitIntervalMs = -1; // NOPMD - private int cacheMaxBytesBuff = -1; // NOPMD - - public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) { - this.bootstrapServers = bootstrapServers; - return this; - } - - public KafkaStreamsBuilder inputTopic(final String inputTopic) { - this.inputTopic = inputTopic; - return this; - } - - public KafkaStreamsBuilder outputTopic(final String outputTopic) { - this.outputTopic = outputTopic; - return this; - } - - public KafkaStreamsBuilder aggregtionDuration(final Duration aggregtionDuration) { - this.aggregtionDuration = aggregtionDuration; - return this; - } - - public KafkaStreamsBuilder aggregationAdvance(final Duration aggregationAdvance) { - this.aggregationAdvance = aggregationAdvance; - return this; - } - - /** - * Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus - * one for using the default. - */ - public KafkaStreamsBuilder numThreads(final int numThreads) { - if (numThreads < -1 || numThreads == 0) { - throw new IllegalArgumentException("Number of threads must be greater 0 or -1."); - } - this.numThreads = numThreads; - return this; - } - - /** - * Sets the Kafka Streams property for the frequency with which to save the position (offsets in - * source topics) of tasks (commit.interval.ms). Must be zero for processing all record, for - * example, when processing bulks of records. Can be minus one for using the default. - */ - public KafkaStreamsBuilder commitIntervalMs(final int commitIntervalMs) { - if (commitIntervalMs < -1) { - throw new IllegalArgumentException("Commit interval must be greater or equal -1."); - } - this.commitIntervalMs = commitIntervalMs; - return this; - } - - /** - * Sets the Kafka Streams property for maximum number of memory bytes to be used for record caches - * across all threads (cache.max.bytes.buffering). Must be zero for processing all record, for - * example, when processing bulks of records. Can be minus one for using the default. - */ - public KafkaStreamsBuilder cacheMaxBytesBuffering(final int cacheMaxBytesBuffering) { - if (cacheMaxBytesBuffering < -1) { - throw new IllegalArgumentException("Cache max bytes buffering must be greater or equal -1."); - } - this.cacheMaxBytesBuff = cacheMaxBytesBuffering; - return this; - } - - /** - * Builds the {@link KafkaStreams} instance. - */ - public KafkaStreams build() { - Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); - Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); - Objects.requireNonNull(this.aggregtionDuration, "Aggregation duration has not been set."); - Objects.requireNonNull(this.aggregationAdvance, "Aggregation advance period has not been set."); - // TODO log parameters - final TopologyBuilder topologyBuilder = new TopologyBuilder( - this.inputTopic, - this.outputTopic, - this.aggregtionDuration, - this.aggregationAdvance); - final Properties properties = PropertiesBuilder.bootstrapServers(this.bootstrapServers) - .applicationId(APPLICATION_NAME + '-' + APPLICATION_VERSION) // TODO as parameter - .set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0) - .set(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs, p -> p >= 0) - .set(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuff, p -> p >= 0) - // .set(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG") - .build(); - return new KafkaStreams(topologyBuilder.build(), properties); - } - -} diff --git a/uc4-application/src/main/java/spesb/uc4/application/ConfigurationKeys.java b/uc4-application/src/main/java/theodolite/uc4/application/ConfigurationKeys.java similarity index 95% rename from uc4-application/src/main/java/spesb/uc4/application/ConfigurationKeys.java rename to uc4-application/src/main/java/theodolite/uc4/application/ConfigurationKeys.java index 236601a46447f5c38b6548d2e0762bbb670747e1..aa74e1552cb4c3c020f511dfb6b53a3f1fd886d7 100644 --- a/uc4-application/src/main/java/spesb/uc4/application/ConfigurationKeys.java +++ b/uc4-application/src/main/java/theodolite/uc4/application/ConfigurationKeys.java @@ -1,4 +1,4 @@ -package spesb.uc4.application; +package theodolite.uc4.application; /** * Keys to access configuration parameters. diff --git a/uc4-application/src/main/java/spesb/uc4/application/HistoryService.java b/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java similarity index 80% rename from uc4-application/src/main/java/spesb/uc4/application/HistoryService.java rename to uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java index f86f0cb7e3bc6840db52ce7bdbbac054cdd05e13..4d686d8f7f244b9e6dd28b4c39abcb83d9a108b8 100644 --- a/uc4-application/src/main/java/spesb/uc4/application/HistoryService.java +++ b/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java @@ -1,10 +1,10 @@ -package spesb.uc4.application; +package theodolite.uc4.application; import java.time.Duration; import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; -import spesb.uc4.streamprocessing.KafkaStreamsBuilder; +import theodolite.uc4.streamprocessing.Uc4KafkaStreamsBuilder; import titan.ccp.common.configuration.Configurations; /** @@ -30,18 +30,24 @@ public class HistoryService { * */ private void createKafkaStreamsApplication() { - final KafkaStreams kafkaStreams = new KafkaStreamsBuilder() - .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) + // Use case specific stream configuration + final Uc4KafkaStreamsBuilder uc4KafkaStreamsBuilder = new Uc4KafkaStreamsBuilder(); + uc4KafkaStreamsBuilder .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) .aggregtionDuration( Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS))) .aggregationAdvance( - Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS))) + Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS))); + + // Configuration of the stream application + final KafkaStreams kafkaStreams = uc4KafkaStreamsBuilder + .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) .build(); + this.stopEvent.thenRun(kafkaStreams::close); kafkaStreams.start(); } diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfDayKey.java b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/HourOfDayKey.java similarity index 92% rename from uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfDayKey.java rename to uc4-application/src/main/java/theodolite/uc4/streamprocessing/HourOfDayKey.java index b07a54d6f22ebbacb77e53a7733f3a16d539fde2..214be2dd073e21944ec0765eb30ed72a81b15b1b 100644 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfDayKey.java +++ b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/HourOfDayKey.java @@ -1,4 +1,4 @@ -package spesb.uc4.streamprocessing; +package theodolite.uc4.streamprocessing; /** * Composed key of an hour of the day and a sensor id. diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfDayKeyFactory.java b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/HourOfDayKeyFactory.java similarity index 92% rename from uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfDayKeyFactory.java rename to uc4-application/src/main/java/theodolite/uc4/streamprocessing/HourOfDayKeyFactory.java index a13de14229dfbb5a201dc282d05a8c4f97394250..edb9ad2b20ac645dfade840130e1be67d2505304 100644 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfDayKeyFactory.java +++ b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/HourOfDayKeyFactory.java @@ -1,4 +1,4 @@ -package spesb.uc4.streamprocessing; +package theodolite.uc4.streamprocessing; import java.time.LocalDateTime; diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfDayKeySerde.java b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/HourOfDayKeySerde.java similarity index 96% rename from uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfDayKeySerde.java rename to uc4-application/src/main/java/theodolite/uc4/streamprocessing/HourOfDayKeySerde.java index a938813c6e1239b8af81c49ef7d83800bfef9b9d..ff404ab121ca2e60da65f11d89b8ec5849bd600d 100644 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfDayKeySerde.java +++ b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/HourOfDayKeySerde.java @@ -1,4 +1,4 @@ -package spesb.uc4.streamprocessing; +package theodolite.uc4.streamprocessing; import org.apache.kafka.common.serialization.Serde; import titan.ccp.common.kafka.simpleserdes.BufferSerde; diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfDayRecordFactory.java b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/HourOfDayRecordFactory.java similarity index 95% rename from uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfDayRecordFactory.java rename to uc4-application/src/main/java/theodolite/uc4/streamprocessing/HourOfDayRecordFactory.java index 25fb9193d1f343a246451ef2a5309198fc39ffde..7249309cea036bff9203ce9a7aa32489f69edebe 100644 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfDayRecordFactory.java +++ b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/HourOfDayRecordFactory.java @@ -1,4 +1,4 @@ -package spesb.uc4.streamprocessing; +package theodolite.uc4.streamprocessing; import com.google.common.math.Stats; import org.apache.kafka.streams.kstream.Windowed; diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/RecordDatabaseAdapter.java b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/RecordDatabaseAdapter.java similarity index 98% rename from uc4-application/src/main/java/spesb/uc4/streamprocessing/RecordDatabaseAdapter.java rename to uc4-application/src/main/java/theodolite/uc4/streamprocessing/RecordDatabaseAdapter.java index 9c286cb49206d1eb1f0efbd2e98e3b44bc9a1e22..8f693d5d3d309eb73a017b8d33dfcd63e70724fb 100644 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/RecordDatabaseAdapter.java +++ b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/RecordDatabaseAdapter.java @@ -1,4 +1,4 @@ -package spesb.uc4.streamprocessing; +package theodolite.uc4.streamprocessing; import java.util.Collection; import java.util.List; diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/StatsKeyFactory.java b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/StatsKeyFactory.java similarity index 88% rename from uc4-application/src/main/java/spesb/uc4/streamprocessing/StatsKeyFactory.java rename to uc4-application/src/main/java/theodolite/uc4/streamprocessing/StatsKeyFactory.java index 7e4ac46e461c9083c7929f5dd313fea0526c3d50..cf67efbd34362c337a956d80f14731cf9b9d6b77 100644 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/StatsKeyFactory.java +++ b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/StatsKeyFactory.java @@ -1,4 +1,4 @@ -package spesb.uc4.streamprocessing; +package theodolite.uc4.streamprocessing; import java.time.LocalDateTime; diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/StatsRecordFactory.java b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/StatsRecordFactory.java similarity index 94% rename from uc4-application/src/main/java/spesb/uc4/streamprocessing/StatsRecordFactory.java rename to uc4-application/src/main/java/theodolite/uc4/streamprocessing/StatsRecordFactory.java index 045b512d0561c25889a0a0f8ef05663824412c60..79eb4b9f76e4429cf84d0af0e56875ea0386e218 100644 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/StatsRecordFactory.java +++ b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/StatsRecordFactory.java @@ -1,4 +1,4 @@ -package spesb.uc4.streamprocessing; +package theodolite.uc4.streamprocessing; import com.google.common.math.Stats; import org.apache.avro.specific.SpecificRecord; diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/TopologyBuilder.java b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java similarity index 93% rename from uc4-application/src/main/java/spesb/uc4/streamprocessing/TopologyBuilder.java rename to uc4-application/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java index 66bb460031f09f1cae77d5e93e3f130aa66f6e90..b4632aaf15ee5f2572c795458f4bfded5c8cfbcd 100644 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/TopologyBuilder.java +++ b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/TopologyBuilder.java @@ -1,4 +1,4 @@ -package spesb.uc4.streamprocessing; +package theodolite.uc4.streamprocessing; import com.google.common.math.Stats; import java.time.Duration; @@ -15,7 +15,7 @@ import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.TimeWindows; -import spesb.uc4.streamprocessing.util.StatsFactory; +import theodolite.uc4.streamprocessing.util.StatsFactory; import titan.ccp.common.kafka.GenericSerde; import titan.ccp.common.kieker.kafka.IMonitoringRecordSerde; import titan.ccp.models.records.ActivePowerRecordFactory; @@ -52,11 +52,8 @@ public class TopologyBuilder { * Build the {@link Topology} for the History microservice. */ public Topology build() { - final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory(); final Serde<HourOfDayKey> keySerde = HourOfDayKeySerde.create(); - // final StatsRecordFactory<HourOfDayKey, HourOfDayActivePowerRecord> statsRecordFactory = new - // HourOfDayRecordFactory(); this.builder .stream(this.inputTopic, diff --git a/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java new file mode 100644 index 0000000000000000000000000000000000000000..8220f4cd36b0639cd69ac102177a53b1ed90e5b6 --- /dev/null +++ b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java @@ -0,0 +1,54 @@ +package theodolite.uc4.streamprocessing; + +import java.time.Duration; +import java.util.Objects; +import org.apache.kafka.streams.Topology; +import theodolite.commons.kafkastreams.KafkaStreamsBuilder; + +/** + * Builder for the Kafka Streams configuration. + */ +public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder { + + private String inputTopic; // NOPMD + private String outputTopic; // NOPMD + private Duration aggregtionDuration; // NOPMD + private Duration aggregationAdvance; // NOPMD + + public Uc4KafkaStreamsBuilder inputTopic(final String inputTopic) { + this.inputTopic = inputTopic; + return this; + } + + public Uc4KafkaStreamsBuilder outputTopic(final String outputTopic) { + this.outputTopic = outputTopic; + return this; + } + + public Uc4KafkaStreamsBuilder aggregtionDuration(final Duration aggregtionDuration) { + this.aggregtionDuration = aggregtionDuration; + return this; + } + + public Uc4KafkaStreamsBuilder aggregationAdvance(final Duration aggregationAdvance) { + this.aggregationAdvance = aggregationAdvance; + return this; + } + + @Override + protected Topology buildTopology() { + Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); + Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); + Objects.requireNonNull(this.aggregtionDuration, "Aggregation duration has not been set."); + Objects.requireNonNull(this.aggregationAdvance, "Aggregation advance period has not been set."); + + final TopologyBuilder topologyBuilder = new TopologyBuilder( + this.inputTopic, + this.outputTopic, + this.aggregtionDuration, + this.aggregationAdvance); + + return topologyBuilder.build(); + } + +} diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/util/StatsFactory.java b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/util/StatsFactory.java similarity index 91% rename from uc4-application/src/main/java/spesb/uc4/streamprocessing/util/StatsFactory.java rename to uc4-application/src/main/java/theodolite/uc4/streamprocessing/util/StatsFactory.java index 39fe573445984f237d600753c8c828eb2869913b..e97fbcd216c57a8aa965ee7a295c5633fa34810e 100644 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/util/StatsFactory.java +++ b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/util/StatsFactory.java @@ -1,4 +1,4 @@ -package spesb.uc4.streamprocessing.util; +package theodolite.uc4.streamprocessing.util; import com.google.common.math.Stats; import com.google.common.math.StatsAccumulator; diff --git a/uc4-workload-generator/build.gradle b/uc4-workload-generator/build.gradle index 8bbdedf4f7c41da73dd2d591b8fd56830d7060b7..76bbce013b67bab325bac06c1986693da3028f0c 100644 --- a/uc4-workload-generator/build.gradle +++ b/uc4-workload-generator/build.gradle @@ -1 +1 @@ -mainClassName = "spesb.uc4.workloadgenerator.LoadGenerator" +mainClassName = "theodolite.uc4.workloadgenerator.LoadGenerator" diff --git a/uc4-workload-generator/src/main/java/spesb/uc4/workloadgenerator/LoadGenerator.java b/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java similarity index 91% rename from uc4-workload-generator/src/main/java/spesb/uc4/workloadgenerator/LoadGenerator.java rename to uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java index db2b1e8bd88d1c3d33e01f952aafc179376ecdad..35a54df16aa65923cf41859d58e2680ecce5f101 100644 --- a/uc4-workload-generator/src/main/java/spesb/uc4/workloadgenerator/LoadGenerator.java +++ b/uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java @@ -1,4 +1,4 @@ -package spesb.uc4.workloadgenerator; +package theodolite.uc4.workloadgenerator; import common.dimensions.Duration; import common.dimensions.KeySpace; @@ -14,6 +14,10 @@ import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.ProducerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +<<<<<<< HEAD:uc4-workload-generator/src/main/java/spesb/uc4/workloadgenerator/LoadGenerator.java +======= +import theodolite.kafkasender.KafkaRecordSender; +>>>>>>> master:uc4-workload-generator/src/main/java/theodolite/uc4/workloadgenerator/LoadGenerator.java import titan.ccp.models.records.ActivePowerRecord; public class LoadGenerator {