diff --git a/conf/logging.properties b/conf/logging.properties index fa34640968e15769d80c5dc40b51f5d226e26ec6..7e34493daab251c1f3ec04fe34770320dfa97598 100644 --- a/conf/logging.properties +++ b/conf/logging.properties @@ -1,12 +1,13 @@ .handlers = java.util.logging.ConsoleHandler -.level = WARNING +.level = ALL java.util.logging.ConsoleHandler.level = ALL #java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %4$s: %5$s (%2$s)%n -#teetime.level = ALL +teetime.level = ALL -#teetime.variant.methodcallWithPorts.framework.core.level = ALL -#teetime.variant.methodcallWithPorts.stage.level = FINE +teetime.variant.methodcallWithPorts.framework.level = ALL +teetime.variant.methodcallWithPorts.framework.core.level = ALL +teetime.variant.methodcallWithPorts.stage.level = FINE #teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads.level = FINE diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java index 9c3b790518da813762d492e4ef13c56fbfdd61e6..e2014a6e50c44346363b8281a3e1f1824b3e1e55 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java @@ -107,6 +107,9 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { */ @Override public void onSignal(final Signal signal, final InputPort<?> inputPort) { + this.logger.debug("Got signal: " + signal + " from input port: " + inputPort); + // System.out.println("Got signal: " + signal + " from input port: " + this.getClass().getSimpleName() + "." + inputPort); + switch (signal) { case FINISHED: this.onFinished(); @@ -116,11 +119,12 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { break; } - this.getOutputPort().sendSignal(signal); + this.outputPort.sendSignal(signal); } protected void onFinished() { // empty default implementation + this.onIsPipelineHead(); } @Override diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ConsumerStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ConsumerStage.java index e43c0f44302f8c8d9d16ad7334866bd81ed88424..9f17e535ed1d436660afdbf2bfb10e09f6af55ae 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ConsumerStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ConsumerStage.java @@ -1,6 +1,5 @@ package teetime.variant.methodcallWithPorts.framework.core; - public abstract class ConsumerStage<I, O> extends AbstractStage<I, O> { @Override diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java index 7500e4de2a40f69122046c664ead4c050133c6db..e95b94fac2f0df6ed4711438687e6434a660d63d 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java @@ -35,7 +35,9 @@ public class OutputPort<T> { } public void sendSignal(final Signal signal) { - this.pipe.setSignal(signal); + if (this.pipe != null) { // if the output port is connected with a pipe + this.pipe.setSignal(signal); + } } } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java index 3a90f164165a7608023202bb7f726c16d0cfd17c..02fdaa01254a3d4e2f874329731e43e8f50df5c2 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java @@ -68,25 +68,27 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { // headStage.sendFinishedSignalToAllSuccessorStages(); // this.updateRescheduable(headStage); - } - - private final void updateRescheduable(final StageWithPort<?, ?> stage) { - StageWithPort<?, ?> currentStage = stage; - do { - this.firstStageIndex++; - // currentStage = currentStage.getOutputPort().getPipe().getTargetStage(); // FIXME what to do with a stage with more than one output port? - // if (currentStage == null) { // loop reaches the last stage - if (this.firstStageIndex == this.stages.length) { // loop reaches the last stage - this.setReschedulable(false); - this.cleanUp(); - return; - } - currentStage = this.stages[this.firstStageIndex]; - currentStage.onIsPipelineHead(); - } while (!currentStage.isReschedulable()); - this.setReschedulable(true); - } + // this.setReschedulable(headStage.isReschedulable()); + } + + // private final void updateRescheduable(final StageWithPort<?, ?> stage) { + // StageWithPort<?, ?> currentStage = stage; + // do { + // this.firstStageIndex++; + // // currentStage = currentStage.getOutputPort().getPipe().getTargetStage(); // FIXME what to do with a stage with more than one output port? + // // if (currentStage == null) { // loop reaches the last stage + // if (this.firstStageIndex == this.stages.length) { // loop reaches the last stage + // this.setReschedulable(false); + // this.cleanUp(); + // return; + // } + // currentStage = this.stages[this.firstStageIndex]; + // currentStage.onIsPipelineHead(); + // } while (!currentStage.isReschedulable()); + // + // this.setReschedulable(true); + // } @Override public void onIsPipelineHead() { @@ -133,12 +135,13 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { @Override public boolean isReschedulable() { - return this.reschedulable; + // return this.reschedulable; + return this.firstStage.isReschedulable(); } - public void setReschedulable(final boolean reschedulable) { - this.reschedulable = reschedulable; - } + // public void setReschedulable(final boolean reschedulable) { + // this.reschedulable = reschedulable; + // } @Override public InputPort<I> getInputPort() { @@ -166,7 +169,7 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { @Override public void onSignal(final Signal signal, final InputPort<?> inputPort) { - throw new IllegalStateException("Should not be used since the signal is directly passed via the first stage's input port."); + this.firstStage.onSignal(signal, inputPort); } } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ProducerStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ProducerStage.java index f960b56909a839084861458da15f59abb74b02fa..44646b809ffcc9ea66b8b25755300e299aab2018 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ProducerStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ProducerStage.java @@ -1,6 +1,5 @@ package teetime.variant.methodcallWithPorts.framework.core; - public abstract class ProducerStage<I, O> extends AbstractStage<I, O> { public ProducerStage() { @@ -9,6 +8,10 @@ public abstract class ProducerStage<I, O> extends AbstractStage<I, O> { @Override public void executeWithPorts() { + // if (this.logger.isDebugEnabled()) { + // this.logger.debug("Executing stage..."); + // } + this.execute5(null); // if (!this.getOutputPort().pipe.isEmpty()) { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java index 62b236375f1e4a784eb3c440b8e486130cd71469..cb1e9329ef0c8c633e037e77d4c67709598cd723 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java @@ -15,6 +15,8 @@ public class RunnableStage<I> implements Runnable { @Override public void run() { + this.logger.debug("Executing runnable stage..."); + try { this.stage.onStart(); diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IntraThreadPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IntraThreadPipe.java index 997665e1c82faa7c20ec867de6adabcb3c6dbef9..59af28e52ad6164f781e0915196ad3db6d74ac08 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IntraThreadPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IntraThreadPipe.java @@ -6,7 +6,9 @@ public abstract class IntraThreadPipe<T> extends AbstractPipe<T> { @Override public void setSignal(final Signal signal) { - this.getTargetPort().getOwningStage().onSignal(signal, this.getTargetPort()); + if (this.getTargetPort() != null) { + this.getTargetPort().getOwningStage().onSignal(signal, this.getTargetPort()); + } } } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/CollectorSink.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/CollectorSink.java index 84a138e7e54ea8fd63ba8c0705c3a7740a83b9e6..5e643e68b7ff57975a2ed20ad14ddc88f2a6b76b 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/CollectorSink.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/CollectorSink.java @@ -49,13 +49,14 @@ public class CollectorSink<T> extends ConsumerStage<T, Void> { @Override protected void execute5(final T element) { this.elements.add(element); + if ((this.elements.size() % THRESHOLD) == 0) { System.out.println("size: " + this.elements.size()); } - if (this.elements.size() > 90000) { - // System.out.println("size > 90000: " + this.elements.size()); - } + // if (this.elements.size() > 90000) { + // // System.out.println("size > 90000: " + this.elements.size()); + // } } } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Distributor.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Distributor.java index 663b4d4703d01c56ea4daea64dd9910061284d99..c700ebfae1cb6b008be6a7bdc3467d331a962132 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Distributor.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Distributor.java @@ -6,7 +6,9 @@ import java.util.List; import teetime.util.concurrent.spsc.Pow2; import teetime.util.list.CommittableQueue; import teetime.variant.methodcallWithPorts.framework.core.AbstractStage; +import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; +import teetime.variant.methodcallWithPorts.framework.core.Signal; public final class Distributor<T> extends AbstractStage<T, T> { @@ -50,6 +52,25 @@ public final class Distributor<T> extends AbstractStage<T, T> { // this.outputPortList.clear(); } + @Override + public void onSignal(final Signal signal, final InputPort<?> inputPort) { + this.logger.debug("Got signal: " + signal + " from input port: " + inputPort); + // System.out.println("Got signal: " + signal + " from input port: " + this.getClass().getSimpleName() + "." + inputPort); + + switch (signal) { + case FINISHED: + this.onFinished(); + break; + default: + this.logger.warn("Aborted sending signal " + signal + ". Reason: Unknown signal."); + break; + } + + for (OutputPort<?> op : this.outputPorts) { + op.sendSignal(signal); + } + } + @SuppressWarnings("unchecked") @Override public void onStart() { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/ObjectProducer.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/ObjectProducer.java index 61d001cd2bf916c7803525d9df59ca38f7a99e2e..b99078f94501f9e70d6d0892b4eb5aa0fffa2996 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/ObjectProducer.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/ObjectProducer.java @@ -53,23 +53,6 @@ public class ObjectProducer<T> extends ProducerStage<Void, T> { this.inputObjectCreator = inputObjectCreator; } - // @Override - // protected void execute3() { - // if (this.numInputObjects == 0) { - // // this.getOutputPort().send((T) END_SIGNAL); - // return; - // } - // - // try { - // final T newObject = this.inputObjectCreator.call(); - // this.numInputObjects--; - // - // // this.getOutputPort().send(newObject); - // } catch (final Exception e) { - // throw new IllegalStateException(e); - // } - // } - @Override protected void execute4(final CommittableQueue<Void> elements) { this.execute5(null); @@ -77,6 +60,8 @@ public class ObjectProducer<T> extends ProducerStage<Void, T> { @Override protected void execute5(final Void element) { + // this.logger.debug("Executing object producer..."); + T newObject = null; newObject = this.inputObjectCreator.create(); this.numInputObjects--; @@ -90,10 +75,4 @@ public class ObjectProducer<T> extends ProducerStage<Void, T> { this.send(newObject); } - // @Override - // public void onIsPipelineHead() { - // // this.getOutputPort().pipe = null; // no performance increase - // super.onIsPipelineHead(); - // } - } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java index 55f3591ea2b33a2ed4ee01ecd2cfc98f5e2186fc..98cc5b977113d20b0af0ca785e8388564c73f64c 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java @@ -7,7 +7,7 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; public class Relay<T> extends AbstractStage<T, T> { - private SpScPipe<T> inputPipe; + private SpScPipe<T> cachedCastedInputPipe; public Relay() { this.setReschedulable(true); @@ -18,7 +18,7 @@ public class Relay<T> extends AbstractStage<T, T> { T element = this.getInputPort().receive(); if (null == element) { // if (this.getInputPort().getPipe().isClosed()) { - if (this.inputPipe.getSignal() == Signal.FINISHED) { + if (this.cachedCastedInputPipe.getSignal() == Signal.FINISHED) { this.setReschedulable(false); assert 0 == this.getInputPort().getPipe().size(); } @@ -30,7 +30,7 @@ public class Relay<T> extends AbstractStage<T, T> { @Override public void onStart() { - this.inputPipe = (SpScPipe<T>) this.getInputPort().getPipe(); + this.cachedCastedInputPipe = (SpScPipe<T>) this.getInputPort().getPipe(); super.onStart(); } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment11/MethodCallThroughputAnalysis11.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment11/MethodCallThroughputAnalysis11.java index ca60071218f9ca07fe7aa601aa12fc453c5ed80e..db3ae9e49364b1c5233d89d80d0d66048d6b0df9 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment11/MethodCallThroughputAnalysis11.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment11/MethodCallThroughputAnalysis11.java @@ -45,8 +45,8 @@ public class MethodCallThroughputAnalysis11 extends Analysis { @Override public void init() { super.init(); - Pipeline<?, ?> pipeline = this.buildPipeline(this.numInputObjects, this.inputObjectCreator); - this.runnable = new RunnableStage(pipeline); + Pipeline<Void, ?> pipeline = this.buildPipeline(this.numInputObjects, this.inputObjectCreator); + this.runnable = new RunnableStage<Void>(pipeline); } /**