Skip to content
Snippets Groups Projects
Commit 158da1f4 authored by Christian Wulf's avatar Christian Wulf
Browse files

fixed performance & memory issues

parent f1943544
No related branches found
No related tags found
No related merge requests found
...@@ -32,13 +32,14 @@ abstract class AbstractRunnableStage implements Runnable { ...@@ -32,13 +32,14 @@ abstract class AbstractRunnableStage implements Runnable {
@Override @Override
public final void run() { public final void run() {
this.logger.debug("Executing runnable stage..."); this.logger.debug("Executing runnable stage...");
final Stage stage = this.stage;
try { try {
beforeStageExecution(); beforeStageExecution();
do { do {
executeStage(); executeStage();
} while (!this.stage.shouldBeTerminated()); } while (!stage.shouldBeTerminated());
afterStageExecution(); afterStageExecution();
...@@ -50,7 +51,7 @@ abstract class AbstractRunnableStage implements Runnable { ...@@ -50,7 +51,7 @@ abstract class AbstractRunnableStage implements Runnable {
throw e; throw e;
} }
this.logger.debug("Finished runnable stage. (" + this.stage.getId() + ")"); this.logger.debug("Finished runnable stage. (" + stage.getId() + ")");
} }
protected abstract void beforeStageExecution(); protected abstract void beforeStageExecution();
......
...@@ -67,7 +67,7 @@ public abstract class AbstractStage extends Stage { ...@@ -67,7 +67,7 @@ public abstract class AbstractStage extends Stage {
if (!this.signalAlreadyReceived(signal, inputPort)) { if (!this.signalAlreadyReceived(signal, inputPort)) {
signal.trigger(this); signal.trigger(this);
for (OutputPort<?> outputPort : this.outputPortList) { for (OutputPort<?> outputPort : outputPortList) {
outputPort.sendSignal(signal); outputPort.sendSignal(signal);
} }
} }
...@@ -87,10 +87,14 @@ public abstract class AbstractStage extends Stage { ...@@ -87,10 +87,14 @@ public abstract class AbstractStage extends Stage {
*/ */
protected boolean signalAlreadyReceived(final ISignal signal, final InputPort<?> inputPort) { protected boolean signalAlreadyReceived(final ISignal signal, final InputPort<?> inputPort) {
if (this.triggeredSignals.contains(signal)) { if (this.triggeredSignals.contains(signal)) {
this.logger.trace("Got signal: " + signal + " again from input port: " + inputPort); if (logger.isTraceEnabled()) {
logger.trace("Got signal: " + signal + " again from input port: " + inputPort);
}
return true; return true;
} else { } else {
this.logger.trace("Got signal: " + signal + " from input port: " + inputPort); if (logger.isTraceEnabled()) {
logger.trace("Got signal: " + signal + " from input port: " + inputPort);
}
this.triggeredSignals.add(signal); this.triggeredSignals.add(signal);
return false; return false;
} }
......
...@@ -15,8 +15,6 @@ ...@@ -15,8 +15,6 @@
*/ */
package teetime.framework; package teetime.framework;
import java.util.Arrays;
import teetime.framework.idle.IdleStrategy; import teetime.framework.idle.IdleStrategy;
import teetime.framework.idle.YieldStrategy; import teetime.framework.idle.YieldStrategy;
import teetime.framework.pipe.IPipe; import teetime.framework.pipe.IPipe;
...@@ -25,6 +23,8 @@ import teetime.framework.signal.ISignal; ...@@ -25,6 +23,8 @@ import teetime.framework.signal.ISignal;
final class RunnableConsumerStage extends AbstractRunnableStage { final class RunnableConsumerStage extends AbstractRunnableStage {
private final IdleStrategy idleStrategy; private final IdleStrategy idleStrategy;
// cache the input ports here since getInputPorts() always returns a new copy
private final InputPort<?>[] inputPorts;
/** /**
* Creates a new instance with the {@link YieldStrategy} as default idle strategy. * Creates a new instance with the {@link YieldStrategy} as default idle strategy.
...@@ -39,11 +39,13 @@ final class RunnableConsumerStage extends AbstractRunnableStage { ...@@ -39,11 +39,13 @@ final class RunnableConsumerStage extends AbstractRunnableStage {
public RunnableConsumerStage(final Stage stage, final IdleStrategy idleStrategy) { public RunnableConsumerStage(final Stage stage, final IdleStrategy idleStrategy) {
super(stage); super(stage);
this.idleStrategy = idleStrategy; this.idleStrategy = idleStrategy;
this.inputPorts = stage.getInputPorts(); // FIXME should getInputPorts() really be defined in Stage?
} }
@Override @Override
protected void beforeStageExecution() { protected void beforeStageExecution() {
logger.trace("ENTRY beforeStageExecution"); logger.trace("ENTRY beforeStageExecution");
final Stage stage = this.stage;
do { do {
checkforSignals(); checkforSignals();
...@@ -76,14 +78,12 @@ final class RunnableConsumerStage extends AbstractRunnableStage { ...@@ -76,14 +78,12 @@ final class RunnableConsumerStage extends AbstractRunnableStage {
@SuppressWarnings("PMD.DataflowAnomalyAnalysis") @SuppressWarnings("PMD.DataflowAnomalyAnalysis")
private void checkforSignals() { private void checkforSignals() {
// FIXME should getInputPorts() really be defined in Stage? final Stage stage = this.stage;
InputPort<?>[] inputPorts = stage.getInputPorts();
logger.debug("Checking signals for: " + Arrays.toString(inputPorts));
for (InputPort<?> inputPort : inputPorts) { for (InputPort<?> inputPort : inputPorts) {
IPipe pipe = inputPort.getPipe(); final IPipe pipe = inputPort.getPipe();
if (pipe instanceof AbstractInterThreadPipe) { if (pipe instanceof AbstractInterThreadPipe) {
AbstractInterThreadPipe intraThreadPipe = (AbstractInterThreadPipe) pipe; final AbstractInterThreadPipe intraThreadPipe = (AbstractInterThreadPipe) pipe;
ISignal signal = intraThreadPipe.getSignal(); final ISignal signal = intraThreadPipe.getSignal();
if (null != signal) { if (null != signal) {
stage.onSignal(signal, inputPort); stage.onSignal(signal, inputPort);
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment