diff --git a/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/theodolite/uc4/application/Uc4PipelineTest.java b/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/theodolite/uc4/application/Uc4PipelineTest.java index d505b716930a599c9e2c3c71f62ecab08b01ae11..925f8339e9d40be3e32acd2edcc99542ac8104bb 100644 --- a/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/theodolite/uc4/application/Uc4PipelineTest.java +++ b/theodolite-benchmarks/uc4-hazelcastjet/src/test/java/theodolite/uc4/application/Uc4PipelineTest.java @@ -6,6 +6,7 @@ import com.hazelcast.jet.config.JetConfig; import com.hazelcast.jet.config.JobConfig; import com.hazelcast.jet.core.JetTestSupport; import com.hazelcast.jet.pipeline.Pipeline; +import com.hazelcast.jet.pipeline.Sinks; import com.hazelcast.jet.pipeline.StreamSource; import com.hazelcast.jet.pipeline.StreamStage; import com.hazelcast.jet.pipeline.test.AssertionCompletedException; @@ -14,6 +15,7 @@ import com.hazelcast.jet.pipeline.test.TestSources; import com.hazelcast.jet.test.SerialTest; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.concurrent.CompletionException; import org.junit.After; import org.junit.Assert; @@ -45,12 +47,12 @@ public class Uc4PipelineTest extends JetTestSupport { public void buildUc4Pipeline() { // Setup Configuration - int testItemsPerSecond = 1; - String testSensorName = "TEST-SENSOR"; - String testLevel1GroupName = "TEST-LEVEL1-GROUP"; - String testLevel2GroupName = "TEST-LEVEL2-GROUP"; - Double testValueInW = 10.0; - int testWindowSize = 5000; // As window size is bugged, not necessary. + final int testItemsPerSecond = 2; + final String testSensorName = "TEST-SENSOR"; + final String testLevel1GroupName = "TEST-LEVEL1-GROUP"; + final String testLevel2GroupName = "TEST-LEVEL2-GROUP"; + final Double testValueInW = 10.0; + final int testWindowSize = 5000; // As window size is bugged, not necessary. // Create mock jet instance with configuration final String testClusterName = randomName(); @@ -68,8 +70,6 @@ public class Uc4PipelineTest extends JetTestSupport { return testEntry; }); - final AggregatedActivePowerRecord.Builder aggregationBuilder = AggregatedActivePowerRecord.newBuilder(); - // Create test source 2 : Mock aggregation Values final StreamSource<Entry<String, AggregatedActivePowerRecord>> testAggregationSource = TestSources.itemStream(testItemsPerSecond, (timestamp, item) -> { @@ -81,42 +81,46 @@ public class Uc4PipelineTest extends JetTestSupport { testValueInW, testValueInW); - final ActivePowerRecord testAggValue = new ActivePowerRecord(testSensorName,System.currentTimeMillis(),testValueInW); + final ActivePowerRecord testAggValue = + new ActivePowerRecord(testSensorName, + System.currentTimeMillis(), + testValueInW); + final Entry<String, AggregatedActivePowerRecord> testEntry = Map.entry(testLevel1GroupName, test); return testEntry; }); + // Create test source 3 : Mock Config Values final StreamSource<Entry<Event, String>> testConfigSource = TestSources.itemStream(testItemsPerSecond, (timestamp, item) -> { - Event theEvent = Event.SENSOR_REGISTRY_CHANGED; + final Event theEvent = Event.SENSOR_REGISTRY_CHANGED; // Topology: // level2Group -> level1Group -> testSensor // Create Registry - MutableSensorRegistry testRegistry = new MutableSensorRegistry(testLevel2GroupName); + final MutableSensorRegistry testRegistry = new MutableSensorRegistry(testLevel2GroupName); // Add Sensors - MutableAggregatedSensor topLevelSensor = testRegistry.getTopLevelSensor(); - MutableAggregatedSensor level1GroupSensor = + final MutableAggregatedSensor topLevelSensor = testRegistry.getTopLevelSensor(); + final MutableAggregatedSensor level1GroupSensor = topLevelSensor.addChildAggregatedSensor(testLevel1GroupName); - MachineSensor inputSensor = level1GroupSensor.addChildMachineSensor(testSensorName); - + final MachineSensor inputSensor = level1GroupSensor.addChildMachineSensor(testSensorName); - - String stringRegistry = testRegistry.toJson(); + final String stringRegistry = testRegistry.toJson(); final Entry<Event, String> testEntry = Map.entry(theEvent, stringRegistry); return testEntry; }); // Create pipeline to test - Uc4PipelineBuilder pipelineBuilder = new Uc4PipelineBuilder(); + final Uc4PipelineBuilder pipelineBuilder = new Uc4PipelineBuilder(); this.testPipeline = Pipeline.create(); - this.uc4Topology = pipelineBuilder.extendUc4Topology(testPipeline, testInputSource, - testAggregationSource, testConfigSource, testWindowSize); + this.uc4Topology = pipelineBuilder.extendUc4Topology(testPipeline, + testInputSource, testAggregationSource, testConfigSource, testWindowSize); + this.uc4Topology.writeTo(Sinks.logger()); } /** @@ -128,29 +132,63 @@ public class Uc4PipelineTest extends JetTestSupport { // System.out.println("DEBUG DEBUG DEBUG || ENTERED TEST 1"); // Assertion Configuration - int timeout = 10; - String testSensorName = "TEST-SENSOR"; - String testLevel1GroupName = "TEST-LEVEL1-GROUP"; - String testLevel2GroupName = "TEST-LEVEL2-GROUP"; - Double testValueInW = 10.0; + final int timeout = 20; + final String testSensorName = "TEST-SENSOR"; + final String testLevel1GroupName = "TEST-LEVEL1-GROUP"; + final String testLevel2GroupName = "TEST-LEVEL2-GROUP"; + final double testValueInW = 10.0; + // Assertion this.uc4Topology.apply(Assertions.assertCollectedEventually(timeout, collection -> { - System.out.println("DEBUG DEBUG DEBUG || ENTERED ASSERTION COLLECTED EVENTUALLY"); - Thread.sleep(20_000); - - boolean allOkay = true; - + System.out.println("DEBUG || ENTERED ASSERTION COLLECTED EVENTUALLY"); + + boolean allOkay = false; + + boolean testLevel1contained = false; + boolean testLevel2contained = false; + boolean averageEqTest = true; + boolean avOk = true; + + if (collection != null) { System.out.println("Collection size: " + collection.size()); - for(int i = 0; i < collection.size(); i++) { - System.out.println("DEBUG DEBUG DEBUG || " + collection.get(i).toString()); + for (final Entry<String, AggregatedActivePowerRecord> entry : collection) { + System.out.println("DEBUG || " + entry.toString()); + + final String key = entry.getKey(); + final AggregatedActivePowerRecord agg = entry.getValue(); + + + if (Objects.equals(key, testLevel1GroupName)) { + testLevel1contained = true; + } + + if(Objects.equals(key, testLevel2GroupName)){ + testLevel2contained = true; + } + + if (testValueInW != agg.getAverageInW()){ + averageEqTest = false; + } + + final double average = agg.getSumInW() / agg.getCount(); + if (average != agg.getAverageInW()) { + avOk = false; + } + } + allOkay = testLevel1contained && testLevel2contained && averageEqTest && avOk; } - + + System.out.println("testLevel1contained: " + testLevel1contained); + System.out.println("testLevel2contained: " + testLevel2contained); + System.out.println("averageEqTest: " + averageEqTest); + System.out.println("avOk: " + avOk); + Assert.assertTrue("Assertion did not complete!", allOkay); })); @@ -179,7 +217,7 @@ public class Uc4PipelineTest extends JetTestSupport { @After public void after() { - System.out.println("Shuting down"); + System.out.println("Shutting down"); // Shuts down all running Jet Instances Jet.shutdownAll(); }