Skip to content
Snippets Groups Projects
Commit b8558b2d authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

Merge remote-tracking branch 'origin/master' into benchmark

Conflicts:
	src/main/java/teetime/framework/RunnableConsumerStage.java
	src/site/markdown/wiki
parents 1cb0af33 2d7466b4
No related branches found
No related tags found
No related merge requests found
Showing
with 226 additions and 90 deletions
#FindBugs User Preferences #FindBugs User Preferences
#Wed Feb 04 15:13:53 CET 2015 #Thu Feb 12 18:09:41 CET 2015
detector_threshold=3 detector_threshold=3
effort=max effort=max
excludefilter0=.fbExcludeFilterFile|true excludefilter0=.fbExcludeFilterFile|true
......
...@@ -112,6 +112,7 @@ ...@@ -112,6 +112,7 @@
<groupId>org.jctools</groupId> <groupId>org.jctools</groupId>
<artifactId>jctools-core</artifactId> <artifactId>jctools-core</artifactId>
<version>1.0</version> <version>1.0</version>
<!-- <version>1.1-SNAPSHOT</version> -->
</dependency> </dependency>
</dependencies> </dependencies>
......
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<launchConfiguration type="org.eclipse.m2e.Maven2LaunchConfigurationType">
<booleanAttribute key="M2_DEBUG_OUTPUT" value="false"/>
<stringAttribute key="M2_GOALS" value="install"/>
<booleanAttribute key="M2_NON_RECURSIVE" value="false"/>
<booleanAttribute key="M2_OFFLINE" value="false"/>
<stringAttribute key="M2_PROFILES" value=""/>
<listAttribute key="M2_PROPERTIES"/>
<stringAttribute key="M2_RUNTIME" value="EMBEDDED"/>
<booleanAttribute key="M2_SKIP_TESTS" value="true"/>
<intAttribute key="M2_THREADS" value="1"/>
<booleanAttribute key="M2_UPDATE_SNAPSHOTS" value="false"/>
<stringAttribute key="M2_USER_SETTINGS" value=""/>
<booleanAttribute key="M2_WORKSPACE_RESOLUTION" value="false"/>
<stringAttribute key="org.eclipse.jdt.launching.WORKING_DIRECTORY" value="I:/Repositories/teetime"/>
</launchConfiguration>
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<launchConfiguration type="org.eclipse.m2e.Maven2LaunchConfigurationType">
<booleanAttribute key="M2_DEBUG_OUTPUT" value="false"/>
<stringAttribute key="M2_GOALS" value="clean package"/>
<booleanAttribute key="M2_NON_RECURSIVE" value="false"/>
<booleanAttribute key="M2_OFFLINE" value="false"/>
<stringAttribute key="M2_PROFILES" value=""/>
<listAttribute key="M2_PROPERTIES"/>
<stringAttribute key="M2_RUNTIME" value="EMBEDDED"/>
<booleanAttribute key="M2_SKIP_TESTS" value="true"/>
<intAttribute key="M2_THREADS" value="1"/>
<booleanAttribute key="M2_UPDATE_SNAPSHOTS" value="false"/>
<stringAttribute key="M2_USER_SETTINGS" value=""/>
<booleanAttribute key="M2_WORKSPACE_RESOLUTION" value="false"/>
<stringAttribute key="org.eclipse.jdt.launching.WORKING_DIRECTORY" value="I:/Repositories/teetime"/>
</launchConfiguration>
...@@ -32,13 +32,14 @@ abstract class AbstractRunnableStage implements Runnable { ...@@ -32,13 +32,14 @@ abstract class AbstractRunnableStage implements Runnable {
@Override @Override
public final void run() { public final void run() {
this.logger.debug("Executing runnable stage..."); this.logger.debug("Executing runnable stage...");
final Stage stage = this.stage;
try { try {
beforeStageExecution(); beforeStageExecution();
do { do {
executeStage(); executeStage();
} while (!this.stage.shouldBeTerminated()); } while (!stage.shouldBeTerminated());
afterStageExecution(); afterStageExecution();
...@@ -50,7 +51,7 @@ abstract class AbstractRunnableStage implements Runnable { ...@@ -50,7 +51,7 @@ abstract class AbstractRunnableStage implements Runnable {
throw e; throw e;
} }
this.logger.debug("Finished runnable stage. (" + this.stage.getId() + ")"); this.logger.debug("Finished runnable stage. (" + stage.getId() + ")");
} }
protected abstract void beforeStageExecution(); protected abstract void beforeStageExecution();
......
...@@ -67,7 +67,7 @@ public abstract class AbstractStage extends Stage { ...@@ -67,7 +67,7 @@ public abstract class AbstractStage extends Stage {
if (!this.signalAlreadyReceived(signal, inputPort)) { if (!this.signalAlreadyReceived(signal, inputPort)) {
signal.trigger(this); signal.trigger(this);
for (OutputPort<?> outputPort : this.outputPortList) { for (OutputPort<?> outputPort : outputPortList) {
outputPort.sendSignal(signal); outputPort.sendSignal(signal);
} }
} }
...@@ -87,10 +87,14 @@ public abstract class AbstractStage extends Stage { ...@@ -87,10 +87,14 @@ public abstract class AbstractStage extends Stage {
*/ */
protected boolean signalAlreadyReceived(final ISignal signal, final InputPort<?> inputPort) { protected boolean signalAlreadyReceived(final ISignal signal, final InputPort<?> inputPort) {
if (this.triggeredSignals.contains(signal)) { if (this.triggeredSignals.contains(signal)) {
this.logger.trace("Got signal: " + signal + " again from input port: " + inputPort); if (logger.isTraceEnabled()) {
logger.trace("Got signal: " + signal + " again from input port: " + inputPort);
}
return true; return true;
} else { } else {
this.logger.trace("Got signal: " + signal + " from input port: " + inputPort); if (logger.isTraceEnabled()) {
logger.trace("Got signal: " + signal + " from input port: " + inputPort);
}
this.triggeredSignals.add(signal); this.triggeredSignals.add(signal);
return false; return false;
} }
......
...@@ -59,6 +59,8 @@ public final class Analysis implements UncaughtExceptionHandler { ...@@ -59,6 +59,8 @@ public final class Analysis implements UncaughtExceptionHandler {
this(configuration, false); this(configuration, false);
} }
@SuppressWarnings("PMD.ConstructorCallsOverridableMethod")
// TODO remove @SuppressWarnings if init is no longer deprecated
public Analysis(final AnalysisConfiguration configuration, final boolean validationEnabled) { public Analysis(final AnalysisConfiguration configuration, final boolean validationEnabled) {
this.configuration = configuration; this.configuration = configuration;
if (validationEnabled) { if (validationEnabled) {
......
...@@ -25,18 +25,16 @@ public final class OutputPort<T> extends AbstractPort<T> { ...@@ -25,18 +25,16 @@ public final class OutputPort<T> extends AbstractPort<T> {
/** /**
* @param element * @param element
* to be sent * to be sent; May not be <code>null</code>.
*/ */
public void send(final T element) { public void send(final T element) {
if (this.pipe.add(element)) { this.pipe.add(element);
this.pipe.reportNewElement();
}
} }
/** /**
* *
* @param signal * @param signal
* to be sent * to be sent; May not be <code>null</code>.
*/ */
public void sendSignal(final ISignal signal) { public void sendSignal(final ISignal signal) {
this.pipe.sendSignal(signal); this.pipe.sendSignal(signal);
......
...@@ -15,8 +15,6 @@ ...@@ -15,8 +15,6 @@
*/ */
package teetime.framework; package teetime.framework;
import java.util.Arrays;
import teetime.framework.idle.IdleStrategy; import teetime.framework.idle.IdleStrategy;
import teetime.framework.idle.YieldStrategy; import teetime.framework.idle.YieldStrategy;
import teetime.framework.pipe.IPipe; import teetime.framework.pipe.IPipe;
...@@ -25,6 +23,8 @@ import teetime.framework.signal.ISignal; ...@@ -25,6 +23,8 @@ import teetime.framework.signal.ISignal;
final class RunnableConsumerStage extends AbstractRunnableStage { final class RunnableConsumerStage extends AbstractRunnableStage {
private final IdleStrategy idleStrategy; private final IdleStrategy idleStrategy;
// cache the input ports here since getInputPorts() always returns a new copy
private final InputPort<?>[] inputPorts;
/** /**
* Creates a new instance with the {@link YieldStrategy} as default idle strategy. * Creates a new instance with the {@link YieldStrategy} as default idle strategy.
...@@ -39,11 +39,13 @@ final class RunnableConsumerStage extends AbstractRunnableStage { ...@@ -39,11 +39,13 @@ final class RunnableConsumerStage extends AbstractRunnableStage {
public RunnableConsumerStage(final Stage stage, final IdleStrategy idleStrategy) { public RunnableConsumerStage(final Stage stage, final IdleStrategy idleStrategy) {
super(stage); super(stage);
this.idleStrategy = idleStrategy; this.idleStrategy = idleStrategy;
this.inputPorts = stage.getInputPorts(); // FIXME should getInputPorts() really be defined in Stage?
} }
@Override @Override
protected void beforeStageExecution() { protected void beforeStageExecution() {
logger.trace("ENTRY beforeStageExecution"); logger.trace("ENTRY beforeStageExecution");
final Stage stage = this.stage;
do { do {
checkforSignals(); checkforSignals();
...@@ -76,14 +78,12 @@ final class RunnableConsumerStage extends AbstractRunnableStage { ...@@ -76,14 +78,12 @@ final class RunnableConsumerStage extends AbstractRunnableStage {
@SuppressWarnings("PMD.DataflowAnomalyAnalysis") @SuppressWarnings("PMD.DataflowAnomalyAnalysis")
private void checkforSignals() { private void checkforSignals() {
// FIXME should getInputPorts() really be defined in Stage? final Stage stage = this.stage;
InputPort<?>[] inputPorts = stage.getInputPorts();
logger.debug("Checking signals for: " + Arrays.toString(inputPorts));
for (InputPort<?> inputPort : inputPorts) { for (InputPort<?> inputPort : inputPorts) {
IPipe pipe = inputPort.getPipe(); final IPipe pipe = inputPort.getPipe();
if (pipe instanceof AbstractInterThreadPipe) { // TODO: is this needed? if (pipe instanceof AbstractInterThreadPipe) { // TODO: is this needed?
AbstractInterThreadPipe intraThreadPipe = (AbstractInterThreadPipe) pipe; final AbstractInterThreadPipe intraThreadPipe = (AbstractInterThreadPipe) pipe;
ISignal signal = intraThreadPipe.getSignal(); final ISignal signal = intraThreadPipe.getSignal();
if (null != signal) { if (null != signal) {
stage.onSignal(signal, inputPort); stage.onSignal(signal, inputPort);
} }
......
...@@ -53,7 +53,9 @@ public final class PipeFactoryLoader { ...@@ -53,7 +53,9 @@ public final class PipeFactoryLoader {
pipeFactories.add(pipeFactory); pipeFactories.add(pipeFactory);
} }
} catch (ClassNotFoundException e) { } catch (ClassNotFoundException e) {
LOGGER.warn("Could not find class: " + line, e); // NOMPD (PMD.GuardLogStatement) if (LOGGER.isWarnEnabled()) {
LOGGER.warn("Could not find class: " + line, e);
}
} catch (InstantiationException e) { } catch (InstantiationException e) {
LOGGER.warn("Could not instantiate pipe factory", e); LOGGER.warn("Could not instantiate pipe factory", e);
} catch (IllegalAccessException e) { } catch (IllegalAccessException e) {
......
...@@ -101,8 +101,10 @@ public final class PipeFactoryRegistry { ...@@ -101,8 +101,10 @@ public final class PipeFactoryRegistry {
public void register(final IPipeFactory pipeFactory) { public void register(final IPipeFactory pipeFactory) {
final String key = this.buildKey(pipeFactory.getThreadCommunication(), pipeFactory.getOrdering(), pipeFactory.isGrowable()); final String key = this.buildKey(pipeFactory.getThreadCommunication(), pipeFactory.getOrdering(), pipeFactory.isGrowable());
this.pipeFactories.put(key, pipeFactory); this.pipeFactories.put(key, pipeFactory);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Registered pipe factory: " + pipeFactory.getClass().getCanonicalName()); LOGGER.info("Registered pipe factory: " + pipeFactory.getClass().getCanonicalName());
} }
}
private String buildKey(final ThreadCommunication tc, final PipeOrdering ordering, final boolean growable) { private String buildKey(final ThreadCommunication tc, final PipeOrdering ordering, final boolean growable) {
return tc.toString() + ordering.toString() + growable; return tc.toString() + ordering.toString() + growable;
......
...@@ -36,6 +36,7 @@ public final class SingleElementPipe extends AbstractIntraThreadPipe { ...@@ -36,6 +36,7 @@ public final class SingleElementPipe extends AbstractIntraThreadPipe {
@Override @Override
public boolean add(final Object element) { public boolean add(final Object element) {
this.element = element; this.element = element;
this.reportNewElement();
return true; return true;
} }
......
...@@ -19,8 +19,6 @@ import java.util.Queue; ...@@ -19,8 +19,6 @@ import java.util.Queue;
import org.jctools.queues.QueueFactory; import org.jctools.queues.QueueFactory;
import org.jctools.queues.spec.ConcurrentQueueSpec; import org.jctools.queues.spec.ConcurrentQueueSpec;
import org.jctools.queues.spec.Ordering;
import org.jctools.queues.spec.Preference;
import teetime.framework.AbstractInterThreadPipe; import teetime.framework.AbstractInterThreadPipe;
import teetime.framework.InputPort; import teetime.framework.InputPort;
...@@ -36,7 +34,7 @@ public final class SpScPipe extends AbstractInterThreadPipe { ...@@ -36,7 +34,7 @@ public final class SpScPipe extends AbstractInterThreadPipe {
<T> SpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { <T> SpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
super(sourcePort, targetPort); super(sourcePort, targetPort);
this.queue = QueueFactory.newQueue(new ConcurrentQueueSpec(1, 1, capacity, Ordering.FIFO, Preference.THROUGHPUT)); this.queue = QueueFactory.newQueue(ConcurrentQueueSpec.createBoundedSpsc(capacity));
} }
@Deprecated @Deprecated
...@@ -53,15 +51,7 @@ public final class SpScPipe extends AbstractInterThreadPipe { ...@@ -53,15 +51,7 @@ public final class SpScPipe extends AbstractInterThreadPipe {
this.numWaits++; this.numWaits++;
Thread.yield(); Thread.yield();
} }
// this.reportNewElement();
Thread owningThread = cachedTargetStage.getOwningThread();
if (null != owningThread && isThreadWaiting(owningThread)) { // FIXME remove the null check for performance
synchronized (cachedTargetStage) {
cachedTargetStage.notify();
// LOGGER.trace("Notified: " + cachedTargetStage);
}
}
return true; return true;
} }
......
/**
* Copyright (C) 2015 TeeTime (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework.pipe;
import java.util.Queue;
import org.jctools.queues.QueueFactory;
import org.jctools.queues.spec.ConcurrentQueueSpec;
import org.jctools.queues.spec.Ordering;
import org.jctools.queues.spec.Preference;
import teetime.framework.AbstractInterThreadPipe;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
public final class UnboundedSpScPipe extends AbstractInterThreadPipe {
private final Queue<Object> queue;
<T> UnboundedSpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
super(sourcePort, targetPort);
ConcurrentQueueSpec specification = new ConcurrentQueueSpec(1, 1, 0, Ordering.FIFO, Preference.THROUGHPUT);
this.queue = QueueFactory.newQueue(specification);
}
@Override
public boolean add(final Object element) {
return this.queue.offer(element);
}
@Override
public Object removeLast() {
return this.queue.poll();
}
@Override
public boolean isEmpty() {
return this.queue.isEmpty();
}
@Override
public int size() {
return this.queue.size();
}
@Override
public Object readLast() {
return this.queue.peek();
}
}
...@@ -13,24 +13,43 @@ ...@@ -13,24 +13,43 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package teetime.framework.idle; package teetime.framework.pipe;
import teetime.framework.Stage; import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
public final class WaitStrategy implements IdleStrategy { public class UnboundedSpScPipeFactory implements IPipeFactory {
private final Stage stage; @Override
public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
return this.create(sourcePort, targetPort, 0);
}
public WaitStrategy(final Stage stage) { /**
super(); * {@inheritDoc}
this.stage = stage; *
* The capacity is ignored.
*/
@Override
public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
return new UnboundedSpScPipe(sourcePort, targetPort);
} }
@Override @Override
public void execute() throws InterruptedException { public ThreadCommunication getThreadCommunication() {
synchronized (stage) { return ThreadCommunication.INTER;
stage.wait();
} }
@Override
public PipeOrdering getOrdering() {
return PipeOrdering.QUEUE_BASED;
}
@Override
public boolean isGrowable() {
return true;
} }
} }
...@@ -20,31 +20,16 @@ import teetime.framework.OutputPort; ...@@ -20,31 +20,16 @@ import teetime.framework.OutputPort;
public final class Relay<T> extends AbstractConsumerStage<T> { public final class Relay<T> extends AbstractConsumerStage<T> {
// private final InputPort<T> inputPort = this.createInputPort();
private final OutputPort<T> outputPort = this.createOutputPort(); private final OutputPort<T> outputPort = this.createOutputPort();
// private AbstractInterThreadPipe cachedCastedInputPipe;
@Override @Override
protected void execute(final T element) { protected void execute(final T element) {
if (null == element) { if (null == element) {
// if (this.cachedCastedInputPipe.getSignal() instanceof TerminatingSignal) {
// this.terminate();
// }
// Thread.yield();
// return;
logger.trace("relay: returnNoElement");
returnNoElement(); returnNoElement();
} }
outputPort.send(element); outputPort.send(element);
} }
// @Override
// public void onStarting() throws Exception {
// super.onStarting();
// this.cachedCastedInputPipe = (AbstractInterThreadPipe) this.inputPort.getPipe();
// }
public OutputPort<T> getOutputPort() { public OutputPort<T> getOutputPort() {
return outputPort; return outputPort;
} }
......
...@@ -2,3 +2,4 @@ teetime.framework.pipe.SingleElementPipeFactory ...@@ -2,3 +2,4 @@ teetime.framework.pipe.SingleElementPipeFactory
teetime.framework.pipe.OrderedGrowableArrayPipeFactory teetime.framework.pipe.OrderedGrowableArrayPipeFactory
teetime.framework.pipe.UnorderedGrowablePipeFactory teetime.framework.pipe.UnorderedGrowablePipeFactory
teetime.framework.pipe.SpScPipeFactory teetime.framework.pipe.SpScPipeFactory
teetime.framework.pipe.UnboundedSpScPipeFactory
...@@ -17,7 +17,6 @@ package teetime.framework; ...@@ -17,7 +17,6 @@ package teetime.framework;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import java.lang.Thread.State;
import java.util.Collection; import java.util.Collection;
import org.junit.Test; import org.junit.Test;
...@@ -28,44 +27,44 @@ import com.google.common.base.Joiner; ...@@ -28,44 +27,44 @@ import com.google.common.base.Joiner;
public class RunnableConsumerStageTest { public class RunnableConsumerStageTest {
@Test // @Test
public void testWaitingInfinitely() throws Exception { // public void testWaitingInfinitely() throws Exception {
WaitStrategyConfiguration waitStrategyConfiguration = new WaitStrategyConfiguration(300, 42); // WaitStrategyConfiguration waitStrategyConfiguration = new WaitStrategyConfiguration(300, 42);
//
final Analysis analysis = new Analysis(waitStrategyConfiguration); // final Analysis analysis = new Analysis(waitStrategyConfiguration);
Thread thread = new Thread(new Runnable() { // Thread thread = new Thread(new Runnable() {
@Override // @Override
public void run() { // public void run() {
start(analysis); // FIXME react on exceptions // start(analysis); // FIXME react on exceptions
} // }
}); // });
thread.start(); // thread.start();
//
Thread.sleep(200); // Thread.sleep(200);
//
assertEquals(State.WAITING, thread.getState()); // assertEquals(State.WAITING, thread.getState());
assertEquals(0, waitStrategyConfiguration.getCollectorSink().getElements().size()); // assertEquals(0, waitStrategyConfiguration.getCollectorSink().getElements().size());
} // }
//
@Test // @Test
public void testWaitingFinitely() throws Exception { // public void testWaitingFinitely() throws Exception {
WaitStrategyConfiguration waitStrategyConfiguration = new WaitStrategyConfiguration(300, 42); // WaitStrategyConfiguration waitStrategyConfiguration = new WaitStrategyConfiguration(300, 42);
//
final Analysis analysis = new Analysis(waitStrategyConfiguration); // final Analysis analysis = new Analysis(waitStrategyConfiguration);
Thread thread = new Thread(new Runnable() { // Thread thread = new Thread(new Runnable() {
@Override // @Override
public void run() { // public void run() {
start(analysis); // FIXME react on exceptions // start(analysis); // FIXME react on exceptions
} // }
}); // });
thread.start(); // thread.start();
//
Thread.sleep(400); // Thread.sleep(400);
//
assertEquals(State.TERMINATED, thread.getState()); // assertEquals(State.TERMINATED, thread.getState());
assertEquals(42, waitStrategyConfiguration.getCollectorSink().getElements().get(0)); // assertEquals(42, waitStrategyConfiguration.getCollectorSink().getElements().get(0));
assertEquals(1, waitStrategyConfiguration.getCollectorSink().getElements().size()); // assertEquals(1, waitStrategyConfiguration.getCollectorSink().getElements().size());
} // }
@Test @Test
public void testYieldRun() throws Exception { public void testYieldRun() throws Exception {
......
...@@ -15,7 +15,6 @@ ...@@ -15,7 +15,6 @@
*/ */
package teetime.framework; package teetime.framework;
import teetime.framework.idle.WaitStrategy;
import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
...@@ -69,7 +68,7 @@ class WaitStrategyConfiguration extends AnalysisConfiguration { ...@@ -69,7 +68,7 @@ class WaitStrategyConfiguration extends AnalysisConfiguration {
Relay<Object> relay = new Relay<Object>(); Relay<Object> relay = new Relay<Object>();
CollectorSink<Object> collectorSink = new CollectorSink<Object>(); CollectorSink<Object> collectorSink = new CollectorSink<Object>();
relay.setIdleStrategy(new WaitStrategy(relay)); // relay.setIdleStrategy(new WaitStrategy(relay));
interThreadPipeFactory.create(delay.getOutputPort(), relay.getInputPort()); interThreadPipeFactory.create(delay.getOutputPort(), relay.getInputPort());
intraThreadPipeFactory.create(relay.getOutputPort(), collectorSink.getInputPort()); intraThreadPipeFactory.create(relay.getOutputPort(), collectorSink.getInputPort());
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment