diff --git a/application-kafkastreams-commons/.settings/org.eclipse.jdt.ui.prefs b/application-kafkastreams-commons/.settings/org.eclipse.jdt.ui.prefs new file mode 100644 index 0000000000000000000000000000000000000000..98b5ca8064a352aacfe2aebd13fbd0a87735fc3e --- /dev/null +++ b/application-kafkastreams-commons/.settings/org.eclipse.jdt.ui.prefs @@ -0,0 +1,127 @@ +cleanup.add_default_serial_version_id=true +cleanup.add_generated_serial_version_id=false +cleanup.add_missing_annotations=true +cleanup.add_missing_deprecated_annotations=true +cleanup.add_missing_methods=false +cleanup.add_missing_nls_tags=false +cleanup.add_missing_override_annotations=true +cleanup.add_missing_override_annotations_interface_methods=true +cleanup.add_serial_version_id=false +cleanup.always_use_blocks=true +cleanup.always_use_parentheses_in_expressions=false +cleanup.always_use_this_for_non_static_field_access=true +cleanup.always_use_this_for_non_static_method_access=true +cleanup.convert_functional_interfaces=false +cleanup.convert_to_enhanced_for_loop=true +cleanup.correct_indentation=true +cleanup.format_source_code=true +cleanup.format_source_code_changes_only=false +cleanup.insert_inferred_type_arguments=false +cleanup.make_local_variable_final=true +cleanup.make_parameters_final=true +cleanup.make_private_fields_final=true +cleanup.make_type_abstract_if_missing_method=false +cleanup.make_variable_declarations_final=true +cleanup.never_use_blocks=false +cleanup.never_use_parentheses_in_expressions=true +cleanup.organize_imports=true +cleanup.qualify_static_field_accesses_with_declaring_class=false +cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true +cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true +cleanup.qualify_static_member_accesses_with_declaring_class=true +cleanup.qualify_static_method_accesses_with_declaring_class=false +cleanup.remove_private_constructors=true +cleanup.remove_redundant_modifiers=false +cleanup.remove_redundant_semicolons=false +cleanup.remove_redundant_type_arguments=true +cleanup.remove_trailing_whitespaces=true +cleanup.remove_trailing_whitespaces_all=true +cleanup.remove_trailing_whitespaces_ignore_empty=false +cleanup.remove_unnecessary_casts=true +cleanup.remove_unnecessary_nls_tags=true +cleanup.remove_unused_imports=true +cleanup.remove_unused_local_variables=false +cleanup.remove_unused_private_fields=true +cleanup.remove_unused_private_members=false +cleanup.remove_unused_private_methods=true +cleanup.remove_unused_private_types=true +cleanup.sort_members=false +cleanup.sort_members_all=false +cleanup.use_anonymous_class_creation=false +cleanup.use_blocks=true +cleanup.use_blocks_only_for_return_and_throw=false +cleanup.use_lambda=true +cleanup.use_parentheses_in_expressions=true +cleanup.use_this_for_non_static_field_access=true +cleanup.use_this_for_non_static_field_access_only_if_necessary=false +cleanup.use_this_for_non_static_method_access=true +cleanup.use_this_for_non_static_method_access_only_if_necessary=false +cleanup_profile=_CAU-SE-Style +cleanup_settings_version=2 +eclipse.preferences.version=1 +editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true +formatter_profile=_CAU-SE-Style +formatter_settings_version=15 +org.eclipse.jdt.ui.ignorelowercasenames=true +org.eclipse.jdt.ui.importorder=; +org.eclipse.jdt.ui.ondemandthreshold=99 +org.eclipse.jdt.ui.staticondemandthreshold=99 +sp_cleanup.add_default_serial_version_id=true +sp_cleanup.add_generated_serial_version_id=false +sp_cleanup.add_missing_annotations=true +sp_cleanup.add_missing_deprecated_annotations=true +sp_cleanup.add_missing_methods=false +sp_cleanup.add_missing_nls_tags=false +sp_cleanup.add_missing_override_annotations=true +sp_cleanup.add_missing_override_annotations_interface_methods=true +sp_cleanup.add_serial_version_id=false +sp_cleanup.always_use_blocks=true +sp_cleanup.always_use_parentheses_in_expressions=false +sp_cleanup.always_use_this_for_non_static_field_access=true +sp_cleanup.always_use_this_for_non_static_method_access=true +sp_cleanup.convert_functional_interfaces=false +sp_cleanup.convert_to_enhanced_for_loop=true +sp_cleanup.correct_indentation=true +sp_cleanup.format_source_code=true +sp_cleanup.format_source_code_changes_only=false +sp_cleanup.insert_inferred_type_arguments=false +sp_cleanup.make_local_variable_final=true +sp_cleanup.make_parameters_final=true +sp_cleanup.make_private_fields_final=true +sp_cleanup.make_type_abstract_if_missing_method=false +sp_cleanup.make_variable_declarations_final=true +sp_cleanup.never_use_blocks=false +sp_cleanup.never_use_parentheses_in_expressions=true +sp_cleanup.on_save_use_additional_actions=true +sp_cleanup.organize_imports=true +sp_cleanup.qualify_static_field_accesses_with_declaring_class=false +sp_cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true +sp_cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true +sp_cleanup.qualify_static_member_accesses_with_declaring_class=true +sp_cleanup.qualify_static_method_accesses_with_declaring_class=false +sp_cleanup.remove_private_constructors=true +sp_cleanup.remove_redundant_modifiers=false +sp_cleanup.remove_redundant_semicolons=false +sp_cleanup.remove_redundant_type_arguments=true +sp_cleanup.remove_trailing_whitespaces=true +sp_cleanup.remove_trailing_whitespaces_all=true +sp_cleanup.remove_trailing_whitespaces_ignore_empty=false +sp_cleanup.remove_unnecessary_casts=true +sp_cleanup.remove_unnecessary_nls_tags=true +sp_cleanup.remove_unused_imports=true +sp_cleanup.remove_unused_local_variables=false +sp_cleanup.remove_unused_private_fields=true +sp_cleanup.remove_unused_private_members=false +sp_cleanup.remove_unused_private_methods=true +sp_cleanup.remove_unused_private_types=true +sp_cleanup.sort_members=false +sp_cleanup.sort_members_all=false +sp_cleanup.use_anonymous_class_creation=false +sp_cleanup.use_blocks=true +sp_cleanup.use_blocks_only_for_return_and_throw=false +sp_cleanup.use_lambda=true +sp_cleanup.use_parentheses_in_expressions=true +sp_cleanup.use_this_for_non_static_field_access=true +sp_cleanup.use_this_for_non_static_field_access_only_if_necessary=false +sp_cleanup.use_this_for_non_static_method_access=true +sp_cleanup.use_this_for_non_static_method_access_only_if_necessary=false diff --git a/application-kafkastreams-commons/.settings/qa.eclipse.plugin.checkstyle.prefs b/application-kafkastreams-commons/.settings/qa.eclipse.plugin.checkstyle.prefs new file mode 100644 index 0000000000000000000000000000000000000000..87860c815222845c1d264d7d0ce498d3397f8280 --- /dev/null +++ b/application-kafkastreams-commons/.settings/qa.eclipse.plugin.checkstyle.prefs @@ -0,0 +1,4 @@ +configFilePath=../config/checkstyle.xml +customModulesJarPaths= +eclipse.preferences.version=1 +enabled=true diff --git a/application-kafkastreams-commons/.settings/qa.eclipse.plugin.pmd.prefs b/application-kafkastreams-commons/.settings/qa.eclipse.plugin.pmd.prefs new file mode 100644 index 0000000000000000000000000000000000000000..efbcb8c9e5d449194a48ca1ea42b7d807b573db9 --- /dev/null +++ b/application-kafkastreams-commons/.settings/qa.eclipse.plugin.pmd.prefs @@ -0,0 +1,4 @@ +customRulesJars= +eclipse.preferences.version=1 +enabled=true +ruleSetFilePath=../config/pmd.xml diff --git a/application-kafkastreams-commons/build.gradle b/application-kafkastreams-commons/build.gradle new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/uc1-application/src/main/java/spesb/uc1/streamprocessing/KafkaStreamsBuilder.java b/application-kafkastreams-commons/src/main/java/spesb/commons/kafkastreams/KafkaStreamsBuilder.java similarity index 51% rename from uc1-application/src/main/java/spesb/uc1/streamprocessing/KafkaStreamsBuilder.java rename to application-kafkastreams-commons/src/main/java/spesb/commons/kafkastreams/KafkaStreamsBuilder.java index 4b7f487c8e848f0b1d6d652b7d86a8c50c202af1..3101225d992476e3e9f29b141542609d887c6259 100644 --- a/uc1-application/src/main/java/spesb/uc1/streamprocessing/KafkaStreamsBuilder.java +++ b/application-kafkastreams-commons/src/main/java/spesb/commons/kafkastreams/KafkaStreamsBuilder.java @@ -1,32 +1,55 @@ -package spesb.uc1.streamprocessing; +package spesb.commons.kafkastreams; import java.util.Objects; import java.util.Properties; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; import titan.ccp.common.kafka.streams.PropertiesBuilder; /** * Builder for the Kafka Streams configuration. */ -public class KafkaStreamsBuilder { - - private static final String APPLICATION_NAME = "titan-ccp-history"; - private static final String APPLICATION_VERSION = "0.0.1"; - - // private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStreamsBuilder.class); +public abstract class KafkaStreamsBuilder { + // Kafkastreams application specific + private String applicationName; // NOPMD + private String applicationVersion; // NOPMD private String bootstrapServers; // NOPMD - private String inputTopic; // NOPMD private int numThreads = -1; // NOPMD private int commitIntervalMs = -1; // NOPMD private int cacheMaxBytesBuff = -1; // NOPMD - public KafkaStreamsBuilder inputTopic(final String inputTopic) { - this.inputTopic = inputTopic; + /** + * Sets the application name for the {@code KafkaStreams} application. It is used to create the + * application ID. + * + * @param applicationName Name of the application. + * @return + */ + public KafkaStreamsBuilder applicationName(final String applicationName) { + this.applicationName = applicationName; + return this; + } + + /** + * Sets the application version for the {@code KafkaStreams} application. It is used to create the + * application ID. + * + * @param applicationVersion Version of the application. + * @return + */ + public KafkaStreamsBuilder applicationVersion(final String applicationVersion) { + this.applicationVersion = applicationVersion; return this; } + /** + * Sets the bootstrap servers for the {@code KafkaStreams} application. + * + * @param bootstrapServers String for a bootstrap server. + * @return + */ public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) { this.bootstrapServers = bootstrapServers; return this; @@ -35,6 +58,9 @@ public class KafkaStreamsBuilder { /** * Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus * one for using the default. + * + * @param numThreads Number of threads. -1 for using the default. + * @return */ public KafkaStreamsBuilder numThreads(final int numThreads) { if (numThreads < -1 || numThreads == 0) { @@ -48,6 +74,10 @@ public class KafkaStreamsBuilder { * Sets the Kafka Streams property for the frequency with which to save the position (offsets in * source topics) of tasks (commit.interval.ms). Must be zero for processing all record, for * example, when processing bulks of records. Can be minus one for using the default. + * + * @param commitIntervalMs Frequency with which to save the position of tasks. In ms, -1 for using + * the default. + * @return */ public KafkaStreamsBuilder commitIntervalMs(final int commitIntervalMs) { if (commitIntervalMs < -1) { @@ -61,6 +91,10 @@ public class KafkaStreamsBuilder { * Sets the Kafka Streams property for maximum number of memory bytes to be used for record caches * across all threads (cache.max.bytes.buffering). Must be zero for processing all record, for * example, when processing bulks of records. Can be minus one for using the default. + * + * @param cacheMaxBytesBuffering Number of memory bytes to be used for record caches across all + * threads. -1 for using the default. + * @return */ public KafkaStreamsBuilder cacheMaxBytesBuffering(final int cacheMaxBytesBuffering) { if (cacheMaxBytesBuffering < -1) { @@ -71,22 +105,38 @@ public class KafkaStreamsBuilder { } /** - * Builds the {@link KafkaStreams} instance. + * Method to implement a {@link Topology} for a {@code KafkaStreams} application. + * + * @return A {@code Topology} for a {@code KafkaStreams} application. */ - public KafkaStreams build() { - Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); - // TODO log parameters - final TopologyBuilder topologyBuilder = new TopologyBuilder( - this.inputTopic); - final Properties properties = PropertiesBuilder + protected abstract Topology buildTopology(); + + /** + * Build the {@link Properties} for a {@code KafkaStreams} application. + * + * @return A {@code Properties} object. + */ + protected Properties buildProperties() { + return PropertiesBuilder .bootstrapServers(this.bootstrapServers) - .applicationId(APPLICATION_NAME + '-' + APPLICATION_VERSION) // TODO as parameter + .applicationId(this.applicationName + '-' + this.applicationVersion) .set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0) .set(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs, p -> p >= 0) .set(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuff, p -> p >= 0) - // .set(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG") .build(); - return new KafkaStreams(topologyBuilder.build(), properties); + } + + /** + * Builds the {@link KafkaStreams} instance. + */ + public KafkaStreams build() { + // Check for required attributes for building properties. + Objects.requireNonNull(this.bootstrapServers, "Bootstrap server has not been set."); + Objects.requireNonNull(this.applicationName, "Application name has not been set."); + Objects.requireNonNull(this.applicationVersion, "Application version has not been set."); + + // Create the Kafka streams instance. + return new KafkaStreams(this.buildTopology(), this.buildProperties()); } } diff --git a/build.gradle b/build.gradle index 6827860869614f2d0ff575cfb5e6229e6d4a3806..694a127ca58774bbe8c243e74996e412488adbf0 100644 --- a/build.gradle +++ b/build.gradle @@ -10,20 +10,27 @@ buildscript { } } -// Plugins for all projects +// Variables used to distinct different subprojects +def useCaseProjects = subprojects.findAll {it -> it.name.matches('uc(.)*')} +def commonProjects = subprojects.findAll {it -> it.name.matches('(.)*commons(.)*')} + + +// Plugins allprojects { apply plugin: 'eclipse' } -// Plugins for subprojects subprojects { - apply plugin: 'application' apply plugin: 'checkstyle' apply plugin: 'pmd' apply plugin: 'com.github.spotbugs' apply plugin: 'java-library' } +configure(useCaseProjects){ + apply plugin: 'application' +} + // Java version for all subprojects subprojects { java { @@ -47,8 +54,8 @@ allprojects { } } -// Dependencies -subprojects { +// Dependencies for all use cases +configure(useCaseProjects) { dependencies { // These dependencies is exported to consumers, that is to say found on their compile classpath. api('org.industrial-devops:titan-ccp-common:0.0.3-SNAPSHOT') { changing = true } @@ -60,6 +67,22 @@ subprojects { implementation 'com.google.guava:guava:24.1-jre' implementation 'org.jctools:jctools-core:2.1.1' implementation 'org.slf4j:slf4j-simple:1.6.1' + implementation project(':application-kafkastreams-commons') + + // Use JUnit test framework + testImplementation 'junit:junit:4.12' + } +} + +// Dependencies for all commons +configure(commonProjects) { + dependencies { + // These dependencies is exported to consumers, that is to say found on their compile classpath. + api 'org.apache.kafka:kafka-clients:2.4.0' + + // These dependencies are used internally, and not exposed to consumers on their own compile classpath. + implementation 'org.slf4j:slf4j-simple:1.6.1' + implementation('org.industrial-devops:titan-ccp-common:0.0.3-SNAPSHOT') { changing = true } // Use JUnit test framework testImplementation 'junit:junit:4.12' diff --git a/settings.gradle b/settings.gradle index e3c73f5ef74c328e4aef187a8becf829a8ae5316..51112256b1a124d07ad80caf7ac0ccaf697858d3 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,5 +1,7 @@ rootProject.name = 'scalability-benchmarking' +include 'application-kafkastreams-commons' + include 'uc1-workload-generator' include 'uc1-application' diff --git a/uc1-application/src/main/java/spesb/uc1/application/ConfigurationKeys.java b/uc1-application/src/main/java/spesb/uc1/application/ConfigurationKeys.java index 7a275cb33a4cd35d228d8ca33ebb7303b251271b..f74ce318713b75cbe5e6da5d523d5811042220f3 100644 --- a/uc1-application/src/main/java/spesb/uc1/application/ConfigurationKeys.java +++ b/uc1-application/src/main/java/spesb/uc1/application/ConfigurationKeys.java @@ -5,19 +5,21 @@ package spesb.uc1.application; */ public final class ConfigurationKeys { - public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; + public static final String APPLICATION_NAME = "application.name"; - public static final String KAFKA_OUTPUT_TOPIC = "kafka.output.topic"; + public static final String APPLICATION_VERSION = "application.version"; - public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; + public static final String NUM_THREADS = "num.threads"; - public static final String NUM_THREADS = "num.threads"; + public static final String COMMIT_INTERVAL_MS = "commit.interval.ms"; - public static final String COMMIT_INTERVAL_MS = "commit.interval.ms"; + public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering"; - public static final String CACHE_MAX_BYTES_BUFFERING = "cache.max.bytes.buffering"; + public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; - private ConfigurationKeys() { - } + public static final String KAFKA_INPUT_TOPIC = "kafka.input.topic"; + + + private ConfigurationKeys() {} } diff --git a/uc1-application/src/main/java/spesb/uc1/application/HistoryService.java b/uc1-application/src/main/java/spesb/uc1/application/HistoryService.java index 18a39da7229d961249be900eeeff679e267a1eef..70bd6d5fd29eae95bfd7cb895f6c9a7b4176f1c2 100644 --- a/uc1-application/src/main/java/spesb/uc1/application/HistoryService.java +++ b/uc1-application/src/main/java/spesb/uc1/application/HistoryService.java @@ -3,7 +3,7 @@ package spesb.uc1.application; import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; -import spesb.uc1.streamprocessing.KafkaStreamsBuilder; +import spesb.uc1.streamprocessing.Uc1KafkaStreamsBuilder; import titan.ccp.common.configuration.Configurations; /** @@ -30,14 +30,20 @@ public class HistoryService { */ private void createKafkaStreamsApplication() { - final KafkaStreams kafkaStreams = new KafkaStreamsBuilder() - .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) - .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) + final Uc1KafkaStreamsBuilder uc1KafkaStreamsBuilder = new Uc1KafkaStreamsBuilder(); + uc1KafkaStreamsBuilder.inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)); + + final KafkaStreams kafkaStreams = uc1KafkaStreamsBuilder + .applicationName(this.config.getString(ConfigurationKeys.APPLICATION_NAME)) + .applicationVersion(this.config.getString(ConfigurationKeys.APPLICATION_VERSION)) .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) + .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) .build(); + this.stopEvent.thenRun(kafkaStreams::close); + kafkaStreams.start(); } diff --git a/uc1-application/src/main/java/spesb/uc1/streamprocessing/TopologyBuilder.java b/uc1-application/src/main/java/spesb/uc1/streamprocessing/TopologyBuilder.java index 279b70d0b7311f2b45b986e54cdf5b6c81c28263..1705e3d81a95ff3ac85cef84e32218093296b065 100644 --- a/uc1-application/src/main/java/spesb/uc1/streamprocessing/TopologyBuilder.java +++ b/uc1-application/src/main/java/spesb/uc1/streamprocessing/TopologyBuilder.java @@ -18,8 +18,7 @@ public class TopologyBuilder { private static final Logger LOGGER = LoggerFactory.getLogger(TopologyBuilder.class); private final String inputTopic; - private final Gson gson; - + private final Gson gson = new Gson(); private final StreamsBuilder builder = new StreamsBuilder(); /** @@ -27,14 +26,12 @@ public class TopologyBuilder { */ public TopologyBuilder(final String inputTopic) { this.inputTopic = inputTopic; - this.gson = new Gson(); } /** * Build the {@link Topology} for the History microservice. */ public Topology build() { - this.builder .stream(this.inputTopic, Consumed.with( Serdes.String(), diff --git a/uc1-application/src/main/java/spesb/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java b/uc1-application/src/main/java/spesb/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java new file mode 100644 index 0000000000000000000000000000000000000000..7283b39e2b0918ddff9585835fa1e478303dd90b --- /dev/null +++ b/uc1-application/src/main/java/spesb/uc1/streamprocessing/Uc1KafkaStreamsBuilder.java @@ -0,0 +1,23 @@ +package spesb.uc1.streamprocessing; + +import java.util.Objects; +import org.apache.kafka.streams.Topology; +import spesb.commons.kafkastreams.KafkaStreamsBuilder; + +/** + * Builder for the Kafka Streams configuration. + */ +public class Uc1KafkaStreamsBuilder extends KafkaStreamsBuilder { + private String inputTopic; // NOPMD + + public KafkaStreamsBuilder inputTopic(final String inputTopic) { + this.inputTopic = inputTopic; + return this; + } + + @Override + protected Topology buildTopology() { + Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); + return new TopologyBuilder(this.inputTopic).build(); + } +} diff --git a/uc1-application/src/main/resources/META-INF/application.properties b/uc1-application/src/main/resources/META-INF/application.properties index ef279332f911108fa8ca42d840d4a147460e8e35..8f029be66f9decadc87c8e88f58698d1422d596d 100644 --- a/uc1-application/src/main/resources/META-INF/application.properties +++ b/uc1-application/src/main/resources/META-INF/application.properties @@ -1,6 +1,10 @@ +application.name="uc1-application" +application.version="0.0.1" + kafka.bootstrap.servers=localhost:9092 kafka.input.topic=input kafka.output.topic=output + num.threads=1 commit.interval.ms=100 cache.max.bytes.buffering=-1 diff --git a/uc2-application/src/main/java/spesb/uc2/application/AggregationService.java b/uc2-application/src/main/java/spesb/uc2/application/AggregationService.java index 79d8c94c75ede32d92485d4b3c49d716ae19ccf8..bc6fdc067d7d1b36efe489ed0cf0a4e1145231af 100644 --- a/uc2-application/src/main/java/spesb/uc2/application/AggregationService.java +++ b/uc2-application/src/main/java/spesb/uc2/application/AggregationService.java @@ -4,56 +4,63 @@ import java.time.Duration; import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; -import spesb.uc2.streamprocessing.KafkaStreamsBuilder; +import spesb.uc2.streamprocessing.Uc2KafkaStreamsBuilder; import titan.ccp.common.configuration.Configurations; /** - * A microservice that manages the history and, therefore, stores and aggregates - * incoming measurements. + * A microservice that manages the history and, therefore, stores and aggregates incoming + * measurements. * */ public class AggregationService { - private final Configuration config = Configurations.create(); - - private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); - - /** - * Start the service. - */ - public void run() { - this.createKafkaStreamsApplication(); - } - - public static void main(final String[] args) { - new AggregationService().run(); - } - - /** - * Build and start the underlying Kafka Streams Application of the service. - * - * @param clusterSession the database session which the application should use. - */ - private void createKafkaStreamsApplication() { - final KafkaStreams kafkaStreams = new KafkaStreamsBuilder() - .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) - .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) - .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) - .configurationTopic(this.config.getString(ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC)) - .windowSize(Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_SIZE_MS))) - .gracePeriod(Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_GRACE_MS))) - .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) - .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) - .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)).build(); - this.stopEvent.thenRun(kafkaStreams::close); - kafkaStreams.start(); - } - - /** - * Stop the service. - */ - public void stop() { - this.stopEvent.complete(null); - } + private final Configuration config = Configurations.create(); + + private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); + + /** + * Start the service. + */ + public void run() { + this.createKafkaStreamsApplication(); + } + + public static void main(final String[] args) { + new AggregationService().run(); + } + + /** + * Build and start the underlying Kafka Streams Application of the service. + * + * @param clusterSession the database session which the application should use. + */ + private void createKafkaStreamsApplication() { + // Use case specific stream configuration + final Uc2KafkaStreamsBuilder uc2KafkaStreamsBuilder = new Uc2KafkaStreamsBuilder(); + uc2KafkaStreamsBuilder + .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) + .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) + .configurationTopic(this.config.getString(ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC)) + .windowSize(Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_SIZE_MS))) + .gracePeriod(Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_GRACE_MS))); + + // Configuration of the stream application + final KafkaStreams kafkaStreams = uc2KafkaStreamsBuilder + .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) + .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) + .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) + .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) + .build(); + + this.stopEvent.thenRun(kafkaStreams::close); + kafkaStreams.start(); + } + + /** + * Stop the service. + */ + public void stop() { + this.stopEvent.complete(null); + } } diff --git a/uc2-application/src/main/java/spesb/uc2/streamprocessing/KafkaStreamsBuilder.java b/uc2-application/src/main/java/spesb/uc2/streamprocessing/KafkaStreamsBuilder.java deleted file mode 100644 index 9b43f5e66fb4336602c026df8941d5545f39bfb4..0000000000000000000000000000000000000000 --- a/uc2-application/src/main/java/spesb/uc2/streamprocessing/KafkaStreamsBuilder.java +++ /dev/null @@ -1,134 +0,0 @@ -package spesb.uc2.streamprocessing; - -import java.time.Duration; -import java.util.Objects; -import java.util.Properties; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.StreamsConfig; - -/** - * Builder for the Kafka Streams configuration. - */ -public class KafkaStreamsBuilder { // NOPMD builder method - - private static final String APPLICATION_NAME = "titan-ccp-aggregation"; - private static final String APPLICATION_VERSION = "0.0.1"; - - private static final Duration WINDOW_SIZE_DEFAULT = Duration.ofSeconds(1); - private static final Duration GRACE_PERIOD_DEFAULT = Duration.ZERO; - - // private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStreamsBuilder.class); - - private String bootstrapServers; // NOPMD - private String inputTopic; // NOPMD - private String outputTopic; // NOPMD - private String configurationTopic; // NOPMD - private Duration windowSize = null; // NOPMD - private Duration gracePeriod = null; // NOPMD - private int numThreads = -1; // NOPMD - private int commitIntervalMs = -1; // NOPMD - private int cacheMaxBytesBuffering = -1; // NOPMD - - public KafkaStreamsBuilder inputTopic(final String inputTopic) { - this.inputTopic = inputTopic; - return this; - } - - public KafkaStreamsBuilder outputTopic(final String outputTopic) { - this.outputTopic = outputTopic; - return this; - } - - public KafkaStreamsBuilder configurationTopic(final String configurationTopic) { - this.configurationTopic = configurationTopic; - return this; - } - - public KafkaStreamsBuilder windowSize(final Duration windowSize) { - this.windowSize = Objects.requireNonNull(windowSize); - return this; - } - - public KafkaStreamsBuilder gracePeriod(final Duration gracePeriod) { - this.gracePeriod = Objects.requireNonNull(gracePeriod); - return this; - } - - public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) { - this.bootstrapServers = bootstrapServers; - return this; - } - - /** - * Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus - * one for using the default. - */ - public KafkaStreamsBuilder numThreads(final int numThreads) { - if (numThreads < -1 || numThreads == 0) { - throw new IllegalArgumentException("Number of threads must be greater 0 or -1."); - } - this.numThreads = numThreads; - return this; - } - - /** - * Sets the Kafka Streams property for the frequency with which to save the position (offsets in - * source topics) of tasks (commit.interval.ms). Must be zero for processing all record, for - * example, when processing bulks of records. Can be minus one for using the default. - */ - public KafkaStreamsBuilder commitIntervalMs(final int commitIntervalMs) { - if (commitIntervalMs < -1) { - throw new IllegalArgumentException("Commit interval must be greater or equal -1."); - } - this.commitIntervalMs = commitIntervalMs; - return this; - } - - /** - * Sets the Kafka Streams property for maximum number of memory bytes to be used for record caches - * across all threads (cache.max.bytes.buffering). Must be zero for processing all record, for - * example, when processing bulks of records. Can be minus one for using the default. - */ - public KafkaStreamsBuilder cacheMaxBytesBuffering(final int cacheMaxBytesBuffering) { - if (cacheMaxBytesBuffering < -1) { - throw new IllegalArgumentException("Cache max bytes buffering must be greater or equal -1."); - } - this.cacheMaxBytesBuffering = cacheMaxBytesBuffering; - return this; - } - - /** - * Builds the {@link KafkaStreams} instance. - */ - public KafkaStreams build() { - Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); - Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); - Objects.requireNonNull(this.configurationTopic, "Configuration topic has not been set."); - // TODO log parameters - final TopologyBuilder topologyBuilder = new TopologyBuilder( - this.inputTopic, - this.outputTopic, - this.configurationTopic, - this.windowSize == null ? WINDOW_SIZE_DEFAULT : this.windowSize, - this.gracePeriod == null ? GRACE_PERIOD_DEFAULT : this.gracePeriod); - return new KafkaStreams(topologyBuilder.build(), this.buildProperties()); - } - - private Properties buildProperties() { - final Properties properties = new Properties(); - properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); - properties.put(StreamsConfig.APPLICATION_ID_CONFIG, - APPLICATION_NAME + '-' + APPLICATION_VERSION); // TODO as parameter - if (this.numThreads > 0) { - properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads); - } - if (this.commitIntervalMs >= 0) { - properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs); - } - if (this.cacheMaxBytesBuffering >= 0) { - properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuffering); - } - return properties; - } - -} diff --git a/uc2-application/src/main/java/spesb/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java b/uc2-application/src/main/java/spesb/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java new file mode 100644 index 0000000000000000000000000000000000000000..875a45ee92b0ec1b229678597c20f7cbb381b7c5 --- /dev/null +++ b/uc2-application/src/main/java/spesb/uc2/streamprocessing/Uc2KafkaStreamsBuilder.java @@ -0,0 +1,63 @@ +package spesb.uc2.streamprocessing; + +import java.time.Duration; +import java.util.Objects; +import org.apache.kafka.streams.Topology; +import spesb.commons.kafkastreams.KafkaStreamsBuilder; + +/** + * Builder for the Kafka Streams configuration. + */ +public class Uc2KafkaStreamsBuilder extends KafkaStreamsBuilder { // NOPMD builder method + + private static final Duration WINDOW_SIZE_DEFAULT = Duration.ofSeconds(1); + private static final Duration GRACE_PERIOD_DEFAULT = Duration.ZERO; + + private String inputTopic; // NOPMD + private String outputTopic; // NOPMD + private String configurationTopic; // NOPMD + private Duration windowSize; // NOPMD + private Duration gracePeriod; // NOPMD + + public Uc2KafkaStreamsBuilder inputTopic(final String inputTopic) { + this.inputTopic = inputTopic; + return this; + } + + public Uc2KafkaStreamsBuilder outputTopic(final String outputTopic) { + this.outputTopic = outputTopic; + return this; + } + + public Uc2KafkaStreamsBuilder configurationTopic(final String configurationTopic) { + this.configurationTopic = configurationTopic; + return this; + } + + public Uc2KafkaStreamsBuilder windowSize(final Duration windowSize) { + this.windowSize = Objects.requireNonNull(windowSize); + return this; + } + + public Uc2KafkaStreamsBuilder gracePeriod(final Duration gracePeriod) { + this.gracePeriod = Objects.requireNonNull(gracePeriod); + return this; + } + + @Override + protected Topology buildTopology() { + Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); + Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); + Objects.requireNonNull(this.configurationTopic, "Configuration topic has not been set."); + + final TopologyBuilder topologyBuilder = new TopologyBuilder( + this.inputTopic, + this.outputTopic, + this.configurationTopic, + this.windowSize == null ? WINDOW_SIZE_DEFAULT : this.windowSize, + this.gracePeriod == null ? GRACE_PERIOD_DEFAULT : this.gracePeriod); + + return topologyBuilder.build(); + } + +} diff --git a/uc3-application/src/main/java/spesb/uc3/application/HistoryService.java b/uc3-application/src/main/java/spesb/uc3/application/HistoryService.java index 2b6c40e51a09e179778209d0626da6f6718bc07a..90f5a828e0adb030d3ecc86a2bd34bba780672b3 100644 --- a/uc3-application/src/main/java/spesb/uc3/application/HistoryService.java +++ b/uc3-application/src/main/java/spesb/uc3/application/HistoryService.java @@ -5,7 +5,7 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; -import spesb.uc3.streamprocessing.KafkaStreamsBuilder; +import spesb.uc3.streamprocessing.Uc3KafkaStreamsBuilder; import titan.ccp.common.configuration.Configurations; /** @@ -33,11 +33,16 @@ public class HistoryService { * */ private void createKafkaStreamsApplication() { - final KafkaStreams kafkaStreams = new KafkaStreamsBuilder() - .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) + // Use case specific stream configuration + final Uc3KafkaStreamsBuilder uc3KafkaStreamsBuilder = new Uc3KafkaStreamsBuilder(); + uc3KafkaStreamsBuilder .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) - .windowDuration(Duration.ofMinutes(this.windowDurationMinutes)) + .windowDuration(Duration.ofMinutes(this.windowDurationMinutes)); + + // Configuration of the stream application + final KafkaStreams kafkaStreams = uc3KafkaStreamsBuilder + .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) diff --git a/uc3-application/src/main/java/spesb/uc3/streamprocessing/KafkaStreamsBuilder.java b/uc3-application/src/main/java/spesb/uc3/streamprocessing/KafkaStreamsBuilder.java deleted file mode 100644 index 28382bedd3b02ceb2c48925212087c28ed371aad..0000000000000000000000000000000000000000 --- a/uc3-application/src/main/java/spesb/uc3/streamprocessing/KafkaStreamsBuilder.java +++ /dev/null @@ -1,105 +0,0 @@ -package spesb.uc3.streamprocessing; - -import java.time.Duration; -import java.util.Objects; -import java.util.Properties; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.StreamsConfig; -import titan.ccp.common.kafka.streams.PropertiesBuilder; - -/** - * Builder for the Kafka Streams configuration. - */ -public class KafkaStreamsBuilder { - - private static final String APPLICATION_NAME = "titan-ccp-history"; - private static final String APPLICATION_VERSION = "0.0.1"; - - // private static final Logger LOGGER = - // LoggerFactory.getLogger(KafkaStreamsBuilder.class); - - private String bootstrapServers; // NOPMD - private String inputTopic; // NOPMD - private String outputTopic; // NOPMD - private Duration windowDuration; // NOPMD - private int numThreads = -1; // NOPMD - private int commitIntervalMs = -1; // NOPMD - private int cacheMaxBytesBuff = -1; // NOPMD - - public KafkaStreamsBuilder inputTopic(final String inputTopic) { - this.inputTopic = inputTopic; - return this; - } - - public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) { - this.bootstrapServers = bootstrapServers; - return this; - } - - public KafkaStreamsBuilder outputTopic(final String outputTopic) { - this.outputTopic = outputTopic; - return this; - } - - public KafkaStreamsBuilder windowDuration(final Duration windowDuration) { - this.windowDuration = windowDuration; - return this; - } - - /** - * Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus - * one for using the default. - */ - public KafkaStreamsBuilder numThreads(final int numThreads) { - if (numThreads < -1 || numThreads == 0) { - throw new IllegalArgumentException("Number of threads must be greater 0 or -1."); - } - this.numThreads = numThreads; - return this; - } - - /** - * Sets the Kafka Streams property for the frequency with which to save the position (offsets in - * source topics) of tasks (commit.interval.ms). Must be zero for processing all record, for - * example, when processing bulks of records. Can be minus one for using the default. - */ - public KafkaStreamsBuilder commitIntervalMs(final int commitIntervalMs) { - if (commitIntervalMs < -1) { - throw new IllegalArgumentException("Commit interval must be greater or equal -1."); - } - this.commitIntervalMs = commitIntervalMs; - return this; - } - - /** - * Sets the Kafka Streams property for maximum number of memory bytes to be used for record caches - * across all threads (cache.max.bytes.buffering). Must be zero for processing all record, for - * example, when processing bulks of records. Can be minus one for using the default. - */ - public KafkaStreamsBuilder cacheMaxBytesBuffering(final int cacheMaxBytesBuffering) { - if (cacheMaxBytesBuffering < -1) { - throw new IllegalArgumentException("Cache max bytes buffering must be greater or equal -1."); - } - this.cacheMaxBytesBuff = cacheMaxBytesBuffering; - return this; - } - - /** - * Builds the {@link KafkaStreams} instance. - */ - public KafkaStreams build() { - Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); - // TODO log parameters - final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic, - this.windowDuration); - final Properties properties = PropertiesBuilder.bootstrapServers(this.bootstrapServers) - .applicationId(APPLICATION_NAME + '-' + APPLICATION_VERSION) // TODO as parameter - .set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0) - .set(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs, p -> p >= 0) - .set(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuff, p -> p >= 0) - // .set(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG") - .build(); - return new KafkaStreams(topologyBuilder.build(), properties); - } - -} diff --git a/uc3-application/src/main/java/spesb/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java b/uc3-application/src/main/java/spesb/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java new file mode 100644 index 0000000000000000000000000000000000000000..b7f5d517c27ffe825161a50623bbcc0e5506c4d3 --- /dev/null +++ b/uc3-application/src/main/java/spesb/uc3/streamprocessing/Uc3KafkaStreamsBuilder.java @@ -0,0 +1,43 @@ +package spesb.uc3.streamprocessing; + +import java.time.Duration; +import java.util.Objects; +import org.apache.kafka.streams.Topology; +import spesb.commons.kafkastreams.KafkaStreamsBuilder; + +/** + * Builder for the Kafka Streams configuration. + */ +public class Uc3KafkaStreamsBuilder extends KafkaStreamsBuilder { + + private String inputTopic; // NOPMD + private String outputTopic; // NOPMD + private Duration windowDuration; // NOPMD + + public Uc3KafkaStreamsBuilder inputTopic(final String inputTopic) { + this.inputTopic = inputTopic; + return this; + } + + public Uc3KafkaStreamsBuilder outputTopic(final String outputTopic) { + this.outputTopic = outputTopic; + return this; + } + + public Uc3KafkaStreamsBuilder windowDuration(final Duration windowDuration) { + this.windowDuration = windowDuration; + return this; + } + + @Override + protected Topology buildTopology() { + Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); + Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); + Objects.requireNonNull(this.windowDuration, "Window duration has not been set."); + + final TopologyBuilder topologyBuilder = new TopologyBuilder(this.inputTopic, this.outputTopic, + this.windowDuration); + return topologyBuilder.build(); + } + +} diff --git a/uc4-application/src/main/java/spesb/uc4/application/HistoryService.java b/uc4-application/src/main/java/spesb/uc4/application/HistoryService.java index f86f0cb7e3bc6840db52ce7bdbbac054cdd05e13..56275d594a9fcd4c4ea5a8416da5ced2e9c52ff9 100644 --- a/uc4-application/src/main/java/spesb/uc4/application/HistoryService.java +++ b/uc4-application/src/main/java/spesb/uc4/application/HistoryService.java @@ -4,7 +4,7 @@ import java.time.Duration; import java.util.concurrent.CompletableFuture; import org.apache.commons.configuration2.Configuration; import org.apache.kafka.streams.KafkaStreams; -import spesb.uc4.streamprocessing.KafkaStreamsBuilder; +import spesb.uc4.streamprocessing.Uc4KafkaStreamsBuilder; import titan.ccp.common.configuration.Configurations; /** @@ -30,18 +30,24 @@ public class HistoryService { * */ private void createKafkaStreamsApplication() { - final KafkaStreams kafkaStreams = new KafkaStreamsBuilder() - .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) + // Use case specific stream configuration + final Uc4KafkaStreamsBuilder uc4KafkaStreamsBuilder = new Uc4KafkaStreamsBuilder(); + uc4KafkaStreamsBuilder .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) .aggregtionDuration( Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_DURATION_DAYS))) .aggregationAdvance( - Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS))) + Duration.ofDays(this.config.getInt(ConfigurationKeys.AGGREGATION_ADVANCE_DAYS))); + + // Configuration of the stream application + final KafkaStreams kafkaStreams = uc4KafkaStreamsBuilder + .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) .build(); + this.stopEvent.thenRun(kafkaStreams::close); kafkaStreams.start(); } diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekKey.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekKey.java deleted file mode 100644 index a3ae3461d055694669e4d874930d5ade9dd83658..0000000000000000000000000000000000000000 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekKey.java +++ /dev/null @@ -1,31 +0,0 @@ -package spesb.uc4.streamprocessing; - -import java.time.DayOfWeek; - -/** - * Composed key of a {@link DayOfWeek} and a sensor id. - */ -public class DayOfWeekKey { - - private final DayOfWeek dayOfWeek; - private final String sensorId; - - public DayOfWeekKey(final DayOfWeek dayOfWeek, final String sensorId) { - this.dayOfWeek = dayOfWeek; - this.sensorId = sensorId; - } - - public DayOfWeek getDayOfWeek() { - return this.dayOfWeek; - } - - public String getSensorId() { - return this.sensorId; - } - - @Override - public String toString() { - return this.sensorId + ";" + this.dayOfWeek.toString(); - } - -} diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekKeyFactory.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekKeyFactory.java deleted file mode 100644 index 222785ca8a2d8db72c81929a216fc53b43d06ec0..0000000000000000000000000000000000000000 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekKeyFactory.java +++ /dev/null @@ -1,22 +0,0 @@ -package spesb.uc4.streamprocessing; - -import java.time.DayOfWeek; -import java.time.LocalDateTime; - -/** - * {@link StatsKeyFactory} for {@link DayOfWeekKey}. - */ -public class DayOfWeekKeyFactory implements StatsKeyFactory<DayOfWeekKey> { - - @Override - public DayOfWeekKey createKey(final String sensorId, final LocalDateTime dateTime) { - final DayOfWeek dayOfWeek = dateTime.getDayOfWeek(); - return new DayOfWeekKey(dayOfWeek, sensorId); - } - - @Override - public String getSensorId(final DayOfWeekKey key) { - return key.getSensorId(); - } - -} diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekKeySerde.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekKeySerde.java deleted file mode 100644 index 9c246f912ffc67ff6fb8d211a99d478cb58c2898..0000000000000000000000000000000000000000 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekKeySerde.java +++ /dev/null @@ -1,33 +0,0 @@ -package spesb.uc4.streamprocessing; - -import java.time.DayOfWeek; -import org.apache.kafka.common.serialization.Serde; -import titan.ccp.common.kafka.simpleserdes.BufferSerde; -import titan.ccp.common.kafka.simpleserdes.ReadBuffer; -import titan.ccp.common.kafka.simpleserdes.SimpleSerdes; -import titan.ccp.common.kafka.simpleserdes.WriteBuffer; - -/** - * {@link BufferSerde} for a {@link DayOfWeekKey}. Use the {@link #create()} method to create a new - * Kafka {@link Serde}. - */ -public class DayOfWeekKeySerde implements BufferSerde<DayOfWeekKey> { - - @Override - public void serialize(final WriteBuffer buffer, final DayOfWeekKey data) { - buffer.putInt(data.getDayOfWeek().getValue()); - buffer.putString(data.getSensorId()); - } - - @Override - public DayOfWeekKey deserialize(final ReadBuffer buffer) { - final DayOfWeek dayOfWeek = DayOfWeek.of(buffer.getInt()); - final String sensorId = buffer.getString(); - return new DayOfWeekKey(dayOfWeek, sensorId); - } - - public static Serde<DayOfWeekKey> create() { - return SimpleSerdes.create(new DayOfWeekKeySerde()); - } - -} diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekRecordFactory.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekRecordFactory.java deleted file mode 100644 index bdfecdbc4857b4d7a630b4afa07de39618435544..0000000000000000000000000000000000000000 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/DayOfWeekRecordFactory.java +++ /dev/null @@ -1,28 +0,0 @@ -package spesb.uc4.streamprocessing; - -import com.google.common.math.Stats; -import org.apache.kafka.streams.kstream.Windowed; -import titan.ccp.model.records.DayOfWeekActivePowerRecord; - -/** - * {@link StatsRecordFactory} to create an {@link DayOfWeekActivePowerRecord}. - */ -public class DayOfWeekRecordFactory - implements StatsRecordFactory<DayOfWeekKey, DayOfWeekActivePowerRecord> { - - @Override - public DayOfWeekActivePowerRecord create(final Windowed<DayOfWeekKey> windowed, - final Stats stats) { - return new DayOfWeekActivePowerRecord( - windowed.key().getSensorId(), - windowed.key().getDayOfWeek().getValue(), - windowed.window().start(), - windowed.window().end(), - stats.count(), - stats.mean(), - stats.populationVariance(), - stats.min(), - stats.max()); - } - -} diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekKey.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekKey.java deleted file mode 100644 index 81d33f3042796ecb3c890e73a82e879ab2d0ac6e..0000000000000000000000000000000000000000 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekKey.java +++ /dev/null @@ -1,40 +0,0 @@ -package spesb.uc4.streamprocessing; - -import java.time.DayOfWeek; - -/** - * Composed key of a {@link DayOfWeek}, an hour of the day and a sensor id. - */ -public class HourOfWeekKey { - - private final DayOfWeek dayOfWeek; - private final int hourOfDay; - private final String sensorId; - - /** - * Create a new {@link HourOfDayKey} using its components. - */ - public HourOfWeekKey(final DayOfWeek dayOfWeek, final int hourOfDay, final String sensorId) { - this.dayOfWeek = dayOfWeek; - this.hourOfDay = hourOfDay; - this.sensorId = sensorId; - } - - public DayOfWeek getDayOfWeek() { - return this.dayOfWeek; - } - - public int getHourOfDay() { - return this.hourOfDay; - } - - public String getSensorId() { - return this.sensorId; - } - - @Override - public String toString() { - return this.sensorId + ";" + this.dayOfWeek.toString() + ";" + this.hourOfDay; - } - -} diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekKeyFactory.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekKeyFactory.java deleted file mode 100644 index 980549309ce94b2e4a4c6da0835b8adfe47bb61e..0000000000000000000000000000000000000000 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekKeyFactory.java +++ /dev/null @@ -1,23 +0,0 @@ -package spesb.uc4.streamprocessing; - -import java.time.DayOfWeek; -import java.time.LocalDateTime; - -/** - * {@link StatsKeyFactory} for {@link HourOfWeekKey}. - */ -public class HourOfWeekKeyFactory implements StatsKeyFactory<HourOfWeekKey> { - - @Override - public HourOfWeekKey createKey(final String sensorId, final LocalDateTime dateTime) { - final DayOfWeek dayOfWeek = dateTime.getDayOfWeek(); - final int hourOfDay = dateTime.getHour(); - return new HourOfWeekKey(dayOfWeek, hourOfDay, sensorId); - } - - @Override - public String getSensorId(final HourOfWeekKey key) { - return key.getSensorId(); - } - -} diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekKeySerde.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekKeySerde.java deleted file mode 100644 index 63a6a445bf46f521a220816896529a081a15bca0..0000000000000000000000000000000000000000 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekKeySerde.java +++ /dev/null @@ -1,35 +0,0 @@ -package spesb.uc4.streamprocessing; - -import java.time.DayOfWeek; -import org.apache.kafka.common.serialization.Serde; -import titan.ccp.common.kafka.simpleserdes.BufferSerde; -import titan.ccp.common.kafka.simpleserdes.ReadBuffer; -import titan.ccp.common.kafka.simpleserdes.SimpleSerdes; -import titan.ccp.common.kafka.simpleserdes.WriteBuffer; - -/** - * {@link BufferSerde} for a {@link HourOfWeekKey}. Use the {@link #create()} method to create a new - * Kafka {@link Serde}. - */ -public class HourOfWeekKeySerde implements BufferSerde<HourOfWeekKey> { - - @Override - public void serialize(final WriteBuffer buffer, final HourOfWeekKey data) { - buffer.putInt(data.getDayOfWeek().getValue()); - buffer.putInt(data.getHourOfDay()); - buffer.putString(data.getSensorId()); - } - - @Override - public HourOfWeekKey deserialize(final ReadBuffer buffer) { - final DayOfWeek dayOfWeek = DayOfWeek.of(buffer.getInt()); - final int hourOfDay = buffer.getInt(); - final String sensorId = buffer.getString(); - return new HourOfWeekKey(dayOfWeek, hourOfDay, sensorId); - } - - public static Serde<HourOfWeekKey> create() { - return SimpleSerdes.create(new HourOfWeekKeySerde()); - } - -} diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekRecordFactory.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekRecordFactory.java deleted file mode 100644 index 358e3d1a5acf8bfd9f4fca7c95b84bd5b13bea53..0000000000000000000000000000000000000000 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/HourOfWeekRecordFactory.java +++ /dev/null @@ -1,29 +0,0 @@ -package spesb.uc4.streamprocessing; - -import com.google.common.math.Stats; -import org.apache.kafka.streams.kstream.Windowed; -import titan.ccp.model.records.HourOfWeekActivePowerRecord; - -/** - * {@link StatsRecordFactory} to create an {@link HourOfWeekActivePowerRecord}. - */ -public class HourOfWeekRecordFactory - implements StatsRecordFactory<HourOfWeekKey, HourOfWeekActivePowerRecord> { - - @Override - public HourOfWeekActivePowerRecord create(final Windowed<HourOfWeekKey> windowed, - final Stats stats) { - return new HourOfWeekActivePowerRecord( - windowed.key().getSensorId(), - windowed.key().getDayOfWeek().getValue(), - windowed.key().getHourOfDay(), - windowed.window().start(), - windowed.window().end(), - stats.count(), - stats.mean(), - stats.populationVariance(), - stats.min(), - stats.max()); - } - -} diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/KafkaStreamsBuilder.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/KafkaStreamsBuilder.java deleted file mode 100644 index 9cbff4f61ec5975e3dcdfc5c4e4a9f900e6707ec..0000000000000000000000000000000000000000 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/KafkaStreamsBuilder.java +++ /dev/null @@ -1,117 +0,0 @@ -package spesb.uc4.streamprocessing; - -import java.time.Duration; -import java.util.Objects; -import java.util.Properties; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.StreamsConfig; -import titan.ccp.common.kafka.streams.PropertiesBuilder; - -/** - * Builder for the Kafka Streams configuration. - */ -public class KafkaStreamsBuilder { - - private static final String APPLICATION_NAME = "titan-ccp-history"; - private static final String APPLICATION_VERSION = "0.0.1"; - - // private static final Logger LOGGER = - // LoggerFactory.getLogger(KafkaStreamsBuilder.class); - - private String bootstrapServers; // NOPMD - private String inputTopic; // NOPMD - private String outputTopic; // NOPMD - private Duration aggregtionDuration; // NOPMD - private Duration aggregationAdvance; // NOPMD - private int numThreads = -1; // NOPMD - private int commitIntervalMs = -1; // NOPMD - private int cacheMaxBytesBuff = -1; // NOPMD - - public KafkaStreamsBuilder bootstrapServers(final String bootstrapServers) { - this.bootstrapServers = bootstrapServers; - return this; - } - - public KafkaStreamsBuilder inputTopic(final String inputTopic) { - this.inputTopic = inputTopic; - return this; - } - - public KafkaStreamsBuilder outputTopic(final String outputTopic) { - this.outputTopic = outputTopic; - return this; - } - - public KafkaStreamsBuilder aggregtionDuration(final Duration aggregtionDuration) { - this.aggregtionDuration = aggregtionDuration; - return this; - } - - public KafkaStreamsBuilder aggregationAdvance(final Duration aggregationAdvance) { - this.aggregationAdvance = aggregationAdvance; - return this; - } - - /** - * Sets the Kafka Streams property for the number of threads (num.stream.threads). Can be minus - * one for using the default. - */ - public KafkaStreamsBuilder numThreads(final int numThreads) { - if (numThreads < -1 || numThreads == 0) { - throw new IllegalArgumentException("Number of threads must be greater 0 or -1."); - } - this.numThreads = numThreads; - return this; - } - - /** - * Sets the Kafka Streams property for the frequency with which to save the position (offsets in - * source topics) of tasks (commit.interval.ms). Must be zero for processing all record, for - * example, when processing bulks of records. Can be minus one for using the default. - */ - public KafkaStreamsBuilder commitIntervalMs(final int commitIntervalMs) { - if (commitIntervalMs < -1) { - throw new IllegalArgumentException("Commit interval must be greater or equal -1."); - } - this.commitIntervalMs = commitIntervalMs; - return this; - } - - /** - * Sets the Kafka Streams property for maximum number of memory bytes to be used for record caches - * across all threads (cache.max.bytes.buffering). Must be zero for processing all record, for - * example, when processing bulks of records. Can be minus one for using the default. - */ - public KafkaStreamsBuilder cacheMaxBytesBuffering(final int cacheMaxBytesBuffering) { - if (cacheMaxBytesBuffering < -1) { - throw new IllegalArgumentException("Cache max bytes buffering must be greater or equal -1."); - } - this.cacheMaxBytesBuff = cacheMaxBytesBuffering; - return this; - } - - /** - * Builds the {@link KafkaStreams} instance. - */ - public KafkaStreams build() { - Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); - Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); - Objects.requireNonNull(this.aggregtionDuration, "Aggregation duration has not been set."); - Objects.requireNonNull(this.aggregationAdvance, "Aggregation advance period has not been set."); - // TODO log parameters - final TopologyBuilder topologyBuilder = new TopologyBuilder( - this.inputTopic, - this.outputTopic, - this.aggregtionDuration, - this.aggregationAdvance); - final Properties properties = PropertiesBuilder.bootstrapServers(this.bootstrapServers) - .applicationId(APPLICATION_NAME + '-' + APPLICATION_VERSION) // TODO as parameter - .set(StreamsConfig.NUM_STREAM_THREADS_CONFIG, this.numThreads, p -> p > 0) - .set(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, this.commitIntervalMs, p -> p >= 0) - .set(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, this.cacheMaxBytesBuff, p -> p >= 0) - // .set(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG") - .build(); - return new KafkaStreams(topologyBuilder.build(), properties); - } - -} diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/TopologyBuilder.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/TopologyBuilder.java index 66bb460031f09f1cae77d5e93e3f130aa66f6e90..c2489a9ae4c10301a914be67db14d5556e459912 100644 --- a/uc4-application/src/main/java/spesb/uc4/streamprocessing/TopologyBuilder.java +++ b/uc4-application/src/main/java/spesb/uc4/streamprocessing/TopologyBuilder.java @@ -52,11 +52,8 @@ public class TopologyBuilder { * Build the {@link Topology} for the History microservice. */ public Topology build() { - final StatsKeyFactory<HourOfDayKey> keyFactory = new HourOfDayKeyFactory(); final Serde<HourOfDayKey> keySerde = HourOfDayKeySerde.create(); - // final StatsRecordFactory<HourOfDayKey, HourOfDayActivePowerRecord> statsRecordFactory = new - // HourOfDayRecordFactory(); this.builder .stream(this.inputTopic, diff --git a/uc4-application/src/main/java/spesb/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java b/uc4-application/src/main/java/spesb/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java new file mode 100644 index 0000000000000000000000000000000000000000..d248c02158befc77e42033fa6a30816cba97d816 --- /dev/null +++ b/uc4-application/src/main/java/spesb/uc4/streamprocessing/Uc4KafkaStreamsBuilder.java @@ -0,0 +1,54 @@ +package spesb.uc4.streamprocessing; + +import java.time.Duration; +import java.util.Objects; +import org.apache.kafka.streams.Topology; +import spesb.commons.kafkastreams.KafkaStreamsBuilder; + +/** + * Builder for the Kafka Streams configuration. + */ +public class Uc4KafkaStreamsBuilder extends KafkaStreamsBuilder { + + private String inputTopic; // NOPMD + private String outputTopic; // NOPMD + private Duration aggregtionDuration; // NOPMD + private Duration aggregationAdvance; // NOPMD + + public Uc4KafkaStreamsBuilder inputTopic(final String inputTopic) { + this.inputTopic = inputTopic; + return this; + } + + public Uc4KafkaStreamsBuilder outputTopic(final String outputTopic) { + this.outputTopic = outputTopic; + return this; + } + + public Uc4KafkaStreamsBuilder aggregtionDuration(final Duration aggregtionDuration) { + this.aggregtionDuration = aggregtionDuration; + return this; + } + + public Uc4KafkaStreamsBuilder aggregationAdvance(final Duration aggregationAdvance) { + this.aggregationAdvance = aggregationAdvance; + return this; + } + + @Override + protected Topology buildTopology() { + Objects.requireNonNull(this.inputTopic, "Input topic has not been set."); + Objects.requireNonNull(this.outputTopic, "Output topic has not been set."); + Objects.requireNonNull(this.aggregtionDuration, "Aggregation duration has not been set."); + Objects.requireNonNull(this.aggregationAdvance, "Aggregation advance period has not been set."); + + final TopologyBuilder topologyBuilder = new TopologyBuilder( + this.inputTopic, + this.outputTopic, + this.aggregtionDuration, + this.aggregationAdvance); + + return topologyBuilder.build(); + } + +}