diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index 174f5bd7f608bad8d5c955cea247394f747dbca4..680d1b8964ab7dd247067b43c2f50bb4c36d7665 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -40,8 +40,7 @@ public abstract class AbstractStage extends Stage { @Override public InputPort<?>[] getInputPorts() { // return this.cachedInputPorts; - System.out.println("inputPortList: " + inputPortList); - return inputPortList.toArray(new InputPort<?>[0]); + return inputPortList.toArray(new InputPort<?>[0]); // FIXME remove work-around } /** @@ -58,7 +57,6 @@ public abstract class AbstractStage extends Stage { public void onSignal(final ISignal signal, final InputPort<?> inputPort) { if (!this.signalAlreadyReceived(signal, inputPort)) { signal.trigger(this); - started = true; for (OutputPort<?> outputPort : this.outputPortList) { outputPort.sendSignal(signal); @@ -98,6 +96,8 @@ public abstract class AbstractStage extends Stage { this.cachedOutputPorts = this.outputPortList.toArray(new OutputPort<?>[0]); this.connectUnconnectedOutputPorts(); + started = true; + logger.info(this + " started."); } public void onTerminating() throws Exception { diff --git a/src/main/java/teetime/framework/RunnableConsumerStage.java b/src/main/java/teetime/framework/RunnableConsumerStage.java index d2a6ebf250a9e8020172473cb1295ac6b929e430..209f826e62645a40682cab79dfa9f8b0cb66e0c1 100644 --- a/src/main/java/teetime/framework/RunnableConsumerStage.java +++ b/src/main/java/teetime/framework/RunnableConsumerStage.java @@ -43,17 +43,20 @@ public final class RunnableConsumerStage extends RunnableStage { } private void executeIdleStrategy() { + if (stage.shouldBeTerminated()) { + return; + } try { idleStrategy.execute(); } catch (InterruptedException e) { - checkforSignals(); // check for termination + // checkforSignals(); // check for termination } } private void checkforSignals() { // FIXME should getInputPorts() really be defined in Stage? InputPort<?>[] inputPorts = stage.getInputPorts(); - logger.debug("inputPorts: " + Arrays.toString(inputPorts)); + logger.debug("Checking signals for: " + Arrays.toString(inputPorts)); for (InputPort<?> inputPort : inputPorts) { IPipe pipe = inputPort.getPipe(); if (pipe instanceof AbstractInterThreadPipe) { diff --git a/src/main/java/teetime/framework/pipe/SpScPipe.java b/src/main/java/teetime/framework/pipe/SpScPipe.java index 58a3789d066dda5fb76a29c1ee2ffdc3fee9acdc..7564e870a452593ff7ebd53e3b8f107095a080c9 100644 --- a/src/main/java/teetime/framework/pipe/SpScPipe.java +++ b/src/main/java/teetime/framework/pipe/SpScPipe.java @@ -14,6 +14,8 @@ import teetime.framework.OutputPort; public final class SpScPipe extends AbstractInterThreadPipe { + // private static final Logger LOGGER = LoggerFactory.getLogger(SpScPipe.class); + private final Queue<Object> queue; // statistics private int numWaits; @@ -38,13 +40,11 @@ public final class SpScPipe extends AbstractInterThreadPipe { Thread.yield(); } - System.out.println("Added: " + element); - Thread owningThread = cachedTargetStage.getOwningThread(); if (null != owningThread && (owningThread.getState() == State.WAITING || owningThread.getState() == State.TIMED_WAITING)) { synchronized (cachedTargetStage) { cachedTargetStage.notify(); - System.out.println("Notified: " + cachedTargetStage); + // LOGGER.trace("Notified: " + cachedTargetStage); } } diff --git a/src/main/java/teetime/framework/signal/StartingSignal.java b/src/main/java/teetime/framework/signal/StartingSignal.java index da156138db66ad0eb938483fd5cd1611bf76f77b..6b260fe6da68e9616011caf2db14ef26800b4878 100644 --- a/src/main/java/teetime/framework/signal/StartingSignal.java +++ b/src/main/java/teetime/framework/signal/StartingSignal.java @@ -19,7 +19,6 @@ public class StartingSignal implements ISignal { public void trigger(final AbstractStage stage) { try { stage.onStarting(); - LOGGER.info(stage + " started."); } catch (Exception e) { // NOCS (Stages can throw any arbitrary Exception) this.catchedExceptions.add(e); LOGGER.error("Exception while sending the start signal", e); diff --git a/src/main/java/teetime/stage/Relay.java b/src/main/java/teetime/stage/Relay.java index 54f09ed8ff322a0e40c4020083620e2c815f2c23..98349057d2df7377e15109469830a28f09578f30 100644 --- a/src/main/java/teetime/stage/Relay.java +++ b/src/main/java/teetime/stage/Relay.java @@ -24,7 +24,6 @@ public final class Relay<T> extends AbstractConsumerStage<T> { logger.trace("relay: returnNoElement"); returnNoElement(); } - logger.trace("relay: " + element); outputPort.send(element); } diff --git a/src/main/java/teetime/stage/basic/Delay.java b/src/main/java/teetime/stage/basic/Delay.java index 29f152d0fd8589b1d5e75da901f3269baef878c1..a06bf6cb27b95d77097f1578867e7332cbe4f5fb 100644 --- a/src/main/java/teetime/stage/basic/Delay.java +++ b/src/main/java/teetime/stage/basic/Delay.java @@ -27,17 +27,31 @@ public final class Delay<T> extends AbstractStage { return; } + sendAllBufferedEllements(); + } + + private void sendAllBufferedEllements() { while (!bufferedElements.isEmpty()) { - element = bufferedElements.remove(0); + T element = bufferedElements.remove(0); outputPort.send(element); + logger.trace("Sent buffered element: " + element); } } @Override public void onTerminating() throws Exception { - while (!this.inputPort.getPipe().isEmpty()) { - this.executeWithPorts(); + while (null == timestampTriggerInputPort.receive()) { + // wait for the next trigger } + + sendAllBufferedEllements(); + + T element; + while (null != (element = inputPort.receive())) { + outputPort.send(element); + logger.trace("Sent element: " + element); + } + super.onTerminating(); } diff --git a/src/test/java/teetime/framework/RunnableConsumerStageTest.java b/src/test/java/teetime/framework/RunnableConsumerStageTest.java index c892f8496822a2dc6ae017aa2060811b2e74c223..e2d1771d20387d25ff8dbcf87ef3d95ba2d5c3c6 100644 --- a/src/test/java/teetime/framework/RunnableConsumerStageTest.java +++ b/src/test/java/teetime/framework/RunnableConsumerStageTest.java @@ -15,7 +15,7 @@ public class RunnableConsumerStageTest { @Test public void testWaitingInfinitely() throws Exception { - WaitStrategyConfiguration waitStrategyConfiguration = new WaitStrategyConfiguration(5000, 1); + WaitStrategyConfiguration waitStrategyConfiguration = new WaitStrategyConfiguration(300, 1); final Analysis analysis = new Analysis(waitStrategyConfiguration); analysis.init(); @@ -49,15 +49,16 @@ public class RunnableConsumerStageTest { Thread.sleep(200); assertEquals(State.WAITING, thread.getState()); + assertEquals(0, waitStrategyConfiguration.getCollectorSink().getElements().size()); - Thread.sleep(500); + Thread.sleep(200); assertEquals(State.TERMINATED, thread.getState()); assertEquals(1, waitStrategyConfiguration.getCollectorSink().getElements().get(0)); - assertEquals(0, waitStrategyConfiguration.getCollectorSink().getElements().size()); + assertEquals(1, waitStrategyConfiguration.getCollectorSink().getElements().size()); } @Test - public void testSimpleRun() throws Exception { + public void testYieldRun() throws Exception { YieldStrategyConfiguration waitStrategyConfiguration = new YieldStrategyConfiguration(42); final Analysis analysis = new Analysis(waitStrategyConfiguration);