diff --git a/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/ConfigurationKeys.java b/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/ConfigurationKeys.java index 6302e4c69904aaf57e3f936ee9ad0ead11414a8d..ca1838b84a4f1b3ddf11ad4dea8e34792371974b 100644 --- a/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/ConfigurationKeys.java +++ b/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/ConfigurationKeys.java @@ -9,12 +9,6 @@ public final class ConfigurationKeys { public static final String APPLICATION_VERSION = "application.version"; - public static final String NUM_THREADS = "num.threads"; - - public static final String COMMIT_INTERVAL_MS = "commit.interval.ms"; - - public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering"; - public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; diff --git a/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java b/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java index 8c758c24444ea9c590c364063a397f9b7bfec8f9..ef1ece3549b1aabf60a4ff5b15028b7e50288cd9 100644 --- a/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java +++ b/application-kafkastreams-commons/src/main/java/theodolite/commons/kafkastreams/KafkaStreamsBuilder.java @@ -1,7 +1,9 @@ package theodolite.commons.kafkastreams; -import java.util.Objects; import java.util.Properties; +import java.util.function.Function; +import java.util.function.Predicate; +import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; @@ -12,109 +14,95 @@ import titan.ccp.common.kafka.streams.PropertiesBuilder; */ public abstract class KafkaStreamsBuilder { - // Kafkastreams application specific - protected String schemaRegistryUrl; // NOPMD for use in subclass + // Kafka Streams application specific + protected final String schemaRegistryUrl; // NOPMD for use in subclass + protected final String inputTopic; // NOPMD for use in subclass - private String applicationName; // NOPMD - private String applicationVersion; // NOPMD - private String bootstrapServers; // NOPMD - private int numThreads = -1; // NOPMD - private int commitIntervalMs = -1; // NOPMD - private int cacheMaxBytesBuff = -1; // NOPMD + private final Configuration config; - /** - * Sets the application name for the {@code KafkaStreams} application. It is used to create the - * application ID. - * - * @param applicationName Name of the application. - * @return - */ - public KafkaStreamsBuilder applicationName(final String applicationName) { - this.applicationName = applicationName; - return this; - } - - /** - * Sets the application version for the {@code KafkaStreams} application. It is used to create the - * application ID. - * - * @param applicationVersion Version of the application. - * @return - */ - public KafkaStreamsBuilder applicationVersion(final String applicationVersion) { - this.applicationVersion = applicationVersion; - return this; - } + private final String applicationName; // NOPMD + private final String applicationVersion; // NOPMD + private final String bootstrapServers; // NOPMD /** - * Sets the bootstrap servers for the {@code KafkaStreams} application. + * Construct a new Build object for a Kafka Streams application. * - * @param bootstrapServers String for a bootstrap server. - * @return + * @param config Contains the key value pairs for configuration. */ - public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) { - this.bootstrapServers = bootstrapServers; - return this; + public KafkaStreamsBuilder(final Configuration config) { + this.config = config; + this.applicationName = this.config.getString(ConfigurationKeys.APPLICATION_NAME); + this.applicationVersion = this.config.getString(ConfigurationKeys.APPLICATION_VERSION); + this.bootstrapServers = this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS); + this.schemaRegistryUrl = this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL); + this.inputTopic = this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC); } /** - * Sets the URL for the schema registry. + * Checks if the given key is contained in the configurations and sets it in the properties. * - * @param url The URL of the schema registry. - * @return + * @param <T> Type of the value for given key + * @param propBuilder Object where to set this property. + * @param key The key to check and set the property. + * @param valueGetter Method to get the value from with given key. + * @param condition for setting the property. */ - public KafkaStreamsBuilder schemaRegistry(final String url) { - this.schemaRegistryUrl = url; - return this; - } - - /** - * Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus - * one for using the default. - * - * @param numThreads Number of threads. -1 for using the default. - * @return - */ - public KafkaStreamsBuilder numThreads(final int numThreads) { - if (numThreads < -1 || numThreads == 0) { - throw new IllegalArgumentException("Number of threads must be greater 0 or -1."); + private <T> void setOptionalProperty(final PropertiesBuilder propBuilder, + final String key, + final Function<String, T> valueGetter, + final Predicate<T> condition) { + if (this.config.containsKey(key)) { + final T value = valueGetter.apply(key); + propBuilder.set(key, value, condition); } - this.numThreads = numThreads; - return this; } /** - * Sets the Kafka Streams property for the frequency with which to save the position (offsets in - * source topics) of tasks (commit.interval.ms). Must be zero for processing all record, for - * example, when processing bulks of records. Can be minus one for using the default. + * Build the {@link Properties} for a {@code KafkaStreams} application. * - * @param commitIntervalMs Frequency with which to save the position of tasks. In ms, -1 for using - * the default. - * @return + * @return A {@code Properties} object. */ - public KafkaStreamsBuilder commitIntervalMs(final int commitIntervalMs) { - if (commitIntervalMs < -1) { - throw new IllegalArgumentException("Commit interval must be greater or equal -1."); + protected Properties buildProperties() { + // required configuration + final PropertiesBuilder propBuilder = PropertiesBuilder + .bootstrapServers(this.bootstrapServers) + .applicationId(this.applicationName + '-' + this.applicationVersion); + + // optional configurations + this.setOptionalProperty(propBuilder, StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, + this.config::getLong, + p -> p >= 0); + this.setOptionalProperty(propBuilder, StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, + this.config::getInt, p -> p > 0); + this.setOptionalProperty(propBuilder, StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, + this.config::getInt, + p -> p >= 0); + this.setOptionalProperty(propBuilder, StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, + this.config::getInt, p -> p >= 0); + this.setOptionalProperty(propBuilder, StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, + this.config::getLong, + p -> p >= 0); + this.setOptionalProperty(propBuilder, StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, + this.config::getInt, p -> p >= 1); + this.setOptionalProperty(propBuilder, StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, + this.config::getInt, p -> p >= 0); + this.setOptionalProperty(propBuilder, StreamsConfig.NUM_STREAM_THREADS_CONFIG, + this.config::getInt, p -> p > 0); + this.setOptionalProperty(propBuilder, StreamsConfig.POLL_MS_CONFIG, + this.config::getLong, + p -> p >= 0); + this.setOptionalProperty(propBuilder, StreamsConfig.PROCESSING_GUARANTEE_CONFIG, + this.config::getString, p -> StreamsConfig.AT_LEAST_ONCE.equals(p) + || StreamsConfig.EXACTLY_ONCE.equals(p) || StreamsConfig.EXACTLY_ONCE_BETA.equals(p)); + this.setOptionalProperty(propBuilder, StreamsConfig.REPLICATION_FACTOR_CONFIG, + this.config::getInt, p -> p >= 0); + + if (this.config.containsKey(StreamsConfig.TOPOLOGY_OPTIMIZATION) + && this.config.getBoolean(StreamsConfig.TOPOLOGY_OPTIMIZATION)) { + propBuilder.set(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); } - this.commitIntervalMs = commitIntervalMs; - return this; - } - /** - * Sets the Kafka Streams property for maximum number of memory bytes to be used for record caches - * across all threads (cache.max.bytes.buffering). Must be zero for processing all record, for - * example, when processing bulks of records. Can be minus one for using the default. - * - * @param cacheMaxBytesBuffering Number of memory bytes to be used for record caches across all - * threads. -1 for using the default. - * @return - */ - public KafkaStreamsBuilder cacheMaxBytesBuffering(final int cacheMaxBytesBuffering) { - if (cacheMaxBytesBuffering < -1) { - throw new IllegalArgumentException("Cache max bytes buffering must be greater or equal -1."); - } - this.cacheMaxBytesBuff = cacheMaxBytesBuffering; - return this; + return propBuilder.build(); } /** @@ -124,31 +112,10 @@ public abstract class KafkaStreamsBuilder { */ protected abstract Topology buildTopology(); - /** - * Build the {@link Properties} for a {@code KafkaStreams} application. - * - * @return A {@code Properties} object. - */ - protected Properties buildProperties() { - return PropertiesBuilder - .bootstrapServers(this.bootstrapServers) - .applicationId(this.applicationName + '-' + this.applicationVersion) - .set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0) - .set(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs, p -> p >= 0) - .set(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuff, p -> p >= 0) - .build(); - } - /** * Builds the {@link KafkaStreams} instance. */ public KafkaStreams build() { - // Check for required attributes for building properties. - Objects.requireNonNull(this.applicationName, "Application name has not been set."); - Objects.requireNonNull(this.applicationVersion, "Application version has not been set."); - Objects.requireNonNull(this.bootstrapServers, "Bootstrap server has not been set."); - Objects.requireNonNull(this.schemaRegistryUrl, "Schema registry has not been set."); - // Create the Kafka streams instance. return new KafkaStreams(this.buildTopology(), this.buildProperties()); } diff --git a/application-kafkastreams-commons/src/test/java/.gitkeep b/application-kafkastreams-commons/src/test/java/.gitkeep deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/build.gradle b/build.gradle index 1e388cb9665b43e004a1854248acc04e1cda387c..3cb86b68e9d37c53572c6611fad1057b5505e9cc 100644 --- a/build.gradle +++ b/build.gradle @@ -6,7 +6,7 @@ buildscript { } } dependencies { - classpath "gradle.plugin.com.github.spotbugs:spotbugs-gradle-plugin:1.6.3" + classpath "gradle.plugin.com.github.spotbugs.snom:spotbugs-gradle-plugin:4.6.0" } } @@ -65,6 +65,7 @@ configure(useCaseApplications) { implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } implementation 'org.apache.kafka:kafka-streams:2.6.0' // enable TransformerSuppliers + implementation 'com.google.code.gson:gson:2.8.2' implementation 'com.google.guava:guava:24.1-jre' implementation 'org.jctools:jctools-core:2.1.1' implementation 'org.slf4j:slf4j-simple:1.7.25' @@ -100,6 +101,7 @@ configure(commonProjects) { implementation 'org.slf4j:slf4j-simple:1.7.25' implementation('org.industrial-devops:titan-ccp-common:0.1.0-SNAPSHOT') { changing = true } implementation('org.industrial-devops:titan-ccp-common-kafka:0.1.0-SNAPSHOT') { changing = true } + implementation 'org.apache.kafka:kafka-streams:2.6.0' // Use JUnit test framework testImplementation 'junit:junit:4.12' @@ -108,7 +110,7 @@ configure(commonProjects) { // Per default XML reports for SpotBugs are generated // Include this to generate HTML reports -tasks.withType(com.github.spotbugs.SpotBugsTask) { +tasks.withType(com.github.spotbugs.snom.SpotBugsTask) { reports { // Either HTML or XML reports can be activated html.enabled true @@ -165,7 +167,7 @@ subprojects { reportLevel = "low" effort = "max" ignoreFailures = false - toolVersion = '3.1.7' + toolVersion = '4.1.4' } } diff --git a/execution/lib/cli_parser.py b/execution/lib/cli_parser.py index f785bce4f933622a99b4daaadeb483276d4956cd..63896efe9fb8791389fa702674a20fb2316b800f 100644 --- a/execution/lib/cli_parser.py +++ b/execution/lib/cli_parser.py @@ -1,6 +1,7 @@ import argparse import os + def env_list_default(env, tf): """ Makes a list from an environment string. @@ -10,6 +11,40 @@ def env_list_default(env, tf): v = [tf(s) for s in v.split(',')] return v + +def key_values_to_dict(kvs): + """ + Given a list with key values in form `Key=Value` it creates a dict from it. + """ + my_dict = {} + for kv in kvs: + k, v = kv.split("=") + my_dict[k] = v + return my_dict + + +def env_dict_default(env): + """ + Makes a dict from an environment string. + """ + v = os.environ.get(env) + if v is not None: + return key_values_to_dict(v.split(',')) + else: + return dict() + + +class StoreDictKeyPair(argparse.Action): + def __init__(self, option_strings, dest, nargs=None, **kwargs): + self._nargs = nargs + super(StoreDictKeyPair, self).__init__( + option_strings, dest, nargs=nargs, **kwargs) + + def __call__(self, parser, namespace, values, option_string=None): + my_dict = key_values_to_dict(values) + setattr(namespace, self.dest, my_dict) + + def default_parser(description): """ Returns the default parser that can be used for thodolite and run uc py @@ -33,11 +68,6 @@ def default_parser(description): metavar='<memory limit>', default=os.environ.get('MEMORY_LIMIT', '4Gi'), help='Kubernetes memory limit') - parser.add_argument('--commit-ms', - metavar='<commit ms>', - type=int, - default=os.environ.get('COMMIT_MS', 100), - help='Kafka Streams commit interval in milliseconds') parser.add_argument('--duration', '-d', metavar='<duration>', type=int, @@ -56,14 +86,23 @@ def default_parser(description): help='Only resets the environment. Ignores all other parameters') parser.add_argument('--prometheus', metavar='<URL>', - default=os.environ.get('PROMETHEUS_BASE_URL', 'http://localhost:9090'), + default=os.environ.get( + 'PROMETHEUS_BASE_URL', 'http://localhost:9090'), help='Defines where to find the prometheus instance') parser.add_argument('--path', metavar='<path>', default=os.environ.get('RESULT_PATH', 'results'), help='A directory path for the results') + parser.add_argument("--configurations", + metavar="KEY=VAL", + dest="configurations", + action=StoreDictKeyPair, + nargs="+", + default=env_dict_default('CONFIGURATIONS'), + help='Defines the environment variables for the UC') return parser + def benchmark_parser(description): """ Parser for the overall benchmark execution @@ -93,6 +132,7 @@ def benchmark_parser(description): help='The benchmarking search strategy. Can be set to default, linear-search or binary-search') return parser + def execution_parser(description): """ Parser for executing one use case diff --git a/execution/run_uc.py b/execution/run_uc.py index 6ebf797241b45a342214ea4dbd003e371f5bd828..da23cb14e54fdb56cfdc5f1eacae7789654a4eaf 100644 --- a/execution/run_uc.py +++ b/execution/run_uc.py @@ -41,8 +41,8 @@ def initialize_kubernetes_api(): config.load_kube_config() # try using local config except config.config_exception.ConfigException as e: # load config from pod, if local config is not available - logging.debug('Failed loading local Kubernetes configuration,' - + ' try from cluster') + logging.debug( + 'Failed loading local Kubernetes configuration try from cluster') logging.debug(e) config.load_incluster_config() @@ -58,8 +58,7 @@ def create_topics(topics): # Calling exec and waiting for response print('Create topics') for (topic, partitions) in topics: - print('Create topic ' + topic + ' with #' + str(partitions) - + ' partitions') + print(f'Create topic {topic} with #{partitions} partitions') exec_command = [ '/bin/sh', '-c', @@ -86,7 +85,7 @@ def load_yaml(file_path): with f: return yaml.safe_load(f) except Exception as e: - logging.error('Error opening file %s' % file_path) + logging.error('Error opening file %s', file_path) logging.error(e) @@ -105,6 +104,15 @@ def load_yaml_files(): return wg, app_svc, app_svc_monitor, app_jmx, app_deploy +def replace_env_value(container, key, value): + """ + Special method to replace in a container with kubernetes env values + the value of a given parameter. + """ + next(filter(lambda x: x['name'] == key, container))[ + 'value'] = value + + def start_workload_generator(wg_yaml, dim_value, uc_id): """Starts the workload generator. :param wg_yaml: The yaml object for the workload generator. @@ -139,26 +147,28 @@ def start_workload_generator(wg_yaml, dim_value, uc_id): '-workload-generator:latest' # Set environment variables - next(filter(lambda x: x['name'] == 'NUM_SENSORS', wg_containter['env']))[ - 'value'] = str(num_sensors) - next(filter(lambda x: x['name'] == 'INSTANCES', wg_containter['env']))[ - 'value'] = str(wl_instances) + replace_env_value(wg_containter['env'], 'NUM_SENSORS', str(num_sensors)) + replace_env_value(wg_containter['env'], 'INSTANCES', str(wl_instances)) + if uc_id == '2': # Special configuration for uc2 - next(filter(lambda x: x['name'] == 'NUM_NESTED_GROUPS', wg_containter['env']))[ - 'value'] = str(num_nested_groups) + replace_env_value( + wg_containter['env'], 'NUM_NESTED_GROUPS', str(num_nested_groups)) + try: wg_ss = appsApi.create_namespaced_deployment( namespace=namespace, body=wg_yaml ) - print("Deployment '%s' created." % wg_ss.metadata.name) + print(f'Deployment {wg_ss.metadata.name} created.') return wg_ss except client.rest.ApiException as e: - print("Deployment creation error: %s" % e.reason) + print(f'Deployment creation error: {e.reason}') return wg_yaml -def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml, instances, uc_id, commit_interval_ms, memory_limit, cpu_limit): +def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml, + instances, uc_id, memory_limit, cpu_limit, + configurations): """Applies the service, service monitor, jmx config map and start the use case application. @@ -168,9 +178,9 @@ def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml, instanc :param deploy_yaml: The yaml object for the application. :param int instances: Number of instances for use case application. :param string uc_id: The id of the use case to execute. - :param int commit_interval_ms: The commit interval in ms. :param string memory_limit: The memory limit for the application. :param string cpu_limit: The CPU limit for the application. + :param dict configurations: A dictionary with ENV variables for configurations. :return: The Service, ServiceMonitor, JMX ConfigMap and Deployment. In case the resource already exist/error the yaml object is returned. @@ -183,10 +193,10 @@ def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml, instanc try: svc = coreApi.create_namespaced_service( namespace=namespace, body=svc_yaml) - print("Service '%s' created." % svc.metadata.name) + print(f'Service {svc.metadata.name} created.') except client.rest.ApiException as e: svc = svc_yaml - logging.error("Service creation error: %s" % e.reason) + logging.error("Service creation error: %s", e.reason) # Create custom object service monitor try: @@ -197,39 +207,54 @@ def start_application(svc_yaml, svc_monitor_yaml, jmx_yaml, deploy_yaml, instanc plural="servicemonitors", # CustomResourceDef of ServiceMonitor body=svc_monitor_yaml, ) - print("ServiceMonitor '%s' created." % svc_monitor['metadata']['name']) + print(f"ServiceMonitor '{svc_monitor['metadata']['name']}' created.") except client.rest.ApiException as e: svc_monitor = svc_monitor_yaml - logging.error("ServiceMonitor creation error: %s" % e.reason) + logging.error("ServiceMonitor creation error: %s", e.reason) # Apply jmx config map for aggregation service try: jmx_cm = coreApi.create_namespaced_config_map( namespace=namespace, body=jmx_yaml) - print("ConfigMap '%s' created." % jmx_cm.metadata.name) + print(f"ConfigMap '{jmx_cm.metadata.name}' created.") except client.rest.ApiException as e: jmx_cm = jmx_yaml - logging.error("ConfigMap creation error: %s" % e.reason) + logging.error("ConfigMap creation error: %s", e.reason) # Create deployment deploy_yaml['spec']['replicas'] = instances app_container = next(filter( - lambda x: x['name'] == 'uc-application', deploy_yaml['spec']['template']['spec']['containers'])) + lambda x: x['name'] == 'uc-application', + deploy_yaml['spec']['template']['spec']['containers'])) app_container['image'] = 'theodolite/theodolite-uc' + uc_id \ + '-kstreams-app:latest' - next(filter(lambda x: x['name'] == 'COMMIT_INTERVAL_MS', app_container['env']))[ - 'value'] = str(commit_interval_ms) + + # Set configurations environment parameters for SPE + for k, v in configurations.items(): + # check if environment variable is already definde in yaml + env = next(filter(lambda x: x['name'] == k, + app_container['env']), None) + if env is not None: + env['value'] = v # replace value + else: + # create new environment pair + conf = {'name': k, 'value': v} + app_container['env'].append(conf) + + # Set resources in Kubernetes app_container['resources']['limits']['memory'] = memory_limit app_container['resources']['limits']['cpu'] = cpu_limit + + # Deploy application try: app_deploy = appsApi.create_namespaced_deployment( namespace=namespace, body=deploy_yaml ) - print("Deployment '%s' created." % app_deploy.metadata.name) + print(f"Deployment '{app_deploy.metadata.name}' created.") except client.rest.ApiException as e: app_deploy = deploy_yaml - logging.error("Deployment creation error: %s" % e.reason) + logging.error("Deployment creation error: %s", e.reason) return svc, svc_monitor, jmx_cm, app_deploy @@ -243,7 +268,7 @@ def wait_execution(execution_minutes): for i in range(execution_minutes): time.sleep(60) - print(f"Executed: {i+1} minutes") + print(f'Executed: {i+1} minutes') print('Execution finished') return @@ -258,7 +283,8 @@ def run_evaluation(exp_id, uc_id, dim_value, instances, execution_minutes, prome :param int execution_minutes: How long the use case where executed. """ print('Run evaluation function') - lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances, execution_minutes, prometheus_base_url, result_path) + lag_analysis.main(exp_id, f'uc{uc_id}', dim_value, instances, + execution_minutes, prometheus_base_url, result_path) return @@ -310,7 +336,7 @@ def stop_applications(wg, app_svc, app_svc_monitor, app_jmx, app_deploy): name=app_svc_monitor['metadata']['name']) print('Resource deleted') except Exception as e: - print("Error deleting service monitor") + print('Error deleting service monitor') print('Delete jmx config map') delete_resource(app_jmx, coreApi.delete_namespaced_config_map) @@ -363,7 +389,7 @@ def delete_topics(topics): stderr=True, stdin=False, stdout=True, tty=False) if resp == '0': - print("Topics deleted") + print('Topics deleted') break return @@ -455,7 +481,7 @@ def reset_cluster(wg, app_svc, app_svc_monitor, app_jmx, app_deploy, topics): stop_lag_exporter() -def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limit, commit_interval_ms, execution_minutes, prometheus_base_url, reset, ns, result_path, reset_only=False): +def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limit, execution_minutes, prometheus_base_url, reset, ns, result_path, configurations, reset_only=False): """ Main method to execute one time the benchmark for a given use case. Start workload generator/application -> execute -> analyse -> stop all @@ -466,9 +492,9 @@ def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limi :param int partitions: Number of partitions the kafka topics should have. :param string cpu_limit: Max CPU utilazation for application. :param string memory_limit: Max memory utilazation for application. - :param int commit_interval_ms: Kafka Streams commit interval in milliseconds :param int execution_minutes: How long to execute the benchmark. :param boolean reset: Flag for reset of cluster before execution. + :param dict configurations: Key value pairs for setting env variables of UC. :param boolean reset_only: Flag to only reset the application. """ global namespace @@ -514,9 +540,9 @@ def main(exp_id, uc_id, dim_value, instances, partitions, cpu_limit, memory_limi app_deploy, instances, uc_id, - commit_interval_ms, memory_limit, - cpu_limit) + cpu_limit, + configurations) print('---------------------') wait_execution(execution_minutes) @@ -535,7 +561,7 @@ if __name__ == '__main__': logging.basicConfig(level=logging.INFO) args = load_variables() print('---------------------') - main(args.exp_id, args.uc, args.load, args.instances, - args.partitions, args.cpu_limit, args.memory_limit, - args.commit_ms, args.duration, args.prometheus, args.reset, - args.namespace, args.path, args.reset_only) + main(args.exp_id, args.uc, args.load, args.instances, args.partitions, + args.cpu_limit, args.memory_limit, args.duration, args.prometheus, + args.reset, args.namespace, args.path, args.configurations, + args.reset_only) diff --git a/execution/strategies/config.py b/execution/strategies/config.py index 3741bcd5a8f025b0efc8bfb6ab53fdf08381ce9f..c3cd1ff82c4926f5efcc741b027996dbc800916b 100644 --- a/execution/strategies/config.py +++ b/execution/strategies/config.py @@ -10,12 +10,12 @@ class ExperimentConfig: partitions: int cpu_limit: str memory_limit: str - kafka_streams_commit_interval_ms: int execution_minutes: int prometheus_base_url: str reset: bool namespace: str result_path: str + configurations: dict domain_restriction_strategy: object search_strategy: object subexperiment_executor: object diff --git a/execution/strategies/strategies/config.py b/execution/strategies/strategies/config.py index 3c6a15918ec8cf923b79e6f4f98564f983deac63..5c31f8c97a4085931cdfa1fa017d4e5909e21915 100644 --- a/execution/strategies/strategies/config.py +++ b/execution/strategies/strategies/config.py @@ -11,9 +11,9 @@ class SubexperimentConfig: partitions: int cpu_limit: str memory_limit: str - kafka_streams_commit_interval_ms: int execution_minutes: int prometheus_base_url: str reset: bool namespace: str result_path: str + configurations: dict diff --git a/execution/strategies/strategies/search/binary_search_strategy.py b/execution/strategies/strategies/search/binary_search_strategy.py index be7da54025c2f9fda1750d8197d3afd4055da790..8856ead0502279f8f8642da87cf56f794cb1b11c 100644 --- a/execution/strategies/strategies/search/binary_search_strategy.py +++ b/execution/strategies/strategies/search/binary_search_strategy.py @@ -5,7 +5,7 @@ from strategies.strategies.config import SubexperimentConfig def binary_search(config, dim_value, lower, upper, subexperiment_counter): if lower == upper: print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[lower]}") - subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path) + subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations) config.subexperiment_executor.execute(subexperiment_config) result = config.subexperiment_evaluator.execute(subexperiment_config) if result==1: # successful, the upper neighbor is assumed to also has been successful @@ -14,14 +14,14 @@ def binary_search(config, dim_value, lower, upper, subexperiment_counter): return (lower+1, subexperiment_counter) elif lower+1==upper: print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[lower]}") - subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path) + subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[lower], config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations) config.subexperiment_executor.execute(subexperiment_config) result = config.subexperiment_evaluator.execute(subexperiment_config) if result==1: # minimal instances found return (lower, subexperiment_counter) else: # not successful, check if lower+1 instances are sufficient print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[upper]}") - subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[upper], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path) + subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[upper], config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations) config.subexperiment_executor.execute(subexperiment_config) result = config.subexperiment_evaluator.execute(subexperiment_config) if result == 1: # minimal instances found @@ -32,7 +32,7 @@ def binary_search(config, dim_value, lower, upper, subexperiment_counter): # test mid mid=(upper+lower)//2 print(f"Run subexperiment {subexperiment_counter} with config {dim_value} {config.replicass[mid]}") - subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[mid], config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path) + subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, config.replicass[mid], config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations) config.subexperiment_executor.execute(subexperiment_config) result = config.subexperiment_evaluator.execute(subexperiment_config) if result == 1: # success -> search in (lower, mid-1) diff --git a/execution/strategies/strategies/search/check_all_strategy.py b/execution/strategies/strategies/search/check_all_strategy.py index 7d8ea605707131d19a023671a77b8f22647d6f51..8e9d6c3ca0924d724c4f55032ebc24a92bc3ad93 100644 --- a/execution/strategies/strategies/search/check_all_strategy.py +++ b/execution/strategies/strategies/search/check_all_strategy.py @@ -12,7 +12,7 @@ def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_c replicas=config.replicass[lower_replicas_bound_index] print(f"Run subexperiment {subexperiment_counter} of {subexperiments_total} with dimension value {dim_value} and {replicas} replicas.") - subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path) + subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations) config.subexperiment_executor.execute(subexperiment_config) diff --git a/execution/strategies/strategies/search/linear_search_strategy.py b/execution/strategies/strategies/search/linear_search_strategy.py index c4f57c0d9bd82467a5917bbf95fe330c7bd81a58..f2436658eec0bd4160259a09c272def40fbc130c 100644 --- a/execution/strategies/strategies/search/linear_search_strategy.py +++ b/execution/strategies/strategies/search/linear_search_strategy.py @@ -11,7 +11,7 @@ def execute(config, dim_value_index, lower_replicas_bound_index, subexperiment_c replicas=config.replicass[lower_replicas_bound_index] print(f"Run subexperiment {subexperiment_counter} from at most {subexperiments_total} with dimension value {dim_value} and {replicas} replicas.") - subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.kafka_streams_commit_interval_ms, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path) + subexperiment_config = SubexperimentConfig(config.use_case, config.exp_id, subexperiment_counter, dim_value, replicas, config.partitions, config.cpu_limit, config.memory_limit, config.execution_minutes, config.prometheus_base_url, config.reset, config.namespace, config.result_path, config.configurations) config.subexperiment_executor.execute(subexperiment_config) result = config.subexperiment_evaluator.execute(subexperiment_config) diff --git a/execution/strategies/subexperiment_execution/subexperiment_executor.py b/execution/strategies/subexperiment_execution/subexperiment_executor.py index 3f7af08b7a52d70609f000a34a47c088574ddfd6..6931dacfc72081cbe112c4d6d1003703ba42c526 100644 --- a/execution/strategies/subexperiment_execution/subexperiment_executor.py +++ b/execution/strategies/subexperiment_execution/subexperiment_executor.py @@ -12,9 +12,9 @@ def execute(subexperiment_config): partitions=subexperiment_config.partitions, cpu_limit=subexperiment_config.cpu_limit, memory_limit=subexperiment_config.memory_limit, - commit_interval_ms=subexperiment_config.kafka_streams_commit_interval_ms, execution_minutes=int(subexperiment_config.execution_minutes), prometheus_base_url=subexperiment_config.prometheus_base_url, reset=subexperiment_config.reset, ns=subexperiment_config.namespace, - result_path=subexperiment_config.result_path) + result_path=subexperiment_config.result_path, + configurations=subexperiment_config.configurations) diff --git a/execution/theodolite.py b/execution/theodolite.py index 22be2f69ab81d81b7aac7717041604cd368e771f..ef218d99ce0c0ac695c5a5fa3df3ebe4182b671f 100755 --- a/execution/theodolite.py +++ b/execution/theodolite.py @@ -30,8 +30,8 @@ def load_variables(): def main(uc, loads, instances_list, partitions, cpu_limit, memory_limit, - commit_ms, duration, domain_restriction, search_strategy, - prometheus_base_url, reset, namespace, result_path): + duration, domain_restriction, search_strategy, prometheus_base_url, + reset, namespace, result_path, configurations): print(f"Domain restriction of search space activated: {domain_restriction}") print(f"Chosen search strategy: {search_strategy}") @@ -49,16 +49,16 @@ def main(uc, loads, instances_list, partitions, cpu_limit, memory_limit, # Store metadata separator = "," lines = [ - f"UC={uc}\n", - f"DIM_VALUES={separator.join(map(str, loads))}\n", - f"REPLICAS={separator.join(map(str, instances_list))}\n", - f"PARTITIONS={partitions}\n", - f"CPU_LIMIT={cpu_limit}\n", - f"MEMORY_LIMIT={memory_limit}\n", - f"KAFKA_STREAMS_COMMIT_INTERVAL_MS={commit_ms}\n", - f"EXECUTION_MINUTES={duration}\n", - f"DOMAIN_RESTRICTION={domain_restriction}\n", - f"SEARCH_STRATEGY={search_strategy}" + f'UC={uc}\n', + f'DIM_VALUES={separator.join(map(str, loads))}\n', + f'REPLICAS={separator.join(map(str, instances_list))}\n', + f'PARTITIONS={partitions}\n', + f'CPU_LIMIT={cpu_limit}\n', + f'MEMORY_LIMIT={memory_limit}\n', + f'EXECUTION_MINUTES={duration}\n', + f'DOMAIN_RESTRICTION={domain_restriction}\n', + f'SEARCH_STRATEGY={search_strategy}\n', + f'CONFIGURATIONS={configurations}' ] with open(f"{result_path}/exp{exp_id}_uc{uc}_meta.txt", "w") as stream: stream.writelines(lines) @@ -95,11 +95,11 @@ def main(uc, loads, instances_list, partitions, cpu_limit, memory_limit, partitions=partitions, cpu_limit=cpu_limit, memory_limit=memory_limit, - kafka_streams_commit_interval_ms=commit_ms, execution_minutes=duration, prometheus_base_url=prometheus_base_url, reset=reset, namespace=namespace, + configurations=configurations, result_path=result_path, domain_restriction_strategy=domain_restriction_strategy, search_strategy=search_strategy, @@ -114,6 +114,6 @@ if __name__ == '__main__': logging.basicConfig(level=logging.INFO) args = load_variables() main(args.uc, args.loads, args.instances_list, args.partitions, args.cpu_limit, - args.memory_limit, args.commit_ms, args.duration, + args.memory_limit, args.duration, args.domain_restriction, args.search_strategy, args.prometheus, - args.reset, args.namespace, args.path) + args.reset, args.namespace, args.path, args.configurations) diff --git a/execution/theodolite.yaml b/execution/theodolite.yaml index 1c1ba6a1f3d9119dddd4668c27e1b1a10291895e..292043a03cb42b085e43ddef33457c211ec29330 100644 --- a/execution/theodolite.yaml +++ b/execution/theodolite.yaml @@ -24,8 +24,6 @@ spec: value: "3" - name: PARTITIONS value: "30" - # - name: COMMIT_MS - # value: "" # - name: SEARCH_STRATEGY # value: "" # - name: CPU_LIMIT @@ -36,6 +34,8 @@ spec: value: "http://prometheus-operated:9090" # - name: NAMESPACE # value: "default" + # - name: CONFIGURATIONS + # value: "COMMIT_INTERVAL_MS=100, NUM_STREAM_THREADS=1" - name: RESULT_PATH value: "results" - name: PYTHONUNBUFFERED diff --git a/execution/uc-application/base/aggregation-deployment.yaml b/execution/uc-application/base/aggregation-deployment.yaml index 81da3eea7688f5d3b3145092d91cb8502e6ad87b..07732ca1dd1e6b2b06f098dfb10a53d38e8d5cae 100644 --- a/execution/uc-application/base/aggregation-deployment.yaml +++ b/execution/uc-application/base/aggregation-deployment.yaml @@ -20,14 +20,14 @@ spec: - containerPort: 5555 name: jmx env: - - name: COMMIT_INTERVAL_MS - value: "100" - name: KAFKA_BOOTSTRAP_SERVERS value: "my-confluent-cp-kafka:9092" - name: SCHEMA_REGISTRY_URL value: "http://my-confluent-cp-schema-registry:8081" - name: JAVA_OPTS value: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=5555" + - name: COMMIT_INTERVAL_MS # Set as default for the applications + value: "100" resources: limits: memory: 4Gi diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 29953ea141f55e3b8fc691d31b5ca8816d89fa87..457aad0d98108420a977756b7145c93c8910b076 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index e0b3fb8d70b1bbf790f6f8ed1c928ddf09f54628..4d9ca1649142b0c20144adce78e2472e2da01c30 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-4.10.2-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-6.7.1-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/gradlew b/gradlew index cccdd3d517fc5249beaefa600691cf150f2fa3e6..af6708ff229fda75da4f7cc4da4747217bac4d53 100755 --- a/gradlew +++ b/gradlew @@ -28,7 +28,7 @@ APP_NAME="Gradle" APP_BASE_NAME=`basename "$0"` # Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -DEFAULT_JVM_OPTS="" +DEFAULT_JVM_OPTS='"-Xmx64m"' # Use the maximum available, or set MAX_FD != -1 to use that value. MAX_FD="maximum" diff --git a/gradlew.bat b/gradlew.bat index e95643d6a2ca62258464e83c72f5156dc941c609..0f8d5937c4ad18feb44a19e55ad1e37cc159260f 100644 --- a/gradlew.bat +++ b/gradlew.bat @@ -14,7 +14,7 @@ set APP_BASE_NAME=%~n0 set APP_HOME=%DIRNAME% @rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -set DEFAULT_JVM_OPTS= +set DEFAULT_JVM_OPTS="-Xmx64m" @rem Find java.exe if defined JAVA_HOME goto findJavaFromJavaHome diff --git a/uc1-application/src/main/java/theodolite/uc1/application/HistoryService.java b/uc1-application/src/main/java/theodolite/uc1/application/HistoryService.java index a35cc37b36fb906e5c5495006126374d4de4656c..f0d8062a2442181507c0bef990b73e0e9cf4a372 100644 --- a/uc1-application/src/main/java/theodolite/uc1/application/HistoryService.java +++ b/uc1-application/src/main/java/theodolite/uc1/application/HistoryService.java @@ -3,7 +3,6 @@ package theodolite.uc1.application; import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; -import theodolite.commons.kafkastreams.ConfigurationKeys; import theodolite.uc1.streamprocessing.Uc1KafkaStreamsBuilder; import titan.ccp.common.configuration.ServiceConfigurations; @@ -31,18 +30,9 @@ public class HistoryService { */ private void createKafkaStreamsApplication() { - final Uc1KafkaStreamsBuilder uc1KafkaStreamsBuilder = new Uc1KafkaStreamsBuilder(); - uc1KafkaStreamsBuilder.inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)); - - final KafkaStreams kafkaStreams = uc1KafkaStreamsBuilder - .applicationName(this.config.getString(ConfigurationKeys.APPLICATION_NAME)) - .applicationVersion(this.config.getString(ConfigurationKeys.APPLICATION_VERSION)) - .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) - .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) - .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) - .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) - .schemaRegistry(this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL)) - .build(); + final Uc1KafkaStreamsBuilder uc1KafkaStreamsBuilder = new Uc1KafkaStreamsBuilder(this.config); + + final KafkaStreams kafkaStreams = uc1KafkaStreamsBuilder.build(); this.stopEvent.thenRun(kafkaStreams::close); diff --git a/uc1-application/src/main/java/theodolite/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java b/uc1-application/src/main/java/theodolite/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java index 7699ecb48369a2041777b901931c46072a10d99f..14335282863bff5a170716b228ea363e3d739685 100644 --- a/uc1-application/src/main/java/theodolite/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java +++ b/uc1-application/src/main/java/theodolite/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java @@ -1,6 +1,7 @@ package theodolite.uc1.streamprocessing; import java.util.Objects; +import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.Topology; import theodolite.commons.kafkastreams.KafkaStreamsBuilder; import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; @@ -9,11 +10,9 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; * Builder for the Kafka Streams configuration. */ public class Uc1KafkaStreamsBuilder extends KafkaStreamsBuilder { - private String inputTopic; // NOPMD - public KafkaStreamsBuilder inputTopic(final String inputTopic) { - this.inputTopic = inputTopic; - return this; + public Uc1KafkaStreamsBuilder(final Configuration config) { + super(config); } @Override diff --git a/uc1-application/src/main/resources/META-INF/application.properties b/uc1-application/src/main/resources/META-INF/application.properties index 3fb301516daa4c7e14875d3d9ca9df9c770eb69e..b46e6246e248cc524c5b6249348c76ded6ec468b 100644 --- a/uc1-application/src/main/resources/META-INF/application.properties +++ b/uc1-application/src/main/resources/META-INF/application.properties @@ -3,10 +3,6 @@ application.version=0.0.1 kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input -kafka.output.topic=output schema.registry.url=http://localhost:8091 -num.threads=1 -commit.interval.ms=100 -cache.max.bytes.buffering=-1 diff --git a/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java b/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java index c094adfcd7952e81115dae84ed9c0d371e380c98..2f828278f5a3033c3e479bf82f3c8c5d9d4c380c 100644 --- a/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java +++ b/uc2-application/src/main/java/theodolite/uc2/application/AggregationService.java @@ -36,26 +36,15 @@ public class AggregationService { * @param clusterSession the database session which the application should use. */ private void createKafkaStreamsApplication() { - // Use case specific stream configuration - final Uc2KafkaStreamsBuilder uc2KafkaStreamsBuilder = new Uc2KafkaStreamsBuilder(); + final Uc2KafkaStreamsBuilder uc2KafkaStreamsBuilder = new Uc2KafkaStreamsBuilder(this.config); uc2KafkaStreamsBuilder - .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) .feedbackTopic(this.config.getString(ConfigurationKeys.KAFKA_FEEDBACK_TOPIC)) .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) .configurationTopic(this.config.getString(ConfigurationKeys.KAFKA_CONFIGURATION_TOPIC)) .emitPeriod(Duration.ofMillis(this.config.getLong(ConfigurationKeys.EMIT_PERIOD_MS))) .gracePeriod(Duration.ofMillis(this.config.getLong(ConfigurationKeys.GRACE_PERIOD_MS))); - // Configuration of the stream application - final KafkaStreams kafkaStreams = uc2KafkaStreamsBuilder - .applicationName(this.config.getString(ConfigurationKeys.APPLICATION_NAME)) - .applicationVersion(this.config.getString(ConfigurationKeys.APPLICATION_VERSION)) - .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) - .schemaRegistry(this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL)) - .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) - .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) - .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) - .build(); + final KafkaStreams kafkaStreams = uc2KafkaStreamsBuilder.build(); this.stopEvent.thenRun(kafkaStreams::close); kafkaStreams.start(); diff --git a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java index 16addb8510eec2254d4787edbfbfbe186996fdea..1a606ee3df5e6ac2f43b650afe4a9aed036df9cd 100644 --- a/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java +++ b/uc2-application/src/main/java/theodolite/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java @@ -2,6 +2,7 @@ package theodolite.uc2.streamprocessing; import java.time.Duration; import java.util.Objects; +import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.Topology; import theodolite.commons.kafkastreams.KafkaStreamsBuilder; import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; @@ -14,16 +15,14 @@ public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD build private static final Duration EMIT_PERIOD_DEFAULT = Duration.ofSeconds(1); private static final Duration GRACE_PERIOD_DEFAULT = Duration.ZERO; - private String inputTopic; // NOPMD private String feedbackTopic; // NOPMD private String outputTopic; // NOPMD private String configurationTopic; // NOPMD private Duration emitPeriod; // NOPMD private Duration gracePeriod; // NOPMD - public Uc2KafkaStreamsBuilder inputTopic(final String inputTopic) { - this.inputTopic = inputTopic; - return this; + public Uc2KafkaStreamsBuilder(final Configuration config) { + super(config); } public Uc2KafkaStreamsBuilder feedbackTopic(final String feedbackTopic) { diff --git a/uc2-application/src/main/resources/META-INF/application.properties b/uc2-application/src/main/resources/META-INF/application.properties index 10c47960adb012ba5c572e3833a37d821189eb8e..8f1af5f590eff7f2b12706d61a7c89d9152f7949 100644 --- a/uc2-application/src/main/resources/META-INF/application.properties +++ b/uc2-application/src/main/resources/META-INF/application.properties @@ -10,8 +10,4 @@ kafka.output.topic=output schema.registry.url=http://localhost:8091 emit.period.ms=5000 -grace.period.ms=0 - -num.threads=1 -commit.interval.ms=100 -cache.max.bytes.buffering=-1 +grace.period.ms=0 \ No newline at end of file diff --git a/uc3-application/src/main/java/theodolite/uc3/application/ConfigurationKeys.java b/uc3-application/src/main/java/theodolite/uc3/application/ConfigurationKeys.java deleted file mode 100644 index ab6f08c017bb78a72c4896d766b38f7b8485c7fb..0000000000000000000000000000000000000000 --- a/uc3-application/src/main/java/theodolite/uc3/application/ConfigurationKeys.java +++ /dev/null @@ -1,29 +0,0 @@ -package theodolite.uc3.application; - -/** - * Keys to access configuration parameters. - */ -public final class ConfigurationKeys { - - public static final String APPLICATION_NAME = "application.name"; - - public static final String APPLICATION_VERSION = "application.version"; - - public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; - - public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; - - public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; - - public static final String NUM_THREADS = "num.threads"; - - public static final String COMMIT_INTERVAL_MS = "commit.interval.ms"; - - public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering"; - - public static final String KAFKA_WINDOW_DURATION_MINUTES = "kafka.window.duration.minutes"; - - private ConfigurationKeys() { - } - -} diff --git a/uc3-application/src/main/java/theodolite/uc3/application/HistoryService.java b/uc3-application/src/main/java/theodolite/uc3/application/HistoryService.java index b245b1645c9e5ee68df3f108802c9b91d70cf017..349512f988bb182d8851e458a1bce244c756bbfe 100644 --- a/uc3-application/src/main/java/theodolite/uc3/application/HistoryService.java +++ b/uc3-application/src/main/java/theodolite/uc3/application/HistoryService.java @@ -34,23 +34,13 @@ public class HistoryService { * */ private void createKafkaStreamsApplication() { - // Use case specific stream configuration - final Uc3KafkaStreamsBuilder uc3KafkaStreamsBuilder = new Uc3KafkaStreamsBuilder(); + final Uc3KafkaStreamsBuilder uc3KafkaStreamsBuilder = new Uc3KafkaStreamsBuilder(this.config); uc3KafkaStreamsBuilder - .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) .windowDuration(Duration.ofMinutes(this.windowDurationMinutes)); - // Configuration of the stream application - final KafkaStreams kafkaStreams = uc3KafkaStreamsBuilder - .applicationName(this.config.getString(ConfigurationKeys.APPLICATION_NAME)) - .applicationVersion(this.config.getString(ConfigurationKeys.APPLICATION_VERSION)) - .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) - .schemaRegistry(this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL)) - .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) - .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) - .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) - .build(); + final KafkaStreams kafkaStreams = uc3KafkaStreamsBuilder.build(); + this.stopEvent.thenRun(kafkaStreams::close); kafkaStreams.start(); } diff --git a/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java b/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java index e74adf7c87673cc0e6ea4004dbcb1c0a6fc907ac..9ab4ea0a96c663af09008bd5358066ca3f8520ac 100644 --- a/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java +++ b/uc3-application/src/main/java/theodolite/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java @@ -2,6 +2,7 @@ package theodolite.uc3.streamprocessing; import java.time.Duration; import java.util.Objects; +import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.Topology; import theodolite.commons.kafkastreams.KafkaStreamsBuilder; import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; @@ -11,13 +12,11 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; */ public class Uc3KafkaStreamsBuilder extends KafkaStreamsBuilder { - private String inputTopic; // NOPMD private String outputTopic; // NOPMD private Duration windowDuration; // NOPMD - public Uc3KafkaStreamsBuilder inputTopic(final String inputTopic) { - this.inputTopic = inputTopic; - return this; + public Uc3KafkaStreamsBuilder(final Configuration config) { + super(config); } public Uc3KafkaStreamsBuilder outputTopic(final String outputTopic) { diff --git a/uc3-application/src/main/resources/META-INF/application.properties b/uc3-application/src/main/resources/META-INF/application.properties index 2ceaf37224b0bff54b09beaabe29210216e11671..011406f7ef1e23647eeae150d349f472214cbcd4 100644 --- a/uc3-application/src/main/resources/META-INF/application.properties +++ b/uc3-application/src/main/resources/META-INF/application.properties @@ -7,7 +7,3 @@ kafka.output.topic=output kafka.window.duration.minutes=1 schema.registry.url=http://localhost:8091 - -num.threads=1 -commit.interval.ms=100 -cache.max.bytes.buffering=-1 diff --git a/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java b/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java index 23af805733de2bb3f6384fa924a2322490ee58d9..12f35e8dcc532b19e470722094ba5aff07420ad2 100644 --- a/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java +++ b/uc4-application/src/main/java/theodolite/uc4/application/HistoryService.java @@ -32,9 +32,8 @@ public class HistoryService { */ private void createKafkaStreamsApplication() { // Use case specific stream configuration - final Uc4KafkaStreamsBuilder uc4KafkaStreamsBuilder = new Uc4KafkaStreamsBuilder(); + final Uc4KafkaStreamsBuilder uc4KafkaStreamsBuilder = new Uc4KafkaStreamsBuilder(this.config); uc4KafkaStreamsBuilder - .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) .aggregtionDuration( Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS))) @@ -42,15 +41,7 @@ public class HistoryService { Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS))); // Configuration of the stream application - final KafkaStreams kafkaStreams = uc4KafkaStreamsBuilder - .applicationName(this.config.getString(ConfigurationKeys.APPLICATION_NAME)) - .applicationVersion(this.config.getString(ConfigurationKeys.APPLICATION_VERSION)) - .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) - .schemaRegistry(this.config.getString(ConfigurationKeys.SCHEMA_REGISTRY_URL)) - .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) - .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) - .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) - .build(); + final KafkaStreams kafkaStreams = uc4KafkaStreamsBuilder.build(); this.stopEvent.thenRun(kafkaStreams::close); kafkaStreams.start(); diff --git a/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java index 7c9e2c4f790cf1fbb7dd34db573576d1e64077db..bbbb043119857612b1a8b0c60e3a5466cd68447e 100644 --- a/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java +++ b/uc4-application/src/main/java/theodolite/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java @@ -2,6 +2,7 @@ package theodolite.uc4.streamprocessing; import java.time.Duration; import java.util.Objects; +import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.Topology; import theodolite.commons.kafkastreams.KafkaStreamsBuilder; import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; @@ -11,14 +12,12 @@ import titan.ccp.common.kafka.avro.SchemaRegistryAvroSerdeFactory; */ public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder { - private String inputTopic; // NOPMD private String outputTopic; // NOPMD private Duration aggregtionDuration; // NOPMD private Duration aggregationAdvance; // NOPMD - public Uc4KafkaStreamsBuilder inputTopic(final String inputTopic) { - this.inputTopic = inputTopic; - return this; + public Uc4KafkaStreamsBuilder(final Configuration config) { + super(config); } public Uc4KafkaStreamsBuilder outputTopic(final String outputTopic) { diff --git a/uc4-application/src/main/resources/META-INF/application.properties b/uc4-application/src/main/resources/META-INF/application.properties index e577c880a8ff8169699acb8598e323b8671e8d5e..b46681533e63bf86a51439778a46940da348559d 100644 --- a/uc4-application/src/main/resources/META-INF/application.properties +++ b/uc4-application/src/main/resources/META-INF/application.properties @@ -8,7 +8,3 @@ aggregation.duration.days=30 aggregation.advance.days=1 schema.registry.url=http://localhost:8091 - -num.threads=1 -commit.interval.ms=100 -cache.max.bytes.buffering=-1