diff --git a/docs/project-info.md b/docs/project-info.md index 5cca3c92ffb70df98f19d6275953de51b7216116..3fdb921965ef2771805d015a2b7c6723267ad068 100644 --- a/docs/project-info.md +++ b/docs/project-info.md @@ -26,6 +26,7 @@ Theodolite's internal development including issue boards, merge requests and ext * [Lorenz Boguhn](https://github.com/lorenzboguhn) * [Simon Ehrenstein](https://github.com/sehrenstein) * [Willi Hasselbring](https://www.se.informatik.uni-kiel.de/en/team/prof.-dr.-wilhelm-willi-hasselbring) +* [Christopher Konkel](https://github.com/JustAnotherChristoph) * [Luca Mertens](https://www.linkedin.com/in/luca-mertens-35a932201) * [Tobias Pfandzelter](https://pfandzelter.com/) * [Julia Rossow](https://www.linkedin.com/in/julia-rossow/) diff --git a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.kstreams.gradle b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.kstreams.gradle index f43b9bafe3b8135cb4606f109648fc1acb4fe024..448101c15c3d318d9d209397928f1c6b9711e1ff 100644 --- a/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.kstreams.gradle +++ b/theodolite-benchmarks/buildSrc/src/main/groovy/theodolite.kstreams.gradle @@ -14,7 +14,7 @@ repositories { url "https://oss.sonatype.org/content/repositories/snapshots/" } maven { - url 'https://packages.confluent.io/maven/' + url 'https://packages.confluent.io/maven/' } } @@ -28,4 +28,10 @@ dependencies { // Use JUnit test framework testImplementation 'junit:junit:4.12' + //testImplementation 'com.github.stefanbirkner:system-rules:1.17.0' + testImplementation 'org.apache.kafka:kafka-streams-test-utils:3.1.0' + testImplementation('io.confluent:kafka-streams-avro-serde:5.5.12') { + // exclude, because introduces older version with higher number 5.5.1-ccs + exclude group: 'org.apache.kafka', module: 'kafka-clients' + } } diff --git a/theodolite-benchmarks/commons/src/main/java/rocks/theodolite/benchmarks/commons/kafka/avro/SchemaRegistryAvroSerdeFactory.java b/theodolite-benchmarks/commons/src/main/java/rocks/theodolite/benchmarks/commons/kafka/avro/SchemaRegistryAvroSerdeFactory.java index 9f3d5721efd90000b9265b593be5656b989c4f44..d7de004b90234987ad3c8ae40172977a4b07165d 100644 --- a/theodolite-benchmarks/commons/src/main/java/rocks/theodolite/benchmarks/commons/kafka/avro/SchemaRegistryAvroSerdeFactory.java +++ b/theodolite-benchmarks/commons/src/main/java/rocks/theodolite/benchmarks/commons/kafka/avro/SchemaRegistryAvroSerdeFactory.java @@ -10,7 +10,7 @@ import org.apache.kafka.common.serialization.Serde; /** * Factory methods to create {@link Serde}s for Avro records using the Confluent Schema Registry. */ -public final class SchemaRegistryAvroSerdeFactory { +public class SchemaRegistryAvroSerdeFactory { private static final String SCHEMA_REGISTRY_URL_KEY = AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG; diff --git a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/HazelcastJetService.java b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/HazelcastJetService.java index 34378c427f150ac079e4e67cb96bfb164ceeac4b..ccbc075cf2e060340a92ae01a097d7a35591050c 100644 --- a/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/HazelcastJetService.java +++ b/theodolite-benchmarks/hazelcastjet-commons/src/main/java/rocks/theodolite/benchmarks/commons/hazelcastjet/HazelcastJetService.java @@ -3,6 +3,7 @@ package rocks.theodolite.benchmarks.commons.hazelcastjet; import com.hazelcast.jet.JetInstance; import com.hazelcast.jet.config.JobConfig; import com.hazelcast.jet.pipeline.Pipeline; +import java.util.Objects; import org.apache.commons.configuration2.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,10 +58,11 @@ public abstract class HazelcastJetService { * instance. */ public void run() { + Objects.requireNonNull(this.jobName, "'jobName' must be set."); try { final Pipeline pipeline = this.pipelineFactory.buildPipeline(); this.registerSerializer(); - this.jobConfig.setName(this.config.getString("name")); + this.jobConfig.setName(this.jobName); this.jetInstance.newJobIfAbsent(pipeline, this.jobConfig).join(); } catch (final Exception e) { // NOPMD LOGGER.error("ABORT MISSION!:", e); diff --git a/theodolite-benchmarks/uc4-kstreams/src/test/java/rocks/theodolite/benchmarks/uc4/kstreams/MockedSchemaRegistrySerdes.java b/theodolite-benchmarks/uc4-kstreams/src/test/java/rocks/theodolite/benchmarks/uc4/kstreams/MockedSchemaRegistrySerdes.java new file mode 100644 index 0000000000000000000000000000000000000000..c9fb98a70481a511f8ee0e171093e8d0454e10f8 --- /dev/null +++ b/theodolite-benchmarks/uc4-kstreams/src/test/java/rocks/theodolite/benchmarks/uc4/kstreams/MockedSchemaRegistrySerdes.java @@ -0,0 +1,41 @@ +package rocks.theodolite.benchmarks.uc4.kstreams; + +import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; +import java.util.Collections; +import java.util.Map; +import org.apache.avro.specific.SpecificRecord; +import org.apache.kafka.common.serialization.Serde; +import rocks.theodolite.benchmarks.commons.kafka.avro.SchemaRegistryAvroSerdeFactory; + + +public class MockedSchemaRegistrySerdes extends SchemaRegistryAvroSerdeFactory { + + private static final String URL_KEY = AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG; + private static final String DUMMY_URL = "http://dummy"; + + private final MockSchemaRegistryClient schemaRegistryClient = new MockSchemaRegistryClient(); + private final Map<String, String> serdeConfig = Collections.singletonMap(URL_KEY, DUMMY_URL); + + public MockedSchemaRegistrySerdes() { + super(DUMMY_URL); + } + + @Override + public <T extends SpecificRecord> Serde<T> forKeys() { + return this.build(true); + } + + @Override + public <T extends SpecificRecord> Serde<T> forValues() { + return this.build(false); + } + + private <T extends SpecificRecord> Serde<T> build(final boolean isKey) { + final Serde<T> avroSerde = new SpecificAvroSerde<>(this.schemaRegistryClient); + avroSerde.configure(this.serdeConfig, isKey); + return avroSerde; + } + +} diff --git a/theodolite-benchmarks/uc4-kstreams/src/test/java/rocks/theodolite/benchmarks/uc4/kstreams/TopologyTest.java b/theodolite-benchmarks/uc4-kstreams/src/test/java/rocks/theodolite/benchmarks/uc4/kstreams/TopologyTest.java new file mode 100644 index 0000000000000000000000000000000000000000..46a5f55fc3e532eeb36a2fb0a8698ad3f850cfd3 --- /dev/null +++ b/theodolite-benchmarks/uc4-kstreams/src/test/java/rocks/theodolite/benchmarks/uc4/kstreams/TopologyTest.java @@ -0,0 +1,372 @@ +package rocks.theodolite.benchmarks.uc4.kstreams; + + +import java.time.Duration; +import java.time.Instant; +import java.util.Properties; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TestOutputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import rocks.theodolite.benchmarks.commons.configuration.events.Event; +import rocks.theodolite.benchmarks.commons.configuration.events.EventSerde; +import rocks.theodolite.benchmarks.commons.kafka.avro.SchemaRegistryAvroSerdeFactory; +import rocks.theodolite.benchmarks.commons.model.records.ActivePowerRecord; +import rocks.theodolite.benchmarks.commons.model.records.AggregatedActivePowerRecord; +import rocks.theodolite.benchmarks.commons.model.sensorregistry.MutableAggregatedSensor; +import rocks.theodolite.benchmarks.commons.model.sensorregistry.MutableSensorRegistry; + + +public class TopologyTest { + + private static final String INPUT_TOPIC = "input"; + private static final String OUTPUT_TOPIC = "output"; + private static final String FEEDBACK_TOPIC = "feedback"; + private static final String CONFIGURATION_TOPIC = "configuration"; + + private TopologyTestDriver testDriver; + private TestInputTopic<Event, String> configurationTopic; + private TestInputTopic<String, ActivePowerRecord> inputTopic; + private TestOutputTopic<String, AggregatedActivePowerRecord> outputTopic; + // private TestOutputTopic<String, AggregatedActivePowerRecord> feedbackTopic; + private SchemaRegistryAvroSerdeFactory serdes; + + @Before + public void setup() { + this.serdes = new MockedSchemaRegistrySerdes(); + + final Properties props = new Properties(); + props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy-aggregation"); + props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"); + + final Topology topology = new TopologyBuilder( + INPUT_TOPIC, + OUTPUT_TOPIC, + FEEDBACK_TOPIC, + CONFIGURATION_TOPIC, + Duration.ofSeconds(2), + Duration.ofSeconds(2), + this.serdes).build(props); + + // setup test driver + this.testDriver = new TopologyTestDriver(topology, props); + this.inputTopic = this.testDriver.createInputTopic( + INPUT_TOPIC, + Serdes.String().serializer(), + this.serdes.<ActivePowerRecord>forValues().serializer()); + this.configurationTopic = this.testDriver.createInputTopic( + CONFIGURATION_TOPIC, + EventSerde.serde().serializer(), + Serdes.String().serializer()); + this.testDriver.createInputTopic( + OUTPUT_TOPIC, + Serdes.String().serializer(), + this.serdes.forValues().serializer()); + this.testDriver.createInputTopic( + FEEDBACK_TOPIC, + Serdes.String().serializer(), + this.serdes.<AggregatedActivePowerRecord>forValues().serializer()); + this.outputTopic = this.testDriver.createOutputTopic( + OUTPUT_TOPIC, + Serdes.String().deserializer(), + this.serdes.<AggregatedActivePowerRecord>forValues().deserializer()); + // this.feedbackTopic = this.testDriver.createOutputTopic( + // FEEDBACK_TOPIC, + // this.serdes.string().deserializer(), + // this.serdes.aggregatedActivePowerRecordValues().deserializer()); + + } + + @After + public void tearDown() { + this.testDriver.close(); + } + + @Test + public void testOneLevelRegistry() { + // Publish sensor registry + final MutableSensorRegistry registry = new MutableSensorRegistry("root"); + final MutableAggregatedSensor root = registry.getTopLevelSensor(); + root.addChildMachineSensor("child1"); + root.addChildMachineSensor("child2"); + this.configurationTopic.pipeInput( + Event.SENSOR_REGISTRY_CHANGED, + registry.toJson(), + Instant.ofEpochSecond(0)); + + // Publish input records + this.pipeInput("child1", Instant.ofEpochSecond(0), 100.0); + this.pipeInput("child2", Instant.ofEpochSecond(1), 100.0); + + // Advance time to obtain outputs + this.pipeInput("child2", Instant.ofEpochSecond(2), 123.0); + + // Check aggregation results + Assert.assertEquals(1, this.outputTopic.getQueueSize()); + + final AggregatedActivePowerRecord result = this.outputTopic.readValue(); + Assert.assertEquals("root", result.getIdentifier()); + Assert.assertEquals(2, result.getCount()); + Assert.assertEquals(200.0, result.getSumInW(), 0.1); + } + + @Test + public void shouldOnlyConsiderLatestValue() { + // Publish sensor registry + final MutableSensorRegistry registry = new MutableSensorRegistry("root"); + final MutableAggregatedSensor root = registry.getTopLevelSensor(); + root.addChildMachineSensor("child1"); + root.addChildMachineSensor("child2"); + this.configurationTopic.pipeInput( + Event.SENSOR_REGISTRY_CHANGED, + registry.toJson(), + Instant.ofEpochSecond(0)); + + // Publish input records + this.pipeInput("child1", Instant.ofEpochMilli(500), 50.0); + this.pipeInput("child2", Instant.ofEpochMilli(1000), 100.0); + this.pipeInput("child1", Instant.ofEpochMilli(1500), 400.0); + + // Advance time to obtain outputs + this.pipeInput("child2", Instant.ofEpochSecond(3), 123.0); + + // Check aggregation results + Assert.assertEquals(1, this.outputTopic.getQueueSize()); + final AggregatedActivePowerRecord result = this.outputTopic.readValue(); + Assert.assertEquals("root", result.getIdentifier()); + Assert.assertEquals(2, result.getCount()); + Assert.assertEquals(500.0, result.getSumInW(), 0.1); + } + + @Test + public void shouldOnlyConsiderLatestValueWhenOutOfOrder() { + // Publish sensor registry + final MutableSensorRegistry registry = new MutableSensorRegistry("root"); + final MutableAggregatedSensor root = registry.getTopLevelSensor(); + root.addChildMachineSensor("child1"); + root.addChildMachineSensor("child2"); + this.configurationTopic.pipeInput( + Event.SENSOR_REGISTRY_CHANGED, + registry.toJson(), + Instant.ofEpochSecond(0)); + + // Publish input records + this.pipeInput("child2", Instant.ofEpochMilli(1000), 100.0); + this.pipeInput("child1", Instant.ofEpochMilli(1500), 400.0); + this.pipeInput("child1", Instant.ofEpochMilli(500), 50.0); + + // Advance time to obtain outputs + this.pipeInput("child2", Instant.ofEpochSecond(3), 123.0); + + // Check aggregation results + Assert.assertEquals(1, this.outputTopic.getQueueSize()); + final AggregatedActivePowerRecord result = this.outputTopic.readValue(); + Assert.assertEquals("root", result.getIdentifier()); + Assert.assertEquals(2, result.getCount()); + Assert.assertEquals(500.0, result.getSumInW(), 0.1); + } + + @Test + public void shouldHandleUpdateDuringGracePeriod() { + // Publish sensor registry + final MutableSensorRegistry registry = new MutableSensorRegistry("root"); + final MutableAggregatedSensor root = registry.getTopLevelSensor(); + root.addChildMachineSensor("child1"); + root.addChildMachineSensor("child2"); + this.configurationTopic.pipeInput( + Event.SENSOR_REGISTRY_CHANGED, + registry.toJson(), + Instant.ofEpochSecond(0)); + + // Publish input records + this.pipeInput("child1", Instant.ofEpochMilli(500), 50.0); + this.pipeInput("child2", Instant.ofEpochMilli(1000), 100.0); + // Advance time to obtain outputs + this.pipeInput("child2", Instant.ofEpochSecond(3), 123.0); + // This record arrives out-of-order but withing the grace period + this.pipeInput("child1", Instant.ofEpochMilli(1500), 400.0); + // Advance time to obtain outputs + this.pipeInput("child2", Instant.ofEpochSecond(5), 123.0); + + // Check aggregation results + Assert.assertEquals(3, this.outputTopic.getQueueSize()); + + final AggregatedActivePowerRecord firstResult = this.outputTopic.readValue(); + Assert.assertEquals("root", firstResult.getIdentifier()); + Assert.assertEquals(2, firstResult.getCount()); + Assert.assertEquals(150.0, firstResult.getSumInW(), 0.1); + + final AggregatedActivePowerRecord secondResult = this.outputTopic.readValue(); + Assert.assertEquals("root", secondResult.getIdentifier()); + Assert.assertEquals(2, secondResult.getCount()); + Assert.assertEquals(500.0, secondResult.getSumInW(), 0.1); + + final AggregatedActivePowerRecord thirdResult = this.outputTopic.readValue(); + Assert.assertEquals("root", thirdResult.getIdentifier()); + Assert.assertEquals(1, thirdResult.getCount()); + Assert.assertEquals(123.0, thirdResult.getSumInW(), 0.1); + } + + @Test + public void shouldNotHandleUpdateAfterGracePeriod() { + // Publish sensor registry + final MutableSensorRegistry registry = new MutableSensorRegistry("root"); + final MutableAggregatedSensor root = registry.getTopLevelSensor(); + root.addChildMachineSensor("child1"); + root.addChildMachineSensor("child2"); + this.configurationTopic.pipeInput( + Event.SENSOR_REGISTRY_CHANGED, + registry.toJson(), + Instant.ofEpochSecond(0)); + + // Publish input records + this.pipeInput("child1", Instant.ofEpochMilli(500), 50.0); + this.pipeInput("child2", Instant.ofEpochMilli(1000), 100.0); + // Advance time to obtain outputs + this.pipeInput("child2", Instant.ofEpochSecond(3), 123.0); + // Advance time to obtain outputs + this.pipeInput("child2", Instant.ofEpochSecond(5), 123.0); + // This record arrives out-of-order and after the grace period has expired + this.pipeInput("child1", Instant.ofEpochMilli(1500), 400.0); + // Advance time to obtain outputs + this.pipeInput("child2", Instant.ofEpochSecond(7), 123.0); + + // Check aggregation results + Assert.assertEquals(3, this.outputTopic.getQueueSize()); + + final AggregatedActivePowerRecord firstResult = this.outputTopic.readValue(); + Assert.assertEquals("root", firstResult.getIdentifier()); + Assert.assertEquals(2, firstResult.getCount()); + Assert.assertEquals(150.0, firstResult.getSumInW(), 0.1); + + final AggregatedActivePowerRecord secondResult = this.outputTopic.readValue(); + Assert.assertEquals("root", secondResult.getIdentifier()); + Assert.assertEquals(1, secondResult.getCount()); + Assert.assertEquals(123.0, secondResult.getSumInW(), 0.1); + + final AggregatedActivePowerRecord thirdResult = this.outputTopic.readValue(); + Assert.assertEquals("root", thirdResult.getIdentifier()); + Assert.assertEquals(1, thirdResult.getCount()); + Assert.assertEquals(123.0, thirdResult.getSumInW(), 0.1); + } + + @Test + public void testTwoLevelRegistry() { + // Publish sensor registry + final MutableSensorRegistry registry = new MutableSensorRegistry("root"); + final MutableAggregatedSensor root = registry.getTopLevelSensor(); + final MutableAggregatedSensor sensor1 = root.addChildAggregatedSensor("sensor-1"); + sensor1.addChildAggregatedSensor("sensor-1-1"); + sensor1.addChildAggregatedSensor("sensor-1-2"); + final MutableAggregatedSensor sensor2 = root.addChildAggregatedSensor("sensor-2"); + sensor2.addChildAggregatedSensor("sensor-2-1"); + sensor2.addChildAggregatedSensor("sensor-2-2"); + this.configurationTopic.pipeInput( + Event.SENSOR_REGISTRY_CHANGED, + registry.toJson(), + Instant.ofEpochSecond(0)); + + // Publish input records + this.pipeInput("sensor-1-1", Instant.ofEpochSecond(0), 100.0); + this.pipeInput("sensor-2-1", Instant.ofEpochSecond(0), 100.0); + this.pipeInput("sensor-1-2", Instant.ofEpochSecond(1), 100.0); + this.pipeInput("sensor-2-2", Instant.ofEpochSecond(1), 100.0); + + // Advance time to obtain outputs + this.pipeInput("sensor-2-2", Instant.ofEpochSecond(2), 123.0); + + // Check aggregation results + Assert.assertEquals(3, this.outputTopic.getQueueSize()); + + final AggregatedActivePowerRecord rootResult = this.outputTopic.readValue(); + Assert.assertEquals("root", rootResult.getIdentifier()); + Assert.assertEquals(2, rootResult.getCount()); + Assert.assertEquals(400.0, rootResult.getSumInW(), 0.1); + + final AggregatedActivePowerRecord sensor1Result = this.outputTopic.readValue(); + Assert.assertEquals("sensor-1", sensor1Result.getIdentifier()); + Assert.assertEquals(2, sensor1Result.getCount()); + Assert.assertEquals(200.0, sensor1Result.getSumInW(), 0.1); + + final AggregatedActivePowerRecord sensor2Result = this.outputTopic.readValue(); + Assert.assertEquals("sensor-2", sensor2Result.getIdentifier()); + Assert.assertEquals(2, sensor2Result.getCount()); + Assert.assertEquals(200.0, sensor2Result.getSumInW(), 0.1); + } + + @Test + public void testUnbalancedRegistry() { + // Publish sensor registry + final MutableSensorRegistry registry = new MutableSensorRegistry("root"); + final MutableAggregatedSensor root = registry.getTopLevelSensor(); + final MutableAggregatedSensor sensor1 = root.addChildAggregatedSensor("sensor-1"); + sensor1.addChildMachineSensor("sensor-1-1"); + sensor1.addChildMachineSensor("sensor-1-2"); + final MutableAggregatedSensor sensor2 = root.addChildAggregatedSensor("sensor-2"); + final MutableAggregatedSensor sensor21 = sensor2.addChildAggregatedSensor("sensor-2-1"); + sensor21.addChildMachineSensor("sensor-2-1-1"); + sensor21.addChildMachineSensor("sensor-2-1-2"); + final MutableAggregatedSensor sensor22 = sensor2.addChildAggregatedSensor("sensor-2-2"); + sensor22.addChildMachineSensor("sensor-2-2-1"); + sensor22.addChildMachineSensor("sensor-2-2-2"); + this.configurationTopic.pipeInput( + Event.SENSOR_REGISTRY_CHANGED, + registry.toJson(), + Instant.ofEpochSecond(0)); + + // Publish input records + this.pipeInput("sensor-1-1", Instant.ofEpochSecond(0), 100.0); + this.pipeInput("sensor-2-1-1", Instant.ofEpochSecond(0), 10.0); + this.pipeInput("sensor-2-2-1", Instant.ofEpochSecond(0), 20.0); + this.pipeInput("sensor-1-2", Instant.ofEpochSecond(0), 200.0); + this.pipeInput("sensor-2-1-2", Instant.ofEpochSecond(1), 1.0); + this.pipeInput("sensor-2-2-2", Instant.ofEpochSecond(1), 2.0); + + // Advance time to obtain outputs + this.pipeInput("sensor-1-1", Instant.ofEpochSecond(2), 999.0); + + // Check aggregation results + Assert.assertEquals(5, this.outputTopic.getQueueSize()); + + final AggregatedActivePowerRecord rootResult = this.outputTopic.readValue(); + Assert.assertEquals("root", rootResult.getIdentifier()); + Assert.assertEquals(2, rootResult.getCount()); + Assert.assertEquals(333.0, rootResult.getSumInW(), 0.1); + + final AggregatedActivePowerRecord sensor1Result = this.outputTopic.readValue(); + Assert.assertEquals("sensor-1", sensor1Result.getIdentifier()); + Assert.assertEquals(2, sensor1Result.getCount()); + Assert.assertEquals(300.0, sensor1Result.getSumInW(), 0.1); + + final AggregatedActivePowerRecord sensor2Result = this.outputTopic.readValue(); + Assert.assertEquals("sensor-2", sensor2Result.getIdentifier()); + Assert.assertEquals(2, sensor2Result.getCount()); + Assert.assertEquals(33.0, sensor2Result.getSumInW(), 0.1); + + final AggregatedActivePowerRecord sensor21Result = this.outputTopic.readValue(); + Assert.assertEquals("sensor-2-1", sensor21Result.getIdentifier()); + Assert.assertEquals(2, sensor21Result.getCount()); + Assert.assertEquals(11.0, sensor21Result.getSumInW(), 0.1); + + final AggregatedActivePowerRecord sensor22Result = this.outputTopic.readValue(); + Assert.assertEquals("sensor-2-2", sensor22Result.getIdentifier()); + Assert.assertEquals(2, sensor22Result.getCount()); + Assert.assertEquals(22.0, sensor22Result.getSumInW(), 0.1); + } + + + + private void pipeInput(final String identifier, final Instant timestamp, final double value) { + this.inputTopic.pipeInput( + identifier, + new ActivePowerRecord(identifier, timestamp.toEpochMilli(), value), + timestamp); + } + +}