Skip to content
Snippets Groups Projects
Commit d8d09246 authored by Christian Wulf's avatar Christian Wulf
Browse files

Merge remote-tracking branch 'origin/master' into DCParadigm

Conflicts:
	.settings/edu.umd.cs.findbugs.core.prefs
parents 51187bff 05df0801
Branches
Tags
1 merge request!72Dc paradigm
Showing
with 290 additions and 110 deletions
...@@ -9,13 +9,13 @@ ...@@ -9,13 +9,13 @@
<name>TeeTime</name> <name>TeeTime</name>
<description>TeeTime is a Pipe-and-Filter framework for Java</description> <description>TeeTime is a Pipe-and-Filter framework for Java</description>
<url>http://teetime.sourceforge.org</url> <url>http://christianwulf.github.io/teetime/</url>
<inceptionYear>2015</inceptionYear> <inceptionYear>2015</inceptionYear>
<licenses> <licenses>
<license> <license>
<name>Apache License, Version 2.0</name> <name>Apache License, Version 2.0</name>
<url>http://teetime.sourceforge.net/LICENSE.txt</url> <url>http://christianwulf.github.io/teetime/LICENSE.txt</url>
</license> </license>
</licenses> </licenses>
...@@ -310,12 +310,6 @@ ...@@ -310,12 +310,6 @@
<artifactId>doxia-module-markdown-teetime</artifactId> <artifactId>doxia-module-markdown-teetime</artifactId>
<version>1.6</version> <version>1.6</version>
</dependency> </dependency>
<!-- add support for ssh/scp -->
<dependency>
<groupId>org.apache.maven.wagon</groupId>
<artifactId>wagon-ssh</artifactId>
<version>2.9-SNAPSHOT</version>
</dependency>
<!-- skin --> <!-- skin -->
<dependency> <dependency>
<groupId>lt.velykis.maven.skins</groupId> <groupId>lt.velykis.maven.skins</groupId>
......
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
</properties> </properties>
<body> <body>
<release version="Snapshot" date="Daily basis" <release version="Snapshot" date="Daily basis"
description="Unstable preview of oncoming versions"> description="Unstable preview of oncoming versions - 2.0-SNAPSHOT">
<action dev="ntd" type="add" issue="33"> <action dev="ntd" type="add" issue="33">
TeeTime automatically TeeTime automatically
chooses the correct type of pipe for all connections. chooses the correct type of pipe for all connections.
...@@ -21,19 +21,24 @@ ...@@ -21,19 +21,24 @@
</action> </action>
<action dev="ntd" type="add" issue="171"> <action dev="ntd" type="add" issue="171">
Configurations are now Configurations are now
built within an AnalysisContext which is passed on to nested built within the Configuration class which is passed on to nested
CompositeStages. CompositeStages.
This removes any constraints on CompositeStages and This removes any constraints on CompositeStages and
enables therefore multiple connections and multithreading. enables therefore multiple connections and multithreading.
</action> </action>
<action dev="ntd" type="update">
Renamed Analysis to Execution
</action>
<action dev="ntd" type="remove"> <action dev="ntd" type="remove">
Marked Pair class as deprecated. Removed pair class.
</action> </action>
<action dev="ntd" type="add" issue="154"> <action dev="ntd" type="add" issue="154">
All stages will be All stages will be
initialized before starting the analysis. initialized before starting the analysis.
</action> </action>
<action dev="ntd" type="add" issue="122">
Threads can be named for better debugging.
</action>
</release> </release>
<release version="1.1.2" date="12.05.2015" description="Minor bugfixes for 1.1"> <release version="1.1.2" date="12.05.2015" description="Minor bugfixes for 1.1">
......
...@@ -49,6 +49,18 @@ public abstract class AbstractCompositeStage { ...@@ -49,6 +49,18 @@ public abstract class AbstractCompositeStage {
return dynamicContext; return dynamicContext;
} }
/**
* Execute this method, to add a stage to the configuration, which should be executed in a own thread.
*
* @param stage
* A arbitrary stage, which will be added to the configuration and executed in a thread.
* @param threadName
* A string which can be used for debugging.
*/
protected final void addThreadableStage(final Stage stage, final String threadName) {
context.addThreadableStage(stage, threadName);
}
/** /**
* Execute this method, to add a stage to the configuration, which should be executed in a own thread. * Execute this method, to add a stage to the configuration, which should be executed in a own thread.
* *
...@@ -56,7 +68,7 @@ public abstract class AbstractCompositeStage { ...@@ -56,7 +68,7 @@ public abstract class AbstractCompositeStage {
* A arbitrary stage, which will be added to the configuration and executed in a thread. * A arbitrary stage, which will be added to the configuration and executed in a thread.
*/ */
protected final void addThreadableStage(final Stage stage) { protected final void addThreadableStage(final Stage stage) {
context.addThreadableStage(stage); this.addThreadableStage(stage, stage.getId());
} }
/** /**
......
...@@ -25,11 +25,12 @@ import org.jctools.queues.spec.Preference; ...@@ -25,11 +25,12 @@ import org.jctools.queues.spec.Preference;
import teetime.framework.signal.ISignal; import teetime.framework.signal.ISignal;
import teetime.framework.signal.InitializingSignal; import teetime.framework.signal.InitializingSignal;
import teetime.util.concurrent.queue.PCBlockingQueue; import teetime.framework.signal.StartingSignal;
import teetime.util.concurrent.queue.putstrategy.PutStrategy; import teetime.util.framework.concurrent.queue.PCBlockingQueue;
import teetime.util.concurrent.queue.putstrategy.YieldPutStrategy; import teetime.util.framework.concurrent.queue.putstrategy.PutStrategy;
import teetime.util.concurrent.queue.takestrategy.SCParkTakeStrategy; import teetime.util.framework.concurrent.queue.putstrategy.YieldPutStrategy;
import teetime.util.concurrent.queue.takestrategy.TakeStrategy; import teetime.util.framework.concurrent.queue.takestrategy.SCParkTakeStrategy;
import teetime.util.framework.concurrent.queue.takestrategy.TakeStrategy;
public abstract class AbstractInterThreadPipe extends AbstractPipe { public abstract class AbstractInterThreadPipe extends AbstractPipe {
...@@ -65,16 +66,19 @@ public abstract class AbstractInterThreadPipe extends AbstractPipe { ...@@ -65,16 +66,19 @@ public abstract class AbstractInterThreadPipe extends AbstractPipe {
} }
@Override @Override
public final void waitForStartSignal() throws InterruptedException { public final void waitForInitializingSignal() throws InterruptedException {
final ISignal signal = signalQueue.take(); final ISignal signal = signalQueue.take();
if (!(signal instanceof InitializingSignal)) {
throw new IllegalStateException("Expected InitializingSignal, but was " + signal.getClass().getSimpleName());
}
cachedTargetStage.onSignal(signal, getTargetPort()); cachedTargetStage.onSignal(signal, getTargetPort());
} }
@Override @Override
public final void waitForInitializingSignal() throws InterruptedException { public final void waitForStartSignal() throws InterruptedException {
final ISignal signal = signalQueue.take(); final ISignal signal = signalQueue.take();
if (!(signal instanceof InitializingSignal)) { if (!(signal instanceof StartingSignal)) {
throw new IllegalStateException("Expected InitializingSignal, but was not the first arriving signal"); throw new IllegalStateException("Expected StartingSignal, but was " + signal.getClass().getSimpleName());
} }
cachedTargetStage.onSignal(signal, getTargetPort()); cachedTargetStage.onSignal(signal, getTargetPort());
} }
......
...@@ -19,47 +19,43 @@ import teetime.framework.pipe.IPipe; ...@@ -19,47 +19,43 @@ import teetime.framework.pipe.IPipe;
public abstract class AbstractPort<T> { public abstract class AbstractPort<T> {
private final String portName;
protected IPipe pipe;
/** /**
* The type of this port. * The type of this port.
* <p> * <p>
* <i>Used to validate the connection between two ports at runtime.</i> * <i>Used to validate the connection between two ports at runtime.</i>
* </p> * </p>
*/ */
protected final Class<T> type; private final Class<T> type;
private final Stage owningStage; private final Stage owningStage;
private final String name;
public AbstractPort(final Class<T> type, final Stage owningStage, final String portName) { protected IPipe pipe;
protected AbstractPort(final Class<T> type, final Stage owningStage, final String name) {
super(); super();
this.portName = portName;
this.type = type; this.type = type;
this.owningStage = owningStage; this.owningStage = owningStage;
this.name = name;
} }
public IPipe getPipe() { public Class<T> getType() {
return this.pipe; return this.type;
} }
public void setPipe(final IPipe pipe) { public Stage getOwningStage() {
this.pipe = pipe; return owningStage;
} }
public Class<T> getType() { public IPipe getPipe() {
return this.type; return this.pipe;
} }
public final Stage getOwningStage() { public void setPipe(final IPipe pipe) {
return owningStage; this.pipe = pipe;
} }
@Override @Override
public final String toString() { public String toString() {
if (portName == null) { return (name != null) ? name : super.toString();
return super.toString();
} else {
return portName;
}
} }
} }
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
*/ */
package teetime.framework; package teetime.framework;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
...@@ -35,6 +36,9 @@ public abstract class AbstractStage extends Stage { ...@@ -35,6 +36,9 @@ public abstract class AbstractStage extends Stage {
private OutputPort<?>[] outputPorts = new OutputPort<?>[0]; private OutputPort<?>[] outputPorts = new OutputPort<?>[0];
private StageState currentState = StageState.CREATED; private StageState currentState = StageState.CREATED;
private final Set<OutputPortRemovedListener> outputPortRemovedListeners = new HashSet<OutputPortRemovedListener>();
private final Set<InputPortRemovedListener> inputPortsRemovedListeners = new HashSet<InputPortRemovedListener>();
@Override @Override
public InputPort<?>[] getInputPorts() { public InputPort<?>[] getInputPorts() {
return inputPorts; return inputPorts;
...@@ -134,14 +138,11 @@ public abstract class AbstractStage extends Stage { ...@@ -134,14 +138,11 @@ public abstract class AbstractStage extends Stage {
* @param <T> * @param <T>
* the type of elements to be received * the type of elements to be received
* *
* @return Newly added InputPort * @return the newly added InputPort
* *
*/ */
// * @deprecated Since 1.1. Use {@link #createInputPort(Class)} instead.
@SuppressWarnings("unchecked")
// @Deprecated
protected <T> InputPort<T> createInputPort() { protected <T> InputPort<T> createInputPort() {
return (InputPort<T>) createInputPort(null, null); return createInputPort(null, null);
} }
/** /**
...@@ -153,7 +154,7 @@ public abstract class AbstractStage extends Stage { ...@@ -153,7 +154,7 @@ public abstract class AbstractStage extends Stage {
* @param <T> * @param <T>
* the type of elements to be received * the type of elements to be received
* *
* @return Newly added InputPort * @return the newly added InputPort
*/ */
protected <T> InputPort<T> createInputPort(final Class<T> type) { protected <T> InputPort<T> createInputPort(final Class<T> type) {
return createInputPort(type, null); return createInputPort(type, null);
...@@ -167,14 +168,11 @@ public abstract class AbstractStage extends Stage { ...@@ -167,14 +168,11 @@ public abstract class AbstractStage extends Stage {
* @param <T> * @param <T>
* the type of elements to be received * the type of elements to be received
* *
* @return Newly added InputPort * @return the newly added InputPort
* *
*/ */
// * @deprecated Since 1.1. Use {@link #createInputPort(Class)} instead.
@SuppressWarnings("unchecked")
// @Deprecated
protected <T> InputPort<T> createInputPort(final String name) { protected <T> InputPort<T> createInputPort(final String name) {
return (InputPort<T>) createInputPort(null, name); return createInputPort(null, name);
} }
/** /**
...@@ -187,7 +185,7 @@ public abstract class AbstractStage extends Stage { ...@@ -187,7 +185,7 @@ public abstract class AbstractStage extends Stage {
* @param <T> * @param <T>
* the type of elements to be received * the type of elements to be received
* *
* @return Newly added InputPort * @return the newly added InputPort
*/ */
protected <T> InputPort<T> createInputPort(final Class<T> type, final String name) { protected <T> InputPort<T> createInputPort(final Class<T> type, final String name) {
final InputPort<T> inputPort = new InputPort<T>(type, this, name); final InputPort<T> inputPort = new InputPort<T>(type, this, name);
...@@ -201,14 +199,11 @@ public abstract class AbstractStage extends Stage { ...@@ -201,14 +199,11 @@ public abstract class AbstractStage extends Stage {
* @param <T> * @param <T>
* the type of elements to be sent * the type of elements to be sent
* *
* @return Newly added OutputPort * @return the newly added OutputPort
* *
*/ */
// * @deprecated Since 1.1. Use {@link #createOutputPort(Class)} instead.
@SuppressWarnings("unchecked")
// @Deprecated
protected <T> OutputPort<T> createOutputPort() { protected <T> OutputPort<T> createOutputPort() {
return (OutputPort<T>) createOutputPort(null, null); return createOutputPort(null, null);
} }
/** /**
...@@ -220,12 +215,10 @@ public abstract class AbstractStage extends Stage { ...@@ -220,12 +215,10 @@ public abstract class AbstractStage extends Stage {
* @param <T> * @param <T>
* the type of elements to be sent * the type of elements to be sent
* *
* @return Newly added OutputPort * @return the newly added OutputPort
*/ */
protected <T> OutputPort<T> createOutputPort(final Class<T> type) { protected <T> OutputPort<T> createOutputPort(final Class<T> type) {
final OutputPort<T> outputPort = new OutputPort<T>(type, this, null); return createOutputPort(type, null);
outputPorts = addElementToArray(outputPort, outputPorts);
return outputPort;
} }
/** /**
...@@ -237,14 +230,11 @@ public abstract class AbstractStage extends Stage { ...@@ -237,14 +230,11 @@ public abstract class AbstractStage extends Stage {
* @param <T> * @param <T>
* the type of elements to be sent * the type of elements to be sent
* *
* @return Newly added OutputPort * @return the newly added OutputPort
* *
*/ */
// * @deprecated Since 1.1. Use {@link #createOutputPort(Class)} instead.
@SuppressWarnings("unchecked")
// @Deprecated
protected <T> OutputPort<T> createOutputPort(final String name) { protected <T> OutputPort<T> createOutputPort(final String name) {
return (OutputPort<T>) createOutputPort(null, name); return createOutputPort(null, name);
} }
/** /**
...@@ -258,7 +248,7 @@ public abstract class AbstractStage extends Stage { ...@@ -258,7 +248,7 @@ public abstract class AbstractStage extends Stage {
* @param <T> * @param <T>
* the type of elements to be sent * the type of elements to be sent
* *
* @return Newly added OutputPort * @return the newly added OutputPort
*/ */
protected <T> OutputPort<T> createOutputPort(final Class<T> type, final String name) { protected <T> OutputPort<T> createOutputPort(final Class<T> type, final String name) {
final OutputPort<T> outputPort = new OutputPort<T>(type, this, name); final OutputPort<T> outputPort = new OutputPort<T>(type, this, name);
...@@ -266,7 +256,7 @@ public abstract class AbstractStage extends Stage { ...@@ -266,7 +256,7 @@ public abstract class AbstractStage extends Stage {
return outputPort; return outputPort;
} }
private <T> T[] addElementToArray(final T element, final T[] srcArray) { private <T, E extends T> T[] addElementToArray(final E element, final T[] srcArray) {
T[] newOutputPorts = Arrays.copyOf(srcArray, srcArray.length + 1); T[] newOutputPorts = Arrays.copyOf(srcArray, srcArray.length + 1);
newOutputPorts[srcArray.length] = element; newOutputPorts[srcArray.length] = element;
return newOutputPorts; return newOutputPorts;
...@@ -302,4 +292,68 @@ public abstract class AbstractStage extends Stage { ...@@ -302,4 +292,68 @@ public abstract class AbstractStage extends Stage {
return TerminationStrategy.BY_SIGNAL; return TerminationStrategy.BY_SIGNAL;
} }
protected <T> DynamicOutputPort<T> createDynamicOutputPort() {
final DynamicOutputPort<T> outputPort = new DynamicOutputPort<T>(null, this, outputPorts.length);
outputPorts = addElementToArray(outputPort, outputPorts);
return outputPort;
}
protected <T> DynamicInputPort<T> createDynamicInputPort() {
final DynamicInputPort<T> inputPort = new DynamicInputPort<T>(null, this, inputPorts.length);
inputPorts = addElementToArray(inputPort, inputPorts);
return inputPort;
}
@Override
protected void removeDynamicPort(final DynamicOutputPort<?> dynamicOutputPort) {
int index = dynamicOutputPort.getIndex();
List<OutputPort<?>> tempOutputPorts = new ArrayList<OutputPort<?>>(Arrays.asList(outputPorts));
OutputPort<?> removedOutputPort = tempOutputPorts.remove(index);
for (int i = index; i < tempOutputPorts.size(); i++) {
OutputPort<?> outputPort = tempOutputPorts.get(i);
if (outputPort instanceof DynamicOutputPort) {
((DynamicOutputPort<?>) outputPort).setIndex(i);
}
}
outputPorts = tempOutputPorts.toArray(new OutputPort[0]);
firePortRemoved(removedOutputPort);
}
private void firePortRemoved(final OutputPort<?> removedOutputPort) {
for (OutputPortRemovedListener listener : outputPortRemovedListeners) {
listener.onOutputPortRemoved(this, removedOutputPort);
}
}
protected final void addOutputPortRemovedListener(final OutputPortRemovedListener outputPortRemovedListener) {
outputPortRemovedListeners.add(outputPortRemovedListener);
}
@Override
protected void removeDynamicPort(final DynamicInputPort<?> dynamicInputPort) {
int index = dynamicInputPort.getIndex();
List<InputPort<?>> tempInputPorts = new ArrayList<InputPort<?>>(Arrays.asList(inputPorts));
InputPort<?> removedInputPort = tempInputPorts.remove(index);
for (int i = index; i < tempInputPorts.size(); i++) {
InputPort<?> inputPort = tempInputPorts.get(i);
if (inputPort instanceof DynamicInputPort) {
((DynamicInputPort<?>) inputPort).setIndex(i);
}
}
inputPorts = tempInputPorts.toArray(new InputPort[0]);
firePortRemoved(removedInputPort);
}
private void firePortRemoved(final InputPort<?> removedInputPort) {
for (InputPortRemovedListener listener : inputPortsRemovedListeners) {
listener.onInputPortRemoved(this, removedInputPort);
}
}
protected final void addInputPortRemovedListener(final InputPortRemovedListener outputPortRemovedListener) {
inputPortsRemovedListeners.add(outputPortRemovedListener);
}
} }
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework; package teetime.framework;
/** /**
......
...@@ -15,8 +15,8 @@ ...@@ -15,8 +15,8 @@
*/ */
package teetime.framework; package teetime.framework;
import java.util.HashSet; import java.util.HashMap;
import java.util.Set; import java.util.Map;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -33,19 +33,19 @@ public final class ConfigurationContext { ...@@ -33,19 +33,19 @@ public final class ConfigurationContext {
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigurationContext.class); private static final Logger LOGGER = LoggerFactory.getLogger(ConfigurationContext.class);
private final Set<Stage> threadableStages = new HashSet<Stage>(); private final Map<Stage, String> threadableStages = new HashMap<Stage, String>();
ConfigurationContext() {} ConfigurationContext() {}
Set<Stage> getThreadableStages() { Map<Stage, String> getThreadableStages() {
return this.threadableStages; return this.threadableStages;
} }
/** /**
* @see AbstractCompositeStage#addThreadableStage(Stage) * @see AbstractCompositeStage#addThreadableStage(Stage)
*/ */
final void addThreadableStage(final Stage stage) { final void addThreadableStage(final Stage stage, final String threadName) {
if (!this.threadableStages.add(stage)) { if (this.threadableStages.put(stage, threadName) != null) {
LOGGER.warn("Stage " + stage.getId() + " was already marked as threadable stage."); LOGGER.warn("Stage " + stage.getId() + " was already marked as threadable stage.");
} }
} }
...@@ -54,8 +54,10 @@ public final class ConfigurationContext { ...@@ -54,8 +54,10 @@ public final class ConfigurationContext {
* @see AbstractCompositeStage#connectPorts(OutputPort, InputPort, int) * @see AbstractCompositeStage#connectPorts(OutputPort, InputPort, int)
*/ */
final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
if (sourcePort.getOwningStage().getInputPorts().length == 0 && !threadableStages.contains(sourcePort.getOwningStage())) { if (sourcePort.getOwningStage().getInputPorts().length == 0) {
addThreadableStage(sourcePort.getOwningStage()); if (!threadableStages.containsKey(sourcePort.getOwningStage())) {
addThreadableStage(sourcePort.getOwningStage(), sourcePort.getOwningStage().getId());
}
} }
if (sourcePort.getPipe() != null || targetPort.getPipe() != null) { if (sourcePort.getPipe() != null || targetPort.getPipe() != null) {
LOGGER.warn("Overwriting existing pipe while connecting stages " + LOGGER.warn("Overwriting existing pipe while connecting stages " +
......
package teetime.framework;
public class DynamicActuator {
/**
* @deprecated Use {@link #startWithinNewThread(Stage)} instead.
*/
@Deprecated
public AbstractRunnableStage wrap(final Stage stage) {
if (stage.getInputPorts().length > 0) {
return new RunnableConsumerStage(stage);
}
return new RunnableProducerStage(stage);
}
public Runnable startWithinNewThread(final Stage stage) {
Runnable runnable = wrap(stage);
Thread thread = new Thread(runnable);
thread.start();
return runnable;
}
}
package teetime.framework;
/**
*
* @author Christian Wulf
*
* @param <T>
* the type of elements to be received
*
* @since 1.2
*/
public final class DynamicInputPort<T> extends InputPort<T> {
private int index;
DynamicInputPort(final Class<T> type, final Stage owningStage, final int index) {
super(type, owningStage, null);
this.index = index;
}
public int getIndex() {
return index;
}
public void setIndex(final int index) {
this.index = index;
}
}
package teetime.framework;
/**
*
* @author Christian Wulf
*
* @param <T>
* the type of elements to be sent
*
* @since 1.2
*/
public final class DynamicOutputPort<T> extends OutputPort<T> {
private int index;
DynamicOutputPort(final Class<T> type, final Stage owningStage, final int index) {
super(type, owningStage, null);
this.index = index;
}
public int getIndex() {
return index;
}
public void setIndex(final int index) {
this.index = index;
}
}
...@@ -19,6 +19,7 @@ import java.lang.Thread.UncaughtExceptionHandler; ...@@ -19,6 +19,7 @@ import java.lang.Thread.UncaughtExceptionHandler;
import java.util.Collection; import java.util.Collection;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
...@@ -27,7 +28,7 @@ import org.slf4j.LoggerFactory; ...@@ -27,7 +28,7 @@ import org.slf4j.LoggerFactory;
import teetime.framework.exceptionHandling.AbstractExceptionListener; import teetime.framework.exceptionHandling.AbstractExceptionListener;
import teetime.framework.exceptionHandling.IExceptionListenerFactory; import teetime.framework.exceptionHandling.IExceptionListenerFactory;
import teetime.framework.exceptionHandling.IgnoringExceptionListenerFactory; import teetime.framework.exceptionHandling.TerminatingExceptionListenerFactory;
import teetime.framework.signal.InitializingSignal; import teetime.framework.signal.InitializingSignal;
import teetime.framework.signal.ValidatingSignal; import teetime.framework.signal.ValidatingSignal;
import teetime.framework.validation.AnalysisNotValidException; import teetime.framework.validation.AnalysisNotValidException;
...@@ -72,11 +73,11 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti ...@@ -72,11 +73,11 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti
* to be used for the analysis * to be used for the analysis
*/ */
public Execution(final T configuration) { public Execution(final T configuration) {
this(configuration, false, new IgnoringExceptionListenerFactory()); this(configuration, false, new TerminatingExceptionListenerFactory());
} }
public Execution(final T configuration, final boolean validationEnabled) { public Execution(final T configuration, final boolean validationEnabled) {
this(configuration, validationEnabled, new IgnoringExceptionListenerFactory()); this(configuration, validationEnabled, new TerminatingExceptionListenerFactory());
} }
/** /**
...@@ -102,8 +103,8 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti ...@@ -102,8 +103,8 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti
// BETTER validate concurrently // BETTER validate concurrently
private void validateStages() { private void validateStages() {
final Set<Stage> threadableStageJobs = this.configuration.getContext().getThreadableStages(); final Map<Stage, String> threadableStageJobs = this.configuration.getContext().getThreadableStages();
for (Stage stage : threadableStageJobs) { for (Stage stage : threadableStageJobs.keySet()) {
// // portConnectionValidator.validate(stage); // // portConnectionValidator.validate(stage);
// } // }
...@@ -123,7 +124,7 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti ...@@ -123,7 +124,7 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti
ExecutionInstantiation executionInstantiation = new ExecutionInstantiation(configuration.getContext()); ExecutionInstantiation executionInstantiation = new ExecutionInstantiation(configuration.getContext());
executionInstantiation.instantiatePipes(); executionInstantiation.instantiatePipes();
final Set<Stage> threadableStageJobs = this.configuration.getContext().getThreadableStages(); final Set<Stage> threadableStageJobs = this.configuration.getContext().getThreadableStages().keySet();
if (threadableStageJobs.isEmpty()) { if (threadableStageJobs.isEmpty()) {
throw new IllegalStateException("No stage was added using the addThreadableStage(..) method. Add at least one stage."); throw new IllegalStateException("No stage was added using the addThreadableStage(..) method. Add at least one stage.");
} }
...@@ -182,7 +183,7 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti ...@@ -182,7 +183,7 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti
private Thread createThread(final AbstractRunnableStage runnable, final String name) { private Thread createThread(final AbstractRunnableStage runnable, final String name) {
final Thread thread = new Thread(runnable); final Thread thread = new Thread(runnable);
thread.setUncaughtExceptionHandler(this); thread.setUncaughtExceptionHandler(this);
thread.setName(name); thread.setName(configuration.getContext().getThreadableStages().get(runnable.stage));
return thread; return thread;
} }
...@@ -203,10 +204,12 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti ...@@ -203,10 +204,12 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti
*/ */
public void waitForTermination() { public void waitForTermination() {
try { try {
LOGGER.debug("Waiting for finiteProducerThreads");
for (Thread thread : this.finiteProducerThreads) { for (Thread thread : this.finiteProducerThreads) {
thread.join(); thread.join();
} }
LOGGER.debug("Waiting for consumerThreads");
for (Thread thread : this.consumerThreads) { for (Thread thread : this.consumerThreads) {
thread.join(); thread.join();
} }
...@@ -221,6 +224,7 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti ...@@ -221,6 +224,7 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti
} }
} }
LOGGER.debug("Interrupting infiniteProducerThreads...");
for (Thread thread : this.infiniteProducerThreads) { for (Thread thread : this.infiniteProducerThreads) {
thread.interrupt(); thread.interrupt();
} }
...@@ -300,7 +304,7 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti ...@@ -300,7 +304,7 @@ public final class Execution<T extends Configuration> implements UncaughtExcepti
if (!executionInterrupted) { if (!executionInterrupted) {
executionInterrupted = true; executionInterrupted = true;
LOGGER.warn("Thread " + thread + " was interrupted. Terminating analysis now."); LOGGER.warn("Thread " + thread + " was interrupted. Terminating analysis now.");
for (Stage stage : configuration.getContext().getThreadableStages()) { for (Stage stage : configuration.getContext().getThreadableStages().keySet()) {
if (stage.getOwningThread() != thread) { if (stage.getOwningThread() != thread) {
if (stage.getTerminationStrategy() == TerminationStrategy.BY_SELF_DECISION) { if (stage.getTerminationStrategy() == TerminationStrategy.BY_SELF_DECISION) {
stage.terminate(); stage.terminate();
......
...@@ -45,7 +45,7 @@ class ExecutionInstantiation { ...@@ -45,7 +45,7 @@ class ExecutionInstantiation {
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
Integer colorAndConnectStages(final Integer i, final Map<Stage, Integer> colors, final Stage threadableStage, final ConfigurationContext configuration) { Integer colorAndConnectStages(final Integer i, final Map<Stage, Integer> colors, final Stage threadableStage, final ConfigurationContext configuration) {
Integer createdConnections = new Integer(0); Integer createdConnections = new Integer(0);
Set<Stage> threadableStageJobs = configuration.getThreadableStages(); Set<Stage> threadableStageJobs = configuration.getThreadableStages().keySet();
for (OutputPort outputPort : threadableStage.getOutputPorts()) { for (OutputPort outputPort : threadableStage.getOutputPorts()) {
if (outputPort.pipe != null) { if (outputPort.pipe != null) {
if (outputPort.pipe instanceof InstantiationPipe) { if (outputPort.pipe instanceof InstantiationPipe) {
...@@ -82,7 +82,7 @@ class ExecutionInstantiation { ...@@ -82,7 +82,7 @@ class ExecutionInstantiation {
void instantiatePipes() { void instantiatePipes() {
Integer i = new Integer(0); Integer i = new Integer(0);
Map<Stage, Integer> colors = new HashMap<Stage, Integer>(); Map<Stage, Integer> colors = new HashMap<Stage, Integer>();
Set<Stage> threadableStageJobs = configuration.getThreadableStages(); Set<Stage> threadableStageJobs = configuration.getThreadableStages().keySet();
Integer createdConnections = 0; Integer createdConnections = 0;
for (Stage threadableStage : threadableStageJobs) { for (Stage threadableStage : threadableStageJobs) {
i++; i++;
......
...@@ -24,7 +24,7 @@ package teetime.framework; ...@@ -24,7 +24,7 @@ package teetime.framework;
* *
* @since 1.0 * @since 1.0
*/ */
public final class InputPort<T> extends AbstractPort<T> { public class InputPort<T> extends AbstractPort<T> {
InputPort(final Class<T> type, final Stage owningStage, final String portName) { InputPort(final Class<T> type, final Stage owningStage, final String portName) {
super(type, owningStage, portName); super(type, owningStage, portName);
......
package teetime.framework;
public interface InputPortRemovedListener {
void onInputPortRemoved(Stage stage, InputPort<?> removedInputPort);
}
...@@ -27,7 +27,7 @@ import teetime.framework.signal.TerminatingSignal; ...@@ -27,7 +27,7 @@ import teetime.framework.signal.TerminatingSignal;
* *
* @since 1.0 * @since 1.0
*/ */
public final class OutputPort<T> extends AbstractPort<T> { public class OutputPort<T> extends AbstractPort<T> {
OutputPort(final Class<T> type, final Stage owningStage, final String portName) { OutputPort(final Class<T> type, final Stage owningStage, final String portName) {
super(type, owningStage, portName); super(type, owningStage, portName);
......
package teetime.framework;
public interface OutputPortRemovedListener {
void onOutputPortRemoved(Stage stage, OutputPort<?> removedOutputPort);
}
...@@ -15,39 +15,30 @@ ...@@ -15,39 +15,30 @@
*/ */
package teetime.framework; package teetime.framework;
import teetime.framework.idle.IdleStrategy;
import teetime.framework.idle.YieldStrategy;
import teetime.framework.signal.ISignal; import teetime.framework.signal.ISignal;
import teetime.framework.signal.TerminatingSignal; import teetime.framework.signal.TerminatingSignal;
final class RunnableConsumerStage extends AbstractRunnableStage { final class RunnableConsumerStage extends AbstractRunnableStage {
// cache the input ports here since getInputPorts() always returns a new copy
private final InputPort<?>[] inputPorts;
/** /**
* Creates a new instance with the {@link YieldStrategy} as default idle strategy. * Creates a new instance.
* *
* @param stage * @param stage
* to execute within an own thread * to execute within an own thread
*/ */
public RunnableConsumerStage(final Stage stage) { public RunnableConsumerStage(final Stage stage) {
this(stage, new YieldStrategy());
}
public RunnableConsumerStage(final Stage stage, final IdleStrategy idleStrategy) {
super(stage); super(stage);
this.inputPorts = stage.getInputPorts(); // FIXME should getInputPorts() really be defined in Stage?
} }
@SuppressWarnings("PMD.GuardLogStatement") @SuppressWarnings("PMD.GuardLogStatement")
@Override @Override
protected void beforeStageExecution() throws InterruptedException { protected void beforeStageExecution() throws InterruptedException {
logger.trace("Waiting for start signals... " + stage); logger.trace("Waiting for init signals... " + stage);
for (InputPort<?> inputPort : inputPorts) { for (InputPort<?> inputPort : stage.getInputPorts()) {
inputPort.waitForInitializingSignal(); inputPort.waitForInitializingSignal();
} }
for (InputPort<?> inputPort : inputPorts) { logger.trace("Waiting for start signals... " + stage);
for (InputPort<?> inputPort : stage.getInputPorts()) {
inputPort.waitForStartSignal(); inputPort.waitForStartSignal();
} }
logger.trace("Starting... " + stage); logger.trace("Starting... " + stage);
...@@ -63,7 +54,10 @@ final class RunnableConsumerStage extends AbstractRunnableStage { ...@@ -63,7 +54,10 @@ final class RunnableConsumerStage extends AbstractRunnableStage {
} }
private void checkForTerminationSignal(final Stage stage) { private void checkForTerminationSignal(final Stage stage) {
for (InputPort<?> inputPort : inputPorts) { System.out.println("checkForTerminationSignal: " + stage);
// FIXME should getInputPorts() really be defined in Stage?
for (InputPort<?> inputPort : stage.getInputPorts()) {
System.out.println("\tclosed: " + inputPort.isClosed() + " (" + inputPort);
if (!inputPort.isClosed()) { if (!inputPort.isClosed()) {
return; return;
} }
...@@ -75,7 +69,7 @@ final class RunnableConsumerStage extends AbstractRunnableStage { ...@@ -75,7 +69,7 @@ final class RunnableConsumerStage extends AbstractRunnableStage {
@Override @Override
protected void afterStageExecution() { protected void afterStageExecution() {
final ISignal signal = new TerminatingSignal(); final ISignal signal = new TerminatingSignal();
for (InputPort<?> inputPort : inputPorts) { for (InputPort<?> inputPort : stage.getInputPorts()) {
stage.onSignal(signal, inputPort); stage.onSignal(signal, inputPort);
} }
} }
......
...@@ -21,12 +21,12 @@ import teetime.framework.signal.InitializingSignal; ...@@ -21,12 +21,12 @@ import teetime.framework.signal.InitializingSignal;
import teetime.framework.signal.StartingSignal; import teetime.framework.signal.StartingSignal;
import teetime.framework.signal.TerminatingSignal; import teetime.framework.signal.TerminatingSignal;
final class RunnableProducerStage extends AbstractRunnableStage { public final class RunnableProducerStage extends AbstractRunnableStage {
private final Semaphore startSemaphore = new Semaphore(0); private final Semaphore startSemaphore = new Semaphore(0);
private final Semaphore initSemaphore = new Semaphore(0); private final Semaphore initSemaphore = new Semaphore(0);
public RunnableProducerStage(final Stage stage) { RunnableProducerStage(final Stage stage) {
super(stage); super(stage);
} }
...@@ -57,11 +57,13 @@ final class RunnableProducerStage extends AbstractRunnableStage { ...@@ -57,11 +57,13 @@ final class RunnableProducerStage extends AbstractRunnableStage {
startSemaphore.release(); startSemaphore.release();
} }
public void waitForInitializingSignal() throws InterruptedException { private void waitForInitializingSignal() throws InterruptedException {
logger.trace("waitForInitializingSignal");
initSemaphore.acquire(); initSemaphore.acquire();
} }
public void waitForStartingSignal() throws InterruptedException { private void waitForStartingSignal() throws InterruptedException {
logger.trace("waitForStartingSignal");
startSemaphore.acquire(); startSemaphore.acquire();
} }
} }
...@@ -148,4 +148,8 @@ public abstract class Stage { ...@@ -148,4 +148,8 @@ public abstract class Stage {
this.exceptionHandler = exceptionHandler; this.exceptionHandler = exceptionHandler;
} }
protected abstract void removeDynamicPort(DynamicOutputPort<?> dynamicOutputPort);
protected abstract void removeDynamicPort(DynamicInputPort<?> dynamicInputPort);
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment