Skip to content
Snippets Groups Projects
Commit cf92e069 authored by Christian Wulf's avatar Christian Wulf
Browse files

added working ControlledMergerTest

parent c7d43f7b
No related branches found
No related tags found
No related merge requests found
Showing
with 113 additions and 58 deletions
#FindBugs User Preferences
#Mon Jun 22 16:47:12 CEST 2015
#Thu Jun 25 14:06:30 CEST 2015
detector_threshold=2
effort=max
excludefilter0=.fbExcludeFilterFile|true
......
......@@ -6,16 +6,17 @@ public class DynamicActuator {
* @deprecated Use {@link #startWithinNewThread(Stage)} instead.
*/
@Deprecated
public Runnable wrap(final Stage stage) {
public AbstractRunnableStage wrap(final Stage stage) {
if (stage.getInputPorts().length > 0) {
return new RunnableConsumerStage(stage);
}
return new RunnableProducerStage(stage);
}
public void startWithinNewThread(final Stage stage) {
public Runnable startWithinNewThread(final Stage stage) {
Runnable runnable = wrap(stage);
Thread thread = new Thread(runnable);
thread.start();
return runnable;
}
}
......@@ -203,10 +203,12 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti
*/
public void waitForTermination() {
try {
LOGGER.debug("Waiting for finiteProducerThreads");
for (Thread thread : this.finiteProducerThreads) {
thread.join();
}
LOGGER.debug("Waiting for consumerThreads");
for (Thread thread : this.consumerThreads) {
thread.join();
}
......@@ -221,6 +223,7 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti
}
}
LOGGER.debug("Interrupting infiniteProducerThreads...");
for (Thread thread : this.infiniteProducerThreads) {
thread.interrupt();
}
......
......@@ -22,9 +22,6 @@ import teetime.framework.signal.TerminatingSignal;
final class RunnableConsumerStage extends AbstractRunnableStage {
// cache the input ports here since getInputPorts() always returns a new copy
private final InputPort<?>[] inputPorts;
/**
* Creates a new instance with the {@link YieldStrategy} as default idle strategy.
*
......@@ -37,17 +34,17 @@ final class RunnableConsumerStage extends AbstractRunnableStage {
public RunnableConsumerStage(final Stage stage, final IdleStrategy idleStrategy) {
super(stage);
this.inputPorts = stage.getInputPorts(); // FIXME should getInputPorts() really be defined in Stage?
}
@SuppressWarnings("PMD.GuardLogStatement")
@Override
protected void beforeStageExecution() throws InterruptedException {
logger.trace("Waiting for start signals... " + stage);
for (InputPort<?> inputPort : inputPorts) {
logger.trace("Waiting for init signals... " + stage);
for (InputPort<?> inputPort : stage.getInputPorts()) {
inputPort.waitForInitializingSignal();
}
for (InputPort<?> inputPort : inputPorts) {
logger.trace("Waiting for start signals... " + stage);
for (InputPort<?> inputPort : stage.getInputPorts()) {
inputPort.waitForStartSignal();
}
logger.trace("Starting... " + stage);
......@@ -63,7 +60,10 @@ final class RunnableConsumerStage extends AbstractRunnableStage {
}
private void checkForTerminationSignal(final Stage stage) {
for (InputPort<?> inputPort : inputPorts) {
System.out.println("checkForTerminationSignal: " + stage);
// FIXME should getInputPorts() really be defined in Stage?
for (InputPort<?> inputPort : stage.getInputPorts()) {
System.out.println("\tclosed: " + inputPort.isClosed() + " (" + inputPort);
if (!inputPort.isClosed()) {
return;
}
......@@ -75,7 +75,7 @@ final class RunnableConsumerStage extends AbstractRunnableStage {
@Override
protected void afterStageExecution() {
final ISignal signal = new TerminatingSignal();
for (InputPort<?> inputPort : inputPorts) {
for (InputPort<?> inputPort : stage.getInputPorts()) {
stage.onSignal(signal, inputPort);
}
}
......
......@@ -21,12 +21,12 @@ import teetime.framework.signal.InitializingSignal;
import teetime.framework.signal.StartingSignal;
import teetime.framework.signal.TerminatingSignal;
final class RunnableProducerStage extends AbstractRunnableStage {
public final class RunnableProducerStage extends AbstractRunnableStage {
private final Semaphore startSemaphore = new Semaphore(0);
private final Semaphore initSemaphore = new Semaphore(0);
public RunnableProducerStage(final Stage stage) {
RunnableProducerStage(final Stage stage) {
super(stage);
}
......@@ -57,11 +57,13 @@ final class RunnableProducerStage extends AbstractRunnableStage {
startSemaphore.release();
}
public void waitForInitializingSignal() throws InterruptedException {
private void waitForInitializingSignal() throws InterruptedException {
logger.trace("waitForInitializingSignal");
initSemaphore.acquire();
}
public void waitForStartingSignal() throws InterruptedException {
private void waitForStartingSignal() throws InterruptedException {
logger.trace("waitForStartingSignal");
startSemaphore.acquire();
}
}
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.stage.basic.merger;
import teetime.framework.InputPort;
import teetime.framework.NotEnoughInputException;
/**
* @author Christian Wulf
*
* @since 2.0
*/
public final class BusyWaitingRoundRobinStrategy implements IMergerStrategy {
private int index = 0;
@Override
public <T> T getNextInput(final Merger<T> merger) {
final InputPort<T>[] inputPorts = merger.getInputPorts();
final InputPort<T> inputPort = getOpenInputPort(inputPorts);
final T token = inputPort.receive();
if (null != token) {
this.index = (this.index + 1) % inputPorts.length;
}
return token;
}
private <T> InputPort<T> getOpenInputPort(final InputPort<T>[] inputPorts) {
final int startedIndex = index;
InputPort<T> inputPort = inputPorts[this.index];
while (inputPort.isClosed()) {
this.index = (this.index + 1) % inputPorts.length;
if (index == startedIndex) {
throw new NotEnoughInputException();
}
inputPort = inputPorts[this.index];
}
return inputPort;
}
}
......@@ -102,9 +102,10 @@ public class Merger<T> extends AbstractStage {
return this.strategy;
}
@SuppressWarnings("unchecked")
@Override
public InputPort<?>[] getInputPorts() { // make public
return super.getInputPorts();
public InputPort<T>[] getInputPorts() { // make public
return (InputPort<T>[]) super.getInputPorts();
}
public InputPort<T> getNewInputPort() {
......
......@@ -28,8 +28,7 @@ public final class RoundRobinStrategy implements IMergerStrategy {
@Override
public <T> T getNextInput(final Merger<T> merger) {
@SuppressWarnings("unchecked")
InputPort<T>[] inputPorts = (InputPort<T>[]) merger.getInputPorts();
final InputPort<T>[] inputPorts = merger.getInputPorts();
int size = inputPorts.length;
// check each port at most once to avoid a potentially infinite loop
while (size-- > 0) {
......
package teetime.stage.basic.merger.dynamic;
import teetime.util.framework.port.PortActionHelper;
public class ControlledDynamicMerger<T> extends DynamicMerger<T> {
@Override
protected void checkForPendingPortActionRequest() {
try {
PortActionHelper.checkBlockingForPendingPortActionRequest(this, portActions);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
}
......@@ -3,6 +3,7 @@ package teetime.stage.basic.merger.dynamic;
import java.util.concurrent.BlockingQueue;
import teetime.framework.DynamicInputPort;
import teetime.stage.basic.merger.IMergerStrategy;
import teetime.stage.basic.merger.Merger;
import teetime.util.framework.port.PortAction;
import teetime.util.framework.port.PortActionHelper;
......@@ -11,15 +12,15 @@ public class DynamicMerger<T> extends Merger<T> {
protected final BlockingQueue<PortAction<DynamicMerger<T>>> portActions;
public DynamicMerger() {
public DynamicMerger(final IMergerStrategy strategy) {
super(strategy);
portActions = PortActionHelper.createPortActionQueue();
}
@Override
public void executeStage() {
super.executeStage(); // must be first, to throw NotEnoughInputException before checking
checkForPendingPortActionRequest();
super.executeStage();
}
protected void checkForPendingPortActionRequest() {
......
......@@ -17,13 +17,13 @@ public class RemovePortAction<T> implements PortAction<DynamicMerger<T>> {
public void execute(final DynamicMerger<T> dynamicMerger) {
InputPort<?> inputPortsToRemove;
if (dynamicMerger instanceof ControlledDynamicMerger) {
// for testing purposes only
InputPort<?>[] inputPorts = ((ControlledDynamicMerger<?>) dynamicMerger).getInputPorts();
inputPortsToRemove = inputPorts[inputPorts.length - 1];
} else {
inputPortsToRemove = inputPort;
}
// if (dynamicMerger instanceof ControlledDynamicMerger) {
// // for testing purposes only
// InputPort<?>[] inputPorts = ((ControlledDynamicMerger<?>) dynamicMerger).getInputPorts();
// inputPortsToRemove = inputPorts[inputPorts.length - 1];
// } else {
inputPortsToRemove = inputPort;
// }
dynamicMerger.removeDynamicPort((DynamicInputPort<?>) inputPortsToRemove);
}
......
......@@ -31,7 +31,7 @@ public final class PortActionHelper {
public static <T extends Stage> void checkForPendingPortActionRequest(final T stage, final BlockingQueue<PortAction<T>> portActions) {
PortAction<T> dynamicPortAction = portActions.poll();
if (null != dynamicPortAction) { // check if getPortAction() uses polling
if (null != dynamicPortAction) {
dynamicPortAction.execute(stage);
}
}
......
wiki @ 0a5bd4dd
Subproject commit 0a5bd4ddb82684ce1ae2ec84c67ff2117ebff143
......@@ -3,6 +3,7 @@ package teetime.stage.basic.merger.dynamic;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.List;
......@@ -12,11 +13,11 @@ import org.junit.Test;
import teetime.framework.Configuration;
import teetime.framework.DynamicActuator;
import teetime.framework.Execution;
import teetime.framework.RunnableProducerStage;
import teetime.framework.exceptionHandling.TerminatingExceptionListenerFactory;
import teetime.framework.signal.InitializingSignal;
import teetime.framework.signal.StartingSignal;
import teetime.stage.CollectorSink;
import teetime.stage.InitialElementProducer;
import teetime.stage.basic.merger.BusyWaitingRoundRobinStrategy;
import teetime.util.framework.port.PortAction;
public class ControlledMergerTest {
......@@ -25,10 +26,10 @@ public class ControlledMergerTest {
@Test
public void shouldWorkWithoutActionTriggers() throws Exception {
List<Integer> inputNumbers = Arrays.asList(0, 1, 2, 3, 4);
List<Integer> inputNumbers = Arrays.asList(0, 1, 2, 3, 4, 5);
@SuppressWarnings("unchecked")
PortAction<DynamicMerger<Integer>>[] inputActions = new PortAction[5];
PortAction<DynamicMerger<Integer>>[] inputActions = new PortAction[6];
for (int i = 0; i < inputActions.length; i++) {
inputActions[i] = new DoNothingPortAction<Integer>();
}
......@@ -39,7 +40,7 @@ public class ControlledMergerTest {
analysis.executeBlocking();
assertThat(config.getOutputElements(), contains(0, 1, 2, 3, 4));
assertThat(config.getOutputElements(), contains(0, 1, 2, 3, 4, 5));
}
@Test
......@@ -47,7 +48,7 @@ public class ControlledMergerTest {
List<Integer> inputNumbers = Arrays.asList(0);
@SuppressWarnings("unchecked")
PortAction<DynamicMerger<Integer>>[] inputActions = new PortAction[5];
PortAction<DynamicMerger<Integer>>[] inputActions = new PortAction[6];
for (int i = 0; i < inputActions.length; i++) {
inputActions[i] = createPortCreateAction(i + 1);
}
......@@ -88,14 +89,15 @@ public class ControlledMergerTest {
private PortAction<DynamicMerger<Integer>> createPortCreateAction(final Integer number) {
final InitialElementProducer<Integer> initialElementProducer = new InitialElementProducer<Integer>(number);
DYNAMIC_ACTUATOR.startWithinNewThread(initialElementProducer);
final Runnable runnableStage = DYNAMIC_ACTUATOR.startWithinNewThread(initialElementProducer);
PortAction<DynamicMerger<Integer>> portAction = new CreatePortAction<Integer>(initialElementProducer.getOutputPort()) {
@Override
public void execute(final DynamicMerger<Integer> dynamicDistributor) {
super.execute(dynamicDistributor);
initialElementProducer.getOutputPort().sendSignal(new InitializingSignal());
initialElementProducer.getOutputPort().sendSignal(new StartingSignal());
final RunnableProducerStage runnableProducerStage = (RunnableProducerStage) runnableStage;
runnableProducerStage.triggerInitializingSignal();
runnableProducerStage.triggerStartingSignal();
}
};
return portAction;
......@@ -107,7 +109,7 @@ public class ControlledMergerTest {
public ControlledMergerTestConfig(final List<T> elements, final List<PortAction<DynamicMerger<T>>> inputActions) {
InitialElementProducer<T> initialElementProducer = new InitialElementProducer<T>(elements);
DynamicMerger<T> merger = new ControlledDynamicMerger<T>();
DynamicMerger<T> merger = new DynamicMerger<T>(new BusyWaitingRoundRobinStrategy());
collectorSink = new CollectorSink<T>();
connectPorts(initialElementProducer.getOutputPort(), merger.getNewInputPort());
......@@ -116,7 +118,8 @@ public class ControlledMergerTest {
addThreadableStage(merger);
for (PortAction<DynamicMerger<T>> a : inputActions) {
merger.addPortActionRequest(a);
boolean added = merger.addPortActionRequest(a);
assertTrue(added);
}
}
......
......@@ -22,6 +22,9 @@
<logger name="teetime" level="INFO" />
<logger name="teetime.framework" level="TRACE" />
<logger name="teetime.stage.InitialElementProducer" level="DEBUG" />
<logger name="teetime.stage.merger" level="TRACE" />
<!-- <logger name="teetime.stage" level="TRACE" /> -->
<!-- <logger name="teetime.framework.signal" level="TRACE" /> -->
<!-- <logger name="teetime.stage" level="TRACE" /> -->
<logger name="util" level="INFO" />
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment