Skip to content
Snippets Groups Projects
Commit 172f33e6 authored by Christian Wulf's avatar Christian Wulf
Browse files

fixed bug in SCParkTakeStrategy;

added RunnableConsumerStageTest
parent e26045c6
No related branches found
No related tags found
No related merge requests found
......@@ -32,6 +32,9 @@ public final class SCParkTakeStrategy<E> implements TakeStrategy<E> {
while ((e = q.poll()) == null) {
LockSupport.park();
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Interrupted while waiting for the queue to become non-empty.");
}
}
t.lazySet(null);
......
......@@ -17,8 +17,10 @@ package teetime.framework;
import static org.junit.Assert.assertEquals;
import java.lang.Thread.State;
import java.util.Collection;
import org.junit.Ignore;
import org.junit.Test;
import teetime.util.Pair;
......@@ -27,6 +29,31 @@ import com.google.common.base.Joiner;
public class RunnableConsumerStageTest {
@Test
public void testWaitingInfinitely() throws Exception {
RunnableConsumerStageTestConfiguration configuration = new RunnableConsumerStageTestConfiguration();
final Analysis analysis = new Analysis(configuration);
final Thread thread = new Thread(new Runnable() {
@Override
public void run() {
start(analysis);
}
});
thread.start();
Thread.sleep(200);
// assertEquals(State.WAITING, thread.getState());
assertEquals(State.WAITING, configuration.getConsumerThread().getState());
assertEquals(0, configuration.getCollectedElements().size());
// clean up
configuration.getConsumerThread().interrupt();
configuration.getConsumerThread().join();
thread.join();
}
// @Test
// public void testWaitingInfinitely() throws Exception {
// WaitStrategyConfiguration waitStrategyConfiguration = new WaitStrategyConfiguration(300, 42);
......@@ -45,7 +72,7 @@ public class RunnableConsumerStageTest {
// assertEquals(State.WAITING, thread.getState());
// assertEquals(0, waitStrategyConfiguration.getCollectorSink().getElements().size());
// }
//
// @Test
// public void testWaitingFinitely() throws Exception {
// WaitStrategyConfiguration waitStrategyConfiguration = new WaitStrategyConfiguration(300, 42);
......@@ -66,6 +93,7 @@ public class RunnableConsumerStageTest {
// assertEquals(1, waitStrategyConfiguration.getCollectorSink().getElements().size());
// }
@Ignore
@Test
public void testYieldRun() throws Exception {
YieldStrategyConfiguration waitStrategyConfiguration = new YieldStrategyConfiguration(42);
......
package teetime.framework;
import java.util.ArrayList;
import java.util.List;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.stage.CollectorSink;
import teetime.stage.InitialElementProducer;
public class RunnableConsumerStageTestConfiguration extends AnalysisConfiguration {
private final List<Integer> collectedElements = new ArrayList<Integer>();
private final CollectorSink<Integer> collectorSink;
public RunnableConsumerStageTestConfiguration(final Integer... inputElements) {
InitialElementProducer<Integer> producer = new InitialElementProducer<Integer>(inputElements);
// addThreadableStage(producer);
CollectorSink<Integer> collectorSink = new CollectorSink<Integer>(collectedElements);
addThreadableStage(collectorSink);
IPipeFactory pipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false);
pipeFactory.create(producer.getOutputPort(), collectorSink.getInputPort());
this.collectorSink = collectorSink;
}
public List<Integer> getCollectedElements() {
return collectedElements;
}
public Thread getConsumerThread() {
return collectorSink.getOwningThread();
}
}
......@@ -20,9 +20,9 @@
</encoder>
</appender>
<logger name="teetime" level="INFO" />
<!-- <logger name="teetime.framework" level="TRACE" /> -->
<!-- <logger name="teetime.stage" level="TRACE" /> -->
<logger name="teetime" level="INFO" />
<logger name="util" level="INFO" />
<root level="WARN">
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment