Skip to content
Snippets Groups Projects
Commit 13e4fa2f authored by Christian Wulf's avatar Christian Wulf
Browse files

added tests for idle strategies

parent 3349f88f
No related branches found
No related tags found
No related merge requests found
Showing
with 255 additions and 10 deletions
package teetime.framework;
import teetime.framework.idle.IdleStrategy;
public abstract class AbstractConsumerStage<I> extends AbstractStage {
protected final InputPort<I> inputPort = this.createInputPort();
private IdleStrategy idleStrategy; // FIXME remove this word-around
public final InputPort<I> getInputPort() {
return this.inputPort;
}
......@@ -17,4 +21,11 @@ public abstract class AbstractConsumerStage<I> extends AbstractStage {
protected abstract void execute(I element);
public IdleStrategy getIdleStrategy() {
return idleStrategy;
}
public void setIdleStrategy(final IdleStrategy idleStrategy) {
this.idleStrategy = idleStrategy;
}
}
......@@ -24,12 +24,10 @@ public abstract class AbstractInterThreadPipe extends AbstractPipe {
System.out.println("send signal: " + signal + " to " + cachedTargetStage);
Thread owningThread = cachedTargetStage.getOwningThread();
if (owningThread.getState() == State.WAITING || owningThread.getState() == State.TIMED_WAITING) {
if (null != owningThread && (owningThread.getState() == State.WAITING || owningThread.getState() == State.TIMED_WAITING)) {
owningThread.interrupt();
System.out.println("interrupted " + owningThread);
}
System.out.println("Signal sent.");
}
/**
......
......@@ -23,6 +23,8 @@ public abstract class AbstractStage extends Stage {
private final Set<ISignal> triggeredSignals = new HashSet<ISignal>();
private boolean shouldTerminate;
private boolean started;
private void connectUnconnectedOutputPorts() {
for (OutputPort<?> outputPort : this.cachedOutputPorts) {
if (null == outputPort.getPipe()) { // if port is unconnected
......@@ -37,7 +39,9 @@ public abstract class AbstractStage extends Stage {
*/
@Override
public InputPort<?>[] getInputPorts() {
return this.cachedInputPorts;
// return this.cachedInputPorts;
System.out.println("inputPortList: " + inputPortList);
return inputPortList.toArray(new InputPort<?>[0]);
}
/**
......@@ -54,6 +58,7 @@ public abstract class AbstractStage extends Stage {
public void onSignal(final ISignal signal, final InputPort<?> inputPort) {
if (!this.signalAlreadyReceived(signal, inputPort)) {
signal.trigger(this);
started = true;
for (OutputPort<?> outputPort : this.outputPortList) {
outputPort.sendSignal(signal);
......@@ -61,6 +66,11 @@ public abstract class AbstractStage extends Stage {
}
}
@Override
public boolean isStarted() {
return started;
}
/**
* @param signal
* arriving signal
......
......@@ -72,7 +72,13 @@ public class Analysis implements UncaughtExceptionHandler {
for (Stage stage : threadableStageJobs) {
switch (stage.getTerminationStrategy()) {
case BY_SIGNAL: {
final Thread thread = new Thread(new RunnableConsumerStage(stage));
RunnableConsumerStage runnable;
if (stage instanceof AbstractConsumerStage<?>) {
runnable = new RunnableConsumerStage(stage, ((AbstractConsumerStage<?>) stage).getIdleStrategy()); // FIXME remove this word-around
} else {
runnable = new RunnableConsumerStage(stage);
}
final Thread thread = new Thread(runnable);
stage.setOwningThread(thread);
this.consumerThreads.add(thread);
break;
......
package teetime.framework;
import java.util.Arrays;
import teetime.framework.idle.IdleStrategy;
import teetime.framework.idle.YieldStrategy;
import teetime.framework.pipe.IPipe;
......@@ -20,13 +22,14 @@ public final class RunnableConsumerStage extends RunnableStage {
@Override
protected void beforeStageExecution() {
// TODO wait for starting signal
logger.trace("ENTRY beforeStageExecution");
do {
checkforSignals();
// logger.trace("Signals checked.");
Thread.yield();
} while (stage.getInputPorts().length == 0);
logger.debug("Stage initialized");
} while (!stage.isStarted());
logger.trace("EXIT beforeStageExecution");
}
@Override
......@@ -50,6 +53,7 @@ public final class RunnableConsumerStage extends RunnableStage {
private void checkforSignals() {
// FIXME should getInputPorts() really be defined in Stage?
InputPort<?>[] inputPorts = stage.getInputPorts();
logger.debug("inputPorts: " + Arrays.toString(inputPorts));
for (InputPort<?> inputPort : inputPorts) {
IPipe pipe = inputPort.getPipe();
if (pipe instanceof AbstractInterThreadPipe) {
......
......@@ -94,4 +94,6 @@ public abstract class Stage {
}
protected abstract InputPort<?>[] getInputPorts();
protected abstract boolean isStarted();
}
......@@ -38,10 +38,13 @@ public final class SpScPipe extends AbstractInterThreadPipe {
Thread.yield();
}
System.out.println("Added: " + element);
Thread owningThread = cachedTargetStage.getOwningThread();
if (owningThread.getState() == State.WAITING || owningThread.getState() == State.TIMED_WAITING) {
if (null != owningThread && (owningThread.getState() == State.WAITING || owningThread.getState() == State.TIMED_WAITING)) {
synchronized (cachedTargetStage) {
cachedTargetStage.notify();
System.out.println("Notified: " + cachedTargetStage);
}
}
......
......@@ -75,4 +75,9 @@ public final class EveryXthPrinter<T> extends Stage {
return distributor.getInputPorts();
}
@Override
protected boolean isStarted() {
return distributor.isStarted();
}
}
......@@ -72,4 +72,9 @@ public class OldPipeline<FirstStage extends Stage, LastStage extends Stage> exte
return firstStage.getTerminationStrategy();
}
@Override
protected boolean isStarted() {
return firstStage.isStarted();
}
}
package teetime.framework;
import static org.junit.Assert.assertEquals;
import java.lang.Thread.State;
import java.util.Collection;
import org.junit.Test;
import teetime.util.Pair;
import com.google.common.base.Joiner;
public class RunnableConsumerStageTest {
@Test
public void testWaitingInfinitely() throws Exception {
WaitStrategyConfiguration waitStrategyConfiguration = new WaitStrategyConfiguration(5000, 1);
final Analysis analysis = new Analysis(waitStrategyConfiguration);
analysis.init();
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
start(analysis);
}
});
thread.start();
Thread.sleep(200);
assertEquals(State.WAITING, thread.getState());
assertEquals(0, waitStrategyConfiguration.getCollectorSink().getElements().size());
}
@Test
public void testWaitingFinitely() throws Exception {
WaitStrategyConfiguration waitStrategyConfiguration = new WaitStrategyConfiguration(300, 1);
final Analysis analysis = new Analysis(waitStrategyConfiguration);
analysis.init();
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
start(analysis);
}
});
thread.start();
Thread.sleep(200);
assertEquals(State.WAITING, thread.getState());
Thread.sleep(500);
assertEquals(State.TERMINATED, thread.getState());
assertEquals(1, waitStrategyConfiguration.getCollectorSink().getElements().get(0));
assertEquals(0, waitStrategyConfiguration.getCollectorSink().getElements().size());
}
@Test
public void testSimpleRun() throws Exception {
YieldStrategyConfiguration waitStrategyConfiguration = new YieldStrategyConfiguration(42);
final Analysis analysis = new Analysis(waitStrategyConfiguration);
analysis.init();
start(analysis);
assertEquals(42, waitStrategyConfiguration.getCollectorSink().getElements().get(0));
assertEquals(1, waitStrategyConfiguration.getCollectorSink().getElements().size());
}
private void start(final Analysis analysis) {
Collection<Pair<Thread, Throwable>> exceptions = analysis.start();
for (Pair<Thread, Throwable> pair : exceptions) {
System.out.println(pair.getSecond());
System.out.println(Joiner.on("\n").join(pair.getSecond().getStackTrace()));
}
assertEquals(0, exceptions.size());
}
}
package teetime.framework;
import teetime.framework.idle.WaitStrategy;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.stage.Clock;
import teetime.stage.CollectorSink;
import teetime.stage.InitialElementProducer;
import teetime.stage.Relay;
import teetime.stage.basic.Delay;
class WaitStrategyConfiguration extends AnalysisConfiguration {
private final IPipeFactory intraThreadPipeFactory;
private final IPipeFactory interThreadPipeFactory;
private Delay<Object> delay;
private CollectorSink<Object> collectorSink;
public WaitStrategyConfiguration(final long initialDelayInMs, final Object... elements) {
intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
interThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false);
Stage producer = buildProducer(elements);
addThreadableStage(producer);
Stage consumer = buildConsumer(delay);
addThreadableStage(consumer);
Clock clock = buildClock(initialDelayInMs, delay);
addThreadableStage(clock);
}
private Clock buildClock(final long initialDelayInMs, final Delay<Object> delay) {
Clock clock = new Clock();
clock.setInitialDelayInMs(initialDelayInMs);
interThreadPipeFactory.create(clock.getOutputPort(), delay.getTimestampTriggerInputPort());
return clock;
}
private Stage buildProducer(final Object... elements) {
InitialElementProducer<Object> initialElementProducer = new InitialElementProducer<Object>(elements);
delay = new Delay<Object>();
intraThreadPipeFactory.create(initialElementProducer.getOutputPort(), delay.getInputPort());
return initialElementProducer;
}
private Relay<Object> buildConsumer(final Delay<Object> delay) {
Relay<Object> relay = new Relay<Object>();
CollectorSink<Object> collectorSink = new CollectorSink<Object>();
relay.setIdleStrategy(new WaitStrategy(relay));
interThreadPipeFactory.create(delay.getOutputPort(), relay.getInputPort());
intraThreadPipeFactory.create(relay.getOutputPort(), collectorSink.getInputPort());
this.collectorSink = collectorSink;
return relay;
}
public CollectorSink<Object> getCollectorSink() {
return collectorSink;
}
}
package teetime.framework;
import teetime.framework.idle.YieldStrategy;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.stage.CollectorSink;
import teetime.stage.InitialElementProducer;
import teetime.stage.Relay;
class YieldStrategyConfiguration extends AnalysisConfiguration {
private final IPipeFactory intraThreadPipeFactory;
private final IPipeFactory interThreadPipeFactory;
private CollectorSink<Object> collectorSink;
public YieldStrategyConfiguration(final Object... elements) {
intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
interThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false);
InitialElementProducer<Object> producer = buildProducer(elements);
addThreadableStage(producer);
Stage consumer = buildConsumer(producer);
addThreadableStage(consumer);
}
private InitialElementProducer<Object> buildProducer(final Object... elements) {
InitialElementProducer<Object> initialElementProducer = new InitialElementProducer<Object>(elements);
return initialElementProducer;
}
private Relay<Object> buildConsumer(final InitialElementProducer<Object> producer) {
Relay<Object> relay = new Relay<Object>();
CollectorSink<Object> collectorSink = new CollectorSink<Object>();
relay.setIdleStrategy(new YieldStrategy());
interThreadPipeFactory.create(producer.getOutputPort(), relay.getInputPort());
intraThreadPipeFactory.create(relay.getOutputPort(), collectorSink.getInputPort());
this.collectorSink = collectorSink;
return relay;
}
public CollectorSink<Object> getCollectorSink() {
return collectorSink;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment