From cecc1e95db396d06ca1c314c8ed1c0da07f9418b Mon Sep 17 00:00:00 2001 From: Nils Ziermann <nz@braune-digital.com> Date: Fri, 6 Jan 2017 13:29:55 +0100 Subject: [PATCH] Extend TaskQueueTest --- src/main/java/teetime/stage/IncStage.java | 26 +++++++++++++++++++ .../directory/TaskQueueTestConfiguration.java | 4 ++- .../java/teetime/framework/TaskQueueTest.java | 9 ++++--- 3 files changed, 35 insertions(+), 4 deletions(-) create mode 100644 src/main/java/teetime/stage/IncStage.java diff --git a/src/main/java/teetime/stage/IncStage.java b/src/main/java/teetime/stage/IncStage.java new file mode 100644 index 000000000..505279e02 --- /dev/null +++ b/src/main/java/teetime/stage/IncStage.java @@ -0,0 +1,26 @@ +package teetime.stage; + +import teetime.framework.AbstractStage; +import teetime.framework.ITaskQueueDuplicable; +import teetime.stage.basic.AbstractFilter; + +/** + * Created by nilsziermann on 06.01.17. + */ +public class IncStage extends AbstractFilter<Integer> implements ITaskQueueDuplicable { + + @Override + protected void execute(Integer element) { + this.outputPort.send(element+1); + } + + @Override + public AbstractStage duplicate() { + IncStage inc = new IncStage(); + inc.getInputPort().setPipe(this.getInputPort().getPipe()); + inc.getOutputPort().setPipe(this.getOutputPort().getPipe()); + inc.setExceptionHandler(this.getExceptionListener()); + + return inc; + } +} diff --git a/src/test/java/teetime/examples/directory/TaskQueueTestConfiguration.java b/src/test/java/teetime/examples/directory/TaskQueueTestConfiguration.java index 3613c4c7c..f937762ff 100644 --- a/src/test/java/teetime/examples/directory/TaskQueueTestConfiguration.java +++ b/src/test/java/teetime/examples/directory/TaskQueueTestConfiguration.java @@ -25,10 +25,12 @@ public class TaskQueueTestConfiguration extends Configuration { final VaryingVariableWorkloadStage varyingVariableWorkloadStage = new VaryingVariableWorkloadStage(); final VaryingVariableWorkloadStage varyingVariableWorkloadStage2 = new VaryingVariableWorkloadStage(); final VaryingVariableWorkloadStage varyingVariableWorkloadStage3 = new VaryingVariableWorkloadStage(); + final IncStage inc = new IncStage(); connectPorts(init.getOutputPort(), varyingVariableWorkloadStage.getInputPort()); connectPorts(varyingVariableWorkloadStage.getOutputPort(), varyingVariableWorkloadStage2.getInputPort()); - connectPorts(varyingVariableWorkloadStage2.getOutputPort(), counter.getInputPort()); + connectPorts(varyingVariableWorkloadStage2.getOutputPort(), inc.getInputPort()); + connectPorts(inc.getOutputPort(), counter.getInputPort()); connectPorts(counter.getOutputPort(), collectorSink.getInputPort()); } } diff --git a/src/test/java/teetime/framework/TaskQueueTest.java b/src/test/java/teetime/framework/TaskQueueTest.java index ec64e5ef1..8acaf9aed 100644 --- a/src/test/java/teetime/framework/TaskQueueTest.java +++ b/src/test/java/teetime/framework/TaskQueueTest.java @@ -50,9 +50,12 @@ public class TaskQueueTest { final List<Integer> output = configuration.collectorSink.getElements(); int last = Integer.MIN_VALUE; - for(Integer element : output) { - Assert.assertTrue(element > last); - last = element; + + int n = output.size(); + for(int i = 0; i < n; i++) { + int test = output.remove(0); + System.out.println(test); + Assert.assertEquals(i+1, test ); } } } -- GitLab