diff --git a/theodolite-benchmarks/definitions/uc1-hazelcastjet/uc1-hazelcastjet-deployment.yaml b/theodolite-benchmarks/definitions/uc1-hazelcastjet/resources/uc1-hazelcastjet-deployment.yaml similarity index 100% rename from theodolite-benchmarks/definitions/uc1-hazelcastjet/uc1-hazelcastjet-deployment.yaml rename to theodolite-benchmarks/definitions/uc1-hazelcastjet/resources/uc1-hazelcastjet-deployment.yaml diff --git a/theodolite-benchmarks/definitions/uc1-hazelcastjet/uc1-hazelcastjet-service.yaml b/theodolite-benchmarks/definitions/uc1-hazelcastjet/resources/uc1-hazelcastjet-service.yaml similarity index 100% rename from theodolite-benchmarks/definitions/uc1-hazelcastjet/uc1-hazelcastjet-service.yaml rename to theodolite-benchmarks/definitions/uc1-hazelcastjet/resources/uc1-hazelcastjet-service.yaml diff --git a/theodolite-benchmarks/definitions/uc1-hazelcastjet/uc1-benchmark-operator.yaml b/theodolite-benchmarks/definitions/uc1-hazelcastjet/uc1-benchmark-operator.yaml new file mode 100644 index 0000000000000000000000000000000000000000..56f1b7f323047b36ded4662c2aabe7d1cb667842 --- /dev/null +++ b/theodolite-benchmarks/definitions/uc1-hazelcastjet/uc1-benchmark-operator.yaml @@ -0,0 +1,36 @@ +apiVersion: theodolite.com/v1 +kind: benchmark +metadata: + name: uc1-hazelcastjet +spec: + appResource: + - "uc1-hazelcastjet/uc1-hazelcastjet-deployment.yaml" + - "uc1-hazelcastjet/uc1-hazelcastjet-service.yaml" + #- "uc1-hazelcastjet/uc1-jmx-configmap.yaml" + #- "uc1-hazelcastjet/uc1-service-monitor.yaml" + loadGenResource: + - "uc1-kstreams/uc1-load-generator-deployment.yaml" + - "uc1-kstreams/uc1-load-generator-service.yaml" + resourceTypes: + - typeName: "Instances" + patchers: + - type: "ReplicaPatcher" + resource: "uc1-hazelcsatjet/uc1-hazelcastjet-deployment.yaml" + loadTypes: + - typeName: "NumSensors" + patchers: + - type: "EnvVarPatcher" + resource: "uc1-kstreams/uc1-load-generator-deployment.yaml" + properties: + container: "workload-generator" + variableName: "NUM_SENSORS" + - type: NumSensorsLoadGeneratorReplicaPatcher + resource: "uc1-kstreams/uc1-load-generator-deployment.yaml" + properties: + loadGenMaxRecords: "150000" + kafkaConfig: + bootstrapServer: "theodolite-cp-kafka:9092" + topics: + - name: "input" + numPartitions: 40 + replicationFactor: 1 diff --git a/theodolite-benchmarks/definitions/uc1-hazelcastjet/uc1-load-generator-deployment.yaml b/theodolite-benchmarks/definitions/uc1-hazelcastjet/uc1-load-generator-deployment.yaml deleted file mode 100644 index 48532c7dac30000558fb797e661aaf1e9f849e76..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/definitions/uc1-hazelcastjet/uc1-load-generator-deployment.yaml +++ /dev/null @@ -1,32 +0,0 @@ -apiVersion: apps/v1 -kind: Deployment -metadata: - name: titan-ccp-load-generator -spec: - selector: - matchLabels: - app: titan-ccp-load-generator - replicas: 1 - template: - metadata: - labels: - app: titan-ccp-load-generator - spec: - terminationGracePeriodSeconds: 0 - containers: - - name: workload-generator - image: ghcr.io/cau-se/theodolite-uc1-workload-generator:latest - ports: - - containerPort: 5701 - name: coordination - env: - - name: KUBERNETES_NAMESPACE - valueFrom: - fieldRef: - fieldPath: metadata.namespace - - name: KUBERNETES_DNS_NAME - value: "titan-ccp-load-generator.$(KUBERNETES_NAMESPACE).svc.cluster.local" - - name: KAFKA_BOOTSTRAP_SERVERS - value: "my-confluent-cp-kafka:9092" - - name: SCHEMA_REGISTRY_URL - value: "http://my-confluent-cp-schema-registry:8081" \ No newline at end of file diff --git a/theodolite-benchmarks/definitions/uc1-hazelcastjet/uc1-load-generator-service.yaml b/theodolite-benchmarks/definitions/uc1-hazelcastjet/uc1-load-generator-service.yaml deleted file mode 100644 index a50fed409030bfd49909202ad08819307dbd193d..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/definitions/uc1-hazelcastjet/uc1-load-generator-service.yaml +++ /dev/null @@ -1,17 +0,0 @@ -apiVersion: v1 -kind: Service -metadata: - name: titan-ccp-load-generator - labels: - app: titan-ccp-load-generator -spec: - type: ClusterIP - clusterIP: None - selector: - app: titan-ccp-load-generator - ports: - - name: coordination - port: 5701 - targetPort: 5701 - protocol: TCP - \ No newline at end of file diff --git a/theodolite-benchmarks/definitions/uc2-hazelcastjet/uc2-hazelcastjet-deployment.yaml b/theodolite-benchmarks/definitions/uc2-hazelcastjet/resources/uc2-hazelcastjet-deployment.yaml similarity index 100% rename from theodolite-benchmarks/definitions/uc2-hazelcastjet/uc2-hazelcastjet-deployment.yaml rename to theodolite-benchmarks/definitions/uc2-hazelcastjet/resources/uc2-hazelcastjet-deployment.yaml diff --git a/theodolite-benchmarks/definitions/uc2-hazelcastjet/uc2-hazelcastjet-service.yaml b/theodolite-benchmarks/definitions/uc2-hazelcastjet/resources/uc2-hazelcastjet-service.yaml similarity index 100% rename from theodolite-benchmarks/definitions/uc2-hazelcastjet/uc2-hazelcastjet-service.yaml rename to theodolite-benchmarks/definitions/uc2-hazelcastjet/resources/uc2-hazelcastjet-service.yaml diff --git a/theodolite-benchmarks/definitions/uc2-hazelcastjet/uc2-benchmark-operator.yaml b/theodolite-benchmarks/definitions/uc2-hazelcastjet/uc2-benchmark-operator.yaml index 3678655f2c5b5f67b2d089bd9c28d967dfee5cf5..d53d0cc9737c3f451af400c2014762db06798556 100644 --- a/theodolite-benchmarks/definitions/uc2-hazelcastjet/uc2-benchmark-operator.yaml +++ b/theodolite-benchmarks/definitions/uc2-hazelcastjet/uc2-benchmark-operator.yaml @@ -1,37 +1,39 @@ -apiVersion: theodolite.com/v1alpha1 +apiVersion: theodolite.com/v1 kind: benchmark metadata: name: uc2-hazelcastjet -appResource: - - "uc2-hazelcastjet-deployment.yaml" - - "uc2-hazelcastjet-service.yaml" - #- "uc2-jmx-configmap.yaml" - #- "uc2-service-monitor.yaml" -loadGenResource: - - "uc2-load-generator-deployment.yaml" - - "uc2-load-generator-service.yaml" -resourceTypes: - - typeName: "Instances" - patchers: - - type: "ReplicaPatcher" - resource: "uc2-hazelcastjet-deployment.yaml" -loadTypes: - - typeName: "NumSensors" - patchers: - - type: "EnvVarPatcher" - resource: "uc2-load-generator-deployment.yaml" - container: "workload-generator" - variableName: "NUM_SENSORS" - - type: NumSensorsLoadGeneratorReplicaPatcher - resource: "uc2-load-generator-deployment.yaml" -kafkaConfig: - bootstrapServer: "theodolite-cp-kafka:9092" - topics: - - name: "input" - numPartitions: 40 - replicationFactor: 1 - - name: "output" - numPartitions: 40 - replicationFactor: 1 - - name: "theodolite-.*" - removeOnly: True +spec: + appResource: + - "uc2-hazelcastjet/uc2-hazelcastjet-deployment.yaml" + - "uc2-khazelcastjet/uc2-hazelcastjet-service.yaml" + #- "uc2-hazelcastjet/uc2-jmx-configmap.yaml" + #- "uc2-hazelcastjet/uc2-service-monitor.yaml" + loadGenResource: + - "uc2-kstreams/uc2-load-generator-deployment.yaml" + - "uc2-kstreams/uc2-load-generator-service.yaml" + resourceTypes: + - typeName: "Instances" + patchers: + - type: "ReplicaPatcher" + resource: "uc2-hazelcastjet/uc2-hazelcastjet-deployment.yaml" + loadTypes: + - typeName: "NumSensors" + patchers: + - type: "EnvVarPatcher" + resource: "uc2-kstreams/uc2-load-generator-deployment.yaml" + properties: + container: "workload-generator" + variableName: "NUM_SENSORS" + - type: NumSensorsLoadGeneratorReplicaPatcher + resource: "uc2-kstreams/uc2-load-generator-deployment.yaml" + properties: + loadGenMaxRecords: "150000" + kafkaConfig: + bootstrapServer: "theodolite-cp-kafka:9092" + topics: + - name: "input" + numPartitions: 40 + replicationFactor: 1 + - name: "output" + numPartitions: 40 + replicationFactor: 1 diff --git a/theodolite-benchmarks/definitions/uc2-hazelcastjet/uc2-benchmark-standalone.yaml b/theodolite-benchmarks/definitions/uc2-hazelcastjet/uc2-benchmark-standalone.yaml deleted file mode 100644 index d1947a369b2e9822e5aca91ccea55c96494df971..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/definitions/uc2-hazelcastjet/uc2-benchmark-standalone.yaml +++ /dev/null @@ -1,34 +0,0 @@ -name: "uc2-hazelcastjet" -appResource: - - "uc2-hazelcastjet-deployment.yaml" - - "uc2-hazelcastjet-service.yaml" - #- "uc2-jmx-configmap.yaml" - #- "uc2-service-monitor.yaml" -loadGenResource: - - "uc2-load-generator-deployment.yaml" - - "uc2-load-generator-service.yaml" -resourceTypes: - - typeName: "Instances" - patchers: - - type: "ReplicaPatcher" - resource: "uc2-hazelcastjet-deployment.yaml" -loadTypes: - - typeName: "NumSensors" - patchers: - - type: "EnvVarPatcher" - resource: "uc2-load-generator-deployment.yaml" - container: "workload-generator" - variableName: "NUM_SENSORS" - - type: NumSensorsLoadGeneratorReplicaPatcher - resource: "uc2-load-generator-deployment.yaml" -kafkaConfig: - bootstrapServer: "theodolite-cp-kafka:9092" - topics: - - name: "input" - numPartitions: 40 - replicationFactor: 1 - - name: "output" - numPartitions: 40 - replicationFactor: 1 - - name: "theodolite-.*" - removeOnly: True diff --git a/theodolite-benchmarks/definitions/uc2-hazelcastjet/uc2-load-generator-deployment.yaml b/theodolite-benchmarks/definitions/uc2-hazelcastjet/uc2-load-generator-deployment.yaml deleted file mode 100644 index a937c18138971f49e13675651544ef2f4595f744..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/definitions/uc2-hazelcastjet/uc2-load-generator-deployment.yaml +++ /dev/null @@ -1,32 +0,0 @@ -apiVersion: apps/v1 -kind: Deployment -metadata: - name: titan-ccp-load-generator -spec: - selector: - matchLabels: - app: titan-ccp-load-generator - replicas: 1 - template: - metadata: - labels: - app: titan-ccp-load-generator - spec: - terminationGracePeriodSeconds: 0 - containers: - - name: workload-generator - image: ghcr.io/cau-se/theodolite-uc2-workload-generator:latest - ports: - - containerPort: 5701 - name: coordination - env: - - name: KUBERNETES_NAMESPACE - valueFrom: - fieldRef: - fieldPath: metadata.namespace - - name: KUBERNETES_DNS_NAME - value: "titan-ccp-load-generator.$(KUBERNETES_NAMESPACE).svc.cluster.local" - - name: KAFKA_BOOTSTRAP_SERVERS - value: "my-confluent-cp-kafka:9092" - - name: SCHEMA_REGISTRY_URL - value: "http://my-confluent-cp-schema-registry:8081" diff --git a/theodolite-benchmarks/definitions/uc2-hazelcastjet/uc2-load-generator-service.yaml b/theodolite-benchmarks/definitions/uc2-hazelcastjet/uc2-load-generator-service.yaml deleted file mode 100644 index f8b26b3f6dece427f9c1ad4db94e351b042749b3..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/definitions/uc2-hazelcastjet/uc2-load-generator-service.yaml +++ /dev/null @@ -1,16 +0,0 @@ -apiVersion: v1 -kind: Service -metadata: - name: titan-ccp-load-generator - labels: - app: titan-ccp-load-generator -spec: - type: ClusterIP - clusterIP: None - selector: - app: titan-ccp-load-generator - ports: - - name: coordination - port: 5701 - targetPort: 5701 - protocol: TCP diff --git a/theodolite-benchmarks/definitions/uc3-hazelcastjet/uc3-hazelcastjet-deployment.yaml b/theodolite-benchmarks/definitions/uc3-hazelcastjet/resources/uc3-hazelcastjet-deployment.yaml similarity index 100% rename from theodolite-benchmarks/definitions/uc3-hazelcastjet/uc3-hazelcastjet-deployment.yaml rename to theodolite-benchmarks/definitions/uc3-hazelcastjet/resources/uc3-hazelcastjet-deployment.yaml diff --git a/theodolite-benchmarks/definitions/uc3-hazelcastjet/uc3-hazelcastjet-service.yaml b/theodolite-benchmarks/definitions/uc3-hazelcastjet/resources/uc3-hazelcastjet-service.yaml similarity index 100% rename from theodolite-benchmarks/definitions/uc3-hazelcastjet/uc3-hazelcastjet-service.yaml rename to theodolite-benchmarks/definitions/uc3-hazelcastjet/resources/uc3-hazelcastjet-service.yaml diff --git a/theodolite-benchmarks/definitions/uc3-hazelcastjet/uc3-benchmark-operator.yaml b/theodolite-benchmarks/definitions/uc3-hazelcastjet/uc3-benchmark-operator.yaml new file mode 100644 index 0000000000000000000000000000000000000000..69199c62975973ad179951fe9b1217802fd66f4a --- /dev/null +++ b/theodolite-benchmarks/definitions/uc3-hazelcastjet/uc3-benchmark-operator.yaml @@ -0,0 +1,39 @@ +apiVersion: theodolite.com/v1 +kind: benchmark +metadata: + name: uc3-hazelcastjet +spec: + appResource: + - "uc3-hazelcastjet/uc3-hazelcastjet-deployment.yaml" + - "uc3-hazelcastjet/uc3-hazelcastjet-service.yaml" + #- "uc3-hazelcastjet/uc3-jmx-configmap.yaml" + #- "uc3-hazelcastjet/uc3-service-monitor.yaml" + loadGenResource: + - "uc3-kstreams/uc3-load-generator-deployment.yaml" + - "uc3-kstreams/uc3-load-generator-service.yaml" + resourceTypes: + - typeName: "Instances" + patchers: + - type: "ReplicaPatcher" + resource: "uc3-hazelcastjet/uc3-hazelcastjet-deployment.yaml" + loadTypes: + - typeName: "NumSensors" + patchers: + - type: "EnvVarPatcher" + resource: "uc3-kstreams/uc3-load-generator-deployment.yaml" + properties: + container: "workload-generator" + variableName: "NUM_SENSORS" + - type: NumSensorsLoadGeneratorReplicaPatcher + resource: "uc3-kstreams/uc3-load-generator-deployment.yaml" + properties: + loadGenMaxRecords: "150000" + kafkaConfig: + bootstrapServer: "theodolite-cp-kafka:9092" + topics: + - name: "input" + numPartitions: 40 + replicationFactor: 1 + - name: "output" + numPartitions: 40 + replicationFactor: 1 diff --git a/theodolite-benchmarks/definitions/uc3-hazelcastjet/uc3-load-generator-deployment.yaml b/theodolite-benchmarks/definitions/uc3-hazelcastjet/uc3-load-generator-deployment.yaml deleted file mode 100644 index 551f26ea40fdd3c8384945c312e41eb017e6d9c8..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/definitions/uc3-hazelcastjet/uc3-load-generator-deployment.yaml +++ /dev/null @@ -1,32 +0,0 @@ -apiVersion: apps/v1 -kind: Deployment -metadata: - name: titan-ccp-load-generator -spec: - selector: - matchLabels: - app: titan-ccp-load-generator - replicas: 1 - template: - metadata: - labels: - app: titan-ccp-load-generator - spec: - terminationGracePeriodSeconds: 0 - containers: - - name: workload-generator - image: ghcr.io/cau-se/theodolite-uc3-workload-generator:latest - ports: - - containerPort: 5701 - name: coordination - env: - - name: KUBERNETES_NAMESPACE - valueFrom: - fieldRef: - fieldPath: metadata.namespace - - name: KUBERNETES_DNS_NAME - value: "titan-ccp-load-generator.$(KUBERNETES_NAMESPACE).svc.cluster.local" - - name: KAFKA_BOOTSTRAP_SERVERS - value: "my-confluent-cp-kafka:9092" - - name: SCHEMA_REGISTRY_URL - value: "http://my-confluent-cp-schema-registry:8081" diff --git a/theodolite-benchmarks/definitions/uc3-hazelcastjet/uc3-load-generator-service.yaml b/theodolite-benchmarks/definitions/uc3-hazelcastjet/uc3-load-generator-service.yaml deleted file mode 100644 index f8b26b3f6dece427f9c1ad4db94e351b042749b3..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/definitions/uc3-hazelcastjet/uc3-load-generator-service.yaml +++ /dev/null @@ -1,16 +0,0 @@ -apiVersion: v1 -kind: Service -metadata: - name: titan-ccp-load-generator - labels: - app: titan-ccp-load-generator -spec: - type: ClusterIP - clusterIP: None - selector: - app: titan-ccp-load-generator - ports: - - name: coordination - port: 5701 - targetPort: 5701 - protocol: TCP diff --git a/theodolite-benchmarks/definitions/uc4-hazelcastjet/uc4-hazelcastjet-deployment.yaml b/theodolite-benchmarks/definitions/uc4-hazelcastjet/resources/uc4-hazelcastjet-deployment.yaml similarity index 100% rename from theodolite-benchmarks/definitions/uc4-hazelcastjet/uc4-hazelcastjet-deployment.yaml rename to theodolite-benchmarks/definitions/uc4-hazelcastjet/resources/uc4-hazelcastjet-deployment.yaml diff --git a/theodolite-benchmarks/definitions/uc4-hazelcastjet/uc4-hazelcastjet-service.yaml b/theodolite-benchmarks/definitions/uc4-hazelcastjet/resources/uc4-hazelcastjet-service.yaml similarity index 100% rename from theodolite-benchmarks/definitions/uc4-hazelcastjet/uc4-hazelcastjet-service.yaml rename to theodolite-benchmarks/definitions/uc4-hazelcastjet/resources/uc4-hazelcastjet-service.yaml diff --git a/theodolite-benchmarks/definitions/uc4-hazelcastjet/uc4-benchmark-operator.yaml b/theodolite-benchmarks/definitions/uc4-hazelcastjet/uc4-benchmark-operator.yaml new file mode 100644 index 0000000000000000000000000000000000000000..ce548c639b986840787ef53e9350d48ab79dac6c --- /dev/null +++ b/theodolite-benchmarks/definitions/uc4-hazelcastjet/uc4-benchmark-operator.yaml @@ -0,0 +1,46 @@ +apiVersion: theodolite.com/v1 +kind: benchmark +metadata: + name: uc4-hazelcastjet +spec: + appResource: + - "uc4-hazelcastjet/uc4-hazelcastjet-deployment.yaml" + - "uc4-hazelcastjet/uc4-hazelcastjet-service.yaml" + #- "uc4-hazelcastjet/uc4-jmx-configmap.yaml" + #- "uc4-hazelcastjet/uc4-service-monitor.yaml" + loadGenResource: + - "uc4-kstreams/uc4-load-generator-deployment.yaml" + - "uc4-kstreams/uc4-load-generator-service.yaml" + resourceTypes: + - typeName: "Instances" + patchers: + - type: "ReplicaPatcher" + resource: "uc4-hazelcastjet/uc4-hazelcastjet-deployment.yaml" + loadTypes: + - typeName: "NumNestedGroups" + patchers: + - type: "EnvVarPatcher" + resource: "uc4-kstreams/uc4-load-generator-deployment.yaml" + properties: + container: "workload-generator" + variableName: "NUM_SENSORS" + - type: NumNestedGroupsLoadGeneratorReplicaPatcher + resource: "uc4-kstreams/uc4-load-generator-deployment.yaml" + properties: + loadGenMaxRecords: "150000" + numSensors: "4.0" + kafkaConfig: + bootstrapServer: "theodolite-cp-kafka:9092" + topics: + - name: "input" + numPartitions: 40 + replicationFactor: 1 + - name: "output" + numPartitions: 40 + replicationFactor: 1 + - name: "configuration" + numPartitions: 40 + replicationFactor: 1 + - name: "aggregation-feedback" + numPartitions: 40 + replicationFactor: 1 diff --git a/theodolite-benchmarks/definitions/uc4-hazelcastjet/uc4-load-generator-deployment.yaml b/theodolite-benchmarks/definitions/uc4-hazelcastjet/uc4-load-generator-deployment.yaml deleted file mode 100644 index 7a69d13daae57b06c77f316da9aa953b21ac096b..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/definitions/uc4-hazelcastjet/uc4-load-generator-deployment.yaml +++ /dev/null @@ -1,34 +0,0 @@ -apiVersion: apps/v1 -kind: Deployment -metadata: - name: titan-ccp-load-generator -spec: - selector: - matchLabels: - app: titan-ccp-load-generator - replicas: 1 - template: - metadata: - labels: - app: titan-ccp-load-generator - spec: - terminationGracePeriodSeconds: 0 - containers: - - name: workload-generator - image: ghcr.io/cau-se/theodolite-uc4-workload-generator:latest - ports: - - containerPort: 5701 - name: coordination - env: - - name: KUBERNETES_NAMESPACE - valueFrom: - fieldRef: - fieldPath: metadata.namespace - - name: KUBERNETES_DNS_NAME - value: "titan-ccp-load-generator.$(KUBERNETES_NAMESPACE).svc.cluster.local" - - name: KAFKA_BOOTSTRAP_SERVERS - value: "theodolite-cp-kafka:9092" - - name: SCHEMA_REGISTRY_URL - value: "http://theodolite-cp-schema-registry:8081" - - name: NUM_NESTED_GROUPS - value: "5" diff --git a/theodolite-benchmarks/definitions/uc4-hazelcastjet/uc4-load-generator-service.yaml b/theodolite-benchmarks/definitions/uc4-hazelcastjet/uc4-load-generator-service.yaml deleted file mode 100644 index f8b26b3f6dece427f9c1ad4db94e351b042749b3..0000000000000000000000000000000000000000 --- a/theodolite-benchmarks/definitions/uc4-hazelcastjet/uc4-load-generator-service.yaml +++ /dev/null @@ -1,16 +0,0 @@ -apiVersion: v1 -kind: Service -metadata: - name: titan-ccp-load-generator - labels: - app: titan-ccp-load-generator -spec: - type: ClusterIP - clusterIP: None - selector: - app: titan-ccp-load-generator - ports: - - name: coordination - port: 5701 - targetPort: 5701 - protocol: TCP diff --git a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/theodolite/uc1/application/Uc1PipelineBuilder.java b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/theodolite/uc1/application/Uc1PipelineBuilder.java index b90f822ac75625db6bc3dd3d234c0b207f0f6c1f..3135f24bf05146b3b1de96528df6bfa71544e10f 100644 --- a/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/theodolite/uc1/application/Uc1PipelineBuilder.java +++ b/theodolite-benchmarks/uc1-hazelcastjet/src/main/java/theodolite/uc1/application/Uc1PipelineBuilder.java @@ -3,9 +3,9 @@ package theodolite.uc1.application; import com.google.gson.Gson; import com.hazelcast.jet.kafka.KafkaSources; import com.hazelcast.jet.pipeline.Pipeline; -import com.hazelcast.jet.pipeline.Sink; import com.hazelcast.jet.pipeline.Sinks; import com.hazelcast.jet.pipeline.StreamSource; +import com.hazelcast.jet.pipeline.StreamStage; import java.util.Map.Entry; import java.util.Properties; import titan.ccp.model.records.ActivePowerRecord; @@ -21,42 +21,49 @@ public class Uc1PipelineBuilder { /** * Builds a pipeline which can be used for stream processing using Hazelcast Jet. * - * @param inputSource A hazelcast jet stream-source for {@code Entry<String,ActivePowerRecord>} - * input values. - * @param outputSink A hazelcast jet sink for String output values. + * @param kafkaPropsForPipeline Properties Object containing the necessary kafka attributes. + * @param kafkaInputTopic The name of the input topic used for the pipeline. * @return A hazelcast jet pipeline which processes data for Uc1. */ - public Pipeline build(final StreamSource<Entry<String, ActivePowerRecord>> inputSource, - final Sink<String> outputSink) { - // Build Pipeline + public Pipeline build(final Properties kafkaPropsForPipeline, final String kafkaInputTopic) { + + // Define a new pipeline final Pipeline pipe = Pipeline.create(); - pipe.readFrom(inputSource) - .withNativeTimestamps(0) - .setLocalParallelism(1) - .setName("Log content") - .map(record -> { - return GSON.toJson(record); - }) - .writeTo(outputSink); + + // Define the Kafka Source + final StreamSource<Entry<String, ActivePowerRecord>> kafkaSource = + KafkaSources.<String, ActivePowerRecord>kafka(kafkaPropsForPipeline, kafkaInputTopic); + + // Extend UC1 topology to the pipeline + final StreamStage<String> uc1TopologyProduct = this.extendUc1Topology(pipe, kafkaSource); + + // Add Sink: Logger + uc1TopologyProduct.writeTo(Sinks.logger()); return pipe; } /** - * Builds a pipeline which can be used for stream processing using Hazelcast Jet. + * Extends to a blank Hazelcast Jet Pipeline the UC1 topology defines by theodolite. + * + * <p>UC1 takes {@code Entry<String,ActivePowerRecord>} objects and turns them into Json strings + * using GSON. * - * @param kafkaPropertiesForPipeline Properties Object containing the necessary kafka attributes. - * @param kafkaInputTopic The name of the input topic used for the pipeline. - * @return A hazelcast jet pipeline which processes data for Uc1. + * @param pipe The blank hazelcast jet pipeline to extend the logic to. + * @param source A streaming source to fetch data from. + * @return A {@code StreamStage<String>} with the above definition of the String. It can be used + * to be further modified or directly be written into a sink. */ - public Pipeline build(final Properties kafkaPropertiesForPipeline, final String kafkaInputTopic) { - - final StreamSource<Entry<String, ActivePowerRecord>> kafkaInputSource = - KafkaSources.<String, ActivePowerRecord>kafka(kafkaPropertiesForPipeline, kafkaInputTopic); - - final Sink<String> loggerSink = Sinks.logger(); - return this.build(kafkaInputSource, loggerSink); - + public StreamStage<String> extendUc1Topology(final Pipeline pipe, + final StreamSource<Entry<String, ActivePowerRecord>> source) { + // Build the pipeline topology + return pipe.readFrom(source) + .withNativeTimestamps(0) + .setLocalParallelism(1) + .setName("Log content") + .map(record -> { + return GSON.toJson(record); + }); } } diff --git a/theodolite-benchmarks/uc1-hazelcastjet/src/test/java/theodolite/uc1/application/Uc1PipelineTest.java b/theodolite-benchmarks/uc1-hazelcastjet/src/test/java/theodolite/uc1/application/Uc1PipelineTest.java index 4279cbb955aaca5772a22eb32161afaed0cd463f..f3fe409d035ce36540bbf9f1a17d870f93621e1e 100644 --- a/theodolite-benchmarks/uc1-hazelcastjet/src/test/java/theodolite/uc1/application/Uc1PipelineTest.java +++ b/theodolite-benchmarks/uc1-hazelcastjet/src/test/java/theodolite/uc1/application/Uc1PipelineTest.java @@ -7,17 +7,17 @@ import com.hazelcast.jet.config.JetConfig; import com.hazelcast.jet.core.JetTestSupport; import com.hazelcast.jet.pipeline.Pipeline; import com.hazelcast.jet.pipeline.StreamSource; +import com.hazelcast.jet.pipeline.StreamStage; import com.hazelcast.jet.pipeline.test.AssertionCompletedException; import com.hazelcast.jet.pipeline.test.Assertions; import com.hazelcast.jet.pipeline.test.TestSources; import com.hazelcast.jet.test.SerialTest; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.CompletionException; import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; import titan.ccp.model.records.ActivePowerRecord; @@ -29,57 +29,64 @@ import titan.ccp.model.records.ActivePowerRecord; public class Uc1PipelineTest extends JetTestSupport { private static final Gson GSON = new Gson(); + private JetInstance testInstance = null; + private Pipeline testPipeline = null; + private StreamStage<String> uc1Topology = null; /** - * UC1 Pipeline test to check if items are passed through at an acceptable rate. + * Creates the JetInstance, defines a new Hazelcast Jet Pipeline and extends the UC2 topology. + * Allows for quick extension of tests. */ - @Test - public void test1Uc1PipelineElements() { + @Before + public void buildUc1Pipeline() { - // Test Configuration + // Setup Configuration final int testItemsPerSecond = 1; - final String testSensorName = "id_test1"; + final String testSensorName = "TEST_SENSOR"; final Double testValueInW = 10.0; - // Assertion Configuration - final int assertTimeoutSeconds = 6; - final int assertCollectedItems = 5; // Create mock jet instance with configuration final String testClusterName = randomName(); final JetConfig testJetConfig = new JetConfig(); testJetConfig.getHazelcastConfig().setClusterName(testClusterName); - final JetInstance testInstance = this.createJetMember(testJetConfig); + this.testInstance = this.createJetMember(testJetConfig); - // Test Pipeline definition - final List<String> sourceRecord = - new ArrayList<>(); + // Create a test source final StreamSource<Entry<String, ActivePowerRecord>> testSource = TestSources.itemStream(testItemsPerSecond, (timestamp, item) -> { final ActivePowerRecord testRecord = new ActivePowerRecord(testSensorName, timestamp, testValueInW); final Entry<String, ActivePowerRecord> testEntry = Map.entry(testSensorName, testRecord); - sourceRecord.add(GSON.toJson(testEntry)); return testEntry; }); - // Recreation of the UC1 Pipeline - adjusted for a Hazelcast Jet Pipeline Test - final Pipeline testPipeline = Pipeline.create(); - testPipeline - .readFrom(testSource) - .withNativeTimestamps(0) - .setLocalParallelism(1) - .setName("Log content") - .map(data -> { - return new Gson().toJson(data); - }) - .apply(Assertions.assertCollectedEventually(assertTimeoutSeconds, - collection -> Assert.assertTrue("Not enough data arrived in the end", - collection.size() >= assertCollectedItems))); + // Create pipeline to test + final Uc1PipelineBuilder pipelineBuilder = new Uc1PipelineBuilder(); + this.testPipeline = Pipeline.create(); + this.uc1Topology = + pipelineBuilder.extendUc1Topology(this.testPipeline, testSource); + + } + + /** + * UC1 Pipeline test to check if items are passed through at an acceptable rate. + */ + @Test + public void test1Uc1PipelineElements() { + + // Assertion Configuration + final int assertTimeoutSeconds = 6; + final int assertCollectedItems = 5; + + // Assertion + this.uc1Topology.apply(Assertions.assertCollectedEventually(assertTimeoutSeconds, + collection -> Assert.assertTrue("Not enough data arrived in the end", + collection.size() >= assertCollectedItems))); // Test the UC1 Pipeline Recreation try { - testInstance.newJob(testPipeline).join(); + this.testInstance.newJob(this.testPipeline).join(); Assert.fail("Job should have completed with an AssertionCompletedException, " + "but completed normally"); } catch (final CompletionException e) { diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/java/test/theodolite/uc2/application/Uc2PipelineTest.java b/theodolite-benchmarks/uc2-hazelcastjet/src/java/test/theodolite/uc2/application/Uc2PipelineTest.java new file mode 100644 index 0000000000000000000000000000000000000000..c16fc9d14a0b115635d76616d31516339776e7aa --- /dev/null +++ b/theodolite-benchmarks/uc2-hazelcastjet/src/java/test/theodolite/uc2/application/Uc2PipelineTest.java @@ -0,0 +1,109 @@ +package theodolite.uc2.application; + +import com.google.gson.Gson; +import com.hazelcast.jet.Jet; +import com.hazelcast.jet.JetInstance; +import com.hazelcast.jet.config.JetConfig; +import com.hazelcast.jet.core.JetTestSupport; +import com.hazelcast.jet.pipeline.Pipeline; +import com.hazelcast.jet.pipeline.StreamSource; +import com.hazelcast.jet.pipeline.StreamStage; +import com.hazelcast.jet.pipeline.test.AssertionCompletedException; +import com.hazelcast.jet.pipeline.test.Assertions; +import com.hazelcast.jet.pipeline.test.TestSources; +import com.hazelcast.jet.test.SerialTest; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.CompletionException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * Test methods for the Hazelcast Jet Implementation of UC2. + */ +@Category(SerialTest.class) +public class Uc2PipelineTest extends JetTestSupport { + + JetInstance testInstance = null; + Pipeline testPipeline = null; + StreamStage<Entry<String, String>> uc2Topology = null; + + /* + * Creates the JetInstance, defines a new Hazelcast Jet Pipeline and extends the UC2 topology. + * Allows for quick extension of tests. + */ + @Before + public void buildUc2Pipeline() { + + // Setup Configuration + int testItemsPerSecond = 1; + String testSensorName = "TEST-SENSOR"; + Double testValueInW = 10.0; + int testWindowInMs = 5000; + + // Create mock jet instance with configuration + final String testClusterName = randomName(); + final JetConfig testJetConfig = new JetConfig(); + testJetConfig.getHazelcastConfig().setClusterName(testClusterName); + this.testInstance = this.createJetMember(testJetConfig); + + // Create a test source + final StreamSource<Entry<String, ActivePowerRecord>> testSource = + TestSources.itemStream(testItemsPerSecond, (timestamp, item) -> { + final ActivePowerRecord testRecord = + new ActivePowerRecord(testSensorName, timestamp, testValueInW); + final Entry<String, ActivePowerRecord> testEntry = + Map.entry(testSensorName, testRecord); + return testEntry; + }); + + // Create pipeline to test + Uc2PipelineBuilder pipelineBuilder = new Uc2PipelineBuilder(); + this.testPipeline = Pipeline.create(); + this.uc2Topology = + pipelineBuilder.extendUc2Topology(this.testPipeline, testSource, testWindowInMs); + + } + + /** + * Tests if no items reach the end before the first window ends. + */ + @Test + public void testOutput() { + + // Assertion Configuration + int timeout = 14; + String expectedOutput = "Stats{count=5, mean=10.0, populationStandardDeviation=0.0, min=10.0, max=10.0}"; + + // Assertion + this.uc2Topology.apply(Assertions.assertCollectedEventually(timeout, + collection -> Assert.assertTrue( + "Not the right amount items in Stats Object!", + collection.get(collection.size()-1).getValue().equals(expectedOutput)))); + + // Run the test! + try { + this.testInstance.newJob(this.testPipeline).join(); + Assert.fail("Job should have completed with an AssertionCompletedException, " + + "but completed normally!"); + } catch (final CompletionException e) { + final String errorMsg = e.getCause().getMessage(); + Assert.assertTrue( + "Job was expected to complete with AssertionCompletedException, but completed with: " + + e.getCause(), + errorMsg.contains(AssertionCompletedException.class.getName())); + } + + } + + @After + public void after() { + // Shuts down all running Jet Instances + Jet.shutdownAll(); + } + +} diff --git a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/Uc2PipelineBuilder.java b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/Uc2PipelineBuilder.java index fae23d682100447802e8f8e6a4521f0a674fe3ad..df7a78706c7dca3b177cab4bb69e50948261a1f9 100644 --- a/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/Uc2PipelineBuilder.java +++ b/theodolite-benchmarks/uc2-hazelcastjet/src/main/java/theodolite/uc2/application/Uc2PipelineBuilder.java @@ -8,6 +8,7 @@ import com.hazelcast.jet.kafka.KafkaSinks; import com.hazelcast.jet.kafka.KafkaSources; import com.hazelcast.jet.pipeline.Pipeline; import com.hazelcast.jet.pipeline.Sinks; +import com.hazelcast.jet.pipeline.StreamSource; import com.hazelcast.jet.pipeline.StreamStage; import com.hazelcast.jet.pipeline.WindowDefinition; import java.util.Map; @@ -31,57 +32,102 @@ public class Uc2PipelineBuilder { * attributes. * @param kafkaInputTopic The name of the input topic used for the pipeline. * @param kafkaOutputTopic The name of the output topic used for the pipeline. - * @param downsampleInterval The window length of the tumbling window used in the aggregation of - * this pipeline. + * @param downsampleIntervalInMs The window length of the tumbling window used in the aggregation + * of this pipeline. * @return returns a Pipeline used which can be used in a Hazelcast Jet Instance to process data * for UC2. */ public Pipeline build(final Properties kafkaReadPropsForPipeline, final Properties kafkaWritePropsForPipeline, final String kafkaInputTopic, final String kafkaOutputTopic, - final int downsampleInterval) { + final int downsampleIntervalInMs) { + + // Define a new pipeline + final Pipeline pipe = Pipeline.create(); + + // Define the Kafka Source + final StreamSource<Entry<String, ActivePowerRecord>> kafkaSource = + KafkaSources.<String, ActivePowerRecord>kafka(kafkaReadPropsForPipeline, kafkaInputTopic); + + // Extend UC2 topology to the pipeline + final StreamStage<Map.Entry<String, String>> uc2TopologyProduct = + extendUc2Topology(pipe, kafkaSource, downsampleIntervalInMs); + + // Add Sink1: Logger + uc2TopologyProduct.writeTo(Sinks.logger()); + // Add Sink2: Write back to kafka for the final benchmark + uc2TopologyProduct.writeTo(KafkaSinks.<String, String>kafka( + kafkaWritePropsForPipeline, kafkaOutputTopic)); + + return pipe; + } + + /** + * Extends to a blank Hazelcast Jet Pipeline the UC2 topology defined by theodolite. + * + * <p>UC2 takes {@code ActivePowerRecord} objects, groups them by keys, windows them in a tumbling + * window and aggregates them into {@code Stats} objects. The final map returns an + * {@code Entry<String,String>} where the key is the key of the group and the String is the + * {@code .toString()} representation of the {@code Stats} object. + * + * @param pipe The blank hazelcast jet pipeline to extend the logic to. + * @param source A streaming source to fetch data from. + * @param downsampleIntervalInMs The size of the tumbling window. + * @return A {@code StreamStage<Map.Entry<String,String>>} with the above definition of the key + * and value of the Entry object. It can be used to be further modified or directly be + * written into a sink. + */ + public StreamStage<Map.Entry<String, String>> extendUc2Topology(final Pipeline pipe, + final StreamSource<Entry<String, ActivePowerRecord>> source, + final int downsampleIntervalInMs) { + // Build the pipeline topology. + return pipe.readFrom(source) + .withNativeTimestamps(0) + .setLocalParallelism(1) + .groupingKey(record -> record.getValue().getIdentifier()) + .window(WindowDefinition.tumbling(downsampleIntervalInMs)) + .aggregate(uc2AggregateOperation()) + .map(agg -> { + final String theKey = agg.key(); + final String theValue = agg.getValue().toString(); + return Map.entry(theKey, theValue); + }); + } + + /** + * Defines an AggregateOperation1 for Hazelcast Jet which is used in the Pipeline of the Hazelcast + * Jet implementation of UC2. + * + * <p>Takes a windowed and keyed {@code Entry<String,ActivePowerRecord>} elements and returns a + * {@Stats} Object. + * + * @return An AggregateOperation used by Hazelcast Jet in a streaming stage which aggregates + * ActivePowerRecord Objects into Stats Objects. + */ + @SuppressWarnings("unchecked") + public AggregateOperation1<Object, StatsAccumulator, Stats> uc2AggregateOperation() { // Aggregate Operation to Create a Stats Object from Entry<String,ActivePowerRecord> items using // the Statsaccumulator. - final AggregateOperation1<Object, StatsAccumulator, Stats> aggrOp = AggregateOperation + return AggregateOperation + // Creates the accumulator .withCreate(new StatsAccumulatorSupplier()) + // Defines the accumulation .andAccumulate((accumulator, item) -> { - - Entry<String, ActivePowerRecord> castedEntry = (Entry<String, ActivePowerRecord>) item; + final Entry<String, ActivePowerRecord> castedEntry = + (Entry<String, ActivePowerRecord>) item; accumulator.add(castedEntry.getValue().getValueInW()); - }) + // Defines the combination of spread out instances .andCombine((left, right) -> { - - Stats rightStats = right.snapshot(); + final Stats rightStats = right.snapshot(); left.addAll(rightStats); }) + // Finishes the aggregation .andExportFinish((accumulator) -> { return accumulator.snapshot(); }); - - final Pipeline pipe = Pipeline.create(); - final StreamStage<Map.Entry<String, String>> mapProduct = - pipe.readFrom(KafkaSources.<String, ActivePowerRecord>kafka( - kafkaReadPropsForPipeline, kafkaInputTopic)) - .withNativeTimestamps(0) - .setLocalParallelism(1) - .groupingKey(record -> record.getValue().getIdentifier()) - .window(WindowDefinition.tumbling(downsampleInterval)) - .aggregate(aggrOp) - .map(agg -> { - String theKey = agg.key(); - String theValue = agg.getValue().toString(); - return Map.entry(theKey, theValue); - }); - // Add Sink1: Logger - mapProduct.writeTo(Sinks.logger()); - // Add Sink2: Write back to kafka for the final benchmark - mapProduct.writeTo(KafkaSinks.<String, String>kafka( - kafkaWritePropsForPipeline, kafkaOutputTopic)); - - return pipe; } } diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/application/Uc3PipelineBuilder.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/application/Uc3PipelineBuilder.java index 938fdc7ab8e23367a59ac6cf305bfd9bb46fe0bc..f0f8cd7cf95d312185d70a5a3b6be1b043878ff2 100644 --- a/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/application/Uc3PipelineBuilder.java +++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/java/theodolite/uc3/application/Uc3PipelineBuilder.java @@ -5,11 +5,13 @@ import com.hazelcast.jet.kafka.KafkaSinks; import com.hazelcast.jet.kafka.KafkaSources; import com.hazelcast.jet.pipeline.Pipeline; import com.hazelcast.jet.pipeline.Sinks; +import com.hazelcast.jet.pipeline.StreamSource; import com.hazelcast.jet.pipeline.StreamStage; import com.hazelcast.jet.pipeline.WindowDefinition; import java.time.Instant; import java.time.LocalDateTime; import java.util.Map; +import java.util.Map.Entry; import java.util.Properties; import java.util.TimeZone; import java.util.concurrent.TimeUnit; @@ -45,50 +47,77 @@ public class Uc3PipelineBuilder { final String kafkaOutputTopic, final int hoppingSizeInSeconds, final int windowSizeInSeconds) { - // Build Pipeline for the History Service of UC3 + // Define a new Pipeline final Pipeline pipe = Pipeline.create(); - final StreamStage<Map.Entry<String, String>> mapProduct = - pipe - .readFrom(KafkaSources - .<String, ActivePowerRecord>kafka( - kafkaReadPropsForPipeline, kafkaInputTopic)) - // use Timestamps - .withNativeTimestamps(0) - .setLocalParallelism(1) - // Map timestamp to hour of day and create new key using sensorID and - // datetime mapped to HourOfDay - .map(record -> { - String sensorId = record.getValue().getIdentifier(); - long timestamp = record.getValue().getTimestamp(); - LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), - TimeZone.getDefault().toZoneId()); - final StatsKeyFactory<HourOfDayKey> keyFactory = new HoursOfDayKeyFactory(); - HourOfDayKey newKey = keyFactory.createKey(sensorId, dateTime); + // Define the source + final StreamSource<Entry<String, ActivePowerRecord>> kafkaSource = KafkaSources + .<String, ActivePowerRecord>kafka( + kafkaReadPropsForPipeline, kafkaInputTopic); - return Map.entry(newKey, record.getValue()); - }) - // group by new keys - .groupingKey(newRecord -> newRecord.getKey()) - // Sliding/Hopping Window - .window(WindowDefinition.sliding(TimeUnit.SECONDS.toMillis(windowSizeInSeconds), - TimeUnit.SECONDS.toMillis(hoppingSizeInSeconds))) - // get average value of group (sensoreId,hourOfDay) - .aggregate( - AggregateOperations.averagingDouble(record -> record.getValue().getValueInW())) - // map to return pair (sensorID,hourOfDay) -> (averaged what value) - .map(agg -> { - String theValue = agg.getValue().toString(); - String theKey = agg.getKey().toString(); - return Map.entry(theKey, theValue); - }); + // Extend topology for UC3 + final StreamStage<Map.Entry<String, String>> uc3Product = + extendUc3Topology(pipe, kafkaSource, hoppingSizeInSeconds, windowSizeInSeconds); + // Add Sink1: Logger - mapProduct.writeTo(Sinks.logger()); + uc3Product.writeTo(Sinks.logger()); // Add Sink2: Write back to kafka for the final benchmark - mapProduct.writeTo(KafkaSinks.<String, String>kafka( + uc3Product.writeTo(KafkaSinks.<String, String>kafka( kafkaWritePropsForPipeline, kafkaOutputTopic)); return pipe; } + /** + * Extends to a blank Hazelcast Jet Pipeline the UC3 topology defined by theodolite. + * + * <p>UC3 takes {@code ActivePowerRecord} object, groups them by keys and calculates + * average double values for a sliding window and sorts them into the hour of the day. + * + * @param pipe The blank hazelcast jet pipeline to extend the logic to. + * @param source A streaming source to fetch data from. + * @param hoppingSizeInSeconds The jump distance of the "sliding" window. + * @param windowSizeInSeconds The size of the "sliding" window. + * @return A {@code StreamStage<Map.Entry<String,String>>} with the above definition of the key + * and value of the Entry object. It can be used to be further modified or directly be + * written into a sink. + */ + public StreamStage<Map.Entry<String, String>> extendUc3Topology(final Pipeline pipe, + final StreamSource<Entry<String, ActivePowerRecord>> source, final int hoppingSizeInSeconds, + final int windowSizeInSeconds) { + // Build the pipeline topology. + return pipe + .readFrom(source) + // use Timestamps + .withNativeTimestamps(0) + .setLocalParallelism(1) + // Map timestamp to hour of day and create new key using sensorID and + // datetime mapped to HourOfDay + .map(record -> { + final String sensorId = record.getValue().getIdentifier(); + final long timestamp = record.getValue().getTimestamp(); + final LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), + TimeZone.getDefault().toZoneId()); + + final StatsKeyFactory<HourOfDayKey> keyFactory = new HoursOfDayKeyFactory(); + final HourOfDayKey newKey = keyFactory.createKey(sensorId, dateTime); + + return Map.entry(newKey, record.getValue()); + }) + // group by new keys + .groupingKey(newRecord -> newRecord.getKey()) + // Sliding/Hopping Window + .window(WindowDefinition.sliding(TimeUnit.SECONDS.toMillis(windowSizeInSeconds), + TimeUnit.SECONDS.toMillis(hoppingSizeInSeconds))) + // get average value of group (sensoreId,hourOfDay) + .aggregate( + AggregateOperations.averagingDouble(record -> record.getValue().getValueInW())) + // map to return pair (sensorID,hourOfDay) -> (averaged what value) + .map(agg -> { + final String theValue = agg.getValue().toString(); + final String theKey = agg.getKey().toString(); + return Map.entry(theKey, theValue); + }); + } + } diff --git a/theodolite-benchmarks/uc3-hazelcastjet/src/main/test/theodolite/uc3/application/Uc3PipelineTest.java b/theodolite-benchmarks/uc3-hazelcastjet/src/main/test/theodolite/uc3/application/Uc3PipelineTest.java new file mode 100644 index 0000000000000000000000000000000000000000..80f85104e4349af7ec332bc3c475d03cdf40a72e --- /dev/null +++ b/theodolite-benchmarks/uc3-hazelcastjet/src/main/test/theodolite/uc3/application/Uc3PipelineTest.java @@ -0,0 +1,161 @@ +package theodolite.uc3.application; + +import com.hazelcast.jet.Jet; +import com.hazelcast.jet.JetInstance; +import com.hazelcast.jet.config.JetConfig; +import com.hazelcast.jet.config.JobConfig; +import com.hazelcast.jet.core.JetTestSupport; +import com.hazelcast.jet.pipeline.Pipeline; +import com.hazelcast.jet.pipeline.StreamSource; +import com.hazelcast.jet.pipeline.StreamStage; +import com.hazelcast.jet.pipeline.test.AssertionCompletedException; +import com.hazelcast.jet.pipeline.test.Assertions; +import com.hazelcast.jet.pipeline.test.TestSources; +import com.hazelcast.jet.test.SerialTest; +import java.time.Instant; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Map; +import java.util.TimeZone; +import java.util.Map.Entry; +import java.util.concurrent.CompletionException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import theodolite.uc3.application.uc3specifics.HourOfDayKey; +import theodolite.uc3.application.uc3specifics.HourOfDayKeySerializer; +import titan.ccp.model.records.ActivePowerRecord; + +/** + * Test methods for the Hazelcast Jet Implementation of UC3. + */ +@Category(SerialTest.class) +public class Uc3PipelineTest extends JetTestSupport { + + // Test Machinery + private JetInstance testInstance = null; + private Pipeline testPipeline = null; + private StreamStage<Entry<String, String>> uc3Topology = null; + + + /** + * Creates the JetInstance, defines a new Hazelcast Jet Pipeline and extends the UC3 topology. + * Allows for quick extension of tests. + */ + @Before + public void buildUc3Pipeline() { + + // Setup Configuration + int testItemsPerSecond = 1; + String testSensorName = "TEST-SENSOR"; + Double testValueInW = 10.0; + int testHopSizeInSec = 1; + int testWindowSizeInSec = 50; + // Used to check hourOfDay + long mockTimestamp = 1632741651; + + + // Create mock jet instance with configuration + final String testClusterName = randomName(); + final JetConfig testJetConfig = new JetConfig(); + testJetConfig.getHazelcastConfig().setClusterName(testClusterName); + this.testInstance = this.createJetMember(testJetConfig); + + // Create a test source + final StreamSource<Entry<String, ActivePowerRecord>> testSource = + TestSources.itemStream(testItemsPerSecond, (timestamp, item) -> { + final ActivePowerRecord testRecord = + new ActivePowerRecord(testSensorName, mockTimestamp, testValueInW); + final Entry<String, ActivePowerRecord> testEntry = + Map.entry(testSensorName, testRecord); + return testEntry; + }); + + // Create pipeline to test + Uc3PipelineBuilder pipelineBuilder = new Uc3PipelineBuilder(); + this.testPipeline = Pipeline.create(); + this.uc3Topology = pipelineBuilder.extendUc3Topology(testPipeline, testSource, + testHopSizeInSec, testWindowSizeInSec); + } + + /** + * Tests if no items reach the end before the first window ends. + */ + @Test + public void testOutput() { + + // Assertion Configuration + int timeout = 10; + String testSensorName = "TEST-SENSOR"; + Double testValueInW = 10.0; + // Used to check hourOfDay + long mockTimestamp = 1632741651; + + // Assertion + this.uc3Topology.apply(Assertions.assertCollectedEventually(timeout, + collection -> { + + // DEBUG + System.out.println("DEBUG: CHECK 1 || Entered Assertion of testOutput()"); + + // Check all collected Items + boolean allOkay = true; + if (collection != null) { + System.out.println("DEBUG: CHECK 2 || Collection Size: " + collection.size()); + for (int i = 0; i < collection.size(); i++) { + + // Build hour of day + long timestamp = mockTimestamp; + int expectedHour = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), + TimeZone.getDefault().toZoneId()).getHour(); + + // Compare expected output with generated output + Entry<String, String> currentEntry = collection.get(i); + String expectedKey = testSensorName + ";" + expectedHour; + String expectedValue = testValueInW.toString(); + + // DEBUG + System.out.println( + "DEBUG: CHECK 3 || Expected Output: '" + expectedKey + "=" + expectedValue + + "' - Actual Output: '" + currentEntry.getKey() + "=" + + currentEntry.getValue().toString() + "'"); + + if (!(currentEntry.getKey().equals(expectedKey) + && currentEntry.getValue().toString().equals(expectedValue))) { + System.out.println("DEBUG: CHECK 5 || Failed assertion!"); + allOkay = false; + } + } + } + + // Assertion + Assert.assertTrue( + "Items do not match expected structure!", allOkay); + })); + + // Run the test! + try { + final JobConfig jobConfig = new JobConfig() + .registerSerializer(HourOfDayKey.class, HourOfDayKeySerializer.class); + this.testInstance.newJob(this.testPipeline, jobConfig).join(); + Assert.fail("Job should have completed with an AssertionCompletedException, " + + "but completed normally!"); + } catch (final CompletionException e) { + final String errorMsg = e.getCause().getMessage(); + Assert.assertTrue( + "Job was expected to complete with AssertionCompletedException, but completed with: " + + e.getCause(), + errorMsg.contains(AssertionCompletedException.class.getName())); + } + + } + + @After + public void after() { + // Shuts down all running Jet Instances + Jet.shutdownAll(); + } + +} diff --git a/theodolite-benchmarks/uc4-hazelcastjet/Dockerfile b/theodolite-benchmarks/uc4-hazelcastjet/Dockerfile index 1b5ce2202a2589fdcbf05a7e0b0bb64efff46a9b..4e6e69935fc7ee8148a0ea72446cf5fad37195e4 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/Dockerfile +++ b/theodolite-benchmarks/uc4-hazelcastjet/Dockerfile @@ -1,6 +1,6 @@ FROM openjdk:11-slim -ADD build/distributions/uc3-hazelcastjet.tar / +ADD build/distributions/uc4-hazelcastjet.tar / CMD JAVA_OPTS="$JAVA_OPTS -Dorg.slf4j.simpleLogger.defaultLogLevel=$LOG_LEVEL" \ diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4PipelineBuilder.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4PipelineBuilder.java index e4561c5fcfdf77cdcfc7bbad90d685eb2229119f..180e750fcb8303d568af30f806d630c04119e821 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4PipelineBuilder.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/java/theodolite/uc4/application/Uc4PipelineBuilder.java @@ -11,6 +11,7 @@ import com.hazelcast.jet.kafka.KafkaSources; import com.hazelcast.jet.pipeline.Pipeline; import com.hazelcast.jet.pipeline.Sinks; import com.hazelcast.jet.pipeline.StageWithWindow; +import com.hazelcast.jet.pipeline.StreamSource; import com.hazelcast.jet.pipeline.StreamStage; import com.hazelcast.jet.pipeline.StreamStageWithKey; import com.hazelcast.jet.pipeline.WindowDefinition; @@ -41,8 +42,8 @@ public class Uc4PipelineBuilder { * * @param kafkaInputReadPropsForPipeline Properties Object containing the necessary kafka input * read attributes. - * @param kafkaConfigPropsForPipeline Properties Object containing the necessary kafka config - * read attributes. + * @param kafkaConfigPropsForPipeline Properties Object containing the necessary kafka config read + * attributes. * @param kafkaFeedbackPropsForPipeline Properties Object containing the necessary kafka * aggregation read attributes. * @param kafkaWritePropsForPipeline Properties Object containing the necessary kafka write @@ -55,7 +56,6 @@ public class Uc4PipelineBuilder { * @return returns a Pipeline used which can be used in a Hazelcast Jet Instance to process data * for UC3. */ - @SuppressWarnings("unchecked") public Pipeline build(final Properties kafkaInputReadPropsForPipeline, // NOPMD final Properties kafkaConfigPropsForPipeline, final Properties kafkaFeedbackPropsForPipeline, @@ -66,93 +66,80 @@ public class Uc4PipelineBuilder { final String kafkaFeedbackTopic, final int windowSize) { - ////////////////////////////////// // The pipeline for this Use Case final Pipeline uc4Pipeline = Pipeline.create(); - // System.out.println("DEBUG: Window Size: " + windowSize); + // Sources for this use case + final StreamSource<Entry<Event, String>> configSource = KafkaSources.<Event, String>kafka( + kafkaConfigPropsForPipeline, kafkaConfigurationTopic); + final StreamSource<Entry<String, ActivePowerRecord>> inputSource = + KafkaSources.<String, ActivePowerRecord>kafka( + kafkaInputReadPropsForPipeline, kafkaInputTopic); + final StreamSource<Entry<String, Double>> aggregationSource = + KafkaSources.<String, Double>kafka(kafkaFeedbackPropsForPipeline, kafkaFeedbackTopic); + + // Extend UC4 topology to pipeline + final StreamStage<Entry<String, Double>> uc4Product = + extendUc4Topology(uc4Pipeline, inputSource, aggregationSource, configSource, windowSize); + + // Add Sink1: Write back to kafka output topic + uc4Product.writeTo(KafkaSinks.<String, Double>kafka( + kafkaWritePropsForPipeline, kafkaOutputTopic)); + // Add Sink2: Write back to kafka feedback/aggregation topic + uc4Product.writeTo(KafkaSinks.<String, Double>kafka( + kafkaWritePropsForPipeline, kafkaFeedbackTopic)); + // Add Sink3: Logger + uc4Product.writeTo(Sinks.logger()); + + // Return the pipeline + return uc4Pipeline; + } + + /** + * Extends to a blank Hazelcast Jet Pipeline the UC4 topology defines by theodolite. + * + * <p>UC4 takes {@code ActivePowerRecord} events from sensors and a {@code SensorRegistry} + * with maps from keys to groups to map values to their accourding groups. A feedback stream + * allows for group keys to be mapped to values and eventually to be mapped to other top + * level groups defines by the {@code SensorRegistry}. + * + * <p>6 Step topology: + * (1) Inputs (Config, Values, Aggregations) + * (2) Merge Input Values and Aggregations + * (3) Join Configuration with Merged Input Stream + * (4) Duplicate as flatmap per value and group + * (5) Window (preperation for possible last values) + * (6) Aggregate data over the window + * + * @param pipe The blank pipeline to extend the logic to. + * @param inputSource A streaming source with {@code ActivePowerRecord} data. + * @param aggregationSource A streaming source with aggregated data. + * @param configurationSource A streaming source delivering a {@code SensorRegistry}. + * @param windowSize The window size used to aggregate over. + * @return A {@code StreamSource<String,Double>} with sensorKeys or groupKeys mapped to their + * according aggregated values. The data can be further modified or directly be linked to + * a Hazelcast Jet sink. + */ + public StreamStage<Entry<String, Double>> extendUc4Topology(final Pipeline pipe,//NOPMD + final StreamSource<Entry<String, ActivePowerRecord>> inputSource, + final StreamSource<Entry<String, Double>> aggregationSource, + final StreamSource<Entry<Event, String>> configurationSource, final int windowSize) { ////////////////////////////////// // (1) Configuration Stream - final StreamStage<Entry<Event, SensorRegistry>> configurationStream = uc4Pipeline - .readFrom(KafkaSources.<Event, String>kafka( - kafkaConfigPropsForPipeline, kafkaConfigurationTopic)) + pipe.readFrom(configurationSource) .withNativeTimestamps(0) - .map(data -> { - - // DEBUG - // System.out.println("D E B U G: Got a configuration Stream Element!"); - // System.out.println("Event: " + data.getKey().toString() + "; Sensor Registry: " + - // data.getValue().toString()); - - return data; - - }) .filter(entry -> entry.getKey() == Event.SENSOR_REGISTRY_CHANGED || entry.getKey() == Event.SENSOR_REGISTRY_STATUS) .map(data -> { - - // DEBUG - // System.out.println("D E B U G: It passed through the filter"); - return Util.entry(data.getKey(), SensorRegistry.fromJson(data.getValue())); - }); - - // Builds a new HashMap // - final SupplierEx<? extends HashMap<String, Set<String>>> hashMapSupplier = - () -> new HashMap<String, Set<String>>(); - - // FlatMapFunction // - final BiFunctionEx<? super HashMap<String, Set<String>>, ? super Entry<Event, SensorRegistry>, - ? extends Traverser<Entry<String, Set<String>>>> flatMapFn = - (flatMapStage, eventItem) -> { - // Get Data - HashMap<String, Set<String>> oldParents = - (HashMap<String, Set<String>>) flatMapStage.clone(); - SensorRegistry newSensorRegistry = (SensorRegistry) eventItem.getValue(); - - // Transform new Input - ChildParentsTransformer transformer = new ChildParentsTransformer("default-name"); - Map<String, Set<String>> mapFromRegistry = - transformer.constructChildParentsPairs(newSensorRegistry); - - // Compare both tables - HashMap<String, Set<String>> updates = new HashMap<String, Set<String>>(); - for (String key : mapFromRegistry.keySet()) { - if (oldParents.containsKey(key)) { - if (!mapFromRegistry.get(key).equals(oldParents.get(key))) { - updates.put(key, mapFromRegistry.get(key)); - } - } else { - updates.put(key, mapFromRegistry.get(key)); - } - } - - ArrayList<Entry<String, Set<String>>> updatesList = - new ArrayList<Entry<String, Set<String>>>(updates.entrySet()); - - // Return traverser with differences - return Traversers.traverseIterable(updatesList) - .map(e -> Util.entry(e.getKey(), e.getValue())); - - }; - - // Write into table sink - configurationStream - .flatMapStateful(hashMapSupplier, flatMapFn) + }) + .flatMapStateful(hashMapSupplier(), configFlatMap()) .writeTo(Sinks.mapWithUpdating( SENSOR_PARENT_MAP_NAME, // The addressed IMAP event -> event.getKey(), // The key to look for (oldValue, newEntry) -> { // the new entry returned (null automatically results in - // deletion of entry) //NOCS - - // DEBUG - /* - * String debugFlatmapString = "["; for (String group : newEntry.getValue()) { - * debugFlatmapString = debugFlatmapString + group + ","; } debugFlatmapString = - * debugFlatmapString + "]"; System.out.println( "Flatmap Writes for key '" + - * newEntry.getKey() + "': " + debugFlatmapString); - */ + // deletion of entry) //NOCS // Write new set of groups return newEntry.getValue(); @@ -160,43 +147,30 @@ public class Uc4PipelineBuilder { ////////////////////////////////// // (1) Sensor Input Stream - final StreamStage<Entry<String, Double>> inputStream = uc4Pipeline - .readFrom(KafkaSources.<String, ActivePowerRecord>kafka( - kafkaInputReadPropsForPipeline, kafkaInputTopic)) + final StreamStage<Entry<String, Double>> inputStream = pipe + .readFrom(inputSource) .withNativeTimestamps(0) .map(stream -> { - + // Build data for next pipeline stage String sensorId = stream.getValue().getIdentifier(); Double valueInW = stream.getValue().getValueInW(); - - // DEBUG - // System.out.println("INPUT D E B U G: Got an input Stream Element!"); - // System.out.println("[SensorId=" + sensorId + "//valueinW=" + valueInW.toString()); - + // Return data for next pipeline stage return Util.entry(sensorId, valueInW); }); + ////////////////////////////////// // (1) Aggregation Stream - final StreamStage<Entry<String, Double>> aggregations = uc4Pipeline - .readFrom(KafkaSources.<String, Double>kafka( - kafkaFeedbackPropsForPipeline, kafkaFeedbackTopic)) - .withNativeTimestamps(0) - .map(stream -> { - - // DEBUG - // System.out.println("AGGREGATION D E B U G: Got an aggregation Stream Element!"); - // System.out.println( - // "[SensorId=" + stream.getKey() + "//valueinW=" + stream.getValue().toString()); - - return stream; - - }); + final StreamStage<Entry<String, Double>> aggregations = pipe + .readFrom(aggregationSource) + .withNativeTimestamps(0); + ////////////////////////////////// // (2) UC4 Merge Input with aggregation stream final StreamStageWithKey<Entry<String, Double>, String> mergedInputAndAggregations = inputStream .merge(aggregations) .groupingKey(event -> event.getKey()); + ////////////////////////////////// // (3) UC4 Join Configuration and Merges Input/Aggregation Stream // [sensorKey , (value,Set<Groups>)] final StreamStage<Entry<String, ValueGroup>> joinedStage = mergedInputAndAggregations @@ -205,31 +179,31 @@ public class Uc4PipelineBuilder { (sensorEvent, sensorParentsSet) -> { // Get Data + @SuppressWarnings("unchecked") Set<String> sensorParentsCasted = (Set<String>) sensorParentsSet; + // Check whether a groupset exists for a key or not if (sensorParentsCasted == null) { + // No group set exists for this key: return valuegroup with default null group set. Set<String> nullSet = new HashSet<String>(); nullSet.add("NULL-GROUPSET"); return Util.entry(sensorEvent.getKey(), new ValueGroup(sensorEvent.getValue(), nullSet)); } else { + // Group set exists for this key: return valuegroup with the groupset. ValueGroup valueParentsPair = new ValueGroup(sensorEvent.getValue(), sensorParentsCasted); // Return solution return Util.entry(sensorEvent.getKey(), valueParentsPair); } - - }); + ////////////////////////////////// // (4) UC4 Duplicate as flatmap joined Stream // [(sensorKey, Group) , value] final StreamStage<Entry<SensorGroupKey, Double>> dupliAsFlatmappedStage = joinedStage .flatMap(entry -> { - // DEBUG - // System.out.println("D E B G U G Stage 4"); - // Supplied data String keyGroupId = entry.getKey(); Double valueInW = entry.getValue().getValueInW(); @@ -243,51 +217,77 @@ public class Uc4PipelineBuilder { for (int i = 0; i < groupList.length; i++) { newKeyList[i] = new SensorGroupKey(keyGroupId, groupList[i]); newEntryList.add(Util.entry(newKeyList[i], valueInW)); - // DEBUG - // System.out.println("Added new Entry to list: [(" + newKeyList[i].getSensorId() + "," - // + newKeyList[i].getGroup() + ")," + valueInW.toString()); } - - // Return traversable list of new entry elements return Traversers.traverseIterable(newEntryList); - }); + ////////////////////////////////// // (5) UC4 Last Value Map // Table with tumbling window differentiation [ (sensorKey,Group) , value ],Time - // TODO: Implementation of static table to fill values out of the past! final StageWithWindow<Entry<SensorGroupKey, Double>> windowedLastValues = dupliAsFlatmappedStage .window(WindowDefinition.tumbling(windowSize)); + ////////////////////////////////// // (6) UC4 GroupBy and aggregate and map // Group using the group out of the sensorGroupKey keys - final StreamStage<Entry<String, Double>> groupedAggregatedMapped = windowedLastValues + return windowedLastValues .groupingKey(entry -> entry.getKey().getGroup()) .aggregate(AggregateOperations.summingDouble(entry -> entry.getValue())) .map(agg -> { - String theGroup = agg.getKey(); - Double summedValueInW = agg.getValue(); - // System.out.println("DEBUG - We have a grouped Aggregation Stage at the end!"); + // Construct data for return pair + final String theGroup = agg.getKey(); + final Double summedValueInW = agg.getValue(); + // Return aggregates group value pair return Util.entry(theGroup, summedValueInW); }); + } - // (7) Sink - Results back to Kafka - groupedAggregatedMapped.writeTo(KafkaSinks.<String, Double>kafka( - kafkaWritePropsForPipeline, kafkaOutputTopic)); - - // (7) Sink - Results back to Kafka - groupedAggregatedMapped.writeTo(KafkaSinks.<String, Double>kafka( - kafkaWritePropsForPipeline, kafkaFeedbackTopic)); - // (7) Sink - Write to logger/console for debug puposes - groupedAggregatedMapped.writeTo(Sinks.logger()); + /** + * Returns a function which supplies a {@code HashMapy<String, Set<String>>()}. + */ + private SupplierEx<? extends HashMap<String, Set<String>>> hashMapSupplier() { + return () -> new HashMap<String, Set<String>>(); + } - // Return the pipeline - return uc4Pipeline; + /** + * Returns a function which supplies the flatMap function used to process the configuration input + * for UC4. + */ + private BiFunctionEx<? super HashMap<String, Set<String>>, + ? super Entry<Event, SensorRegistry>, + ? extends Traverser<Entry<String, Set<String>>>> configFlatMap() { + return (flatMapStage, eventItem) -> { + + // Transform new Input + final ChildParentsTransformer transformer = new ChildParentsTransformer("default-name"); + final Map<String, Set<String>> mapFromRegistry = + transformer.constructChildParentsPairs(eventItem.getValue()); + + // Compare both tables + final HashMap<String, Set<String>> updates = new HashMap<String, Set<String>>(); + for (final String key : mapFromRegistry.keySet()) { + if (flatMapStage.containsKey(key)) { + if (!mapFromRegistry.get(key).equals(flatMapStage.get(key))) { + updates.put(key, mapFromRegistry.get(key)); + } + } else { + updates.put(key, mapFromRegistry.get(key)); + } + } + + // Create a updates list to pass onto the next pipeline stage- + final ArrayList<Entry<String, Set<String>>> updatesList = + new ArrayList<Entry<String, Set<String>>>(updates.entrySet()); + + // Return traverser with updates list. + return Traversers.traverseIterable(updatesList) + .map(e -> Util.entry(e.getKey(), e.getValue())); + }; } } diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/main/test/theodolite/uc4/application/Uc4PipelineTest.java b/theodolite-benchmarks/uc4-hazelcastjet/src/main/test/theodolite/uc4/application/Uc4PipelineTest.java new file mode 100644 index 0000000000000000000000000000000000000000..6ac236b395c3ebf9ae49241ead879661373df8c3 --- /dev/null +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/main/test/theodolite/uc4/application/Uc4PipelineTest.java @@ -0,0 +1,154 @@ +package theodolite.uc4.application; + +import com.hazelcast.jet.Jet; +import com.hazelcast.jet.JetInstance; +import com.hazelcast.jet.config.JetConfig; +import com.hazelcast.jet.config.JobConfig; +import com.hazelcast.jet.core.JetTestSupport; +import com.hazelcast.jet.pipeline.Pipeline; +import com.hazelcast.jet.pipeline.StreamSource; +import com.hazelcast.jet.pipeline.StreamStage; +import com.hazelcast.jet.pipeline.test.AssertionCompletedException; +import com.hazelcast.jet.pipeline.test.Assertions; +import com.hazelcast.jet.pipeline.test.TestSources; +import com.hazelcast.jet.test.SerialTest; +import java.time.Instant; +import java.time.LocalDateTime; +import java.util.Map; +import java.util.TimeZone; +import java.util.Map.Entry; +import java.util.concurrent.CompletionException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import titan.ccp.configuration.events.Event; +import titan.ccp.model.records.ActivePowerRecord; +import titan.ccp.model.sensorregistry.MachineSensor; +import titan.ccp.model.sensorregistry.MutableAggregatedSensor; +import titan.ccp.model.sensorregistry.MutableSensorRegistry; +import titan.ccp.model.sensorregistry.SensorRegistry; + +@Category(SerialTest.class) +public class Uc4PipelineTest extends JetTestSupport { + + // TEst Machinery + JetInstance testInstance = null; + Pipeline testPipeline = null; + StreamStage<Entry<String, Double>> uc4Topology = null; + + @Before + public void buildUc4Pipeline() { + + // Setup Configuration + int testItemsPerSecond = 1; + String testSensorName = "TEST-SENSOR"; + String testLevel1GroupName = "TEST-LEVEL1-GROUP"; + String testLevel2GroupName = "TEST-LEVEL2-GROUP"; + Double testValueInW = 10.0; + int testWindowSize = 5000; // As window size is bugged, not necessary. + + // Create mock jet instance with configuration + final String testClusterName = randomName(); + final JetConfig testJetConfig = new JetConfig(); + testJetConfig.getHazelcastConfig().setClusterName(testClusterName); + this.testInstance = this.createJetMember(testJetConfig); + + // Create test source 1 : Input Values + final StreamSource<Entry<String, ActivePowerRecord>> testInputSource = + TestSources.itemStream(testItemsPerSecond, (timestamp, item) -> { + final ActivePowerRecord testRecord = + new ActivePowerRecord(testSensorName, timestamp, testValueInW); + final Entry<String, ActivePowerRecord> testEntry = + Map.entry(testSensorName, testRecord); + return testEntry; + }); + + // Create test source 2 : Mock aggregation Values + final StreamSource<Entry<String, Double>> testAggregationSource = + TestSources.itemStream(testItemsPerSecond, (timestamp, item) -> { + final Double testAggValue = testValueInW; + final Entry<String, Double> testEntry = + Map.entry(testLevel1GroupName, testAggValue); + return testEntry; + }); + + // Create test source 3 : Mock Config Values + final StreamSource<Entry<Event, String>> testConfigSource = + TestSources.itemStream(testItemsPerSecond, (timestamp, item) -> { + Event theEvent = Event.SENSOR_REGISTRY_CHANGED; + + // Topology: + // level2Group -> level1Group -> testSensor + + // Create Registry + MutableSensorRegistry testRegistry = new MutableSensorRegistry(testLevel2GroupName); + // Add Sensors + MutableAggregatedSensor topLevelSensor = testRegistry.getTopLevelSensor(); + MutableAggregatedSensor level1GroupSensor = + topLevelSensor.addChildAggregatedSensor(testLevel1GroupName); + MachineSensor inputSensor = level1GroupSensor.addChildMachineSensor(testSensorName); + + + + String stringRegistry = testRegistry.toJson(); + final Entry<Event, String> testEntry = + Map.entry(theEvent, stringRegistry); + return testEntry; + }); + + // Create pipeline to test + Uc4PipelineBuilder pipelineBuilder = new Uc4PipelineBuilder(); + this.testPipeline = Pipeline.create(); + this.uc4Topology = pipelineBuilder.extendUc4Topology(testPipeline, testInputSource, + testAggregationSource, testConfigSource, testWindowSize); + + } + + /** + * Tests if no items reach the end before the first window ends. + */ + @Test + public void testOutput() { + + System.out.println("DEBUG DEBUG DEBUG || ENTERED TEST 1"); + + // Assertion Configuration + int timeout = 10; + String testSensorName = "TEST-SENSOR"; + String testLevel1GroupName = "TEST-LEVEL1-GROUP"; + String testLevel2GroupName = "TEST-LEVEL2-GROUP"; + Double testValueInW = 10.0; + + // Assertion + this.uc4Topology.apply(Assertions.assertCollectedEventually(timeout, + collection -> { + + // TODO Try to find out why this test does not work or why the pipeline seems + // TODO to crash! + // but whyyy cant i get in here + + System.out.println("DEBUG DEBUG DEBUG || ENTERED ASSERTION COLLECTED EVENTUALLY"); + + boolean allOkay = true; + + if (collection != null) { + for(int i = 0; i < collection.size(); i++) { + System.out.println("DEBUG DEBUG DEBUG || " + collection.get(i).toString()); + } + } + + Assert.assertTrue("Assertion did not complete!", allOkay); + + })); + + } + + @After + public void after() { + // Shuts down all running Jet Instances + Jet.shutdownAll(); + } + +}