diff --git a/pom.xml b/pom.xml index 1a5601f8cfe879ec95ce5836fd63ced928d0cc92..e0c85b350b7de292f8c0a152dccec525a09d86a5 100644 --- a/pom.xml +++ b/pom.xml @@ -57,7 +57,7 @@ <build> <plugins> - <!-- we want JDK 1.6 source and binary compatiblility --> + <!-- we want JDK 1.6 source and binary compatibility --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java index 49a4cbde7e21603e4db51350061db98bf2307d36..e3834bcd206507c8b2917b81d2ca16c94604598d 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java @@ -44,16 +44,19 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { protected abstract void execute5(I element); /** - * Sends the <code>element</code> using the default output port + * Sends the given <code>element</code> using the default output port * * @param element + * @return <code>true</code> iff the given element could be sent, <code>false</code> otherwise (then use a re-try strategy) */ - protected final void send(final O element) { - this.send(this.getOutputPort(), element); + protected final boolean send(final O element) { + return this.send(this.getOutputPort(), element); } - protected final void send(final OutputPort<O> outputPort, final O element) { - outputPort.send(element); + protected final boolean send(final OutputPort<O> outputPort, final O element) { + if (!outputPort.send(element)) { + return false; + } // StageWithPort<?, ?> next = outputPort.getPipe().getTargetPort().getOwningStage(); StageWithPort<?, ?> next = outputPort.getCachedTargetStage(); @@ -61,6 +64,8 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { do { next.executeWithPorts(); // PERFORMANCE use the return value as indicator for re-schedulability instead } while (next.isReschedulable()); + + return true; } // public void disable() { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java index e95b94fac2f0df6ed4711438687e6434a660d63d..6d5e197f2d91a669b6213ff7d6432b77cb615a27 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java @@ -14,8 +14,13 @@ public class OutputPort<T> { */ private StageWithPort<?, ?> cachedTargetStage; - public void send(final T element) { - this.pipe.add(element); + /** + * + * @param element + * @return <code>true</code> iff the given <code>element</code> could be sent, <code>false</code> otherwise (then use a re-try strategy) + */ + public boolean send(final T element) { + return this.pipe.add(element); } public IPipe<T> getPipe() { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java index 4b94be69c33616979c6dc78e6a8d1a482deb6dc9..e09d4fe8bc24a58ce361979785c0ac53536fe1c4 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java @@ -5,7 +5,7 @@ import teetime.variant.methodcallWithPorts.framework.core.Signal; public interface IPipe<T> { - void add(T element); + boolean add(T element); T removeLast(); diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java index fc3d224a0ad8a1654171b7cf974f4d4a74e2677b..d289f4c805209f3e14868750a179f3b3dffdcf0d 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java @@ -26,8 +26,9 @@ public class OrderedGrowableArrayPipe<T> extends IntraThreadPipe<T> { } @Override - public void add(final T element) { + public boolean add(final T element) { this.elements.put(this.tail++, element); + return true; } @Override diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java index eb92d9433e9745a1d29bb46788c0d29e497079d8..a69db3732b8367ac9e8f58b0a22ec804521dfdd1 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java @@ -25,8 +25,8 @@ public class OrderedGrowablePipe<T> extends IntraThreadPipe<T> { } @Override - public void add(final T element) { - this.elements.offer(element); + public boolean add(final T element) { + return this.elements.offer(element); } @Override diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java index eda193f371596c4b95515dd9b70277a3fd28ccf5..8d8053f603df20c8503cd192d103b89ee3798742 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java @@ -21,9 +21,10 @@ public class Pipe<T> extends IntraThreadPipe<T> { * @see teetime.examples.throughput.methodcall.IPipe#add(T) */ @Override - public void add(final T element) { + public boolean add(final T element) { this.elements.addToTailUncommitted(element); this.elements.commit(); + return true; } /* diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java index c3581c1a6369bdf9841b8545c55f1f67b01fe3d7..c5e02be9c013da476c9d4e5cc9749365a896a878 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java @@ -15,8 +15,9 @@ public class SingleElementPipe<T> extends IntraThreadPipe<T> { } @Override - public void add(final T element) { + public boolean add(final T element) { this.element = element; + return true; } @Override diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java index 836af47bd8c3462054d0429ae86e66e76815632c..65143e7f420dce06077fd21557e929ff31a74772 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java @@ -15,8 +15,9 @@ import teetime.variant.methodcallWithPorts.framework.core.Signal; public class SpScPipe<T> extends AbstractPipe<T> { private final Queue<T> queue; - private int maxSize; private final AtomicReference<Signal> signal = new AtomicReference<Signal>(); + // statistics + private int numWaits; public SpScPipe(final int capacity) { ConcurrentQueueSpec concurrentQueueSpec = new ConcurrentQueueSpec(1, 1, capacity, Ordering.FIFO, Preference.THROUGHPUT); @@ -32,9 +33,16 @@ public class SpScPipe<T> extends AbstractPipe<T> { } @Override - public void add(final T element) { - this.queue.offer(element); - this.maxSize = Math.max(this.queue.size(), this.maxSize); + public boolean add(final T element) { + // this.maxSize = Math.max(this.queue.size(), this.maxSize); + + // BETTER introduce a QueueIsFullStrategy + while (!this.queue.offer(element)) { + this.numWaits++; + Thread.yield(); + } + + return true; } @Override @@ -58,8 +66,8 @@ public class SpScPipe<T> extends AbstractPipe<T> { } // BETTER find a solution w/o any thread-safe code in this stage - public synchronized int getMaxSize() { - return this.maxSize; + public synchronized int getNumWaits() { + return this.numWaits; } @Override diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java index 1ead00769b9f07daca2e04b70ead6da3dfbe9bb8..2b908268929cd301933b12f0a58d78eb41f483c6 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java @@ -25,13 +25,14 @@ public class UnorderedGrowablePipe<T> extends IntraThreadPipe<T> { } @Override - public void add(final T element) { + public boolean add(final T element) { if (this.lastFreeIndex == this.elements.length) { // if (this.lastFreeIndex == this.elements.getCapacity()) { this.elements = this.grow(); } this.elements[this.lastFreeIndex++] = element; // this.elements.put(this.lastFreeIndex++, element); + return true; } @Override diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/RoundRobinStrategy.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/RoundRobinStrategy.java index 13b27fa341d353492b572ed1ef0389cf6c07d86f..0a3dfb7888ff595430aab79134fbff63f100b304 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/RoundRobinStrategy.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/RoundRobinStrategy.java @@ -31,6 +31,7 @@ public final class RoundRobinStrategy<T> implements IDistributorStrategy<T> { @Override public boolean distribute(final List<OutputPort<T>> outputPorts, final T element) { final OutputPort<T> outputPort = this.getNextPortInRoundRobinOrder(outputPorts); + outputPort.send(element); return true; diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java index a5d73561aaa78a21e85d6b34f2d732b34d35bead..99626076521e56a46247b72a5279fe7363cf46c0 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java @@ -151,7 +151,7 @@ public class TCPReader extends ProducerStage<Void, IMonitoringRecord> { final long loggingTimestamp = buffer.getLong(); final IMonitoringRecord record; try { // NOCS (Nested try-catch) - // record = this.recordFactory.create(clazzid, buffer, this.stringRegistry); + // record = this.recordFactory.create(clazzid, buffer, this.stringRegistry); record = AbstractMonitoringRecord.createFromByteBuffer(clazzid, buffer, this.stringRegistry); record.setLoggingTimestamp(loggingTimestamp); this.send(record); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java index 575a35495997ded35a7f2b55dd86fb6c2a1970a3..6328a5d70102374f7a978fca2b2e9ba30296d2fc 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java @@ -114,11 +114,11 @@ public class TcpTraceReconstruction extends Analysis { @Override public void onTerminate() { - int maxSize = 0; + int maxNumWaits = 0; for (SpScPipe<IMonitoringRecord> pipe : this.tcpRelayPipes) { - maxSize = Math.max(maxSize, pipe.getMaxSize()); + maxNumWaits = Math.max(maxNumWaits, pipe.getNumWaits()); } - System.out.println("max size of TcpRelayPipes: " + maxSize); + System.out.println("max #waits of TcpRelayPipes: " + maxNumWaits); super.onTerminate(); } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java index 1665575968a91e346467d2858508b1f1367a5854..d73c8032a9f6562ed3b06cd01a6f5385eae6e479 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java @@ -148,11 +148,11 @@ public class TcpTraceReduction extends Analysis { @Override public void onTerminate() { - int maxSize = 0; + int maxNumWaits = 0; for (SpScPipe<IMonitoringRecord> pipe : this.tcpRelayPipes) { - maxSize = Math.max(maxSize, pipe.getMaxSize()); + maxNumWaits = Math.max(maxNumWaits, pipe.getNumWaits()); } - System.out.println("max size of TcpRelayPipes: " + maxSize); + System.out.println("max #waits of TcpRelayPipes: " + maxNumWaits); super.onTerminate(); } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java index bf0f636f81ca72aac98f332ab6cdf3d38cb78365..db8071072992cf1d5dc65321b139b2663181368f 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java @@ -99,11 +99,11 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest { analysis.onTerminate(); } - int maxSize = 0; + int maxNumWaits = 0; for (SpScPipe<IMonitoringRecord> pipe : analysis.getTcpRelayPipes()) { - maxSize = Math.max(maxSize, pipe.getMaxSize()); + maxNumWaits = Math.max(maxNumWaits, pipe.getNumWaits()); } - System.out.println("Max size of tcp-relay pipe: " + maxSize); + System.out.println("max #waits of TcpRelayPipes: " + maxNumWaits); // System.out.println("#traceMetadata read: " + analysis.getNumTraceMetadatas()); // System.out.println("Max #trace created: " + analysis.getMaxElementsCreated()); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/ChwWorkTcpTraceReductionAnalysisWithThreadsTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/ChwWorkTcpTraceReductionAnalysisWithThreadsTest.java index 61907ac6ec191a7521e1359b5bc848e8ed0ecc93..d5bd3c9c9e4560814ef987dcf6bf9cdd7ac50fbd 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/ChwWorkTcpTraceReductionAnalysisWithThreadsTest.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/ChwWorkTcpTraceReductionAnalysisWithThreadsTest.java @@ -83,7 +83,7 @@ public class ChwWorkTcpTraceReductionAnalysisWithThreadsTest { analysis.onTerminate(); } - System.out.println("Max size of tcp-relay pipe: " + analysis.getTcpRelayPipe().getMaxSize()); + System.out.println("#waits of tcp-relay pipe: " + analysis.getTcpRelayPipe().getNumWaits()); // System.out.println("#traceMetadata read: " + analysis.getNumTraceMetadatas()); // System.out.println("Max #trace created: " + analysis.getMaxElementsCreated()); System.out.println("TraceThroughputs: " + analysis.getTraceThroughputs());