Skip to content
Snippets Groups Projects
Commit 22b05b27 authored by Christian Wulf's avatar Christian Wulf
Browse files

added further pipe factories

parent e72b0b33
No related branches found
No related tags found
No related merge requests found
Showing
with 172 additions and 26 deletions
package teetime.variant.methodcallWithPorts.framework.core.pipe;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import teetime.variant.methodcallWithPorts.framework.core.Signal;
public interface IPipe<T> {
......@@ -25,4 +26,6 @@ public interface IPipe<T> {
void setSignal(Signal signal);
void connectPorts(OutputPort<T> sourcePort, InputPort<T> targetPort);
}
package teetime.variant.methodcallWithPorts.framework.core.pipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering;
import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication;
public interface IPipeFactory {
<T> IPipe<T> create(int capacity);
ThreadCommunication getThreadCommunication();
PipeOrdering getOrdering();
boolean isGrowable();
}
......@@ -18,10 +18,16 @@ public class OrderedGrowableArrayPipe<T> extends IntraThreadPipe<T> {
this.elements = new CircularArray<T>(initialCapacity);
}
@Deprecated
public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
IPipe<T> pipe = new OrderedGrowableArrayPipe<T>();
sourcePort.setPipe(pipe);
targetPort.setPipe(pipe);
pipe.connectPorts(sourcePort, targetPort);
}
@Override
public void connectPorts(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
sourcePort.setPipe(this);
targetPort.setPipe(this);
sourcePort.setCachedTargetStage(targetPort.getOwningStage());
}
......
package teetime.variant.methodcallWithPorts.framework.core.pipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering;
import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication;
public class OrderedGrowableArrayPipeFactory implements IPipeFactory {
/**
......@@ -10,4 +13,19 @@ public class OrderedGrowableArrayPipeFactory implements IPipeFactory {
return new OrderedGrowableArrayPipe<T>();
}
@Override
public ThreadCommunication getThreadCommunication() {
return ThreadCommunication.INTRA;
}
@Override
public PipeOrdering getOrdering() {
return PipeOrdering.QUEUE_BASED;
}
@Override
public boolean isGrowable() {
return true;
}
}
......@@ -17,10 +17,16 @@ public class OrderedGrowablePipe<T> extends IntraThreadPipe<T> {
this.elements = new LinkedList<T>();
}
@Deprecated
public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
IPipe<T> pipe = new OrderedGrowablePipe<T>();
sourcePort.setPipe(pipe);
targetPort.setPipe(pipe);
pipe.connectPorts(sourcePort, targetPort);
}
@Override
public void connectPorts(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
sourcePort.setPipe(this);
targetPort.setPipe(this);
sourcePort.setCachedTargetStage(targetPort.getOwningStage());
}
......@@ -48,4 +54,5 @@ public class OrderedGrowablePipe<T> extends IntraThreadPipe<T> {
public int size() {
return this.elements.size();
}
}
......@@ -8,16 +8,22 @@ public class Pipe<T> extends IntraThreadPipe<T> {
private final CommittableResizableArrayQueue<T> elements = new CommittableResizableArrayQueue<T>(null, 4);
@Deprecated
public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
IPipe<T> pipe = new Pipe<T>();
sourcePort.setPipe(pipe);
targetPort.setPipe(pipe);
pipe.connectPorts(sourcePort, targetPort);
}
@Override
public void connectPorts(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
sourcePort.setPipe(this);
targetPort.setPipe(this);
sourcePort.setCachedTargetStage(targetPort.getOwningStage());
}
/*
* (non-Javadoc)
*
*
* @see teetime.examples.throughput.methodcall.IPipe#add(T)
*/
@Override
......@@ -29,7 +35,7 @@ public class Pipe<T> extends IntraThreadPipe<T> {
/*
* (non-Javadoc)
*
*
* @see teetime.examples.throughput.methodcall.IPipe#removeLast()
*/
@Override
......@@ -41,7 +47,7 @@ public class Pipe<T> extends IntraThreadPipe<T> {
/*
* (non-Javadoc)
*
*
* @see teetime.examples.throughput.methodcall.IPipe#isEmpty()
*/
@Override
......@@ -51,7 +57,7 @@ public class Pipe<T> extends IntraThreadPipe<T> {
/*
* (non-Javadoc)
*
*
* @see teetime.examples.throughput.methodcall.IPipe#readLast()
*/
@Override
......@@ -67,4 +73,5 @@ public class Pipe<T> extends IntraThreadPipe<T> {
public int size() {
return this.elements.size();
}
}
......@@ -9,7 +9,7 @@ public class PipeFactory {
INTER, INTRA
}
public enum Ordering {
public enum PipeOrdering {
/**
* FIFO
*/
......@@ -25,27 +25,28 @@ public class PipeFactory {
/**
* Creates a new FIFO-ordered, growable pipe with an initial capacity of 1. <br>
* <i>This method is suitable for most programmers.</i>
* <i>This method is suitable for most situations.</i>
*
* @param tc
* @return
*/
public <T> IPipe<T> create(final ThreadCommunication tc) {
return this.create(tc, Ordering.QUEUE_BASED, true, 1);
return this.create(tc, PipeOrdering.QUEUE_BASED, true, 1);
}
public <T> IPipe<T> create(final ThreadCommunication tc, final Ordering ordering, final boolean growable, final int capacity) {
public <T> IPipe<T> create(final ThreadCommunication tc, final PipeOrdering ordering, final boolean growable, final int capacity) {
String key = this.buildKey(tc, ordering, growable);
IPipeFactory pipeClass = this.pipeFactories.get(key);
return pipeClass.create(capacity);
}
private String buildKey(final ThreadCommunication tc, final Ordering ordering, final boolean growable) {
private String buildKey(final ThreadCommunication tc, final PipeOrdering ordering, final boolean growable) {
return tc.toString() + ordering.toString() + growable;
}
public void register(final IPipeFactory pipeFactory, final ThreadCommunication tc, final Ordering ordering, final boolean growable) {
String key = this.buildKey(tc, ordering, growable);
public void register(final IPipeFactory pipeFactory) {
String key = this.buildKey(pipeFactory.getThreadCommunication(), pipeFactory.getOrdering(), pipeFactory.isGrowable());
this.pipeFactories.put(key, pipeFactory);
}
}
......@@ -7,10 +7,16 @@ public class SingleElementPipe<T> extends IntraThreadPipe<T> {
private T element;
@Deprecated
public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
IPipe<T> pipe = new SingleElementPipe<T>();
sourcePort.setPipe(pipe);
targetPort.setPipe(pipe);
pipe.connectPorts(sourcePort, targetPort);
}
@Override
public void connectPorts(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
sourcePort.setPipe(this);
targetPort.setPipe(this);
sourcePort.setCachedTargetStage(targetPort.getOwningStage());
}
......
package teetime.variant.methodcallWithPorts.framework.core.pipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering;
import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication;
public class SingleElementPipeFactory implements IPipeFactory {
/**
......@@ -10,4 +13,16 @@ public class SingleElementPipeFactory implements IPipeFactory {
return new SingleElementPipe<T>();
}
public ThreadCommunication getThreadCommunication() {
return ThreadCommunication.INTRA;
}
public PipeOrdering getOrdering() {
return PipeOrdering.ARBITRARY;
}
public boolean isGrowable() {
return false;
}
}
......@@ -24,16 +24,18 @@ public class SpScPipe<T> extends AbstractPipe<T> {
this.queue = QueueFactory.newQueue(concurrentQueueSpec);
}
@Deprecated
public static <T> SpScPipe<T> connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort, final int capacity) {
SpScPipe<T> pipe = new SpScPipe<T>(capacity);
return pipe.connect(sourcePort, targetPort);
pipe.connectPorts(sourcePort, targetPort);
return pipe;
}
public SpScPipe<T> connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
@Override
public void connectPorts(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
targetPort.setPipe(this);
sourcePort.setPipe(this);
sourcePort.setCachedTargetStage(targetPort.getOwningStage());
return this;
}
@Override
......
package teetime.variant.methodcallWithPorts.framework.core.pipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering;
import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication;
public class SpScPipeFactory implements IPipeFactory {
@Override
......@@ -7,4 +10,18 @@ public class SpScPipeFactory implements IPipeFactory {
return new SpScPipe<T>(capacity);
}
@Override
public ThreadCommunication getThreadCommunication() {
return ThreadCommunication.INTER;
}
@Override
public PipeOrdering getOrdering() {
return PipeOrdering.QUEUE_BASED;
}
@Override
public boolean isGrowable() {
return false;
}
}
......@@ -17,10 +17,16 @@ public class UnorderedGrowablePipe<T> extends IntraThreadPipe<T> {
this.elements = (T[]) new Object[this.MIN_CAPACITY];
}
@Deprecated
public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
IPipe<T> pipe = new UnorderedGrowablePipe<T>();
sourcePort.setPipe(pipe);
targetPort.setPipe(pipe);
pipe.connectPorts(sourcePort, targetPort);
}
@Override
public void connectPorts(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
sourcePort.setPipe(this);
targetPort.setPipe(this);
sourcePort.setCachedTargetStage(targetPort.getOwningStage());
}
......
package teetime.variant.methodcallWithPorts.framework.core.pipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering;
import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication;
public class UnorderedGrowablePipeFactory implements IPipeFactory {
/**
* Hint: The capacity for this pipe implementation is ignored
*/
@Override
public <T> IPipe<T> create(final int capacity) {
return new UnorderedGrowablePipe<T>();
}
@Override
public ThreadCommunication getThreadCommunication() {
return ThreadCommunication.INTRA;
}
@Override
public PipeOrdering getOrdering() {
return PipeOrdering.STACK_BASED;
}
@Override
public boolean isGrowable() {
return true;
}
}
......@@ -22,8 +22,15 @@ import java.util.List;
import teetime.variant.methodcallWithPorts.framework.core.Configuration;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.OrderedGrowableArrayPipeFactory;
import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory;
import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering;
import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipeFactory;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipeFactory;
import teetime.variant.methodcallWithPorts.framework.core.pipe.UnorderedGrowablePipeFactory;
import teetime.variant.methodcallWithPorts.stage.CollectorSink;
import teetime.variant.methodcallWithPorts.stage.kieker.Dir2RecordsFilter;
import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository;
......@@ -38,6 +45,16 @@ import kieker.common.record.IMonitoringRecord;
public class RecordReaderConfiguration extends Configuration {
private final List<IMonitoringRecord> elementCollection = new LinkedList<IMonitoringRecord>();
private final PipeFactory pipeFactory;
public RecordReaderConfiguration() {
// BETTER instantiate one single pipe factory for all analyzes and register all available pipe implementations once
this.pipeFactory = new PipeFactory();
this.pipeFactory.register(new SingleElementPipeFactory());
this.pipeFactory.register(new OrderedGrowableArrayPipeFactory());
this.pipeFactory.register(new UnorderedGrowablePipeFactory());
this.pipeFactory.register(new SpScPipeFactory());
}
public void buildConfiguration() {
StageWithPort producerPipeline = this.buildProducerPipeline();
......@@ -54,7 +71,8 @@ public class RecordReaderConfiguration extends Configuration {
pipeline.setFirstStage(dir2RecordsFilter);
pipeline.setLastStage(collector);
SingleElementPipe.connect(dir2RecordsFilter.getOutputPort(), collector.getInputPort());
IPipe<IMonitoringRecord> pipe = this.pipeFactory.create(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false, 1);
pipe.connectPorts(dir2RecordsFilter.getOutputPort(), collector.getInputPort());
dir2RecordsFilter.getInputPort().setPipe(new SpScPipe<File>(1));
dir2RecordsFilter.getInputPort().getPipe().add(new File("src/test/data/bookstore-logs"));
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment