diff --git a/src/main/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/SysOutFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/SysOutFilter.java index 9a98b94f05c522441cfd0527fd5d789ff393d05e..7b97e682bf260cd0ffd8d585c8602aa7f947ba37 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/SysOutFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/SysOutFilter.java @@ -5,16 +5,14 @@ import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe; -import kieker.common.record.IMonitoringRecord; - public class SysOutFilter<T> extends ConsumerStage<T> { private final InputPort<Long> triggerInputPort = this.createInputPort(); private final OutputPort<T> outputPort = this.createOutputPort(); - private final IPipe<IMonitoringRecord> pipe; + private final IPipe pipe; - public SysOutFilter(final IPipe<IMonitoringRecord> pipe) { + public SysOutFilter(final IPipe pipe) { this.pipe = pipe; } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractPort.java index ce3563c62e9ca1124a1f698262e99ffafa5221ca..53aed08c7ab95c848e4721fe116a0ced45cb61af 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractPort.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractPort.java @@ -4,7 +4,7 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe; public abstract class AbstractPort<T> { - protected IPipe<T> pipe; + protected IPipe pipe; /** * The type of this port. * <p> @@ -13,11 +13,11 @@ public abstract class AbstractPort<T> { */ protected Class<T> type; - public IPipe<T> getPipe() { + public IPipe getPipe() { return this.pipe; } - public void setPipe(final IPipe<T> pipe) { + public void setPipe(final IPipe pipe) { this.pipe = pipe; } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java index 84828cf0256a228e0372318e98028fe081417f9b..79c143abfeb328b6924aa63ab692e859953b8368 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java @@ -147,7 +147,7 @@ public abstract class AbstractStage implements StageWithPort { @Override public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) { for (OutputPort<?> outputPort : this.getOutputPorts()) { - IPipe<?> pipe = outputPort.getPipe(); + IPipe pipe = outputPort.getPipe(); if (null != pipe) { // if output port is connected with another one Class<?> sourcePortType = outputPort.getType(); Class<?> targetPortType = pipe.getTargetPort().getType(); diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InputPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InputPort.java index 42b188dcf6ec3c17433a4bab28d91551400e8e5b..02d74d49eee3273d64fb02b541d3afbc5049eacd 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InputPort.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InputPort.java @@ -12,22 +12,24 @@ public class InputPort<T> extends AbstractPort<T> { } public T receive() { - T element = this.pipe.removeLast(); + @SuppressWarnings("unchecked") + T element = (T) this.pipe.removeLast(); return element; } public T read() { - T element = this.pipe.readLast(); + @SuppressWarnings("unchecked") + T element = (T) this.pipe.readLast(); return element; } /** * Connects this input port with the given <code>pipe</code> bi-directionally - * + * * @param pipe */ @Override - public void setPipe(final IPipe<T> pipe) { + public void setPipe(final IPipe pipe) { this.pipe = pipe; pipe.setTargetPort(this); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java index 5d7f3f511e6e3c69e5132028771fc5362d650564..e222c2570ba6b57814e6a4be83e014fcbc4f0e5d 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java @@ -4,9 +4,9 @@ import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; -public abstract class AbstractPipe<T> implements IPipe<T> { +public abstract class AbstractPipe implements IPipe { - private InputPort<T> targetPort; + private InputPort<?> targetPort; /** * Performance cache: Avoids the following method chain @@ -18,18 +18,18 @@ public abstract class AbstractPipe<T> implements IPipe<T> { protected StageWithPort cachedTargetStage; @Override - public InputPort<T> getTargetPort() { + public InputPort<?> getTargetPort() { return this.targetPort; } @Override - public void setTargetPort(final InputPort<T> targetPort) { + public void setTargetPort(final InputPort<?> targetPort) { this.targetPort = targetPort; this.cachedTargetStage = targetPort.getOwningStage(); } @Override - public void connectPorts(final OutputPort<T> sourcePort, final InputPort<T> targetPort) { + public <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { sourcePort.setPipe(this); targetPort.setPipe(this); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/CommittablePipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/CommittablePipe.java index 5490feb101c239ba6ae526b9bfe338ceb862bf4b..9cf15055b71549c7f98a18c8cc9123128519b33f 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/CommittablePipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/CommittablePipe.java @@ -4,13 +4,13 @@ import teetime.util.list.CommittableResizableArrayQueue; import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; -public final class CommittablePipe<T> extends IntraThreadPipe<T> { +public final class CommittablePipe extends IntraThreadPipe { - private final CommittableResizableArrayQueue<T> elements = new CommittableResizableArrayQueue<T>(null, 4); + private final CommittableResizableArrayQueue<Object> elements = new CommittableResizableArrayQueue<Object>(null, 4); @Deprecated - public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) { - IPipe<T> pipe = new CommittablePipe<T>(); + public static <T> void connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + IPipe pipe = new CommittablePipe(); pipe.connectPorts(sourcePort, targetPort); } @@ -20,7 +20,7 @@ public final class CommittablePipe<T> extends IntraThreadPipe<T> { * @see teetime.examples.throughput.methodcall.IPipe#add(T) */ @Override - public boolean add(final T element) { + public boolean add(final Object element) { this.elements.addToTailUncommitted(element); this.elements.commit(); return true; @@ -32,8 +32,8 @@ public final class CommittablePipe<T> extends IntraThreadPipe<T> { * @see teetime.examples.throughput.methodcall.IPipe#removeLast() */ @Override - public T removeLast() { - T element = this.elements.removeFromHeadUncommitted(); + public Object removeLast() { + Object element = this.elements.removeFromHeadUncommitted(); this.elements.commit(); return element; } @@ -54,11 +54,11 @@ public final class CommittablePipe<T> extends IntraThreadPipe<T> { * @see teetime.examples.throughput.methodcall.IPipe#readLast() */ @Override - public T readLast() { + public Object readLast() { return this.elements.getTail(); } - public CommittableResizableArrayQueue<T> getElements() { + public CommittableResizableArrayQueue<?> getElements() { return this.elements; } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java index 7362f6fdaf7638a3317e0e0859d774931e94571d..28c43332d65e180ffd41c6a4703366918c7e9442 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java @@ -4,30 +4,25 @@ import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; import teetime.variant.methodcallWithPorts.framework.core.signal.Signal; -public interface IPipe<T> { +public interface IPipe { - boolean add(T element); - - T removeLast(); + boolean add(Object element); boolean isEmpty(); int size(); - T readLast(); + Object removeLast(); - // void close(); - // - // boolean isClosed(); + Object readLast(); - InputPort<T> getTargetPort(); + InputPort<?> getTargetPort(); - void setTargetPort(InputPort<T> targetPort); + void setTargetPort(InputPort<?> targetPort); void setSignal(Signal signal); - // BETTER change signature to allow {OutputPort<T>, OutputPort<A0 extends T>, OutputPort<A1 extends T>, ...} - void connectPorts(OutputPort<T> sourcePort, InputPort<T> targetPort); + <T> void connectPorts(OutputPort<? extends T> sourcePort, InputPort<T> targetPort); void reportNewElement(); diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipeFactory.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipeFactory.java index b0b1b7a8055aa131a4fde12361458ef7e893cd79..382fe5f2e038f948af3a56d58836f74215d3dd14 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipeFactory.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipeFactory.java @@ -5,7 +5,7 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.Threa public interface IPipeFactory { - <T> IPipe<T> create(int capacity); + IPipe create(int capacity); ThreadCommunication getThreadCommunication(); diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/InterThreadPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/InterThreadPipe.java index abc3ff3b4d40b13c77ef927812afa59f716ab5ab..94c562ebbea24afecfc863ebf60cdf6abbbc1b8d 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/InterThreadPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/InterThreadPipe.java @@ -4,7 +4,7 @@ import java.util.concurrent.atomic.AtomicReference; import teetime.variant.methodcallWithPorts.framework.core.signal.Signal; -public abstract class InterThreadPipe<T> extends AbstractPipe<T> { +public abstract class InterThreadPipe extends AbstractPipe { private final AtomicReference<Signal> signal = new AtomicReference<Signal>(); diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IntraThreadPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IntraThreadPipe.java index 1e78c8b418e623adbbb0450d33341b3d0b89d903..874663cc575d115f42b065c58146e775f9054d80 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IntraThreadPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IntraThreadPipe.java @@ -2,7 +2,7 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; import teetime.variant.methodcallWithPorts.framework.core.signal.Signal; -public abstract class IntraThreadPipe<T> extends AbstractPipe<T> { +public abstract class IntraThreadPipe extends AbstractPipe { @Override public void setSignal(final Signal signal) { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java index 86f2b99102a205c3fecdd43bd9ffe2f9766388f4..f1944529e0089eb9c54f7413c475864af7ca3885 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java @@ -4,9 +4,9 @@ import teetime.util.concurrent.workstealing.CircularArray; import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; -public final class OrderedGrowableArrayPipe<T> extends IntraThreadPipe<T> { +public final class OrderedGrowableArrayPipe extends IntraThreadPipe { - private CircularArray<T> elements; + private final CircularArray<Object> elements; private int head; private int tail; @@ -15,23 +15,23 @@ public final class OrderedGrowableArrayPipe<T> extends IntraThreadPipe<T> { } public OrderedGrowableArrayPipe(final int initialCapacity) { - this.elements = new CircularArray<T>(initialCapacity); + this.elements = new CircularArray<Object>(initialCapacity); } @Deprecated - public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) { - IPipe<T> pipe = new OrderedGrowableArrayPipe<T>(); + public static <T> void connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + IPipe pipe = new OrderedGrowableArrayPipe(); pipe.connectPorts(sourcePort, targetPort); } @Override - public boolean add(final T element) { + public boolean add(final Object element) { this.elements.put(this.tail++, element); return true; } @Override - public T removeLast() { + public Object removeLast() { if (this.head < this.tail) { return this.elements.get(this.head++); } else { @@ -45,7 +45,7 @@ public final class OrderedGrowableArrayPipe<T> extends IntraThreadPipe<T> { } @Override - public T readLast() { + public Object readLast() { return this.elements.get(this.head); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipeFactory.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipeFactory.java index b7d10d2e4222a84539348d330a62123c60905521..86a00ca10b19943c70750f150c2eaf802681b739 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipeFactory.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipeFactory.java @@ -9,8 +9,8 @@ public class OrderedGrowableArrayPipeFactory implements IPipeFactory { * Hint: The capacity for this pipe implementation is ignored */ @Override - public <T> IPipe<T> create(final int capacity) { - return new OrderedGrowableArrayPipe<T>(); + public IPipe create(final int capacity) { + return new OrderedGrowableArrayPipe(); } @Override diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java index 8bf61cecfd561019d6a67939c4f7f6f134f96486..d63a5f86ff0864b2a76557e1900ff3d2b4f7d8b8 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java @@ -5,31 +5,31 @@ import java.util.LinkedList; import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; -public class OrderedGrowablePipe<T> extends IntraThreadPipe<T> { +public class OrderedGrowablePipe extends IntraThreadPipe { - private LinkedList<T> elements; + private final LinkedList<Object> elements; public OrderedGrowablePipe() { this(100000); } public OrderedGrowablePipe(final int initialCapacity) { - this.elements = new LinkedList<T>(); + this.elements = new LinkedList<Object>(); } @Deprecated - public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) { - IPipe<T> pipe = new OrderedGrowablePipe<T>(); + public static <T> void connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + IPipe pipe = new OrderedGrowablePipe(); pipe.connectPorts(sourcePort, targetPort); } @Override - public boolean add(final T element) { + public boolean add(final Object element) { return this.elements.offer(element); } @Override - public T removeLast() { + public Object removeLast() { return this.elements.poll(); } @@ -39,7 +39,7 @@ public class OrderedGrowablePipe<T> extends IntraThreadPipe<T> { } @Override - public T readLast() { + public Object readLast() { return this.elements.peek(); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/PipeFactory.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/PipeFactory.java index 5a8177a745b283d5bcbca8b0073fb3c63fe7f506..d876a385f21e5457531d030546054935959f3d19 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/PipeFactory.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/PipeFactory.java @@ -50,11 +50,11 @@ public class PipeFactory { * @param tc * @return */ - public <T> IPipe<T> create(final ThreadCommunication tc) { + public IPipe create(final ThreadCommunication tc) { return this.create(tc, PipeOrdering.QUEUE_BASED, true, 1); } - public <T> IPipe<T> create(final ThreadCommunication tc, final PipeOrdering ordering, final boolean growable, final int capacity) { + public IPipe create(final ThreadCommunication tc, final PipeOrdering ordering, final boolean growable, final int capacity) { String key = this.buildKey(tc, ordering, growable); IPipeFactory pipeClass = this.pipeFactories.get(key); if (null == pipeClass) { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/RelayTestPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/RelayTestPipe.java index d7ab5e83c715369c5e36cc7854cb7360ae589131..bbd11753ade86bfe33c0841f9814359a3b67768b 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/RelayTestPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/RelayTestPipe.java @@ -2,7 +2,7 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; import teetime.util.ConstructorClosure; -public final class RelayTestPipe<T> extends InterThreadPipe<T> { +public final class RelayTestPipe<T> extends InterThreadPipe { private int numInputObjects; private final ConstructorClosure<T> inputObjectCreator; @@ -14,7 +14,7 @@ public final class RelayTestPipe<T> extends InterThreadPipe<T> { } @Override - public boolean add(final T element) { + public boolean add(final Object element) { return false; } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java index 1d48809f6574a3ea7ea696065159cb7e7804c993..73cba7744747b47df7f65e9c79195eb34cd4b052 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java @@ -3,29 +3,29 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; -public final class SingleElementPipe<T> extends IntraThreadPipe<T> { +public final class SingleElementPipe extends IntraThreadPipe { - private T element; + private Object element; SingleElementPipe() { super(); } @Deprecated - public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) { - IPipe<T> pipe = new SingleElementPipe<T>(); + public static <T> void connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + IPipe pipe = new SingleElementPipe(); pipe.connectPorts(sourcePort, targetPort); } @Override - public boolean add(final T element) { + public boolean add(final Object element) { this.element = element; return true; } @Override - public T removeLast() { - T temp = this.element; + public Object removeLast() { + Object temp = this.element; this.element = null; return temp; } @@ -36,7 +36,7 @@ public final class SingleElementPipe<T> extends IntraThreadPipe<T> { } @Override - public T readLast() { + public Object readLast() { return this.element; } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipeFactory.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipeFactory.java index f557d3e7bb5a9c92bfe940e71291090586f4aebb..fcbde3f4e79ef253ac99fe2fb8d43d75693d08a5 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipeFactory.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipeFactory.java @@ -9,18 +9,21 @@ public class SingleElementPipeFactory implements IPipeFactory { * Hint: The capacity for this pipe implementation is ignored */ @Override - public <T> IPipe<T> create(final int capacity) { - return new SingleElementPipe<T>(); + public IPipe create(final int capacity) { + return new SingleElementPipe(); } + @Override public ThreadCommunication getThreadCommunication() { return ThreadCommunication.INTRA; } + @Override public PipeOrdering getOrdering() { return PipeOrdering.ARBITRARY; } + @Override public boolean isGrowable() { return false; } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java index 4999edef4501b2265ab347217ee4d83d74583364..4ac039e4d5e542ca383322486074c264204ed3c7 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java @@ -10,9 +10,9 @@ import org.jctools.queues.spec.Preference; import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; -public final class SpScPipe<T> extends InterThreadPipe<T> { +public final class SpScPipe extends InterThreadPipe { - private final Queue<T> queue; + private final Queue<Object> queue; // statistics private int numWaits; @@ -22,14 +22,14 @@ public final class SpScPipe<T> extends InterThreadPipe<T> { } @Deprecated - public static <T> SpScPipe<T> connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort, final int capacity) { - SpScPipe<T> pipe = new SpScPipe<T>(capacity); + public static <T> SpScPipe connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + SpScPipe pipe = new SpScPipe(capacity); pipe.connectPorts(sourcePort, targetPort); return pipe; } @Override - public boolean add(final T element) { + public boolean add(final Object element) { // BETTER introduce a QueueIsFullStrategy while (!this.queue.offer(element)) { this.numWaits++; @@ -40,7 +40,7 @@ public final class SpScPipe<T> extends InterThreadPipe<T> { } @Override - public T removeLast() { + public Object removeLast() { return this.queue.poll(); } @@ -55,7 +55,7 @@ public final class SpScPipe<T> extends InterThreadPipe<T> { } @Override - public T readLast() { + public Object readLast() { return this.queue.peek(); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipeFactory.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipeFactory.java index 25d681d248b63f7a377b4bdfa3d31a5aa86d434a..d81ade9dce0de2bbeffdecb307088b6d5befef8b 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipeFactory.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipeFactory.java @@ -6,8 +6,8 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.Threa public class SpScPipeFactory implements IPipeFactory { @Override - public <T> IPipe<T> create(final int capacity) { - return new SpScPipe<T>(capacity); + public IPipe create(final int capacity) { + return new SpScPipe(capacity); } @Override diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java index 357762b45c5f3f3bafbd47e8bd1ac43218f2395e..fbbd079ca4039c073df17b8cd04d37522da10a6d 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java @@ -3,28 +3,27 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; -public final class UnorderedGrowablePipe<T> extends IntraThreadPipe<T> { +public final class UnorderedGrowablePipe extends IntraThreadPipe { private final int MIN_CAPACITY; - private T[] elements; + private Object[] elements; // private final ArrayWrapper2<T> elements = new ArrayWrapper2<T>(2); private int lastFreeIndex; - @SuppressWarnings("unchecked") UnorderedGrowablePipe() { this.MIN_CAPACITY = 4; - this.elements = (T[]) new Object[this.MIN_CAPACITY]; + this.elements = new Object[this.MIN_CAPACITY]; } @Deprecated - public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) { - IPipe<T> pipe = new UnorderedGrowablePipe<T>(); + public static <T> void connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + IPipe pipe = new UnorderedGrowablePipe(); pipe.connectPorts(sourcePort, targetPort); } @Override - public boolean add(final T element) { + public boolean add(final Object element) { if (this.lastFreeIndex == this.elements.length) { // if (this.lastFreeIndex == this.elements.getCapacity()) { this.elements = this.grow(); @@ -35,11 +34,11 @@ public final class UnorderedGrowablePipe<T> extends IntraThreadPipe<T> { } @Override - public T removeLast() { + public Object removeLast() { // if (this.lastFreeIndex == 0) { // return null; // } - T element = this.elements[--this.lastFreeIndex]; + Object element = this.elements[--this.lastFreeIndex]; this.elements[this.lastFreeIndex] = null; // T element = this.elements.get(--this.lastFreeIndex); return element; @@ -51,7 +50,7 @@ public final class UnorderedGrowablePipe<T> extends IntraThreadPipe<T> { } @Override - public T readLast() { + public Object readLast() { return this.elements[this.lastFreeIndex - 1]; // return this.elements.get(this.lastFreeIndex - 1); } @@ -61,7 +60,7 @@ public final class UnorderedGrowablePipe<T> extends IntraThreadPipe<T> { return this.lastFreeIndex; } - private T[] grow() { + private Object[] grow() { int newSize = this.elements.length * 2; // System.out.println("growing to " + newSize); return this.newArray(newSize); @@ -73,9 +72,8 @@ public final class UnorderedGrowablePipe<T> extends IntraThreadPipe<T> { // return this.newArray(newSize); // } - private T[] newArray(final int newSize) { - @SuppressWarnings("unchecked") - T[] newElements = (T[]) new Object[newSize]; + private Object[] newArray(final int newSize) { + Object[] newElements = new Object[newSize]; System.arraycopy(this.elements, 0, newElements, 0, this.elements.length); diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipeFactory.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipeFactory.java index 362559e9e556de851e606f9d0f4bb08f97c3deb7..c6d1b87f10dae68bdf9162628cb4e8458b7f3259 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipeFactory.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipeFactory.java @@ -9,8 +9,8 @@ public class UnorderedGrowablePipeFactory implements IPipeFactory { * Hint: The capacity for this pipe implementation is ignored */ @Override - public <T> IPipe<T> create(final int capacity) { - return new UnorderedGrowablePipe<T>(); + public IPipe create(final int capacity) { + return new UnorderedGrowablePipe(); } @Override diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java index 2c525299c1e31c5d31be3c96416dc6ec7da3da83..b40d462835d133237362f5366bbb756cb685a70a 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java @@ -9,7 +9,7 @@ public class Relay<T> extends ProducerStage<T> { private final InputPort<T> inputPort = this.createInputPort(); - private InterThreadPipe<T> cachedCastedInputPipe; + private InterThreadPipe cachedCastedInputPipe; @Override public void execute() { @@ -26,7 +26,7 @@ public class Relay<T> extends ProducerStage<T> { @Override public void onStarting() { - this.cachedCastedInputPipe = (InterThreadPipe<T>) this.inputPort.getPipe(); + this.cachedCastedInputPipe = (InterThreadPipe) this.inputPort.getPipe(); super.onStarting(); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/Dir2RecordsFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/Dir2RecordsFilter.java index df94500cc16587271fc7b99ad030f68b3de67173..689d326559e6fa6180ebd34d5d05b5657f9f04c0 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/Dir2RecordsFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/Dir2RecordsFilter.java @@ -20,7 +20,6 @@ import java.io.File; import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; import teetime.variant.methodcallWithPorts.framework.core.Pipeline; -import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory; import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering; import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication; @@ -73,11 +72,11 @@ public class Dir2RecordsFilter extends Pipeline<ClassNameRegistryCreationFilter, final OutputPort<File> zipFileOutputPort = fileExtensionSwitch.addFileExtension(FSUtil.ZIP_FILE_EXTENSION); // connect ports by pipes - IPipe<File> pipe = this.pipeFactory.create(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false, 1); - pipe.connectPorts(classNameRegistryCreationFilter.getOutputPort(), directory2FilesFilter.getInputPort()); + this.pipeFactory.create(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false, 1) + .connectPorts(classNameRegistryCreationFilter.getOutputPort(), directory2FilesFilter.getInputPort()); - pipe = this.pipeFactory.create(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false, 1); - pipe.connectPorts(directory2FilesFilter.getOutputPort(), fileExtensionSwitch.getInputPort()); + this.pipeFactory.create(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false, 1) + .connectPorts(directory2FilesFilter.getOutputPort(), fileExtensionSwitch.getInputPort()); SingleElementPipe.connect(normalFileOutputPort, datFile2RecordFilter.getInputPort()); SingleElementPipe.connect(binFileOutputPort, binaryFile2RecordFilter.getInputPort()); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment14/MethodCallThroughputAnalysis14.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment14/MethodCallThroughputAnalysis14.java index 76b3f3c6309de61f9f73310a60721609d43e829e..04a3c5b4e32fdf6cd32186436d26a210aacf1b1e 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment14/MethodCallThroughputAnalysis14.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment14/MethodCallThroughputAnalysis14.java @@ -74,7 +74,7 @@ public class MethodCallThroughputAnalysis14 extends Analysis { pipeline.setFirstStage(objectProducer); pipeline.setLastStage(collectorSink); - IPipe<TimestampObject> pipe = this.pipeFactory.create(ThreadCommunication.INTRA); + IPipe pipe = this.pipeFactory.create(ThreadCommunication.INTRA); pipe.connectPorts(objectProducer.getOutputPort(), startTimestampFilter.getInputPort()); pipe = this.pipeFactory.create(ThreadCommunication.INTRA); pipe.connectPorts(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort()); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java index 751a9f23319915e48df1bdc9d13343d2728fd88e..df4fd02977f6f0162178934d997ae9da5a2ad6b5 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java @@ -151,7 +151,7 @@ public class MethodCallThroughputAnalysis17 extends Analysis { final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter(); final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects); - IPipe<TimestampObject> startPipe = new RelayTestPipe<TimestampObject>(this.numInputObjects, this.inputObjectCreator); + IPipe startPipe = new RelayTestPipe<TimestampObject>(this.numInputObjects, this.inputObjectCreator); startPipe.setSignal(new TerminatingSignal()); relay.getInputPort().setPipe(startPipe); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java index f6967ff1c036d2000f55f3a0c3df1f21612bbd29..016514cf0205ae130df137f6bb87b7b47fbd4374 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java @@ -31,7 +31,7 @@ public class TcpTraceReconstruction extends Analysis { private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>(); private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer()); - private final List<SpScPipe<IMonitoringRecord>> tcpRelayPipes = new ArrayList<SpScPipe<IMonitoringRecord>>(); + private final List<SpScPipe> tcpRelayPipes = new ArrayList<SpScPipe>(); private Thread tcpThread; private Thread[] workerThreads; @@ -75,7 +75,7 @@ public class TcpTraceReconstruction extends Analysis { Sink<TraceEventRecords> endStage = new Sink<TraceEventRecords>(); // connect stages - SpScPipe<IMonitoringRecord> tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getNewOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE); + SpScPipe tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getNewOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE); this.tcpRelayPipes.add(tcpRelayPipe); SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort()); @@ -113,7 +113,7 @@ public class TcpTraceReconstruction extends Analysis { @Override public void onTerminate() { int maxNumWaits = 0; - for (SpScPipe<IMonitoringRecord> pipe : this.tcpRelayPipes) { + for (SpScPipe pipe : this.tcpRelayPipes) { maxNumWaits = Math.max(maxNumWaits, pipe.getNumWaits()); } System.out.println("max #waits of TcpRelayPipes: " + maxNumWaits); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java index f04cdfc8ca22448f12b90587b45f47eebefdc316..82f80ee6f3f53fad5e81017a8094870f36f62d43 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java @@ -38,7 +38,7 @@ public class TcpTraceReduction extends Analysis { private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>(); private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer()); private final Map<TraceEventRecords, TraceAggregationBuffer> trace2buffer = new TreeMap<TraceEventRecords, TraceAggregationBuffer>(new TraceComperator()); - private final List<SpScPipe<IMonitoringRecord>> tcpRelayPipes = new ArrayList<SpScPipe<IMonitoringRecord>>(); + private final List<SpScPipe> tcpRelayPipes = new ArrayList<SpScPipe>(); private Thread tcpThread; private Thread clockThread; @@ -102,7 +102,7 @@ public class TcpTraceReduction extends Analysis { Sink<TraceEventRecords> endStage = new Sink<TraceEventRecords>(); // connect stages - SpScPipe<IMonitoringRecord> tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getNewOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE); + SpScPipe tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getNewOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE); this.tcpRelayPipes.add(tcpRelayPipe); SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort()); @@ -145,7 +145,7 @@ public class TcpTraceReduction extends Analysis { @Override public void onTerminate() { int maxNumWaits = 0; - for (SpScPipe<IMonitoringRecord> pipe : this.tcpRelayPipes) { + for (SpScPipe pipe : this.tcpRelayPipes) { maxNumWaits = Math.max(maxNumWaits, pipe.getNumWaits()); } System.out.println("max #waits of TcpRelayPipes: " + maxNumWaits); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderConfiguration.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderConfiguration.java index f7afedad2c2f32b97deae6c37398a21ac52f4fbb..e2dcad56386ae295e26b7258fafd396ee2f9a277 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderConfiguration.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderConfiguration.java @@ -21,7 +21,6 @@ import java.util.List; import teetime.variant.methodcallWithPorts.framework.core.Configuration; import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; -import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory; import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering; import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication; @@ -59,11 +58,11 @@ public class RecordReaderConfiguration extends Configuration { pipeline.setFirstStage(initialElementProducer); pipeline.setLastStage(collector); - IPipe<File> pipe = this.pipeFactory.create(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false, 1); - pipe.connectPorts(initialElementProducer.getOutputPort(), dir2RecordsFilter.getInputPort()); + this.pipeFactory.create(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false, 1) + .connectPorts(initialElementProducer.getOutputPort(), dir2RecordsFilter.getInputPort()); - IPipe<IMonitoringRecord> pipe1 = this.pipeFactory.create(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false, 1); - pipe1.connectPorts(dir2RecordsFilter.getOutputPort(), collector.getInputPort()); + this.pipeFactory.create(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false, 1) + .connectPorts(dir2RecordsFilter.getOutputPort(), collector.getInputPort()); return pipeline; } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest.java index de3ac414017f35525cd7f6f28c4248ba2474a036..ef0474b28b3a3196794104eb74b3267e93523819 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest.java @@ -38,8 +38,6 @@ import teetime.variant.methodcallWithPorts.framework.core.Analysis; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import util.StatisticsUtil; -import kieker.common.record.IMonitoringRecord; - /** * @author Christian Wulf * @@ -96,7 +94,7 @@ public class ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest { } int maxNumWaits = 0; - for (SpScPipe<IMonitoringRecord> pipe : configuration.getTcpRelayPipes()) { + for (SpScPipe pipe : configuration.getTcpRelayPipes()) { maxNumWaits = Math.max(maxNumWaits, pipe.getNumWaits()); } System.out.println("max #waits of TcpRelayPipes: " + maxNumWaits); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java index 800b29b228ab32db4140af38be8d7aca9a43ddfe..84a3cc49d3d1b72d36562484f7fd077cea0261ba 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java @@ -33,8 +33,6 @@ import teetime.variant.methodcallWithPorts.framework.core.Analysis; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import util.StatisticsUtil; -import kieker.common.record.IMonitoringRecord; - /** * @author Christian Wulf * @@ -104,7 +102,7 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest { } int maxNumWaits = 0; - for (SpScPipe<IMonitoringRecord> pipe : configuration.getTcpRelayPipes()) { + for (SpScPipe pipe : configuration.getTcpRelayPipes()) { maxNumWaits = Math.max(maxNumWaits, pipe.getNumWaits()); } System.out.println("max #waits of TcpRelayPipes: " + maxNumWaits); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java index 650b54c557b0a6790c266882524b3c4049d78b05..a04d5e919fbf535cbd6e223107568e1d79ba1520 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java @@ -50,7 +50,7 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Conf private final StageFactory<Counter<TraceEventRecords>> traceCounterFactory; private final StageFactory<ElementThroughputMeasuringStage<TraceEventRecords>> traceThroughputFilterFactory; - private final List<SpScPipe<IMonitoringRecord>> tcpRelayPipes = new LinkedList<SpScPipe<IMonitoringRecord>>(); + private final List<SpScPipe> tcpRelayPipes = new LinkedList<SpScPipe>(); @SuppressWarnings({ "rawtypes", "unchecked" }) public TcpTraceReconstructionAnalysisWithThreadsConfiguration() { @@ -165,7 +165,7 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Conf // EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>(); // connect stages - SpScPipe<IMonitoringRecord> tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getNewOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE); + SpScPipe tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getNewOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE); this.tcpRelayPipes.add(tcpRelayPipe); // SysOutFilter<TraceEventRecords> sysout = new SysOutFilter<TraceEventRecords>(tcpRelayPipe); @@ -244,7 +244,7 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Conf return numTraceMetadatas; } - public List<SpScPipe<IMonitoringRecord>> getTcpRelayPipes() { + public List<SpScPipe> getTcpRelayPipes() { return this.tcpRelayPipes; } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java index 3f6564aeb15341945a9a8d41b5e1877dfe164a7b..2a0f4b07840ff9f28bae5a51fbb17dd4e93f9929 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java @@ -49,7 +49,7 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis { private Thread clock2Thread; private Thread[] workerThreads; - private SpScPipe<IMonitoringRecord> tcpRelayPipe; + private SpScPipe tcpRelayPipe; private int numWorkerThreads; @Override @@ -155,7 +155,8 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis { } } - private HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> buildPipeline(final HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> tcpReaderPipeline, + private HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> buildPipeline( + final HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> tcpReaderPipeline, final HeadPipeline<Clock, Distributor<Long>> clockStage, final HeadPipeline<Clock, Distributor<Long>> clock2Stage) { // create stages @@ -257,7 +258,7 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis { return throughputs; } - public SpScPipe<IMonitoringRecord> getTcpRelayPipe() { + public SpScPipe getTcpRelayPipe() { return this.tcpRelayPipe; }