diff --git a/uc2-application/src/main/java/titan/ccp/aggregation/AggregationService.java b/uc2-application/src/main/java/titan/ccp/aggregation/AggregationService.java deleted file mode 100644 index 0058e939bc5cc50dafe15aaf011bfcc9aa47925e..0000000000000000000000000000000000000000 --- a/uc2-application/src/main/java/titan/ccp/aggregation/AggregationService.java +++ /dev/null @@ -1,62 +0,0 @@ -package titan.ccp.aggregation; - -import java.time.Duration; -import java.util.concurrent.CompletableFuture; -import org.apache.commons.configuration2.Configuration; -import org.apache.kafka.streams.KafkaStreams; -import titan.ccp.aggregation.streamprocessing.KafkaStreamsBuilder; -import titan.ccp.common.configuration.Configurations; - -/** - * A microservice that manages the history and, therefore, stores and aggregates incoming - * measurements. - * - */ -public class AggregationService { - - private final Configuration config = Configurations.create(); - - private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); - - - /** - * Start the service. - */ - public void run() { - this.createKafkaStreamsApplication(); - } - - public static void main(final String[] args) { - new AggregationService().run(); - } - - - /** - * Build and start the underlying Kafka Streams Application of the service. - * - * @param clusterSession the database session which the application should use. - */ - private void createKafkaStreamsApplication() { - final KafkaStreams kafkaStreams = new KafkaStreamsBuilder() - .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) - .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) - .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) - .configurationTopic(this.config.getString(ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC)) - .windowSize(Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_SIZE_MS))) - .gracePeriod(Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_GRACE_MS))) - .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) - .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) - .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)) - .build(); - this.stopEvent.thenRun(kafkaStreams::close); - kafkaStreams.start(); - } - - /** - * Stop the service. - */ - public void stop() { - this.stopEvent.complete(null); - } - -} diff --git a/uc2-application/src/main/java/uc2/application/AggregationService.java b/uc2-application/src/main/java/uc2/application/AggregationService.java new file mode 100644 index 0000000000000000000000000000000000000000..696b13f4889a988282467aca3e4241938e636d7c --- /dev/null +++ b/uc2-application/src/main/java/uc2/application/AggregationService.java @@ -0,0 +1,59 @@ +package uc2.application; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import org.apache.commons.configuration2.Configuration; +import org.apache.kafka.streams.KafkaStreams; +import titan.ccp.common.configuration.Configurations; +import uc2.streamprocessing.KafkaStreamsBuilder; + +/** + * A microservice that manages the history and, therefore, stores and aggregates + * incoming measurements. + * + */ +public class AggregationService { + + private final Configuration config = Configurations.create(); + + private final CompletableFuture<Void> stopEvent = new CompletableFuture<>(); + + /** + * Start the service. + */ + public void run() { + this.createKafkaStreamsApplication(); + } + + public static void main(final String[] args) { + new AggregationService().run(); + } + + /** + * Build and start the underlying Kafka Streams Application of the service. + * + * @param clusterSession the database session which the application should use. + */ + private void createKafkaStreamsApplication() { + final KafkaStreams kafkaStreams = new KafkaStreamsBuilder() + .bootstrapServers(this.config.getString(ConfigurationKeys.KAFKA_BOOTSTRAP_SERVERS)) + .inputTopic(this.config.getString(ConfigurationKeys.KAFKA_INPUT_TOPIC)) + .outputTopic(this.config.getString(ConfigurationKeys.KAFKA_OUTPUT_TOPIC)) + .configurationTopic(this.config.getString(ConfigurationKeys.CONFIGURATION_KAFKA_TOPIC)) + .windowSize(Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_SIZE_MS))) + .gracePeriod(Duration.ofMillis(this.config.getLong(ConfigurationKeys.WINDOW_GRACE_MS))) + .numThreads(this.config.getInt(ConfigurationKeys.NUM_THREADS)) + .commitIntervalMs(this.config.getInt(ConfigurationKeys.COMMIT_INTERVAL_MS)) + .cacheMaxBytesBuffering(this.config.getInt(ConfigurationKeys.CACHE_MAX_BYTES_BUFFERING)).build(); + this.stopEvent.thenRun(kafkaStreams::close); + kafkaStreams.start(); + } + + /** + * Stop the service. + */ + public void stop() { + this.stopEvent.complete(null); + } + +} diff --git a/uc2-application/src/main/java/titan/ccp/aggregation/ConfigurationKeys.java b/uc2-application/src/main/java/uc2/application/ConfigurationKeys.java similarity index 96% rename from uc2-application/src/main/java/titan/ccp/aggregation/ConfigurationKeys.java rename to uc2-application/src/main/java/uc2/application/ConfigurationKeys.java index fd6286fea3949137304c280666c9edccbb2554d4..08d5e1eb26535b91462a2954e57037f20e3d62e9 100644 --- a/uc2-application/src/main/java/titan/ccp/aggregation/ConfigurationKeys.java +++ b/uc2-application/src/main/java/uc2/application/ConfigurationKeys.java @@ -1,4 +1,4 @@ -package titan.ccp.aggregation; +package uc2.application; /** * Keys to access configuration parameters. diff --git a/uc2-application/src/main/java/titan/ccp/aggregation/streamprocessing/ChildParentsTransformer.java b/uc2-application/src/main/java/uc2/streamprocessing/ChildParentsTransformer.java similarity index 98% rename from uc2-application/src/main/java/titan/ccp/aggregation/streamprocessing/ChildParentsTransformer.java rename to uc2-application/src/main/java/uc2/streamprocessing/ChildParentsTransformer.java index 8422a5c937ec3ac003a96776b4aaddfe3bbe1fff..4315aad5bc211d9342ee1703ead357d0786a2e0e 100644 --- a/uc2-application/src/main/java/titan/ccp/aggregation/streamprocessing/ChildParentsTransformer.java +++ b/uc2-application/src/main/java/uc2/streamprocessing/ChildParentsTransformer.java @@ -1,4 +1,4 @@ -package titan.ccp.aggregation.streamprocessing; +package uc2.streamprocessing; import java.util.Map; import java.util.Optional; diff --git a/uc2-application/src/main/java/titan/ccp/aggregation/streamprocessing/ChildParentsTransformerFactory.java b/uc2-application/src/main/java/uc2/streamprocessing/ChildParentsTransformerFactory.java similarity index 97% rename from uc2-application/src/main/java/titan/ccp/aggregation/streamprocessing/ChildParentsTransformerFactory.java rename to uc2-application/src/main/java/uc2/streamprocessing/ChildParentsTransformerFactory.java index 942858ec77cadbd981fdd538306ea3192e875a44..5029c02446b0b191edf0cc498165465d30516504 100644 --- a/uc2-application/src/main/java/titan/ccp/aggregation/streamprocessing/ChildParentsTransformerFactory.java +++ b/uc2-application/src/main/java/uc2/streamprocessing/ChildParentsTransformerFactory.java @@ -1,4 +1,4 @@ -package titan.ccp.aggregation.streamprocessing; +package uc2.streamprocessing; import java.util.Map; import java.util.Optional; diff --git a/uc2-application/src/main/java/titan/ccp/aggregation/streamprocessing/JointFlatTransformer.java b/uc2-application/src/main/java/uc2/streamprocessing/JointFlatTransformer.java similarity index 98% rename from uc2-application/src/main/java/titan/ccp/aggregation/streamprocessing/JointFlatTransformer.java rename to uc2-application/src/main/java/uc2/streamprocessing/JointFlatTransformer.java index aa39a56322a74248d54177d30305c8ceec4b79d8..87a1d9967295995ce5dc46e0f1a9f5f52ffae469 100644 --- a/uc2-application/src/main/java/titan/ccp/aggregation/streamprocessing/JointFlatTransformer.java +++ b/uc2-application/src/main/java/uc2/streamprocessing/JointFlatTransformer.java @@ -1,4 +1,4 @@ -package titan.ccp.aggregation.streamprocessing; +package uc2.streamprocessing; import com.google.common.base.MoreObjects; import java.util.ArrayList; diff --git a/uc2-application/src/main/java/titan/ccp/aggregation/streamprocessing/JointFlatTransformerFactory.java b/uc2-application/src/main/java/uc2/streamprocessing/JointFlatTransformerFactory.java similarity index 96% rename from uc2-application/src/main/java/titan/ccp/aggregation/streamprocessing/JointFlatTransformerFactory.java rename to uc2-application/src/main/java/uc2/streamprocessing/JointFlatTransformerFactory.java index adfca6cb67bfa26ccf1499b0f79b7e37ee1da5e4..5ddb07850e4c14418b9014c8a240c677cb548259 100644 --- a/uc2-application/src/main/java/titan/ccp/aggregation/streamprocessing/JointFlatTransformerFactory.java +++ b/uc2-application/src/main/java/uc2/streamprocessing/JointFlatTransformerFactory.java @@ -1,4 +1,4 @@ -package titan.ccp.aggregation.streamprocessing; +package uc2.streamprocessing; import java.util.Map; import java.util.Set; diff --git a/uc2-application/src/main/java/titan/ccp/aggregation/streamprocessing/JointRecordParents.java b/uc2-application/src/main/java/uc2/streamprocessing/JointRecordParents.java similarity index 92% rename from uc2-application/src/main/java/titan/ccp/aggregation/streamprocessing/JointRecordParents.java rename to uc2-application/src/main/java/uc2/streamprocessing/JointRecordParents.java index 6bbf0aad6fd5976791e157957640c1c51c3bd259..74fb5441f9a716af4ddd279b4b5fff0466697a23 100644 --- a/uc2-application/src/main/java/titan/ccp/aggregation/streamprocessing/JointRecordParents.java +++ b/uc2-application/src/main/java/uc2/streamprocessing/JointRecordParents.java @@ -1,4 +1,4 @@ -package titan.ccp.aggregation.streamprocessing; +package uc2.streamprocessing; import java.util.Set; import titan.ccp.models.records.ActivePowerRecord; diff --git a/uc2-application/src/main/java/titan/ccp/aggregation/streamprocessing/KafkaStreamsBuilder.java b/uc2-application/src/main/java/uc2/streamprocessing/KafkaStreamsBuilder.java similarity index 99% rename from uc2-application/src/main/java/titan/ccp/aggregation/streamprocessing/KafkaStreamsBuilder.java rename to uc2-application/src/main/java/uc2/streamprocessing/KafkaStreamsBuilder.java index 0ae0d399d232535da9c445ca0918869d2db1ad9e..eb0643d63f934e7966bca74a7ff7356b2aefb259 100644 --- a/uc2-application/src/main/java/titan/ccp/aggregation/streamprocessing/KafkaStreamsBuilder.java +++ b/uc2-application/src/main/java/uc2/streamprocessing/KafkaStreamsBuilder.java @@ -1,4 +1,4 @@ -package titan.ccp.aggregation.streamprocessing; +package uc2.streamprocessing; import java.time.Duration; import java.util.Objects; diff --git a/uc2-application/src/main/java/titan/ccp/aggregation/streamprocessing/OptionalParentsSerde.java b/uc2-application/src/main/java/uc2/streamprocessing/OptionalParentsSerde.java similarity index 96% rename from uc2-application/src/main/java/titan/ccp/aggregation/streamprocessing/OptionalParentsSerde.java rename to uc2-application/src/main/java/uc2/streamprocessing/OptionalParentsSerde.java index a310b3a68462d73e305fbda179fb5e49eddf5d85..e4624d9531fc476d707d1b712dddb553a69b3823 100644 --- a/uc2-application/src/main/java/titan/ccp/aggregation/streamprocessing/OptionalParentsSerde.java +++ b/uc2-application/src/main/java/uc2/streamprocessing/OptionalParentsSerde.java @@ -1,4 +1,4 @@ -package titan.ccp.aggregation.streamprocessing; +package uc2.streamprocessing; import java.util.HashSet; import java.util.Optional; diff --git a/uc2-application/src/main/java/titan/ccp/aggregation/streamprocessing/ParentsSerde.java b/uc2-application/src/main/java/uc2/streamprocessing/ParentsSerde.java similarity index 95% rename from uc2-application/src/main/java/titan/ccp/aggregation/streamprocessing/ParentsSerde.java rename to uc2-application/src/main/java/uc2/streamprocessing/ParentsSerde.java index 478b8bcf3ccf19b4ffc02ac1fb41e085c66c4db9..327f33a10b6450c6d16d155314bff76aa18913d9 100644 --- a/uc2-application/src/main/java/titan/ccp/aggregation/streamprocessing/ParentsSerde.java +++ b/uc2-application/src/main/java/uc2/streamprocessing/ParentsSerde.java @@ -1,4 +1,4 @@ -package titan.ccp.aggregation.streamprocessing; +package uc2.streamprocessing; import java.util.HashSet; import java.util.Set; diff --git a/uc2-application/src/main/java/titan/ccp/aggregation/streamprocessing/RecordAggregator.java b/uc2-application/src/main/java/uc2/streamprocessing/RecordAggregator.java similarity index 97% rename from uc2-application/src/main/java/titan/ccp/aggregation/streamprocessing/RecordAggregator.java rename to uc2-application/src/main/java/uc2/streamprocessing/RecordAggregator.java index ae7a167c90862eb85a9f75b59fe110c4628ee8bc..0b3e23462ccd61bdd71b485de62c28e89168374a 100644 --- a/uc2-application/src/main/java/titan/ccp/aggregation/streamprocessing/RecordAggregator.java +++ b/uc2-application/src/main/java/uc2/streamprocessing/RecordAggregator.java @@ -1,4 +1,4 @@ -package titan.ccp.aggregation.streamprocessing; +package uc2.streamprocessing; import org.apache.kafka.streams.kstream.Windowed; import titan.ccp.models.records.ActivePowerRecord; diff --git a/uc2-application/src/main/java/titan/ccp/aggregation/streamprocessing/SensorParentKey.java b/uc2-application/src/main/java/uc2/streamprocessing/SensorParentKey.java similarity index 93% rename from uc2-application/src/main/java/titan/ccp/aggregation/streamprocessing/SensorParentKey.java rename to uc2-application/src/main/java/uc2/streamprocessing/SensorParentKey.java index 32d77c402ece5b3bf289941a56c9c0f15b3b2576..4cb3bc9c6ec31a6ee086adffb4db188e348c040f 100644 --- a/uc2-application/src/main/java/titan/ccp/aggregation/streamprocessing/SensorParentKey.java +++ b/uc2-application/src/main/java/uc2/streamprocessing/SensorParentKey.java @@ -1,4 +1,4 @@ -package titan.ccp.aggregation.streamprocessing; +package uc2.streamprocessing; /** * A key consisting of the identifier of a sensor and an identifier of parent sensor. diff --git a/uc2-application/src/main/java/titan/ccp/aggregation/streamprocessing/SensorParentKeySerde.java b/uc2-application/src/main/java/uc2/streamprocessing/SensorParentKeySerde.java similarity index 95% rename from uc2-application/src/main/java/titan/ccp/aggregation/streamprocessing/SensorParentKeySerde.java rename to uc2-application/src/main/java/uc2/streamprocessing/SensorParentKeySerde.java index 2646c7d04439565d7d2f43719bbdb8a1b66633cc..1a2688c2bac2dc3e69d786c6ff395106f0a0f58c 100644 --- a/uc2-application/src/main/java/titan/ccp/aggregation/streamprocessing/SensorParentKeySerde.java +++ b/uc2-application/src/main/java/uc2/streamprocessing/SensorParentKeySerde.java @@ -1,4 +1,4 @@ -package titan.ccp.aggregation.streamprocessing; +package uc2.streamprocessing; import org.apache.kafka.common.serialization.Serde; import titan.ccp.common.kafka.simpleserdes.BufferSerde; diff --git a/uc2-application/src/main/java/titan/ccp/aggregation/streamprocessing/TopologyBuilder.java b/uc2-application/src/main/java/uc2/streamprocessing/TopologyBuilder.java similarity index 99% rename from uc2-application/src/main/java/titan/ccp/aggregation/streamprocessing/TopologyBuilder.java rename to uc2-application/src/main/java/uc2/streamprocessing/TopologyBuilder.java index 7a67c78372db2abb59af33d4b15109fe7e75c0c2..a6b377b0ead972c89c58d405279a571f545ae91b 100644 --- a/uc2-application/src/main/java/titan/ccp/aggregation/streamprocessing/TopologyBuilder.java +++ b/uc2-application/src/main/java/uc2/streamprocessing/TopologyBuilder.java @@ -1,4 +1,4 @@ -package titan.ccp.aggregation.streamprocessing; +package uc2.streamprocessing; import com.google.common.math.StatsAccumulator; import java.time.Duration; diff --git a/uc2-application/src/test/java/titan/ccp/aggregation/streamprocessing/OptionalParentsSerdeTest.java b/uc2-application/src/test/java/titan/ccp/aggregation/streamprocessing/OptionalParentsSerdeTest.java index 1bffdaa3ec264dbd871b08d5f47bee1ef40dae82..f92af2b5a908f8c4efb8ec02a00c62b9925cb41f 100644 --- a/uc2-application/src/test/java/titan/ccp/aggregation/streamprocessing/OptionalParentsSerdeTest.java +++ b/uc2-application/src/test/java/titan/ccp/aggregation/streamprocessing/OptionalParentsSerdeTest.java @@ -3,7 +3,7 @@ package titan.ccp.aggregation.streamprocessing; import java.util.Optional; import java.util.Set; import org.junit.Test; -import titan.ccp.aggregation.streamprocessing.OptionalParentsSerde; +import uc2.streamprocessing.OptionalParentsSerde; public class OptionalParentsSerdeTest { diff --git a/uc2-application/src/test/java/titan/ccp/aggregation/streamprocessing/ParentsSerdeTest.java b/uc2-application/src/test/java/titan/ccp/aggregation/streamprocessing/ParentsSerdeTest.java index 12301031c14002c22843e4178eacc2efafae9816..715a14f47ee1d8243070344ea40edba37ee595fd 100644 --- a/uc2-application/src/test/java/titan/ccp/aggregation/streamprocessing/ParentsSerdeTest.java +++ b/uc2-application/src/test/java/titan/ccp/aggregation/streamprocessing/ParentsSerdeTest.java @@ -2,7 +2,7 @@ package titan.ccp.aggregation.streamprocessing; import java.util.Set; import org.junit.Test; -import titan.ccp.aggregation.streamprocessing.ParentsSerde; +import uc2.streamprocessing.ParentsSerde; public class ParentsSerdeTest { diff --git a/uc2-application/src/test/java/titan/ccp/aggregation/streamprocessing/SensorParentKeySerdeTest.java b/uc2-application/src/test/java/titan/ccp/aggregation/streamprocessing/SensorParentKeySerdeTest.java index 51b2a052ebe080b505414fb5d514aebcaa3fba00..3090c9efb7e1fa846f5dc10fae0e917802853c39 100644 --- a/uc2-application/src/test/java/titan/ccp/aggregation/streamprocessing/SensorParentKeySerdeTest.java +++ b/uc2-application/src/test/java/titan/ccp/aggregation/streamprocessing/SensorParentKeySerdeTest.java @@ -1,8 +1,8 @@ package titan.ccp.aggregation.streamprocessing; import org.junit.Test; -import titan.ccp.aggregation.streamprocessing.SensorParentKey; -import titan.ccp.aggregation.streamprocessing.SensorParentKeySerde; +import uc2.streamprocessing.SensorParentKey; +import uc2.streamprocessing.SensorParentKeySerde; public class SensorParentKeySerdeTest { diff --git a/uc2-workload-generator/src/main/java/titan/ccp/kiekerbridge/KafkaRecordSender.java b/uc2-workload-generator/src/main/java/kafkaSender/KafkaRecordSender.java similarity index 100% rename from uc2-workload-generator/src/main/java/titan/ccp/kiekerbridge/KafkaRecordSender.java rename to uc2-workload-generator/src/main/java/kafkaSender/KafkaRecordSender.java diff --git a/uc2-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/ConfigPublisher.java b/uc2-workload-generator/src/main/java/uc2/workloadGenerator/ConfigPublisher.java similarity index 100% rename from uc2-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/ConfigPublisher.java rename to uc2-workload-generator/src/main/java/uc2/workloadGenerator/ConfigPublisher.java diff --git a/uc2-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/ExperimentorBigData.java b/uc2-workload-generator/src/main/java/uc2/workloadGenerator/ExperimentorBigData.java similarity index 100% rename from uc2-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/ExperimentorBigData.java rename to uc2-workload-generator/src/main/java/uc2/workloadGenerator/ExperimentorBigData.java diff --git a/uc2-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadCounter.java b/uc2-workload-generator/src/main/java/uc2/workloadGenerator/LoadCounter.java similarity index 100% rename from uc2-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadCounter.java rename to uc2-workload-generator/src/main/java/uc2/workloadGenerator/LoadCounter.java diff --git a/uc2-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadGenerator.java b/uc2-workload-generator/src/main/java/uc2/workloadGenerator/LoadGenerator.java similarity index 100% rename from uc2-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadGenerator.java rename to uc2-workload-generator/src/main/java/uc2/workloadGenerator/LoadGenerator.java diff --git a/uc2-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadGeneratorExtrem.java b/uc2-workload-generator/src/main/java/uc2/workloadGenerator/LoadGeneratorExtrem.java similarity index 100% rename from uc2-workload-generator/src/main/java/titan/ccp/kiekerbridge/expbigdata19/LoadGeneratorExtrem.java rename to uc2-workload-generator/src/main/java/uc2/workloadGenerator/LoadGeneratorExtrem.java