Skip to content
Snippets Groups Projects
Commit ef126d8e authored by Christian Wulf's avatar Christian Wulf
Browse files

Merge remote-tracking branch 'remotes/origin/master' into

collectedExceptions

Conflicts:
	src/test/java/teetime/stage/InstanceOfFilterTest.java
parents 6dbf89d8 071bf905
No related branches found
No related tags found
No related merge requests found
Showing
with 158 additions and 131 deletions
......@@ -3,7 +3,7 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://pmd.sourceforge.net/ruleset/2.0.0 http://pmd.sourceforge.net/ruleset_2_0_0.xsd">
<description>This ruleset checks my code for bad stuff</description>
<exclude-pattern>.*/target/.*</exclude-pattern>
<!-- warning: if you reference a pmd rulesets xml file more than once, only
the first rule is applied; so check for duplicates -->
......
......@@ -25,7 +25,7 @@
<java.version>1.6</java.version>
<checkstyle.version>2.16</checkstyle.version>
<findbugs.version>3.0.1</findbugs.version>
<findbugs.version>3.0.2</findbugs.version>
<pmd.version>3.5</pmd.version>
<javadoc.version>2.10.3</javadoc.version>
</properties>
......@@ -190,6 +190,25 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>${javadoc.version}</version>
<configuration>
<tags>
<tag>
<name>stage.input</name>
<placement>t</placement>
<head>Stage input:</head>
</tag>
<tag>
<name>stage.output</name>
<placement>t</placement>
<head>Stage output:</head>
</tag>
<tag>
<name>stage.sketch</name>
<placement>t</placement>
<head>Stage sketch:</head>
</tag>
</tags>
</configuration>
<executions>
<execution>
<id>attach-javadocs</id>
......@@ -203,7 +222,7 @@
<plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>nexus-staging-maven-plugin</artifactId>
<version>1.6.5</version>
<version>1.6.6</version>
<extensions>true</extensions>
<configuration>
<serverId>teetime-deployment</serverId>
......@@ -358,7 +377,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>2.8</version>
<version>2.8.1</version>
</plugin>
</plugins>
</build>
......@@ -414,6 +433,23 @@
<version>${javadoc.version}</version>
<configuration>
<destDir>${javadocOutputDir}</destDir>
<tags>
<tag>
<name>stage.input</name>
<placement>t</placement>
<head>Stage input:</head>
</tag>
<tag>
<name>stage.output</name>
<placement>t</placement>
<head>Stage output:</head>
</tag>
<tag>
<name>stage.sketch</name>
<placement>t</placement>
<head>Stage sketch:</head>
</tag>
</tags>
</configuration>
</plugin>
</plugins>
......
......@@ -19,6 +19,7 @@ import java.util.HashSet;
import java.util.Set;
import teetime.framework.Traverser.VisitorBehavior;
import teetime.framework.pipe.DummyPipe;
/**
* Searches for threadable stages
......@@ -44,4 +45,10 @@ class A1ThreadableStageCollector implements ITraverserVisitor {
return VisitorBehavior.CONTINUE;
}
@Override
public void visit(final DummyPipe pipe, final AbstractPort<?> port) {
// TODO Auto-generated method stub
}
}
......@@ -21,6 +21,7 @@ import com.carrotsearch.hppc.ObjectIntHashMap;
import com.carrotsearch.hppc.ObjectIntMap;
import teetime.framework.Traverser.VisitorBehavior;
import teetime.framework.pipe.DummyPipe;
import teetime.framework.pipe.IPipe;
/**
......@@ -90,5 +91,11 @@ public class A2InvalidThreadAssignmentCheck {
return VisitorBehavior.STOP;
}
@Override
public void visit(final DummyPipe pipe, final AbstractPort<?> port) {
// TODO Auto-generated method stub
}
}
}
......@@ -22,6 +22,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import teetime.framework.Traverser.VisitorBehavior;
import teetime.framework.pipe.DummyPipe;
import teetime.framework.pipe.IPipe;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.InstantiationPipe;
......@@ -90,4 +91,12 @@ class A3PipeInstantiation implements ITraverserVisitor {
}
}
@Override
public void visit(final DummyPipe pipe, final AbstractPort<?> port) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Unconnected port " + port + " in stage " + port.getOwningStage().getId());
}
}
}
......@@ -32,30 +32,6 @@ public abstract class AbstractCompositeStage {
*/
private static final int DEFAULT_CAPACITY = 4;
/**
* Execute this method, to add a stage to the configuration, which should be executed in a own thread.
*
* @param stage
* A arbitrary stage, which will be added to the configuration and executed in a thread.
*/
protected final void addThreadableStage(final Stage stage) {
this.addThreadableStage(stage, stage.getId());
}
/**
* Execute this method, to add a stage to the configuration, which should be executed in a own thread.
*
* @param stage
* A arbitrary stage, which will be added to the configuration and executed in a thread.
* @param threadName
* A string which can be used for debugging.
*/
protected void addThreadableStage(final Stage stage, final String threadName) {
AbstractRunnableStage runnable = AbstractRunnableStage.create(stage);
Thread newThread = new TeeTimeThread(runnable, threadName);
stage.setOwningThread(newThread);
}
/**
* Connects two ports with a pipe with a default capacity of currently {@value #DEFAULT_CAPACITY}.
*
......@@ -85,7 +61,7 @@ public abstract class AbstractCompositeStage {
protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
if (sourcePort.getOwningStage().getInputPorts().size() == 0) {
if (sourcePort.getOwningStage().getOwningThread() == null) {
addThreadableStage(sourcePort.getOwningStage(), sourcePort.getOwningStage().getId());
sourcePort.getOwningStage().declareActive();
}
}
......
......@@ -15,8 +15,10 @@
*/
package teetime.framework;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import teetime.framework.pipe.IPipe;
......@@ -27,6 +29,7 @@ import teetime.util.framework.port.PortRemovedListener;
public abstract class AbstractStage extends Stage {
private final Map<Class<? extends ISignal>, Set<InputPort<?>>> signalMap = new HashMap<Class<? extends ISignal>, Set<InputPort<?>>>();
private final Set<Class<? extends ISignal>> triggeredSignalTypes = new HashSet<Class<? extends ISignal>>();
private final PortList<InputPort<?>> inputPorts = new PortList<InputPort<?>>();
......@@ -54,7 +57,21 @@ public abstract class AbstractStage extends Stage {
@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
@Override
public void onSignal(final ISignal signal, final InputPort<?> inputPort) {
if (!this.signalAlreadyReceived(signal, inputPort)) {
Class<? extends ISignal> signalClass = signal.getClass();
Set<InputPort<?>> signalReceivedInputPorts;
if (signalMap.containsKey(signalClass)) {
signalReceivedInputPorts = signalMap.get(signalClass);
} else {
signalReceivedInputPorts = new HashSet<InputPort<?>>();
signalMap.put(signalClass, signalReceivedInputPorts);
}
if (!signalReceivedInputPorts.add(inputPort)) {
this.logger.warn("Received more than one signal - " + signal + " - from input port: " + inputPort);
return;
}
if (signal.mayBeTriggered(signalReceivedInputPorts, getInputPorts())) {
try {
signal.trigger(this);
} catch (Exception e) {
......
......@@ -31,6 +31,7 @@ public abstract class Configuration extends AbstractCompositeStage {
private final AbstractExceptionListenerFactory<?> factory;
private final ConfigurationContext context;
private boolean initialized;
private boolean executed;
private Stage startStage;
......@@ -43,11 +44,19 @@ public abstract class Configuration extends AbstractCompositeStage {
this.context = new ConfigurationContext(this);
}
boolean isExecuted() {
boolean isInitialized() {
return initialized;
}
void setInitialized(final boolean executed) {
this.initialized = executed;
}
public boolean isExecuted() {
return executed;
}
void setExecuted(final boolean executed) {
public void setExecuted(final boolean executed) {
this.executed = executed;
}
......@@ -55,10 +64,14 @@ public abstract class Configuration extends AbstractCompositeStage {
return factory;
}
@Override
protected void addThreadableStage(final Stage stage, final String threadName) {
startStage = stage; // memorize an arbitrary stage as starting point for traversing
super.addThreadableStage(stage, threadName);
/**
* Register pipes if your configuration only relies on custom pipes and therefore {@link #connectPorts(OutputPort, InputPort)} is never called.
*
* @param pipe
* A custom pipe instance
*/
protected void registerCustomPipe(final AbstractPipe<?> pipe) {
startStage = pipe.getSourcePort().getOwningStage(); // memorize an arbitrary stage as starting point for traversing
}
@Override
......
......@@ -19,7 +19,7 @@ import java.util.Set;
/**
* Represents a context that is used by a configuration and composite stages to connect ports, for example.
* Stages can be added by executing {@link #addThreadableStage(Stage)}.
* Stages can be added by executing {@link #declareActive(Stage)}.
*
* @since 2.0
*/
......
......@@ -64,10 +64,10 @@ public final class Execution<T extends Configuration> {
public Execution(final T configuration, final boolean validationEnabled) {
this.configuration = configuration;
this.configurationContext = configuration.getContext();
if (configuration.isExecuted()) {
if (configuration.isInitialized()) {
throw new IllegalStateException("Configuration was already executed");
}
configuration.setExecuted(true);
configuration.setInitialized(true);
if (validationEnabled) {
validateStages();
}
......@@ -142,6 +142,10 @@ public final class Execution<T extends Configuration> {
* @since 2.0
*/
public void executeNonBlocking() {
if (configuration.isExecuted()) {
throw new IllegalStateException("Any configuration instance may only be executed once.");
}
configuration.setExecuted(true);
configurationContext.executeConfiguration();
}
......
......@@ -16,6 +16,7 @@
package teetime.framework;
import teetime.framework.Traverser.VisitorBehavior;
import teetime.framework.pipe.DummyPipe;
public interface ITraverserVisitor {
......@@ -23,4 +24,6 @@ public interface ITraverserVisitor {
VisitorBehavior visit(AbstractPort<?> port);
void visit(DummyPipe pipe, AbstractPort<?> port);
}
......@@ -16,6 +16,7 @@
package teetime.framework;
import teetime.framework.Traverser.VisitorBehavior;
import teetime.framework.pipe.DummyPipe;
public class IntraStageCollector implements ITraverserVisitor {
......@@ -40,4 +41,10 @@ public class IntraStageCollector implements ITraverserVisitor {
return VisitorBehavior.CONTINUE;
}
@Override
public void visit(final DummyPipe pipe, final AbstractPort<?> port) {
// TODO Auto-generated method stub
}
}
......@@ -50,6 +50,8 @@ public abstract class Stage {
/** The owning thread of this stage if this stage is directly executed by a {@link AbstractRunnableStage}, <code>null</code> otherwise. */
private Thread owningThread;
private boolean isActive;
private ConfigurationContext owningContext;
ConfigurationContext getOwningContext() {
......@@ -181,4 +183,37 @@ public abstract class Stage {
protected abstract void removeDynamicPort(InputPort<?> inputPort);
public boolean isActive() {
return isActive;
}
void setActive(final boolean isActive) {
this.isActive = isActive;
}
/**
* Execute this method, to add a stage to the configuration, which should be executed in a own thread.
*
* @param stage
* A arbitrary stage, which will be added to the configuration and executed in a thread.
*/
public void declareActive() {
declareActive(getId());
}
/**
* Execute this method, to add a stage to the configuration, which should be executed in a own thread.
*
* @param stage
* A arbitrary stage, which will be added to the configuration and executed in a thread.
* @param threadName
* A string which can be used for debugging.
*/
public void declareActive(final String threadName) {
AbstractRunnableStage runnable = AbstractRunnableStage.create(this);
Thread newThread = new TeeTimeThread(runnable, threadName);
this.setOwningThread(newThread);
this.setActive(true);
}
}
......@@ -61,7 +61,7 @@ class ThreadService extends AbstractService<ThreadService> {
}
void startStageAtRuntime(final Stage newStage) {
configuration.addThreadableStage(newStage);
newStage.declareActive();
Set<Stage> newThreadableStages = initialize(newStage);
startThreads(newThreadableStages);
......
......@@ -82,6 +82,7 @@ public class Traverser {
private void visitAndTraverse(final AbstractPort<?> port, final Direction direction) {
if (port.getPipe() instanceof DummyPipe) {
traverserVisitor.visit((DummyPipe) port.getPipe(), port);
return;
}
VisitorBehavior behavior = traverserVisitor.visit(port);
......
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://christianwulf.github.io/teetime)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework.pipe;
public final class CouldNotFindPipeImplException extends RuntimeException {
private static final long serialVersionUID = 5242260988104493402L;
public CouldNotFindPipeImplException(final String key) {
super("Could not find any pipe implementation that conforms to the key: " + key);
}
}
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://christianwulf.github.io/teetime)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework.pipe;
import teetime.framework.AbstractInterThreadPipe;
import teetime.util.ConstructorClosure;
final class RelayTestPipe<T> extends AbstractInterThreadPipe<T> {
private int numInputObjects;
private final ConstructorClosure<T> inputObjectCreator;
public RelayTestPipe(final int numInputObjects, final ConstructorClosure<T> inputObjectCreator) {
super(null, null, Integer.MAX_VALUE);
this.numInputObjects = numInputObjects;
this.inputObjectCreator = inputObjectCreator;
}
@Override
public boolean add(final Object element) {
return false;
}
@Override
public boolean addNonBlocking(final Object element) {
return add(element);
}
@Override
public T removeLast() {
if (this.numInputObjects == 0) {
return null;
} else {
this.numInputObjects--;
return this.inputObjectCreator.create();
}
}
@Override
public boolean isEmpty() {
return (this.numInputObjects == 0);
}
@Override
public int size() {
return this.numInputObjects;
}
}
......@@ -30,7 +30,7 @@ public final class StartingSignal implements ISignal {
@Override
public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final List<InputPort<?>> allInputPorts) {
return true;
return receivedInputPorts.size() == 1;
}
}
......@@ -30,7 +30,7 @@ public final class TerminatingSignal implements ISignal {
@Override
public boolean mayBeTriggered(final Set<InputPort<?>> receivedInputPorts, final List<InputPort<?>> allInputPorts) {
return receivedInputPorts.size() == allInputPorts.size();
return receivedInputPorts.size() >= allInputPorts.size();
}
}
......@@ -91,7 +91,7 @@ public final class StageTester {
connectPorts(producer.getOutputPort(), inputHolder.getPort());
}
addThreadableStage(stage);
stage.declareActive();
for (OutputHolder<?> outputHolder : outputHolders) {
final CollectorSink<Object> sink = new CollectorSink<Object>(outputHolder.getOutputElements());
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment