diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java index e09d4fe8bc24a58ce361979785c0ac53536fe1c4..e7d500b577a59362ee18f0704c032361d2ffe474 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java @@ -1,6 +1,7 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; import teetime.variant.methodcallWithPorts.framework.core.InputPort; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; import teetime.variant.methodcallWithPorts.framework.core.Signal; public interface IPipe<T> { @@ -25,4 +26,6 @@ public interface IPipe<T> { void setSignal(Signal signal); + void connectPorts(OutputPort<T> sourcePort, InputPort<T> targetPort); + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipeFactory.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipeFactory.java index 8e55a09441a205da9179e3713d4c0b83cb55c0a4..b0b1b7a8055aa131a4fde12361458ef7e893cd79 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipeFactory.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipeFactory.java @@ -1,7 +1,16 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; +import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering; +import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication; + public interface IPipeFactory { <T> IPipe<T> create(int capacity); + ThreadCommunication getThreadCommunication(); + + PipeOrdering getOrdering(); + + boolean isGrowable(); + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java index d289f4c805209f3e14868750a179f3b3dffdcf0d..270556dc16a2f816963d8d78510a833dad94c4f1 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java @@ -18,10 +18,16 @@ public class OrderedGrowableArrayPipe<T> extends IntraThreadPipe<T> { this.elements = new CircularArray<T>(initialCapacity); } + @Deprecated public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) { IPipe<T> pipe = new OrderedGrowableArrayPipe<T>(); - sourcePort.setPipe(pipe); - targetPort.setPipe(pipe); + pipe.connectPorts(sourcePort, targetPort); + } + + @Override + public void connectPorts(final OutputPort<T> sourcePort, final InputPort<T> targetPort) { + sourcePort.setPipe(this); + targetPort.setPipe(this); sourcePort.setCachedTargetStage(targetPort.getOwningStage()); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipeFactory.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipeFactory.java index 80445efedce4738bc37c9a1d05a43b29b4ca1509..b7d10d2e4222a84539348d330a62123c60905521 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipeFactory.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipeFactory.java @@ -1,5 +1,8 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; +import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering; +import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication; + public class OrderedGrowableArrayPipeFactory implements IPipeFactory { /** @@ -10,4 +13,19 @@ public class OrderedGrowableArrayPipeFactory implements IPipeFactory { return new OrderedGrowableArrayPipe<T>(); } + @Override + public ThreadCommunication getThreadCommunication() { + return ThreadCommunication.INTRA; + } + + @Override + public PipeOrdering getOrdering() { + return PipeOrdering.QUEUE_BASED; + } + + @Override + public boolean isGrowable() { + return true; + } + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java index a69db3732b8367ac9e8f58b0a22ec804521dfdd1..4c22e36cf97f8a4b6d0c9cace4bcebfecd6675b6 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java @@ -17,10 +17,16 @@ public class OrderedGrowablePipe<T> extends IntraThreadPipe<T> { this.elements = new LinkedList<T>(); } + @Deprecated public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) { IPipe<T> pipe = new OrderedGrowablePipe<T>(); - sourcePort.setPipe(pipe); - targetPort.setPipe(pipe); + pipe.connectPorts(sourcePort, targetPort); + } + + @Override + public void connectPorts(final OutputPort<T> sourcePort, final InputPort<T> targetPort) { + sourcePort.setPipe(this); + targetPort.setPipe(this); sourcePort.setCachedTargetStage(targetPort.getOwningStage()); } @@ -48,4 +54,5 @@ public class OrderedGrowablePipe<T> extends IntraThreadPipe<T> { public int size() { return this.elements.size(); } + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java index 8d8053f603df20c8503cd192d103b89ee3798742..eefd95f7462f9af1e96ed007d4d83f751d29b5e4 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java @@ -8,16 +8,22 @@ public class Pipe<T> extends IntraThreadPipe<T> { private final CommittableResizableArrayQueue<T> elements = new CommittableResizableArrayQueue<T>(null, 4); + @Deprecated public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) { IPipe<T> pipe = new Pipe<T>(); - sourcePort.setPipe(pipe); - targetPort.setPipe(pipe); + pipe.connectPorts(sourcePort, targetPort); + } + + @Override + public void connectPorts(final OutputPort<T> sourcePort, final InputPort<T> targetPort) { + sourcePort.setPipe(this); + targetPort.setPipe(this); sourcePort.setCachedTargetStage(targetPort.getOwningStage()); } /* * (non-Javadoc) - * + * * @see teetime.examples.throughput.methodcall.IPipe#add(T) */ @Override @@ -29,7 +35,7 @@ public class Pipe<T> extends IntraThreadPipe<T> { /* * (non-Javadoc) - * + * * @see teetime.examples.throughput.methodcall.IPipe#removeLast() */ @Override @@ -41,7 +47,7 @@ public class Pipe<T> extends IntraThreadPipe<T> { /* * (non-Javadoc) - * + * * @see teetime.examples.throughput.methodcall.IPipe#isEmpty() */ @Override @@ -51,7 +57,7 @@ public class Pipe<T> extends IntraThreadPipe<T> { /* * (non-Javadoc) - * + * * @see teetime.examples.throughput.methodcall.IPipe#readLast() */ @Override @@ -67,4 +73,5 @@ public class Pipe<T> extends IntraThreadPipe<T> { public int size() { return this.elements.size(); } + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/PipeFactory.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/PipeFactory.java index 5534a69dc2a30ae110ad28e593f97b7d7e8ec84e..1bc509be8538d78d6791fe11ffef08a3c5525ec2 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/PipeFactory.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/PipeFactory.java @@ -9,7 +9,7 @@ public class PipeFactory { INTER, INTRA } - public enum Ordering { + public enum PipeOrdering { /** * FIFO */ @@ -25,27 +25,28 @@ public class PipeFactory { /** * Creates a new FIFO-ordered, growable pipe with an initial capacity of 1. <br> - * <i>This method is suitable for most programmers.</i> + * <i>This method is suitable for most situations.</i> * * @param tc * @return */ public <T> IPipe<T> create(final ThreadCommunication tc) { - return this.create(tc, Ordering.QUEUE_BASED, true, 1); + return this.create(tc, PipeOrdering.QUEUE_BASED, true, 1); } - public <T> IPipe<T> create(final ThreadCommunication tc, final Ordering ordering, final boolean growable, final int capacity) { + public <T> IPipe<T> create(final ThreadCommunication tc, final PipeOrdering ordering, final boolean growable, final int capacity) { String key = this.buildKey(tc, ordering, growable); IPipeFactory pipeClass = this.pipeFactories.get(key); return pipeClass.create(capacity); } - private String buildKey(final ThreadCommunication tc, final Ordering ordering, final boolean growable) { + private String buildKey(final ThreadCommunication tc, final PipeOrdering ordering, final boolean growable) { return tc.toString() + ordering.toString() + growable; } - public void register(final IPipeFactory pipeFactory, final ThreadCommunication tc, final Ordering ordering, final boolean growable) { - String key = this.buildKey(tc, ordering, growable); + public void register(final IPipeFactory pipeFactory) { + String key = this.buildKey(pipeFactory.getThreadCommunication(), pipeFactory.getOrdering(), pipeFactory.isGrowable()); this.pipeFactories.put(key, pipeFactory); } + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java index c5e02be9c013da476c9d4e5cc9749365a896a878..aebe4b589decebb337e83567caf4740f9f30c174 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java @@ -7,10 +7,16 @@ public class SingleElementPipe<T> extends IntraThreadPipe<T> { private T element; + @Deprecated public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) { IPipe<T> pipe = new SingleElementPipe<T>(); - sourcePort.setPipe(pipe); - targetPort.setPipe(pipe); + pipe.connectPorts(sourcePort, targetPort); + } + + @Override + public void connectPorts(final OutputPort<T> sourcePort, final InputPort<T> targetPort) { + sourcePort.setPipe(this); + targetPort.setPipe(this); sourcePort.setCachedTargetStage(targetPort.getOwningStage()); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipeFactory.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipeFactory.java index a6f83738f318afa60565c9d7d5d1aacc8f5b3ebd..f557d3e7bb5a9c92bfe940e71291090586f4aebb 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipeFactory.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipeFactory.java @@ -1,5 +1,8 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; +import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering; +import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication; + public class SingleElementPipeFactory implements IPipeFactory { /** @@ -10,4 +13,16 @@ public class SingleElementPipeFactory implements IPipeFactory { return new SingleElementPipe<T>(); } + public ThreadCommunication getThreadCommunication() { + return ThreadCommunication.INTRA; + } + + public PipeOrdering getOrdering() { + return PipeOrdering.ARBITRARY; + } + + public boolean isGrowable() { + return false; + } + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java index 1c8994ccabc76c6cab9212f73cc5a4004a89303a..ef95689501c1e1608ebd95444d40ea2efd3410de 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java @@ -24,16 +24,18 @@ public class SpScPipe<T> extends AbstractPipe<T> { this.queue = QueueFactory.newQueue(concurrentQueueSpec); } + @Deprecated public static <T> SpScPipe<T> connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort, final int capacity) { SpScPipe<T> pipe = new SpScPipe<T>(capacity); - return pipe.connect(sourcePort, targetPort); + pipe.connectPorts(sourcePort, targetPort); + return pipe; } - public SpScPipe<T> connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) { + @Override + public void connectPorts(final OutputPort<T> sourcePort, final InputPort<T> targetPort) { targetPort.setPipe(this); sourcePort.setPipe(this); sourcePort.setCachedTargetStage(targetPort.getOwningStage()); - return this; } @Override diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipeFactory.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipeFactory.java index 745823c1576dbbff09e2c37adfa456abeb7a21e2..25d681d248b63f7a377b4bdfa3d31a5aa86d434a 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipeFactory.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipeFactory.java @@ -1,5 +1,8 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; +import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering; +import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication; + public class SpScPipeFactory implements IPipeFactory { @Override @@ -7,4 +10,18 @@ public class SpScPipeFactory implements IPipeFactory { return new SpScPipe<T>(capacity); } + @Override + public ThreadCommunication getThreadCommunication() { + return ThreadCommunication.INTER; + } + + @Override + public PipeOrdering getOrdering() { + return PipeOrdering.QUEUE_BASED; + } + + @Override + public boolean isGrowable() { + return false; + } } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java index 2b908268929cd301933b12f0a58d78eb41f483c6..71cc7e87fb9b436ebb46fc7b2276408d9638d2c6 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java @@ -17,10 +17,16 @@ public class UnorderedGrowablePipe<T> extends IntraThreadPipe<T> { this.elements = (T[]) new Object[this.MIN_CAPACITY]; } + @Deprecated public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) { IPipe<T> pipe = new UnorderedGrowablePipe<T>(); - sourcePort.setPipe(pipe); - targetPort.setPipe(pipe); + pipe.connectPorts(sourcePort, targetPort); + } + + @Override + public void connectPorts(final OutputPort<T> sourcePort, final InputPort<T> targetPort) { + sourcePort.setPipe(this); + targetPort.setPipe(this); sourcePort.setCachedTargetStage(targetPort.getOwningStage()); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipeFactory.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipeFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..362559e9e556de851e606f9d0f4bb08f97c3deb7 --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipeFactory.java @@ -0,0 +1,31 @@ +package teetime.variant.methodcallWithPorts.framework.core.pipe; + +import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering; +import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication; + +public class UnorderedGrowablePipeFactory implements IPipeFactory { + + /** + * Hint: The capacity for this pipe implementation is ignored + */ + @Override + public <T> IPipe<T> create(final int capacity) { + return new UnorderedGrowablePipe<T>(); + } + + @Override + public ThreadCommunication getThreadCommunication() { + return ThreadCommunication.INTRA; + } + + @Override + public PipeOrdering getOrdering() { + return PipeOrdering.STACK_BASED; + } + + @Override + public boolean isGrowable() { + return true; + } + +} diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderConfiguration.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderConfiguration.java index f1adf9271d72ccfe53323e8bc6bdb65adb43c612..149f97cb893bced1b5dee2eff3ed12b02769a2da 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderConfiguration.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderConfiguration.java @@ -22,8 +22,15 @@ import java.util.List; import teetime.variant.methodcallWithPorts.framework.core.Configuration; import teetime.variant.methodcallWithPorts.framework.core.Pipeline; import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; -import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; +import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe; +import teetime.variant.methodcallWithPorts.framework.core.pipe.OrderedGrowableArrayPipeFactory; +import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory; +import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering; +import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication; +import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipeFactory; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; +import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipeFactory; +import teetime.variant.methodcallWithPorts.framework.core.pipe.UnorderedGrowablePipeFactory; import teetime.variant.methodcallWithPorts.stage.CollectorSink; import teetime.variant.methodcallWithPorts.stage.kieker.Dir2RecordsFilter; import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository; @@ -38,6 +45,16 @@ import kieker.common.record.IMonitoringRecord; public class RecordReaderConfiguration extends Configuration { private final List<IMonitoringRecord> elementCollection = new LinkedList<IMonitoringRecord>(); + private final PipeFactory pipeFactory; + + public RecordReaderConfiguration() { + // BETTER instantiate one single pipe factory for all analyzes and register all available pipe implementations once + this.pipeFactory = new PipeFactory(); + this.pipeFactory.register(new SingleElementPipeFactory()); + this.pipeFactory.register(new OrderedGrowableArrayPipeFactory()); + this.pipeFactory.register(new UnorderedGrowablePipeFactory()); + this.pipeFactory.register(new SpScPipeFactory()); + } public void buildConfiguration() { StageWithPort producerPipeline = this.buildProducerPipeline(); @@ -54,7 +71,8 @@ public class RecordReaderConfiguration extends Configuration { pipeline.setFirstStage(dir2RecordsFilter); pipeline.setLastStage(collector); - SingleElementPipe.connect(dir2RecordsFilter.getOutputPort(), collector.getInputPort()); + IPipe<IMonitoringRecord> pipe = this.pipeFactory.create(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false, 1); + pipe.connectPorts(dir2RecordsFilter.getOutputPort(), collector.getInputPort()); dir2RecordsFilter.getInputPort().setPipe(new SpScPipe<File>(1)); dir2RecordsFilter.getInputPort().getPipe().add(new File("src/test/data/bookstore-logs"));