Skip to content
Snippets Groups Projects
Commit 46e3b3a0 authored by Christian Wulf's avatar Christian Wulf
Browse files

fixed bug in Delay;

fixed perf test for idle strategies
parent 13e4fa2f
No related branches found
No related tags found
No related merge requests found
......@@ -40,8 +40,7 @@ public abstract class AbstractStage extends Stage {
@Override
public InputPort<?>[] getInputPorts() {
// return this.cachedInputPorts;
System.out.println("inputPortList: " + inputPortList);
return inputPortList.toArray(new InputPort<?>[0]);
return inputPortList.toArray(new InputPort<?>[0]); // FIXME remove work-around
}
/**
......@@ -58,7 +57,6 @@ public abstract class AbstractStage extends Stage {
public void onSignal(final ISignal signal, final InputPort<?> inputPort) {
if (!this.signalAlreadyReceived(signal, inputPort)) {
signal.trigger(this);
started = true;
for (OutputPort<?> outputPort : this.outputPortList) {
outputPort.sendSignal(signal);
......@@ -98,6 +96,8 @@ public abstract class AbstractStage extends Stage {
this.cachedOutputPorts = this.outputPortList.toArray(new OutputPort<?>[0]);
this.connectUnconnectedOutputPorts();
started = true;
logger.info(this + " started.");
}
public void onTerminating() throws Exception {
......
......@@ -43,17 +43,20 @@ public final class RunnableConsumerStage extends RunnableStage {
}
private void executeIdleStrategy() {
if (stage.shouldBeTerminated()) {
return;
}
try {
idleStrategy.execute();
} catch (InterruptedException e) {
checkforSignals(); // check for termination
// checkforSignals(); // check for termination
}
}
private void checkforSignals() {
// FIXME should getInputPorts() really be defined in Stage?
InputPort<?>[] inputPorts = stage.getInputPorts();
logger.debug("inputPorts: " + Arrays.toString(inputPorts));
logger.debug("Checking signals for: " + Arrays.toString(inputPorts));
for (InputPort<?> inputPort : inputPorts) {
IPipe pipe = inputPort.getPipe();
if (pipe instanceof AbstractInterThreadPipe) {
......
......@@ -14,6 +14,8 @@ import teetime.framework.OutputPort;
public final class SpScPipe extends AbstractInterThreadPipe {
// private static final Logger LOGGER = LoggerFactory.getLogger(SpScPipe.class);
private final Queue<Object> queue;
// statistics
private int numWaits;
......@@ -38,13 +40,11 @@ public final class SpScPipe extends AbstractInterThreadPipe {
Thread.yield();
}
System.out.println("Added: " + element);
Thread owningThread = cachedTargetStage.getOwningThread();
if (null != owningThread && (owningThread.getState() == State.WAITING || owningThread.getState() == State.TIMED_WAITING)) {
synchronized (cachedTargetStage) {
cachedTargetStage.notify();
System.out.println("Notified: " + cachedTargetStage);
// LOGGER.trace("Notified: " + cachedTargetStage);
}
}
......
......@@ -19,7 +19,6 @@ public class StartingSignal implements ISignal {
public void trigger(final AbstractStage stage) {
try {
stage.onStarting();
LOGGER.info(stage + " started.");
} catch (Exception e) { // NOCS (Stages can throw any arbitrary Exception)
this.catchedExceptions.add(e);
LOGGER.error("Exception while sending the start signal", e);
......
......@@ -24,7 +24,6 @@ public final class Relay<T> extends AbstractConsumerStage<T> {
logger.trace("relay: returnNoElement");
returnNoElement();
}
logger.trace("relay: " + element);
outputPort.send(element);
}
......
......@@ -27,17 +27,31 @@ public final class Delay<T> extends AbstractStage {
return;
}
sendAllBufferedEllements();
}
private void sendAllBufferedEllements() {
while (!bufferedElements.isEmpty()) {
element = bufferedElements.remove(0);
T element = bufferedElements.remove(0);
outputPort.send(element);
logger.trace("Sent buffered element: " + element);
}
}
@Override
public void onTerminating() throws Exception {
while (!this.inputPort.getPipe().isEmpty()) {
this.executeWithPorts();
while (null == timestampTriggerInputPort.receive()) {
// wait for the next trigger
}
sendAllBufferedEllements();
T element;
while (null != (element = inputPort.receive())) {
outputPort.send(element);
logger.trace("Sent element: " + element);
}
super.onTerminating();
}
......
......@@ -15,7 +15,7 @@ public class RunnableConsumerStageTest {
@Test
public void testWaitingInfinitely() throws Exception {
WaitStrategyConfiguration waitStrategyConfiguration = new WaitStrategyConfiguration(5000, 1);
WaitStrategyConfiguration waitStrategyConfiguration = new WaitStrategyConfiguration(300, 1);
final Analysis analysis = new Analysis(waitStrategyConfiguration);
analysis.init();
......@@ -49,15 +49,16 @@ public class RunnableConsumerStageTest {
Thread.sleep(200);
assertEquals(State.WAITING, thread.getState());
assertEquals(0, waitStrategyConfiguration.getCollectorSink().getElements().size());
Thread.sleep(500);
Thread.sleep(200);
assertEquals(State.TERMINATED, thread.getState());
assertEquals(1, waitStrategyConfiguration.getCollectorSink().getElements().get(0));
assertEquals(0, waitStrategyConfiguration.getCollectorSink().getElements().size());
assertEquals(1, waitStrategyConfiguration.getCollectorSink().getElements().size());
}
@Test
public void testSimpleRun() throws Exception {
public void testYieldRun() throws Exception {
YieldStrategyConfiguration waitStrategyConfiguration = new YieldStrategyConfiguration(42);
final Analysis analysis = new Analysis(waitStrategyConfiguration);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment