Skip to content
Snippets Groups Projects
Commit c1c277c5 authored by Christian Wulf's avatar Christian Wulf
Browse files

removed type parameter from pipes

parent c82b9505
No related branches found
No related tags found
No related merge requests found
Showing
with 92 additions and 96 deletions
......@@ -5,16 +5,14 @@ import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
import kieker.common.record.IMonitoringRecord;
public class SysOutFilter<T> extends ConsumerStage<T> {
private final InputPort<Long> triggerInputPort = this.createInputPort();
private final OutputPort<T> outputPort = this.createOutputPort();
private final IPipe<IMonitoringRecord> pipe;
private final IPipe pipe;
public SysOutFilter(final IPipe<IMonitoringRecord> pipe) {
public SysOutFilter(final IPipe pipe) {
this.pipe = pipe;
}
......
......@@ -4,7 +4,7 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
public abstract class AbstractPort<T> {
protected IPipe<T> pipe;
protected IPipe pipe;
/**
* The type of this port.
* <p>
......@@ -13,11 +13,11 @@ public abstract class AbstractPort<T> {
*/
protected Class<T> type;
public IPipe<T> getPipe() {
public IPipe getPipe() {
return this.pipe;
}
public void setPipe(final IPipe<T> pipe) {
public void setPipe(final IPipe pipe) {
this.pipe = pipe;
}
......
......@@ -147,7 +147,7 @@ public abstract class AbstractStage implements StageWithPort {
@Override
public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) {
for (OutputPort<?> outputPort : this.getOutputPorts()) {
IPipe<?> pipe = outputPort.getPipe();
IPipe pipe = outputPort.getPipe();
if (null != pipe) { // if output port is connected with another one
Class<?> sourcePortType = outputPort.getType();
Class<?> targetPortType = pipe.getTargetPort().getType();
......
......@@ -12,12 +12,14 @@ public class InputPort<T> extends AbstractPort<T> {
}
public T receive() {
T element = this.pipe.removeLast();
@SuppressWarnings("unchecked")
T element = (T) this.pipe.removeLast();
return element;
}
public T read() {
T element = this.pipe.readLast();
@SuppressWarnings("unchecked")
T element = (T) this.pipe.readLast();
return element;
}
......@@ -27,7 +29,7 @@ public class InputPort<T> extends AbstractPort<T> {
* @param pipe
*/
@Override
public void setPipe(final IPipe<T> pipe) {
public void setPipe(final IPipe pipe) {
this.pipe = pipe;
pipe.setTargetPort(this);
}
......
......@@ -4,9 +4,9 @@ import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
public abstract class AbstractPipe<T> implements IPipe<T> {
public abstract class AbstractPipe implements IPipe {
private InputPort<T> targetPort;
private InputPort<?> targetPort;
/**
* Performance cache: Avoids the following method chain
......@@ -18,18 +18,18 @@ public abstract class AbstractPipe<T> implements IPipe<T> {
protected StageWithPort cachedTargetStage;
@Override
public InputPort<T> getTargetPort() {
public InputPort<?> getTargetPort() {
return this.targetPort;
}
@Override
public void setTargetPort(final InputPort<T> targetPort) {
public void setTargetPort(final InputPort<?> targetPort) {
this.targetPort = targetPort;
this.cachedTargetStage = targetPort.getOwningStage();
}
@Override
public void connectPorts(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
public <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
sourcePort.setPipe(this);
targetPort.setPipe(this);
}
......
......@@ -4,13 +4,13 @@ import teetime.util.list.CommittableResizableArrayQueue;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
public final class CommittablePipe<T> extends IntraThreadPipe<T> {
public final class CommittablePipe extends IntraThreadPipe {
private final CommittableResizableArrayQueue<T> elements = new CommittableResizableArrayQueue<T>(null, 4);
private final CommittableResizableArrayQueue<Object> elements = new CommittableResizableArrayQueue<Object>(null, 4);
@Deprecated
public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
IPipe<T> pipe = new CommittablePipe<T>();
public static <T> void connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
IPipe pipe = new CommittablePipe();
pipe.connectPorts(sourcePort, targetPort);
}
......@@ -20,7 +20,7 @@ public final class CommittablePipe<T> extends IntraThreadPipe<T> {
* @see teetime.examples.throughput.methodcall.IPipe#add(T)
*/
@Override
public boolean add(final T element) {
public boolean add(final Object element) {
this.elements.addToTailUncommitted(element);
this.elements.commit();
return true;
......@@ -32,8 +32,8 @@ public final class CommittablePipe<T> extends IntraThreadPipe<T> {
* @see teetime.examples.throughput.methodcall.IPipe#removeLast()
*/
@Override
public T removeLast() {
T element = this.elements.removeFromHeadUncommitted();
public Object removeLast() {
Object element = this.elements.removeFromHeadUncommitted();
this.elements.commit();
return element;
}
......@@ -54,11 +54,11 @@ public final class CommittablePipe<T> extends IntraThreadPipe<T> {
* @see teetime.examples.throughput.methodcall.IPipe#readLast()
*/
@Override
public T readLast() {
public Object readLast() {
return this.elements.getTail();
}
public CommittableResizableArrayQueue<T> getElements() {
public CommittableResizableArrayQueue<?> getElements() {
return this.elements;
}
......
......@@ -4,30 +4,25 @@ import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import teetime.variant.methodcallWithPorts.framework.core.signal.Signal;
public interface IPipe<T> {
public interface IPipe {
boolean add(T element);
T removeLast();
boolean add(Object element);
boolean isEmpty();
int size();
T readLast();
Object removeLast();
// void close();
//
// boolean isClosed();
Object readLast();
InputPort<T> getTargetPort();
InputPort<?> getTargetPort();
void setTargetPort(InputPort<T> targetPort);
void setTargetPort(InputPort<?> targetPort);
void setSignal(Signal signal);
// BETTER change signature to allow {OutputPort<T>, OutputPort<A0 extends T>, OutputPort<A1 extends T>, ...}
void connectPorts(OutputPort<T> sourcePort, InputPort<T> targetPort);
<T> void connectPorts(OutputPort<? extends T> sourcePort, InputPort<T> targetPort);
void reportNewElement();
......
......@@ -5,7 +5,7 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.Threa
public interface IPipeFactory {
<T> IPipe<T> create(int capacity);
IPipe create(int capacity);
ThreadCommunication getThreadCommunication();
......
......@@ -4,7 +4,7 @@ import java.util.concurrent.atomic.AtomicReference;
import teetime.variant.methodcallWithPorts.framework.core.signal.Signal;
public abstract class InterThreadPipe<T> extends AbstractPipe<T> {
public abstract class InterThreadPipe extends AbstractPipe {
private final AtomicReference<Signal> signal = new AtomicReference<Signal>();
......
......@@ -2,7 +2,7 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe;
import teetime.variant.methodcallWithPorts.framework.core.signal.Signal;
public abstract class IntraThreadPipe<T> extends AbstractPipe<T> {
public abstract class IntraThreadPipe extends AbstractPipe {
@Override
public void setSignal(final Signal signal) {
......
......@@ -4,9 +4,9 @@ import teetime.util.concurrent.workstealing.CircularArray;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
public final class OrderedGrowableArrayPipe<T> extends IntraThreadPipe<T> {
public final class OrderedGrowableArrayPipe extends IntraThreadPipe {
private CircularArray<T> elements;
private final CircularArray<Object> elements;
private int head;
private int tail;
......@@ -15,23 +15,23 @@ public final class OrderedGrowableArrayPipe<T> extends IntraThreadPipe<T> {
}
public OrderedGrowableArrayPipe(final int initialCapacity) {
this.elements = new CircularArray<T>(initialCapacity);
this.elements = new CircularArray<Object>(initialCapacity);
}
@Deprecated
public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
IPipe<T> pipe = new OrderedGrowableArrayPipe<T>();
public static <T> void connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
IPipe pipe = new OrderedGrowableArrayPipe();
pipe.connectPorts(sourcePort, targetPort);
}
@Override
public boolean add(final T element) {
public boolean add(final Object element) {
this.elements.put(this.tail++, element);
return true;
}
@Override
public T removeLast() {
public Object removeLast() {
if (this.head < this.tail) {
return this.elements.get(this.head++);
} else {
......@@ -45,7 +45,7 @@ public final class OrderedGrowableArrayPipe<T> extends IntraThreadPipe<T> {
}
@Override
public T readLast() {
public Object readLast() {
return this.elements.get(this.head);
}
......
......@@ -9,8 +9,8 @@ public class OrderedGrowableArrayPipeFactory implements IPipeFactory {
* Hint: The capacity for this pipe implementation is ignored
*/
@Override
public <T> IPipe<T> create(final int capacity) {
return new OrderedGrowableArrayPipe<T>();
public IPipe create(final int capacity) {
return new OrderedGrowableArrayPipe();
}
@Override
......
......@@ -5,31 +5,31 @@ import java.util.LinkedList;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
public class OrderedGrowablePipe<T> extends IntraThreadPipe<T> {
public class OrderedGrowablePipe extends IntraThreadPipe {
private LinkedList<T> elements;
private final LinkedList<Object> elements;
public OrderedGrowablePipe() {
this(100000);
}
public OrderedGrowablePipe(final int initialCapacity) {
this.elements = new LinkedList<T>();
this.elements = new LinkedList<Object>();
}
@Deprecated
public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
IPipe<T> pipe = new OrderedGrowablePipe<T>();
public static <T> void connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
IPipe pipe = new OrderedGrowablePipe();
pipe.connectPorts(sourcePort, targetPort);
}
@Override
public boolean add(final T element) {
public boolean add(final Object element) {
return this.elements.offer(element);
}
@Override
public T removeLast() {
public Object removeLast() {
return this.elements.poll();
}
......@@ -39,7 +39,7 @@ public class OrderedGrowablePipe<T> extends IntraThreadPipe<T> {
}
@Override
public T readLast() {
public Object readLast() {
return this.elements.peek();
}
......
......@@ -50,11 +50,11 @@ public class PipeFactory {
* @param tc
* @return
*/
public <T> IPipe<T> create(final ThreadCommunication tc) {
public IPipe create(final ThreadCommunication tc) {
return this.create(tc, PipeOrdering.QUEUE_BASED, true, 1);
}
public <T> IPipe<T> create(final ThreadCommunication tc, final PipeOrdering ordering, final boolean growable, final int capacity) {
public IPipe create(final ThreadCommunication tc, final PipeOrdering ordering, final boolean growable, final int capacity) {
String key = this.buildKey(tc, ordering, growable);
IPipeFactory pipeClass = this.pipeFactories.get(key);
if (null == pipeClass) {
......
......@@ -2,7 +2,7 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe;
import teetime.util.ConstructorClosure;
public final class RelayTestPipe<T> extends InterThreadPipe<T> {
public final class RelayTestPipe<T> extends InterThreadPipe {
private int numInputObjects;
private final ConstructorClosure<T> inputObjectCreator;
......@@ -14,7 +14,7 @@ public final class RelayTestPipe<T> extends InterThreadPipe<T> {
}
@Override
public boolean add(final T element) {
public boolean add(final Object element) {
return false;
}
......
......@@ -3,29 +3,29 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
public final class SingleElementPipe<T> extends IntraThreadPipe<T> {
public final class SingleElementPipe extends IntraThreadPipe {
private T element;
private Object element;
SingleElementPipe() {
super();
}
@Deprecated
public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
IPipe<T> pipe = new SingleElementPipe<T>();
public static <T> void connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
IPipe pipe = new SingleElementPipe();
pipe.connectPorts(sourcePort, targetPort);
}
@Override
public boolean add(final T element) {
public boolean add(final Object element) {
this.element = element;
return true;
}
@Override
public T removeLast() {
T temp = this.element;
public Object removeLast() {
Object temp = this.element;
this.element = null;
return temp;
}
......@@ -36,7 +36,7 @@ public final class SingleElementPipe<T> extends IntraThreadPipe<T> {
}
@Override
public T readLast() {
public Object readLast() {
return this.element;
}
......
......@@ -9,18 +9,21 @@ public class SingleElementPipeFactory implements IPipeFactory {
* Hint: The capacity for this pipe implementation is ignored
*/
@Override
public <T> IPipe<T> create(final int capacity) {
return new SingleElementPipe<T>();
public IPipe create(final int capacity) {
return new SingleElementPipe();
}
@Override
public ThreadCommunication getThreadCommunication() {
return ThreadCommunication.INTRA;
}
@Override
public PipeOrdering getOrdering() {
return PipeOrdering.ARBITRARY;
}
@Override
public boolean isGrowable() {
return false;
}
......
......@@ -10,9 +10,9 @@ import org.jctools.queues.spec.Preference;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
public final class SpScPipe<T> extends InterThreadPipe<T> {
public final class SpScPipe extends InterThreadPipe {
private final Queue<T> queue;
private final Queue<Object> queue;
// statistics
private int numWaits;
......@@ -22,14 +22,14 @@ public final class SpScPipe<T> extends InterThreadPipe<T> {
}
@Deprecated
public static <T> SpScPipe<T> connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort, final int capacity) {
SpScPipe<T> pipe = new SpScPipe<T>(capacity);
public static <T> SpScPipe connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
SpScPipe pipe = new SpScPipe(capacity);
pipe.connectPorts(sourcePort, targetPort);
return pipe;
}
@Override
public boolean add(final T element) {
public boolean add(final Object element) {
// BETTER introduce a QueueIsFullStrategy
while (!this.queue.offer(element)) {
this.numWaits++;
......@@ -40,7 +40,7 @@ public final class SpScPipe<T> extends InterThreadPipe<T> {
}
@Override
public T removeLast() {
public Object removeLast() {
return this.queue.poll();
}
......@@ -55,7 +55,7 @@ public final class SpScPipe<T> extends InterThreadPipe<T> {
}
@Override
public T readLast() {
public Object readLast() {
return this.queue.peek();
}
......
......@@ -6,8 +6,8 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.Threa
public class SpScPipeFactory implements IPipeFactory {
@Override
public <T> IPipe<T> create(final int capacity) {
return new SpScPipe<T>(capacity);
public IPipe create(final int capacity) {
return new SpScPipe(capacity);
}
@Override
......
......@@ -3,28 +3,27 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
public final class UnorderedGrowablePipe<T> extends IntraThreadPipe<T> {
public final class UnorderedGrowablePipe extends IntraThreadPipe {
private final int MIN_CAPACITY;
private T[] elements;
private Object[] elements;
// private final ArrayWrapper2<T> elements = new ArrayWrapper2<T>(2);
private int lastFreeIndex;
@SuppressWarnings("unchecked")
UnorderedGrowablePipe() {
this.MIN_CAPACITY = 4;
this.elements = (T[]) new Object[this.MIN_CAPACITY];
this.elements = new Object[this.MIN_CAPACITY];
}
@Deprecated
public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
IPipe<T> pipe = new UnorderedGrowablePipe<T>();
public static <T> void connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
IPipe pipe = new UnorderedGrowablePipe();
pipe.connectPorts(sourcePort, targetPort);
}
@Override
public boolean add(final T element) {
public boolean add(final Object element) {
if (this.lastFreeIndex == this.elements.length) {
// if (this.lastFreeIndex == this.elements.getCapacity()) {
this.elements = this.grow();
......@@ -35,11 +34,11 @@ public final class UnorderedGrowablePipe<T> extends IntraThreadPipe<T> {
}
@Override
public T removeLast() {
public Object removeLast() {
// if (this.lastFreeIndex == 0) {
// return null;
// }
T element = this.elements[--this.lastFreeIndex];
Object element = this.elements[--this.lastFreeIndex];
this.elements[this.lastFreeIndex] = null;
// T element = this.elements.get(--this.lastFreeIndex);
return element;
......@@ -51,7 +50,7 @@ public final class UnorderedGrowablePipe<T> extends IntraThreadPipe<T> {
}
@Override
public T readLast() {
public Object readLast() {
return this.elements[this.lastFreeIndex - 1];
// return this.elements.get(this.lastFreeIndex - 1);
}
......@@ -61,7 +60,7 @@ public final class UnorderedGrowablePipe<T> extends IntraThreadPipe<T> {
return this.lastFreeIndex;
}
private T[] grow() {
private Object[] grow() {
int newSize = this.elements.length * 2;
// System.out.println("growing to " + newSize);
return this.newArray(newSize);
......@@ -73,9 +72,8 @@ public final class UnorderedGrowablePipe<T> extends IntraThreadPipe<T> {
// return this.newArray(newSize);
// }
private T[] newArray(final int newSize) {
@SuppressWarnings("unchecked")
T[] newElements = (T[]) new Object[newSize];
private Object[] newArray(final int newSize) {
Object[] newElements = new Object[newSize];
System.arraycopy(this.elements, 0, newElements, 0, this.elements.length);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment