From 35174f2d2d5e3dbc06a5560a28ba3ad89e276865 Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Fri, 29 Aug 2014 16:34:16 +0200 Subject: [PATCH] Introduced HeadStage --- .../framework/core/AbstractStage.java | 9 -- .../framework/core/Analysis.java | 6 +- .../framework/core/Configuration.java | 12 +- .../framework/core/ConsumerStage.java | 7 - .../framework/core/HeadPipeline.java | 14 ++ .../framework/core/HeadStage.java | 8 + .../framework/core/OutputPort.java | 18 --- .../framework/core/Pipeline.java | 142 +++--------------- .../framework/core/ProducerStage.java | 17 ++- .../framework/core/RunnableStage.java | 6 +- .../framework/core/StageWithPort.java | 8 - .../framework/core/pipe/AbstractPipe.java | 7 + .../core/pipe/OrderedGrowableArrayPipe.java | 7 - .../core/pipe/OrderedGrowablePipe.java | 7 - .../framework/core/pipe/Pipe.java | 7 - .../core/pipe/SingleElementPipe.java | 7 - .../framework/core/pipe/SpScPipe.java | 7 - .../core/pipe/UnorderedGrowablePipe.java | 1 - .../methodcallWithPorts/stage/Clock.java | 2 +- .../stage/ObjectProducer.java | 11 +- .../methodcallWithPorts/stage/Relay.java | 3 +- .../stage/basic/Delay.java | 8 +- .../stage/basic/merger/Merger.java | 9 -- .../explorviz/KiekerRecordTcpReader.java | 2 +- .../stage/io/TCPReader.java | 2 +- .../stage/kieker/Dir2RecordsFilter.java | 19 ++- .../stage/kieker/DirWithBin2RecordFilter.java | 1 - .../stage/kieker/DirWithDat2RecordFilter.java | 1 - .../stage/kieker/TCPReaderSink.java | 2 +- .../fileToRecord/DatFile2RecordFilter.java | 2 +- src/main/java/util/KiekerLoadDriver.java | 10 +- .../MethodCallThroughputAnalysis9.java | 4 +- .../MethodCallThroughputAnalysis10.java | 4 +- .../MethodCallThroughputAnalysis11.java | 4 +- .../MethodCallThroughputAnalysis14.java | 4 +- .../MethodCallThroughputAnalysis15.java | 6 +- .../MethodCallThroughputAnalysis16.java | 16 +- .../MethodCallThroughputAnalysis17.java | 6 +- .../MethodCallThroughputAnalysis18.java | 14 +- .../MethodCallThroughputAnalysis19.java | 10 +- .../kiekerdays/TcpTraceLoggingExplorviz.java | 4 +- .../kiekerdays/TcpTraceReconstruction.java | 10 +- .../kiekerdays/TcpTraceReduction.java | 16 +- .../RecordReaderConfiguration.java | 4 +- .../TcpTraceLoggingExtAnalysis.java | 10 +- .../TcpTraceReconstructionAnalysis.java | 23 +-- .../TraceReconstructionAnalysis.java | 4 +- ...ctionAnalysisWithThreadsConfiguration.java | 32 ++-- .../TcpTraceReductionAnalysisWithThreads.java | 33 ++-- submodules/JCTools | 2 +- 50 files changed, 195 insertions(+), 373 deletions(-) create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/framework/core/HeadPipeline.java create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/framework/core/HeadStage.java diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java index ab8a4be9..3eff9cfc 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java @@ -87,15 +87,6 @@ public abstract class AbstractStage implements StageWithPort { this.parentStage = parentStage; } - @Override - public boolean isReschedulable() { - return this.reschedulable; - } - - public void setReschedulable(final boolean reschedulable) { - this.reschedulable = reschedulable; - } - @Override public String getId() { return this.id; diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Analysis.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Analysis.java index 234a1c59..ee22541c 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Analysis.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Analysis.java @@ -21,17 +21,17 @@ public class Analysis { } public void init() { - for (StageWithPort stage : this.configuration.getConsumerStages()) { + for (HeadStage stage : this.configuration.getConsumerStages()) { Thread thread = new Thread(new RunnableStage(stage)); this.consumerThreads.add(thread); } - for (StageWithPort stage : this.configuration.getFiniteProducerStages()) { + for (HeadStage stage : this.configuration.getFiniteProducerStages()) { Thread thread = new Thread(new RunnableStage(stage)); this.finiteProducerThreads.add(thread); } - for (StageWithPort stage : this.configuration.getInfiniteProducerStages()) { + for (HeadStage stage : this.configuration.getInfiniteProducerStages()) { Thread thread = new Thread(new RunnableStage(stage)); this.infiniteProducerThreads.add(thread); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Configuration.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Configuration.java index fcac06f4..97667505 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Configuration.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Configuration.java @@ -5,19 +5,19 @@ import java.util.List; public class Configuration { - private final List<StageWithPort> consumerStages = new LinkedList<StageWithPort>(); - private final List<StageWithPort> finiteProducerStages = new LinkedList<StageWithPort>(); - private final List<StageWithPort> infiniteProducerStages = new LinkedList<StageWithPort>(); + private final List<HeadStage> consumerStages = new LinkedList<HeadStage>(); + private final List<HeadStage> finiteProducerStages = new LinkedList<HeadStage>(); + private final List<HeadStage> infiniteProducerStages = new LinkedList<HeadStage>(); - public List<StageWithPort> getConsumerStages() { + public List<HeadStage> getConsumerStages() { return this.consumerStages; } - public List<StageWithPort> getFiniteProducerStages() { + public List<HeadStage> getFiniteProducerStages() { return this.finiteProducerStages; } - public List<StageWithPort> getInfiniteProducerStages() { + public List<HeadStage> getInfiniteProducerStages() { return this.infiniteProducerStages; } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ConsumerStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ConsumerStage.java index f64c50f3..36c4893b 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ConsumerStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ConsumerStage.java @@ -12,9 +12,6 @@ public abstract class ConsumerStage<I> extends AbstractStage { public void executeWithPorts() { I element = this.getInputPort().receive(); - boolean isReschedulable = this.determineReschedulability(); - this.setReschedulable(isReschedulable); - this.execute(element); } @@ -23,10 +20,6 @@ public abstract class ConsumerStage<I> extends AbstractStage { // do nothing } - protected boolean determineReschedulability() { - return this.getInputPort().getPipe().size() > 0; - } - protected abstract void execute(I element); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/HeadPipeline.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/HeadPipeline.java new file mode 100644 index 00000000..f4e3e867 --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/HeadPipeline.java @@ -0,0 +1,14 @@ +package teetime.variant.methodcallWithPorts.framework.core; + +public class HeadPipeline<FirstStage extends HeadStage, LastStage extends StageWithPort> extends Pipeline<FirstStage, LastStage> implements HeadStage { + + @Override + public boolean shouldBeTerminated() { + return this.firstStage.shouldBeTerminated(); + } + + @Override + public void terminate() { + this.firstStage.terminate(); + } +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/HeadStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/HeadStage.java new file mode 100644 index 00000000..2c80f08c --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/HeadStage.java @@ -0,0 +1,8 @@ +package teetime.variant.methodcallWithPorts.framework.core; + +public interface HeadStage extends StageWithPort { + + boolean shouldBeTerminated(); + + void terminate(); +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java index 3ce89029..5d06dd47 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java @@ -4,15 +4,6 @@ import teetime.variant.methodcallWithPorts.framework.core.signal.Signal; public final class OutputPort<T> extends AbstractPort<T> { - /** - * Performance cache: Avoids the following method chain - * - * <pre> - * this.getPipe().getTargetPort().getOwningStage() - * </pre> - */ - // private StageWithPort cachedTargetStage; - OutputPort() { super(); } @@ -26,15 +17,6 @@ public final class OutputPort<T> extends AbstractPort<T> { return this.pipe.add(element); } - // public StageWithPort getCachedTargetStage() { - // return this.cachedTargetStage; - // } - - @Deprecated - public void setCachedTargetStage(final StageWithPort cachedTargetStage) { - // this.cachedTargetStage = cachedTargetStage; - } - public void sendSignal(final Signal signal) { this.pipe.setSignal(signal); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java index 018f39e0..6d063ea8 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java @@ -1,150 +1,54 @@ package teetime.variant.methodcallWithPorts.framework.core; -import java.util.Arrays; -import java.util.LinkedList; import java.util.List; -import java.util.UUID; import teetime.variant.methodcallWithPorts.framework.core.signal.Signal; -import teetime.variant.methodcallWithPorts.framework.core.signal.StartingSignal; import teetime.variant.methodcallWithPorts.framework.core.validation.InvalidPortConnection; -import kieker.common.logging.Log; -import kieker.common.logging.LogFactory; - -/** - * - * @author Christian Wulf - * - * @param <FirstStage> - * @param <LastStage> - */ -// BETTER remove the pipeline since it does not add any new functionality public class Pipeline<FirstStage extends StageWithPort, LastStage extends StageWithPort> implements StageWithPort { - private final String id; - /** - * A unique logger instance per stage instance - */ - protected Log logger; + protected FirstStage firstStage; + protected LastStage lastStage; - private FirstStage firstStage; - private final List<StageWithPort> intermediateStages = new LinkedList<StageWithPort>(); - private LastStage lastStage; - - private StageWithPort parentStage; - - // private final Set<StageWithPort<?, ?>> currentHeads = new HashSet<StageWithPort<?, ?>>(); - - public Pipeline() { - this(UUID.randomUUID().toString()); - } - - public Pipeline(final String id) { - this.id = id; // the id should only be represented by a UUID, not additionally by the class name - this.logger = LogFactory.getLog(this.id); - } - - @Override - public String getId() { - return this.id; - } - - public void setFirstStage(final FirstStage stage) { - this.firstStage = stage; + public FirstStage getFirstStage() { + return this.firstStage; } - public void addIntermediateStages(final StageWithPort... stages) { - this.intermediateStages.addAll(Arrays.asList(stages)); + public void setFirstStage(final FirstStage firstStage) { + this.firstStage = firstStage; } - public void addIntermediateStage(final StageWithPort stage) { - this.intermediateStages.add(stage); + public LastStage getLastStage() { + return this.lastStage; } - public void setLastStage(final LastStage stage) { - this.lastStage = stage; + public void setLastStage(final LastStage lastStage) { + this.lastStage = lastStage; } @Override - public void executeWithPorts() { - StageWithPort headStage = this.firstStage; - - // do { - headStage.executeWithPorts(); - // } while (headStage.isReschedulable()); - - // headStage.sendFinishedSignalToAllSuccessorStages(); - - // this.updateRescheduable(headStage); - - // this.setReschedulable(headStage.isReschedulable()); + public String getId() { + return this.firstStage.getId(); } - // private final void updateRescheduable(final StageWithPort<?, ?> stage) { - // StageWithPort<?, ?> currentStage = stage; - // do { - // this.firstStageIndex++; - // // currentStage = currentStage.getOutputPort().getPipe().getTargetStage(); // FIXME what to do with a stage with more than one output port? - // // if (currentStage == null) { // loop reaches the last stage - // if (this.firstStageIndex == this.stages.length) { // loop reaches the last stage - // this.setReschedulable(false); - // this.cleanUp(); - // return; - // } - // currentStage = this.stages[this.firstStageIndex]; - // currentStage.onIsPipelineHead(); - // } while (!currentStage.isReschedulable()); - // - // this.setReschedulable(true); - // } - @Override - public void onIsPipelineHead() { - // do nothing - } - - @Deprecated - public void onStarting() { - int size = 1 + this.intermediateStages.size() + 1; - StageWithPort[] stages = new StageWithPort[size]; - stages[0] = this.firstStage; - for (int i = 0; i < this.intermediateStages.size(); i++) { - StageWithPort stage = this.intermediateStages.get(i); - stages[1 + i] = stage; - } - stages[stages.length - 1] = this.lastStage; - - // for (int i = 0; i < this.stages.length; i++) { - // StageWithPort<?, ?> stage = this.stages[i]; - // stage.setParentStage(this, i); - // stage.setListener(this); - // } - - // for (int i = 0; i < this.stages.length - 1; i++) { - // StageWithPort stage = this.stages[i]; - // stage.setSuccessor(this.stages[i + 1]); - // } - // this.stages[this.stages.length - 1].setSuccessor(new EndStage<Object>()); - - for (StageWithPort stage : stages) { - stage.onSignal(new StartingSignal(), null); - } + public void executeWithPorts() { + this.firstStage.executeWithPorts(); } @Override public StageWithPort getParentStage() { - return this.parentStage; + return this.firstStage.getParentStage(); } @Override public void setParentStage(final StageWithPort parentStage, final int index) { - this.parentStage = parentStage; + this.firstStage.setParentStage(parentStage, index); } @Override - public boolean isReschedulable() { - return this.firstStage.isReschedulable(); + public void onIsPipelineHead() { + this.firstStage.onIsPipelineHead(); } @Override @@ -152,17 +56,9 @@ public class Pipeline<FirstStage extends StageWithPort, LastStage extends StageW this.firstStage.onSignal(signal, inputPort); } - public FirstStage getFirstStage() { - return this.firstStage; - } - - public LastStage getLastStage() { - return this.lastStage; - } - @Override public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) { - // do nothing + this.lastStage.validateOutputPorts(invalidPortConnections); } } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ProducerStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ProducerStage.java index 5a949215..37f95fac 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ProducerStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ProducerStage.java @@ -13,18 +13,15 @@ package teetime.variant.methodcallWithPorts.framework.core; * the type of the default output port * */ -public abstract class ProducerStage<O> extends AbstractStage { +public abstract class ProducerStage<O> extends AbstractStage implements HeadStage { protected final OutputPort<O> outputPort = this.createOutputPort(); + private boolean shouldTerminate; public final OutputPort<O> getOutputPort() { return this.outputPort; } - public ProducerStage() { - this.setReschedulable(true); - } - @Override public void executeWithPorts() { this.execute(); @@ -35,6 +32,16 @@ public abstract class ProducerStage<O> extends AbstractStage { // do nothing } + @Override + public void terminate() { + this.shouldTerminate = true; + } + + @Override + public boolean shouldBeTerminated() { + return this.shouldTerminate; + } + protected abstract void execute(); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java index 3c61059f..8298ebae 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java @@ -10,11 +10,11 @@ import teetime.variant.methodcallWithPorts.framework.core.validation.AnalysisNot public class RunnableStage implements Runnable { - private final StageWithPort stage; + private final HeadStage stage; private final Logger logger; private boolean validationEnabled; - public RunnableStage(final StageWithPort stage) { + public RunnableStage(final HeadStage stage) { this.stage = stage; this.logger = LoggerFactory.getLogger(stage.getClass()); } @@ -37,7 +37,7 @@ public class RunnableStage implements Runnable { do { this.stage.executeWithPorts(); - } while (this.stage.isReschedulable()); + } while (!this.stage.shouldBeTerminated()); TerminatingSignal terminatingSignal = new TerminatingSignal(); this.stage.onSignal(terminatingSignal, null); diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/StageWithPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/StageWithPort.java index 9753bcdf..ffd5d060 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/StageWithPort.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/StageWithPort.java @@ -15,14 +15,6 @@ public interface StageWithPort { void setParentStage(StageWithPort parentStage, int index); - // void setListener(OnDisableListener listener); - - /** - * @return <code>true</code> iff this stage makes progress when it is re-executed by the scheduler, otherwise <code>false</code>.<br> - * For example, many stages are re-schedulable if at least one of their input ports are not empty. - */ - boolean isReschedulable(); - // BETTER remove this method since it will be replaced by onTerminating() void onIsPipelineHead(); diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java index e914afef..5d7f3f51 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java @@ -1,6 +1,7 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; import teetime.variant.methodcallWithPorts.framework.core.InputPort; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; public abstract class AbstractPipe<T> implements IPipe<T> { @@ -27,4 +28,10 @@ public abstract class AbstractPipe<T> implements IPipe<T> { this.cachedTargetStage = targetPort.getOwningStage(); } + @Override + public void connectPorts(final OutputPort<T> sourcePort, final InputPort<T> targetPort) { + sourcePort.setPipe(this); + targetPort.setPipe(this); + } + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java index 270556dc..7a6b4aa0 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowableArrayPipe.java @@ -24,13 +24,6 @@ public class OrderedGrowableArrayPipe<T> extends IntraThreadPipe<T> { pipe.connectPorts(sourcePort, targetPort); } - @Override - public void connectPorts(final OutputPort<T> sourcePort, final InputPort<T> targetPort) { - sourcePort.setPipe(this); - targetPort.setPipe(this); - sourcePort.setCachedTargetStage(targetPort.getOwningStage()); - } - @Override public boolean add(final T element) { this.elements.put(this.tail++, element); diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java index 4c22e36c..8bf61cec 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/OrderedGrowablePipe.java @@ -23,13 +23,6 @@ public class OrderedGrowablePipe<T> extends IntraThreadPipe<T> { pipe.connectPorts(sourcePort, targetPort); } - @Override - public void connectPorts(final OutputPort<T> sourcePort, final InputPort<T> targetPort) { - sourcePort.setPipe(this); - targetPort.setPipe(this); - sourcePort.setCachedTargetStage(targetPort.getOwningStage()); - } - @Override public boolean add(final T element) { return this.elements.offer(element); diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java index eefd95f7..2dd45c93 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/Pipe.java @@ -14,13 +14,6 @@ public class Pipe<T> extends IntraThreadPipe<T> { pipe.connectPorts(sourcePort, targetPort); } - @Override - public void connectPorts(final OutputPort<T> sourcePort, final InputPort<T> targetPort) { - sourcePort.setPipe(this); - targetPort.setPipe(this); - sourcePort.setCachedTargetStage(targetPort.getOwningStage()); - } - /* * (non-Javadoc) * diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java index aebe4b58..96eeba86 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java @@ -13,13 +13,6 @@ public class SingleElementPipe<T> extends IntraThreadPipe<T> { pipe.connectPorts(sourcePort, targetPort); } - @Override - public void connectPorts(final OutputPort<T> sourcePort, final InputPort<T> targetPort) { - sourcePort.setPipe(this); - targetPort.setPipe(this); - sourcePort.setCachedTargetStage(targetPort.getOwningStage()); - } - @Override public boolean add(final T element) { this.element = element; diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java index 22b5c6a4..3e6f4694 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java @@ -31,13 +31,6 @@ public class SpScPipe<T> extends AbstractPipe<T> { return pipe; } - @Override - public void connectPorts(final OutputPort<T> sourcePort, final InputPort<T> targetPort) { - targetPort.setPipe(this); - sourcePort.setPipe(this); - sourcePort.setCachedTargetStage(targetPort.getOwningStage()); - } - @Override public boolean add(final T element) { // BETTER introduce a QueueIsFullStrategy diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java index 71cc7e87..3befc87d 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/UnorderedGrowablePipe.java @@ -27,7 +27,6 @@ public class UnorderedGrowablePipe<T> extends IntraThreadPipe<T> { public void connectPorts(final OutputPort<T> sourcePort, final InputPort<T> targetPort) { sourcePort.setPipe(this); targetPort.setPipe(this); - sourcePort.setCachedTargetStage(targetPort.getOwningStage()); } @Override diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Clock.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Clock.java index 1520b609..e140fed5 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Clock.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Clock.java @@ -26,7 +26,7 @@ public class Clock extends ProducerStage<Long> { try { Thread.sleep(delayInMs); } catch (InterruptedException e) { - this.setReschedulable(false); + this.terminate(); } } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/ObjectProducer.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/ObjectProducer.java index 028d8709..599d9fb4 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/ObjectProducer.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/ObjectProducer.java @@ -20,7 +20,7 @@ import teetime.variant.methodcallWithPorts.framework.core.ProducerStage; /** * @author Christian Wulf - * + * * @since 1.10 */ public class ObjectProducer<T> extends ProducerStage<T> { @@ -60,13 +60,12 @@ public class ObjectProducer<T> extends ProducerStage<T> { newObject = this.inputObjectCreator.create(); this.numInputObjects--; - if (this.numInputObjects == 0) { - this.setReschedulable(false); - // this.getOutputPort().pipe.close(); - } - // System.out.println(this.getClass().getSimpleName() + ": sending " + this.numInputObjects); this.send(this.outputPort, newObject); + + if (this.numInputObjects == 0) { + this.terminate(); + } } } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java index d0e281fd..d31041fd 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java @@ -16,8 +16,7 @@ public class Relay<T> extends ProducerStage<T> { T element = this.inputPort.receive(); if (null == element) { if (this.cachedCastedInputPipe.getSignal() instanceof TerminatingSignal) { - this.setReschedulable(false); - assert 0 == this.inputPort.getPipe().size(); + this.terminate(); } Thread.yield(); return; diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/Delay.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/Delay.java index 5b234be8..55ee676c 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/Delay.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/Delay.java @@ -24,15 +24,13 @@ public class Delay<T> extends AbstractStage { T element = this.inputPort.receive(); this.send(this.outputPort, element); } - - // this.setReschedulable(this.getInputPort().pipe.size() > 0); - this.setReschedulable(false); - // System.out.println("delay: " + this.getInputPort().pipe.size()); } @Override public void onIsPipelineHead() { - this.setReschedulable(true); + while (!this.inputPort.getPipe().isEmpty()) { + this.executeWithPorts(); + } } public InputPort<T> getInputPort() { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java index 7f469d4f..bf90f0a5 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java @@ -48,15 +48,6 @@ public class Merger<T> extends AbstractStage { } this.send(this.outputPort, token); - - boolean isReschedulable = false; - for (InputPort<?> inputPort : this.getInputPorts()) { - if (!inputPort.getPipe().isEmpty()) { - isReschedulable = true; - break; - } - } - this.setReschedulable(isReschedulable); } @Override diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/explorviz/KiekerRecordTcpReader.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/explorviz/KiekerRecordTcpReader.java index a49946d3..ecbef3d5 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/explorviz/KiekerRecordTcpReader.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/explorviz/KiekerRecordTcpReader.java @@ -87,7 +87,7 @@ public class KiekerRecordTcpReader extends ProducerStage<IMonitoringRecord> { } } - this.setReschedulable(false); + this.terminate(); } } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java index decfc7f3..3e09b862 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java @@ -165,7 +165,7 @@ public class TCPReader extends ProducerStage<IMonitoringRecord> { } } - this.setReschedulable(false); + this.terminate(); this.tcpStringReader.terminate(); } } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/Dir2RecordsFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/Dir2RecordsFilter.java index d0735a73..d6759a26 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/Dir2RecordsFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/Dir2RecordsFilter.java @@ -20,6 +20,10 @@ import java.io.File; import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; import teetime.variant.methodcallWithPorts.framework.core.Pipeline; +import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe; +import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory; +import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering; +import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication; import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; import teetime.variant.methodcallWithPorts.stage.FileExtensionSwitch; import teetime.variant.methodcallWithPorts.stage.basic.merger.Merger; @@ -36,11 +40,12 @@ import kieker.common.util.filesystem.FSUtil; /** * @author Christian Wulf - * + * * @since 1.10 */ public class Dir2RecordsFilter extends Pipeline<ClassNameRegistryCreationFilter, Merger<IMonitoringRecord>> { + private final PipeFactory pipeFactory = new PipeFactory(); private ClassNameRegistryRepository classNameRegistryRepository; /** @@ -68,8 +73,11 @@ public class Dir2RecordsFilter extends Pipeline<ClassNameRegistryCreationFilter, final OutputPort<File> zipFileOutputPort = fileExtensionSwitch.addFileExtension(FSUtil.ZIP_FILE_EXTENSION); // connect ports by pipes - SingleElementPipe.connect(classNameRegistryCreationFilter.getOutputPort(), directory2FilesFilter.getInputPort()); - SingleElementPipe.connect(directory2FilesFilter.getOutputPort(), fileExtensionSwitch.getInputPort()); + IPipe<File> pipe = this.pipeFactory.create(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false, 1); + pipe.connectPorts(classNameRegistryCreationFilter.getOutputPort(), directory2FilesFilter.getInputPort()); + + pipe = this.pipeFactory.create(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false, 1); + pipe.connectPorts(directory2FilesFilter.getOutputPort(), fileExtensionSwitch.getInputPort()); SingleElementPipe.connect(normalFileOutputPort, datFile2RecordFilter.getInputPort()); SingleElementPipe.connect(binFileOutputPort, binaryFile2RecordFilter.getInputPort()); @@ -81,11 +89,6 @@ public class Dir2RecordsFilter extends Pipeline<ClassNameRegistryCreationFilter, // prepare pipeline this.setFirstStage(classNameRegistryCreationFilter); - this.addIntermediateStage(directory2FilesFilter); - this.addIntermediateStage(fileExtensionSwitch); - this.addIntermediateStage(datFile2RecordFilter); - this.addIntermediateStage(binaryFile2RecordFilter); - this.addIntermediateStage(zipFile2RecordFilter); this.setLastStage(recordMerger); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/DirWithBin2RecordFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/DirWithBin2RecordFilter.java index c3058434..86e015aa 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/DirWithBin2RecordFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/DirWithBin2RecordFilter.java @@ -24,7 +24,6 @@ public class DirWithBin2RecordFilter extends Pipeline<ClassNameRegistryCreationF final BinaryFile2RecordFilter binaryFile2RecordFilter = new BinaryFile2RecordFilter(classNameRegistryRepository); this.setFirstStage(classNameRegistryCreationFilter); - this.addIntermediateStage(directory2FilesFilter); this.setLastStage(binaryFile2RecordFilter); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/DirWithDat2RecordFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/DirWithDat2RecordFilter.java index cfa54ecf..d636a07d 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/DirWithDat2RecordFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/DirWithDat2RecordFilter.java @@ -24,7 +24,6 @@ public class DirWithDat2RecordFilter extends Pipeline<ClassNameRegistryCreationF final DatFile2RecordFilter datFile2RecordFilter = new DatFile2RecordFilter(classNameRegistryRepository); this.setFirstStage(classNameRegistryCreationFilter); - this.addIntermediateStage(directory2FilesFilter); this.setLastStage(datFile2RecordFilter); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/TCPReaderSink.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/TCPReaderSink.java index af0c71fa..4bd7ef0a 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/TCPReaderSink.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/TCPReaderSink.java @@ -144,7 +144,7 @@ public class TCPReaderSink extends ProducerStage<IMonitoringRecord> { } } - this.setReschedulable(false); + this.terminate(); } } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/DatFile2RecordFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/DatFile2RecordFilter.java index 749548bb..704ceffa 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/DatFile2RecordFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/DatFile2RecordFilter.java @@ -29,7 +29,7 @@ import kieker.common.record.IMonitoringRecord; /** * @author Christian Wulf - * + * * @since 1.10 */ public class DatFile2RecordFilter extends Pipeline<File2TextLinesFilter, TextLine2RecordFilter> { diff --git a/src/main/java/util/KiekerLoadDriver.java b/src/main/java/util/KiekerLoadDriver.java index e30aa067..659a9cf2 100644 --- a/src/main/java/util/KiekerLoadDriver.java +++ b/src/main/java/util/KiekerLoadDriver.java @@ -14,9 +14,9 @@ import java.util.Collection; import java.util.LinkedList; import java.util.List; -import teetime.variant.methodcallWithPorts.framework.core.Pipeline; +import teetime.variant.methodcallWithPorts.framework.core.HeadStage; +import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; -import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import teetime.variant.methodcallWithPorts.stage.CollectorSink; @@ -36,17 +36,17 @@ public class KiekerLoadDriver { private long[] timings; public KiekerLoadDriver(final File directory) { - StageWithPort producerPipeline = this.buildProducerPipeline(directory); + HeadStage producerPipeline = this.buildProducerPipeline(directory); this.runnableStage = new RunnableStage(producerPipeline); } - private StageWithPort buildProducerPipeline(final File directory) { + private HeadPipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>> buildProducerPipeline(final File directory) { ClassNameRegistryRepository classNameRegistryRepository = new ClassNameRegistryRepository(); // create stages Dir2RecordsFilter dir2RecordsFilter = new Dir2RecordsFilter(classNameRegistryRepository); CollectorSink<IMonitoringRecord> collector = new CollectorSink<IMonitoringRecord>(this.elementCollection); - final Pipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>> pipeline = new Pipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>>(); + final HeadPipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>> pipeline = new HeadPipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>>(); pipeline.setFirstStage(dir2RecordsFilter); pipeline.setLastStage(collector); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment09/MethodCallThroughputAnalysis9.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment09/MethodCallThroughputAnalysis9.java index 6b2a9378..bd2ae16e 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment09/MethodCallThroughputAnalysis9.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment09/MethodCallThroughputAnalysis9.java @@ -20,7 +20,7 @@ import java.util.List; import teetime.util.ConstructorClosure; import teetime.variant.explicitScheduling.examples.throughput.TimestampObject; import teetime.variant.explicitScheduling.framework.core.Analysis; -import teetime.variant.methodcallWithPorts.framework.core.Pipeline; +import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.Pipe; @@ -66,7 +66,7 @@ public class MethodCallThroughputAnalysis9 extends Analysis { final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter(); final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(this.timestampObjects); - final Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>(); + final HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>(); pipeline.setFirstStage(objectProducer); pipeline.addIntermediateStage(startTimestampFilter); pipeline.addIntermediateStages(noopFilters); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment10/MethodCallThroughputAnalysis10.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment10/MethodCallThroughputAnalysis10.java index 19dc2d6e..bd08ab4a 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment10/MethodCallThroughputAnalysis10.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment10/MethodCallThroughputAnalysis10.java @@ -20,7 +20,7 @@ import java.util.List; import teetime.util.ConstructorClosure; import teetime.variant.explicitScheduling.examples.throughput.TimestampObject; import teetime.variant.explicitScheduling.framework.core.Analysis; -import teetime.variant.methodcallWithPorts.framework.core.Pipeline; +import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; import teetime.variant.methodcallWithPorts.stage.CollectorSink; @@ -64,7 +64,7 @@ public class MethodCallThroughputAnalysis10 extends Analysis { final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter(); final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(this.timestampObjects); - final Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>(); + final HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>(); pipeline.setFirstStage(objectProducer); pipeline.addIntermediateStage(startTimestampFilter); pipeline.addIntermediateStages(noopFilters); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment11/MethodCallThroughputAnalysis11.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment11/MethodCallThroughputAnalysis11.java index 2d57a02c..ce10958f 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment11/MethodCallThroughputAnalysis11.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment11/MethodCallThroughputAnalysis11.java @@ -20,7 +20,7 @@ import java.util.List; import teetime.util.ConstructorClosure; import teetime.variant.explicitScheduling.examples.throughput.TimestampObject; import teetime.variant.explicitScheduling.framework.core.Analysis; -import teetime.variant.methodcallWithPorts.framework.core.Pipeline; +import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.UnorderedGrowablePipe; @@ -64,7 +64,7 @@ public class MethodCallThroughputAnalysis11 extends Analysis { final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter(); final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(this.timestampObjects); - final Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>(); + final HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>(); pipeline.setFirstStage(objectProducer); // pipeline.addIntermediateStage(relayFake); pipeline.addIntermediateStage(startTimestampFilter); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment14/MethodCallThroughputAnalysis14.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment14/MethodCallThroughputAnalysis14.java index 8f50cbe1..1cd3f963 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment14/MethodCallThroughputAnalysis14.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment14/MethodCallThroughputAnalysis14.java @@ -20,7 +20,7 @@ import java.util.List; import teetime.util.ConstructorClosure; import teetime.variant.explicitScheduling.examples.throughput.TimestampObject; import teetime.variant.explicitScheduling.framework.core.Analysis; -import teetime.variant.methodcallWithPorts.framework.core.Pipeline; +import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe; @@ -68,7 +68,7 @@ public class MethodCallThroughputAnalysis14 extends Analysis { final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter(); final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(this.timestampObjects); - final Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>(); + final HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>(); pipeline.setFirstStage(objectProducer); pipeline.addIntermediateStage(startTimestampFilter); pipeline.addIntermediateStages(noopFilters); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment15/MethodCallThroughputAnalysis15.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment15/MethodCallThroughputAnalysis15.java index 4fb5db96..702c3cc4 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment15/MethodCallThroughputAnalysis15.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment15/MethodCallThroughputAnalysis15.java @@ -20,7 +20,7 @@ import java.util.List; import teetime.util.ConstructorClosure; import teetime.variant.explicitScheduling.examples.throughput.TimestampObject; import teetime.variant.explicitScheduling.framework.core.Analysis; -import teetime.variant.methodcallWithPorts.framework.core.Pipeline; +import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.OrderedGrowableArrayPipe; @@ -71,7 +71,7 @@ public class MethodCallThroughputAnalysis15 extends Analysis { this.clock.setInitialDelayInMs(100); this.clock.setIntervalDelayInMs(100); - final Pipeline<Clock, Sink<Long>> pipeline = new Pipeline<Clock, Sink<Long>>(); + final HeadPipeline<Clock, Sink<Long>> pipeline = new HeadPipeline<Clock, Sink<Long>>(); pipeline.setFirstStage(this.clock); pipeline.setLastStage(new Sink<Long>()); @@ -95,7 +95,7 @@ public class MethodCallThroughputAnalysis15 extends Analysis { final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter(); final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(this.timestampObjects); - final Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>(); + final HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>(); pipeline.setFirstStage(objectProducer); pipeline.addIntermediateStage(startTimestampFilter); pipeline.addIntermediateStages(noopFilters); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment16/MethodCallThroughputAnalysis16.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment16/MethodCallThroughputAnalysis16.java index f204497b..ba7cd435 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment16/MethodCallThroughputAnalysis16.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment16/MethodCallThroughputAnalysis16.java @@ -22,7 +22,7 @@ import java.util.List; import teetime.util.ConstructorClosure; import teetime.variant.explicitScheduling.examples.throughput.TimestampObject; import teetime.variant.explicitScheduling.framework.core.Analysis; -import teetime.variant.methodcallWithPorts.framework.core.Pipeline; +import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; @@ -59,7 +59,7 @@ public class MethodCallThroughputAnalysis16 extends Analysis { @Override public void init() { super.init(); - Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> producerPipeline = this.buildProducerPipeline(this.numInputObjects, + HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> producerPipeline = this.buildProducerPipeline(this.numInputObjects, this.inputObjectCreator); this.producerThread = new Thread(new RunnableStage(producerPipeline)); @@ -70,17 +70,17 @@ public class MethodCallThroughputAnalysis16 extends Analysis { List<TimestampObject> resultList = new ArrayList<TimestampObject>(this.numInputObjects); this.timestampObjectsList.add(resultList); - Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> workerPipeline = this.buildPipeline(producerPipeline, resultList); + HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> workerPipeline = this.buildPipeline(producerPipeline, resultList); this.workerThreads[i] = new Thread(new RunnableStage(workerPipeline)); } } - private Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> buildProducerPipeline(final int numInputObjects, + private HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> buildProducerPipeline(final int numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) { final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(numInputObjects, inputObjectCreator); Distributor<TimestampObject> distributor = new Distributor<TimestampObject>(); - final Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> pipeline = new Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>>(); + final HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> pipeline = new HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>>(); pipeline.setFirstStage(objectProducer); pipeline.setLastStage(distributor); @@ -93,8 +93,8 @@ public class MethodCallThroughputAnalysis16 extends Analysis { * @param numNoopFilters * @since 1.10 */ - private Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> buildPipeline( - final Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> previousStage, + private HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> buildPipeline( + final HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> previousStage, final List<TimestampObject> timestampObjects) { Relay<TimestampObject> relay = new Relay<TimestampObject>(); @SuppressWarnings("unchecked") @@ -107,7 +107,7 @@ public class MethodCallThroughputAnalysis16 extends Analysis { final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter(); final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects); - final Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>(); + final HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>(); pipeline.setFirstStage(relay); pipeline.addIntermediateStage(startTimestampFilter); pipeline.addIntermediateStages(noopFilters); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java index 9a4c84ae..a5ac5b39 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java @@ -22,7 +22,7 @@ import java.util.List; import teetime.util.ConstructorClosure; import teetime.variant.explicitScheduling.examples.throughput.TimestampObject; import teetime.variant.explicitScheduling.framework.core.Analysis; -import teetime.variant.methodcallWithPorts.framework.core.Pipeline; +import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe; @@ -113,7 +113,7 @@ public class MethodCallThroughputAnalysis17 extends Analysis { Sink<TimestampObject> sink = new Sink<TimestampObject>(); Sink<Void> endStage = new Sink<Void>(); - final Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> pipeline = new Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>>(); + final HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> pipeline = new HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>>(); pipeline.setFirstStage(objectProducer); // pipeline.setFirstStage(sink); // pipeline.setFirstStage(endStage); @@ -147,7 +147,7 @@ public class MethodCallThroughputAnalysis17 extends Analysis { final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter(); final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects); - final Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>(); + final HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>(); pipeline.setFirstStage(relay); pipeline.addIntermediateStage(startTimestampFilter); pipeline.addIntermediateStages(noopFilters); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment18/MethodCallThroughputAnalysis18.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment18/MethodCallThroughputAnalysis18.java index cb95597b..5307b5bf 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment18/MethodCallThroughputAnalysis18.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment18/MethodCallThroughputAnalysis18.java @@ -22,7 +22,7 @@ import java.util.List; import teetime.util.ConstructorClosure; import teetime.variant.explicitScheduling.examples.throughput.TimestampObject; import teetime.variant.explicitScheduling.framework.core.Analysis; -import teetime.variant.methodcallWithPorts.framework.core.Pipeline; +import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; @@ -60,7 +60,7 @@ public class MethodCallThroughputAnalysis18 extends Analysis { @Override public void init() { super.init(); - Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> producerPipeline = this.buildProducerPipeline(this.numInputObjects, + HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> producerPipeline = this.buildProducerPipeline(this.numInputObjects, this.inputObjectCreator); this.producerThread = new Thread(new RunnableStage(producerPipeline)); @@ -76,12 +76,12 @@ public class MethodCallThroughputAnalysis18 extends Analysis { } } - private Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> buildProducerPipeline(final int numInputObjects, + private HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> buildProducerPipeline(final int numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) { final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(numInputObjects, inputObjectCreator); Distributor<TimestampObject> distributor = new Distributor<TimestampObject>(); - final Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> pipeline = new Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>>(); + final HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> pipeline = new HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>>(); pipeline.setFirstStage(objectProducer); pipeline.setLastStage(distributor); @@ -94,8 +94,8 @@ public class MethodCallThroughputAnalysis18 extends Analysis { * @param numNoopFilters * @since 1.10 */ - private Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> buildPipeline( - final Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> previousStage, + private HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> buildPipeline( + final HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> previousStage, final List<TimestampObject> timestampObjects) { Relay<TimestampObject> relay = new Relay<TimestampObject>(); @SuppressWarnings("unchecked") @@ -108,7 +108,7 @@ public class MethodCallThroughputAnalysis18 extends Analysis { final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter(); final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects); - final Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>(); + final HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>(); pipeline.setFirstStage(relay); pipeline.addIntermediateStage(startTimestampFilter); pipeline.addIntermediateStages(noopFilters); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment19/MethodCallThroughputAnalysis19.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment19/MethodCallThroughputAnalysis19.java index f31ad97e..9d16a88b 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment19/MethodCallThroughputAnalysis19.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment19/MethodCallThroughputAnalysis19.java @@ -22,7 +22,7 @@ import java.util.List; import teetime.util.ConstructorClosure; import teetime.variant.explicitScheduling.examples.throughput.TimestampObject; import teetime.variant.explicitScheduling.framework.core.Analysis; -import teetime.variant.methodcallWithPorts.framework.core.Pipeline; +import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.pipe.OrderedGrowableArrayPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; @@ -59,7 +59,7 @@ public class MethodCallThroughputAnalysis19 extends Analysis { @Override public void init() { super.init(); - Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> producerPipeline = this.buildProducerPipeline(this.numInputObjects, + HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> producerPipeline = this.buildProducerPipeline(this.numInputObjects, this.inputObjectCreator); this.producerThread = new Thread(new RunnableStage(producerPipeline)); @@ -76,12 +76,12 @@ public class MethodCallThroughputAnalysis19 extends Analysis { } - private Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> buildProducerPipeline(final int numInputObjects, + private HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> buildProducerPipeline(final int numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) { final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(numInputObjects, inputObjectCreator); Distributor<TimestampObject> distributor = new Distributor<TimestampObject>(); - final Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> pipeline = new Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>>(); + final HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> pipeline = new HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>>(); pipeline.setFirstStage(objectProducer); pipeline.setLastStage(distributor); @@ -102,7 +102,7 @@ public class MethodCallThroughputAnalysis19 extends Analysis { final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter(); final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects); - final Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>(); + final HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>(); pipeline.setFirstStage(relay); pipeline.addIntermediateStage(startTimestampFilter); pipeline.addIntermediateStages(noopFilters); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLoggingExplorviz.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLoggingExplorviz.java index 4e141ed0..91a6a5ae 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLoggingExplorviz.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLoggingExplorviz.java @@ -1,7 +1,7 @@ package teetime.variant.methodcallWithPorts.examples.kiekerdays; import teetime.variant.explicitScheduling.framework.core.Analysis; -import teetime.variant.methodcallWithPorts.framework.core.Pipeline; +import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; @@ -41,7 +41,7 @@ public class TcpTraceLoggingExplorviz extends Analysis { SingleElementPipe.connect(tcpReader.getOutputPort(), endStage.getInputPort()); // create and configure pipeline - Pipeline<KiekerRecordTcpReader, Sink<IMonitoringRecord>> pipeline = new Pipeline<KiekerRecordTcpReader, Sink<IMonitoringRecord>>(); + HeadPipeline<KiekerRecordTcpReader, Sink<IMonitoringRecord>> pipeline = new HeadPipeline<KiekerRecordTcpReader, Sink<IMonitoringRecord>>(); pipeline.setFirstStage(tcpReader); pipeline.setLastStage(endStage); return tcpReader; diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java index b6c76cd8..538c20c1 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReconstruction.java @@ -7,7 +7,7 @@ import java.util.List; import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault; import teetime.util.concurrent.hashmap.TraceBuffer; import teetime.variant.explicitScheduling.framework.core.Analysis; -import teetime.variant.methodcallWithPorts.framework.core.Pipeline; +import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; @@ -41,7 +41,7 @@ public class TcpTraceReconstruction extends Analysis { @Override public void init() { super.init(); - Pipeline<TCPReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); + HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads); @@ -53,14 +53,14 @@ public class TcpTraceReconstruction extends Analysis { } } - private Pipeline<TCPReader, Distributor<IMonitoringRecord>> buildTcpPipeline() { + private HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> buildTcpPipeline() { TCPReader tcpReader = new TCPReader(); Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>(); SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort()); // create and configure pipeline - Pipeline<TCPReader, Distributor<IMonitoringRecord>> pipeline = new Pipeline<TCPReader, Distributor<IMonitoringRecord>>(); + HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> pipeline = new HeadPipeline<TCPReader, Distributor<IMonitoringRecord>>(); pipeline.setFirstStage(tcpReader); pipeline.setLastStage(distributor); return pipeline; @@ -83,7 +83,7 @@ public class TcpTraceReconstruction extends Analysis { SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), endStage.getInputPort()); // create and configure pipeline - Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>(); + HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>(); pipeline.setFirstStage(relay); pipeline.addIntermediateStage(instanceOfFilter); pipeline.addIntermediateStage(traceReconstructionFilter); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java index e01c1b52..a38bf7d6 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceReduction.java @@ -9,7 +9,7 @@ import java.util.TreeMap; import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault; import teetime.util.concurrent.hashmap.TraceBuffer; import teetime.variant.explicitScheduling.framework.core.Analysis; -import teetime.variant.methodcallWithPorts.framework.core.Pipeline; +import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; @@ -49,10 +49,10 @@ public class TcpTraceReduction extends Analysis { @Override public void init() { super.init(); - Pipeline<TCPReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); + HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); - Pipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(5000); + HeadPipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(5000); this.clockThread = new Thread(new RunnableStage(clockStage)); this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads); @@ -64,20 +64,20 @@ public class TcpTraceReduction extends Analysis { } } - private Pipeline<TCPReader, Distributor<IMonitoringRecord>> buildTcpPipeline() { + private HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> buildTcpPipeline() { TCPReader tcpReader = new TCPReader(); Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>(); SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort()); // create and configure pipeline - Pipeline<TCPReader, Distributor<IMonitoringRecord>> pipeline = new Pipeline<TCPReader, Distributor<IMonitoringRecord>>(); + HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> pipeline = new HeadPipeline<TCPReader, Distributor<IMonitoringRecord>>(); pipeline.setFirstStage(tcpReader); pipeline.setLastStage(distributor); return pipeline; } - private Pipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) { + private HeadPipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) { Clock clock = new Clock(); clock.setInitialDelayInMs(intervalDelayInMs); clock.setIntervalDelayInMs(intervalDelayInMs); @@ -86,7 +86,7 @@ public class TcpTraceReduction extends Analysis { SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort()); // create and configure pipeline - Pipeline<Clock, Distributor<Long>> pipeline = new Pipeline<Clock, Distributor<Long>>(); + HeadPipeline<Clock, Distributor<Long>> pipeline = new HeadPipeline<Clock, Distributor<Long>>(); pipeline.setFirstStage(clock); pipeline.setLastStage(distributor); return pipeline; @@ -113,7 +113,7 @@ public class TcpTraceReduction extends Analysis { SpScPipe.connect(clockStage.getNewOutputPort(), traceReductionFilter.getTriggerInputPort(), 10); // create and configure pipeline - Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>(); + HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>(); pipeline.setFirstStage(relay); pipeline.addIntermediateStage(instanceOfFilter); pipeline.addIntermediateStage(traceReconstructionFilter); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderConfiguration.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderConfiguration.java index a3ccee70..6b530cc0 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderConfiguration.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderConfiguration.java @@ -20,7 +20,7 @@ import java.util.LinkedList; import java.util.List; import teetime.variant.methodcallWithPorts.framework.core.Configuration; -import teetime.variant.methodcallWithPorts.framework.core.Pipeline; +import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory; @@ -58,7 +58,7 @@ public class RecordReaderConfiguration extends Configuration { Dir2RecordsFilter dir2RecordsFilter = new Dir2RecordsFilter(classNameRegistryRepository); CollectorSink<IMonitoringRecord> collector = new CollectorSink<IMonitoringRecord>(this.elementCollection); - final Pipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>> pipeline = new Pipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>>(); + final HeadPipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>> pipeline = new HeadPipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>>(); pipeline.setFirstStage(dir2RecordsFilter); pipeline.setLastStage(collector); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/TcpTraceLoggingExtAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/TcpTraceLoggingExtAnalysis.java index dd178e77..2d459fd6 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/TcpTraceLoggingExtAnalysis.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/TcpTraceLoggingExtAnalysis.java @@ -3,7 +3,7 @@ package teetime.variant.methodcallWithPorts.examples.traceReading; import java.util.List; import teetime.variant.explicitScheduling.framework.core.Analysis; -import teetime.variant.methodcallWithPorts.framework.core.Pipeline; +import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; @@ -25,7 +25,7 @@ public class TcpTraceLoggingExtAnalysis extends Analysis { private Counter<IMonitoringRecord> recordCounter; private ElementThroughputMeasuringStage<IMonitoringRecord> recordThroughputStage; - private Pipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) { + private HeadPipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) { Clock clockStage = new Clock(); clockStage.setInitialDelayInMs(intervalDelayInMs); clockStage.setIntervalDelayInMs(intervalDelayInMs); @@ -34,7 +34,7 @@ public class TcpTraceLoggingExtAnalysis extends Analysis { SingleElementPipe.connect(clockStage.getOutputPort(), distributor.getInputPort()); // create and configure pipeline - Pipeline<Clock, Distributor<Long>> pipeline = new Pipeline<Clock, Distributor<Long>>(); + HeadPipeline<Clock, Distributor<Long>> pipeline = new HeadPipeline<Clock, Distributor<Long>>(); pipeline.setFirstStage(clockStage); pipeline.setLastStage(distributor); return pipeline; @@ -54,7 +54,7 @@ public class TcpTraceLoggingExtAnalysis extends Analysis { SpScPipe.connect(previousClockStage.getNewOutputPort(), this.recordThroughputStage.getTriggerInputPort(), 10); // create and configure pipeline - Pipeline<TCPReader, Sink<IMonitoringRecord>> pipeline = new Pipeline<TCPReader, Sink<IMonitoringRecord>>(); + HeadPipeline<TCPReader, Sink<IMonitoringRecord>> pipeline = new HeadPipeline<TCPReader, Sink<IMonitoringRecord>>(); pipeline.setFirstStage(tcpReader); pipeline.addIntermediateStage(this.recordCounter); // pipeline.addIntermediateStage(this.recordThroughputStage); @@ -66,7 +66,7 @@ public class TcpTraceLoggingExtAnalysis extends Analysis { public void init() { super.init(); - Pipeline<Clock, Distributor<Long>> clockPipeline = this.buildClockPipeline(1000); + HeadPipeline<Clock, Distributor<Long>> clockPipeline = this.buildClockPipeline(1000); this.clockThread = new Thread(new RunnableStage(clockPipeline)); StageWithPort tcpPipeline = this.buildTcpPipeline(clockPipeline.getLastStage()); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java index 58c3fc74..6ba339ac 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java @@ -6,9 +6,8 @@ import java.util.List; import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault; import teetime.util.concurrent.hashmap.TraceBuffer; import teetime.variant.explicitScheduling.framework.core.Analysis; -import teetime.variant.methodcallWithPorts.framework.core.Pipeline; +import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; -import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import teetime.variant.methodcallWithPorts.stage.Clock; @@ -45,17 +44,17 @@ public class TcpTraceReconstructionAnalysis extends Analysis { @Override public void init() { super.init(); - Pipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000); + HeadPipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000); this.clockThread = new Thread(new RunnableStage(clockStage)); - Pipeline<Clock, Distributor<Long>> clock2Stage = this.buildClockPipeline(2000); + HeadPipeline<Clock, Distributor<Long>> clock2Stage = this.buildClockPipeline(2000); this.clock2Thread = new Thread(new RunnableStage(clock2Stage)); - StageWithPort pipeline = this.buildPipeline(clockStage.getLastStage(), clock2Stage.getLastStage()); + HeadPipeline<?, ?> pipeline = this.buildPipeline(clockStage.getLastStage(), clock2Stage.getLastStage()); this.workerThread = new Thread(new RunnableStage(pipeline)); } - private Pipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) { + private HeadPipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) { Clock clock = new Clock(); clock.setIntervalDelayInMs(intervalDelayInMs); Distributor<Long> distributor = new Distributor<Long>(); @@ -63,13 +62,13 @@ public class TcpTraceReconstructionAnalysis extends Analysis { SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort()); // create and configure pipeline - Pipeline<Clock, Distributor<Long>> pipeline = new Pipeline<Clock, Distributor<Long>>(); + HeadPipeline<Clock, Distributor<Long>> pipeline = new HeadPipeline<Clock, Distributor<Long>>(); pipeline.setFirstStage(clock); pipeline.setLastStage(distributor); return pipeline; } - private StageWithPort buildPipeline(final Distributor<Long> clockStage, final Distributor<Long> clock2Stage) { + private HeadPipeline<TCPReader, Sink<TraceEventRecords>> buildPipeline(final Distributor<Long> clockStage, final Distributor<Long> clock2Stage) { // create stages TCPReader tcpReader = new TCPReader(); this.recordCounter = new Counter<IMonitoringRecord>(); @@ -96,14 +95,8 @@ public class TcpTraceReconstructionAnalysis extends Analysis { SpScPipe.connect(clock2Stage.getNewOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 10); // create and configure pipeline - Pipeline<TCPReader, Sink<TraceEventRecords>> pipeline = new Pipeline<TCPReader, Sink<TraceEventRecords>>(); + HeadPipeline<TCPReader, Sink<TraceEventRecords>> pipeline = new HeadPipeline<TCPReader, Sink<TraceEventRecords>>(); pipeline.setFirstStage(tcpReader); - pipeline.addIntermediateStage(this.recordCounter); - pipeline.addIntermediateStage(instanceOfFilter); - // pipeline.addIntermediateStage(this.recordThroughputFilter); - pipeline.addIntermediateStage(traceReconstructionFilter); - pipeline.addIntermediateStage(this.traceThroughputFilter); - pipeline.addIntermediateStage(this.traceCounter); pipeline.setLastStage(endStage); return pipeline; } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java index 5b9bce4c..9ff72902 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java @@ -7,7 +7,7 @@ import java.util.List; import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault; import teetime.util.concurrent.hashmap.TraceBuffer; import teetime.variant.explicitScheduling.framework.core.Analysis; -import teetime.variant.methodcallWithPorts.framework.core.Pipeline; +import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; @@ -104,7 +104,7 @@ public class TraceReconstructionAnalysis extends Analysis { dir2RecordsFilter.getInputPort().getPipe().add(this.inputDir); // create and configure pipeline - Pipeline<Dir2RecordsFilter, CollectorSink<TraceEventRecords>> pipeline = new Pipeline<Dir2RecordsFilter, CollectorSink<TraceEventRecords>>(); + HeadPipeline<Dir2RecordsFilter, CollectorSink<TraceEventRecords>> pipeline = new HeadPipeline<Dir2RecordsFilter, CollectorSink<TraceEventRecords>>(); pipeline.setFirstStage(dir2RecordsFilter); pipeline.addIntermediateStage(this.recordCounter); pipeline.addIntermediateStage(cache); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java index 9d17509d..650b54c5 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java @@ -9,7 +9,7 @@ import java.util.List; import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault; import teetime.util.concurrent.hashmap.TraceBuffer; import teetime.variant.methodcallWithPorts.framework.core.Configuration; -import teetime.variant.methodcallWithPorts.framework.core.Pipeline; +import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; @@ -72,36 +72,37 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Conf } public void buildConfiguration() { - final Pipeline<TCPReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); + final HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); this.getFiniteProducerStages().add(tcpPipeline); - final Pipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000); + final HeadPipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000); this.getInfiniteProducerStages().add(clockStage); - final Pipeline<Clock, Distributor<Long>> clock2Stage = this.buildClockPipeline(2000); + final HeadPipeline<Clock, Distributor<Long>> clock2Stage = this.buildClockPipeline(2000); this.getInfiniteProducerStages().add(clock2Stage); this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads); for (int i = 0; i < this.numWorkerThreads; i++) { - StageWithPort pipeline = this.buildPipeline(tcpPipeline.getLastStage(), clockStage.getLastStage(), clock2Stage.getLastStage()); + HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = this.buildPipeline(tcpPipeline.getLastStage(), clockStage.getLastStage(), + clock2Stage.getLastStage()); this.getConsumerStages().add(pipeline); } } - private Pipeline<TCPReader, Distributor<IMonitoringRecord>> buildTcpPipeline() { + private HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> buildTcpPipeline() { TCPReader tcpReader = new TCPReader(); Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>(); SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort()); // create and configure pipeline - Pipeline<TCPReader, Distributor<IMonitoringRecord>> pipeline = new Pipeline<TCPReader, Distributor<IMonitoringRecord>>("TCP reader pipeline"); + HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> pipeline = new HeadPipeline<TCPReader, Distributor<IMonitoringRecord>>("TCP reader pipeline"); pipeline.setFirstStage(tcpReader); pipeline.setLastStage(distributor); return pipeline; } - private Pipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) { + private HeadPipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) { Clock clock = new Clock(); clock.setInitialDelayInMs(intervalDelayInMs); clock.setIntervalDelayInMs(intervalDelayInMs); @@ -110,7 +111,7 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Conf SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort()); // create and configure pipeline - Pipeline<Clock, Distributor<Long>> pipeline = new Pipeline<Clock, Distributor<Long>>(); + HeadPipeline<Clock, Distributor<Long>> pipeline = new HeadPipeline<Clock, Distributor<Long>>(); pipeline.setFirstStage(clock); pipeline.setLastStage(distributor); return pipeline; @@ -146,7 +147,7 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Conf } } - private StageWithPort buildPipeline(final Distributor<IMonitoringRecord> tcpReaderPipeline, + private HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> buildPipeline(final Distributor<IMonitoringRecord> tcpReaderPipeline, final Distributor<Long> clockStage, final Distributor<Long> clock2Stage) { // create stages Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>(); @@ -182,16 +183,9 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Conf SingleElementPipe.connect(traceCounter.getOutputPort(), endStage.getInputPort()); // create and configure pipeline - Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>("Worker pipeline"); + HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>( + "Worker pipeline"); pipeline.setFirstStage(relay); - pipeline.addIntermediateStage(recordCounter); - pipeline.addIntermediateStage(recordThroughputFilter); - pipeline.addIntermediateStage(traceMetadataCounter); - pipeline.addIntermediateStage(instanceOfFilter); - // pipeline.addIntermediateStage(this.recordThroughputFilter); - pipeline.addIntermediateStage(traceReconstructionFilter); - // pipeline.addIntermediateStage(traceThroughputFilter); - pipeline.addIntermediateStage(traceCounter); // pipeline.addIntermediateStage(sysout); pipeline.setLastStage(endStage); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java index 3f1d0ae1..3f6564ae 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java @@ -11,7 +11,7 @@ import java.util.TreeMap; import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault; import teetime.util.concurrent.hashmap.TraceBuffer; import teetime.variant.explicitScheduling.framework.core.Analysis; -import teetime.variant.methodcallWithPorts.framework.core.Pipeline; +import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; @@ -55,38 +55,38 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis { @Override public void init() { super.init(); - Pipeline<TCPReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); + HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); - Pipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000); + HeadPipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000); this.clockThread = new Thread(new RunnableStage(clockStage)); - Pipeline<Clock, Distributor<Long>> clock2Stage = this.buildClockPipeline(5000); + HeadPipeline<Clock, Distributor<Long>> clock2Stage = this.buildClockPipeline(5000); this.clock2Thread = new Thread(new RunnableStage(clock2Stage)); this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads); this.workerThreads = new Thread[this.numWorkerThreads]; for (int i = 0; i < this.workerThreads.length; i++) { - StageWithPort pipeline = this.buildPipeline(tcpPipeline, clockStage, clock2Stage); + HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = this.buildPipeline(tcpPipeline, clockStage, clock2Stage); this.workerThreads[i] = new Thread(new RunnableStage(pipeline)); } } - private Pipeline<TCPReader, Distributor<IMonitoringRecord>> buildTcpPipeline() { + private HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> buildTcpPipeline() { TCPReader tcpReader = new TCPReader(); Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>(); SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort()); // create and configure pipeline - Pipeline<TCPReader, Distributor<IMonitoringRecord>> pipeline = new Pipeline<TCPReader, Distributor<IMonitoringRecord>>(); + HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> pipeline = new HeadPipeline<TCPReader, Distributor<IMonitoringRecord>>(); pipeline.setFirstStage(tcpReader); pipeline.setLastStage(distributor); return pipeline; } - private Pipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) { + private HeadPipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) { Clock clock = new Clock(); clock.setInitialDelayInMs(intervalDelayInMs); clock.setIntervalDelayInMs(intervalDelayInMs); @@ -95,7 +95,7 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis { SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort()); // create and configure pipeline - Pipeline<Clock, Distributor<Long>> pipeline = new Pipeline<Clock, Distributor<Long>>(); + HeadPipeline<Clock, Distributor<Long>> pipeline = new HeadPipeline<Clock, Distributor<Long>>(); pipeline.setFirstStage(clock); pipeline.setLastStage(distributor); return pipeline; @@ -155,9 +155,9 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis { } } - private Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> buildPipeline(final Pipeline<TCPReader, Distributor<IMonitoringRecord>> tcpReaderPipeline, - final Pipeline<Clock, Distributor<Long>> clockStage, - final Pipeline<Clock, Distributor<Long>> clock2Stage) { + private HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> buildPipeline(final HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> tcpReaderPipeline, + final HeadPipeline<Clock, Distributor<Long>> clockStage, + final HeadPipeline<Clock, Distributor<Long>> clock2Stage) { // create stages Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>(); Counter<IMonitoringRecord> recordCounter = this.recordCounterFactory.create(); @@ -190,15 +190,8 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis { SpScPipe.connect(clockStage.getLastStage().getNewOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10); // create and configure pipeline - Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>(); + HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>(); pipeline.setFirstStage(relay); - pipeline.addIntermediateStage(recordCounter); - pipeline.addIntermediateStage(traceMetadataCounter); - pipeline.addIntermediateStage(instanceOfFilter); - pipeline.addIntermediateStage(traceReconstructionFilter); - pipeline.addIntermediateStage(traceReductionFilter); - pipeline.addIntermediateStage(traceCounter); - pipeline.addIntermediateStage(traceThroughputFilter); pipeline.setLastStage(endStage); return pipeline; } diff --git a/submodules/JCTools b/submodules/JCTools index 75998aa2..88e1e25f 160000 --- a/submodules/JCTools +++ b/submodules/JCTools @@ -1 +1 @@ -Subproject commit 75998aa20b7ec897ec321c1f94192de888f2dc6e +Subproject commit 88e1e25f9519b250258c7e5ada30935975ab2d10 -- GitLab