diff --git a/src/main/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/SysOutFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/SysOutFilter.java index 0d173381ca30c1acbde7a1bd0b7289deb98fc2aa..9a98b94f05c522441cfd0527fd5d789ff393d05e 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/SysOutFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/SysOutFilter.java @@ -2,13 +2,15 @@ package teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThre import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; import teetime.variant.methodcallWithPorts.framework.core.InputPort; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe; import kieker.common.record.IMonitoringRecord; -public class SysOutFilter<T> extends ConsumerStage<T, T> { +public class SysOutFilter<T> extends ConsumerStage<T> { - private final InputPort<Long> triggerInputPort = new InputPort<Long>(this); + private final InputPort<Long> triggerInputPort = this.createInputPort(); + private final OutputPort<T> outputPort = this.createOutputPort(); private final IPipe<IMonitoringRecord> pipe; @@ -17,13 +19,13 @@ public class SysOutFilter<T> extends ConsumerStage<T, T> { } @Override - protected void execute5(final T element) { + protected void execute(final T element) { Long timestamp = this.triggerInputPort.receive(); if (timestamp != null) { // this.logger.info("pipe.size: " + this.pipe.size()); System.out.println("pipe.size: " + this.pipe.size()); } - this.send(element); + this.send(this.outputPort, element); } public InputPort<Long> getTriggerInputPort() { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java index e3834bcd206507c8b2917b81d2ca16c94604598d..65e5f5b7656d6a931588eff63dd16c9a3f722fc0 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java @@ -1,13 +1,13 @@ package teetime.variant.methodcallWithPorts.framework.core; +import java.util.ArrayList; +import java.util.List; import java.util.UUID; -import teetime.util.list.CommittableQueue; - import kieker.common.logging.Log; import kieker.common.logging.LogFactory; -public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { +public abstract class AbstractStage implements StageWithPort { private final String id; /** @@ -15,51 +15,35 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { */ protected final Log logger; // BETTER use SLF4J as interface and logback as impl - private final InputPort<I> inputPort = new InputPort<I>(this); - private final OutputPort<O> outputPort = new OutputPort<O>(); - - private StageWithPort<?, ?> parentStage; + private StageWithPort parentStage; private boolean reschedulable; + private final List<InputPort<?>> inputPortList = new ArrayList<InputPort<?>>(); + private final List<OutputPort<?>> outputPortList = new ArrayList<OutputPort<?>>(); + + /** A cached instance of <code>inputPortList</code> to avoid creating an iterator each time iterating it */ + protected InputPort<?>[] cachedInputPorts; + /** A cached instance of <code>outputPortList</code> to avoid creating an iterator each time iterating it */ + protected OutputPort<?>[] cachedOutputPorts; + public AbstractStage() { this.id = UUID.randomUUID().toString(); // the id should only be represented by a UUID, not additionally by the class name this.logger = LogFactory.getLog(this.getClass().getName() + "(" + this.id + ")"); } - @Override - public InputPort<I> getInputPort() { - return this.inputPort; - } - - @Override - public OutputPort<O> getOutputPort() { - return this.outputPort; - } - - protected void execute4(final CommittableQueue<I> elements) { - throw new IllegalStateException(); // default implementation - } - - protected abstract void execute5(I element); - /** * Sends the given <code>element</code> using the default output port * * @param element * @return <code>true</code> iff the given element could be sent, <code>false</code> otherwise (then use a re-try strategy) */ - protected final boolean send(final O element) { - return this.send(this.getOutputPort(), element); - } - - protected final boolean send(final OutputPort<O> outputPort, final O element) { + protected final <O> boolean send(final OutputPort<O> outputPort, final O element) { if (!outputPort.send(element)) { return false; } - // StageWithPort<?, ?> next = outputPort.getPipe().getTargetPort().getOwningStage(); - StageWithPort<?, ?> next = outputPort.getCachedTargetStage(); + StageWithPort next = outputPort.getCachedTargetStage(); do { next.executeWithPorts(); // PERFORMANCE use the return value as indicator for re-schedulability instead @@ -68,29 +52,32 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { return true; } - // public void disable() { - // this.schedulingInformation.setActive(false); - // this.fireOnDisable(); - // } - - // private void fireOnDisable() { - // if (this.listener != null) { - // this.listener.onDisable(this, this.index); - // } - // } - @Override public void onStart() { + this.cachedInputPorts = this.inputPortList.toArray(new InputPort<?>[0]); + this.cachedOutputPorts = this.outputPortList.toArray(new OutputPort<?>[0]); + } + + protected void onFinished() { // empty default implementation + this.onIsPipelineHead(); + } + + protected InputPort<?>[] getInputPorts() { + return this.cachedInputPorts; + } + + protected OutputPort<?>[] getOutputPorts() { + return this.cachedOutputPorts; } @Override - public StageWithPort<?, ?> getParentStage() { + public StageWithPort getParentStage() { return this.parentStage; } @Override - public void setParentStage(final StageWithPort<?, ?> parentStage, final int index) { + public void setParentStage(final StageWithPort parentStage, final int index) { this.parentStage = parentStage; } @@ -124,12 +111,21 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { break; } - this.outputPort.sendSignal(signal); + for (OutputPort<?> outputPort : this.outputPortList) { + outputPort.sendSignal(signal); + } } - protected void onFinished() { - // empty default implementation - this.onIsPipelineHead(); + protected <T> InputPort<T> createInputPort() { + InputPort<T> inputPort = new InputPort<T>(this); + this.inputPortList.add(inputPort); + return inputPort; + } + + protected <T> OutputPort<T> createOutputPort() { + OutputPort<T> outputPort = new OutputPort<T>(); + this.outputPortList.add(outputPort); + return outputPort; } @Override diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ConsumerStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ConsumerStage.java index 9f17e535ed1d436660afdbf2bfb10e09f6af55ae..1540b1fb68436d0251ae42a871e870f4077049e1 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ConsumerStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ConsumerStage.java @@ -1,18 +1,21 @@ package teetime.variant.methodcallWithPorts.framework.core; -public abstract class ConsumerStage<I, O> extends AbstractStage<I, O> { +public abstract class ConsumerStage<I> extends AbstractStage { + + protected final InputPort<I> inputPort = this.createInputPort(); + + public final InputPort<I> getInputPort() { + return this.inputPort; + } @Override public void executeWithPorts() { - // if (this.logger.isDebugEnabled()) { - // this.logger.debug("Executing stage..."); - // } - - I element = this.getInputPort().receive(); + I element = this.inputPort.receive(); - this.setReschedulable(this.getInputPort().getPipe().size() > 0); + boolean isReschedulable = this.determineReschedulability(); + this.setReschedulable(isReschedulable); - this.execute5(element); + this.execute(element); } @Override @@ -20,4 +23,15 @@ public abstract class ConsumerStage<I, O> extends AbstractStage<I, O> { // do nothing } + /** + * + * @return <code>true</code> iff this stage makes progress when it is re-executed by the scheduler, otherwise <code>false</code>.<br> + * For example, many stages are re-schedulable if at least one of their input ports are not empty. + */ + protected boolean determineReschedulability() { + return this.inputPort.getPipe().size() > 0; + } + + protected abstract void execute(I element); + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InputPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InputPort.java index 6a614b766246772ac595be4e1796afc8ddd9e160..85f6ba79bee44fea528c78543e58f676fdbe171b 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InputPort.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InputPort.java @@ -4,10 +4,10 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe; public class InputPort<T> { - private final StageWithPort<?, ?> owningStage; + private final StageWithPort owningStage; private IPipe<T> pipe; - public InputPort(final StageWithPort<?, ?> owningStage) { + InputPort(final StageWithPort owningStage) { super(); this.owningStage = owningStage; } @@ -36,7 +36,7 @@ public class InputPort<T> { pipe.setTargetPort(this); } - public StageWithPort<?, ?> getOwningStage() { + public StageWithPort getOwningStage() { return this.owningStage; } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java index 6d5e197f2d91a669b6213ff7d6432b77cb615a27..9fff9078abfe43830445899ceaf502218ea47e4c 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java @@ -12,7 +12,11 @@ public class OutputPort<T> { * this.getPipe().getTargetPort().getOwningStage() * </pre> */ - private StageWithPort<?, ?> cachedTargetStage; + private StageWithPort cachedTargetStage; + + OutputPort() { + super(); + } /** * @@ -31,11 +35,11 @@ public class OutputPort<T> { this.pipe = pipe; } - public StageWithPort<?, ?> getCachedTargetStage() { + public StageWithPort getCachedTargetStage() { return this.cachedTargetStage; } - public void setCachedTargetStage(final StageWithPort<?, ?> cachedTargetStage) { + public void setCachedTargetStage(final StageWithPort cachedTargetStage) { this.cachedTargetStage = cachedTargetStage; } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java index 4e85387bd9b3ae724f94768ee5a6eef5d6563f6d..5b0c55060613704c740dc67a1b8c4a01756214f6 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java @@ -9,7 +9,7 @@ import kieker.common.logging.Log; import kieker.common.logging.LogFactory; // BETTER remove the pipeline since it does not add any new functionality -public class Pipeline<I, O> implements StageWithPort<I, O> { +public class Pipeline<I, O> implements StageWithPort { private final String id; /** @@ -17,17 +17,18 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { */ protected Log logger; - private StageWithPort<I, ?> firstStage; - private final List<StageWithPort<?, ?>> intermediateStages = new LinkedList<StageWithPort<?, ?>>(); - private StageWithPort<?, O> lastStage; + private StageWithPort firstStage; + private InputPort<I> firstStageInputPort; + private final List<StageWithPort> intermediateStages = new LinkedList<StageWithPort>(); + private StageWithPort lastStage; + private OutputPort<O> lastStageOutputPort; // BETTER remove the stage array and use the output ports instead for passing a signal to all stages in the same thread; what about multiple same signals due to // multiple input ports? - private StageWithPort<?, ?>[] stages; - private StageWithPort<?, ?> parentStage; + private StageWithPort[] stages; + private StageWithPort parentStage; // private int startIndex; - private boolean reschedulable; private int firstStageIndex; // private final Set<StageWithPort<?, ?>> currentHeads = new HashSet<StageWithPort<?, ?>>(); @@ -46,25 +47,27 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { return this.id; } - public void setFirstStage(final StageWithPort<I, ?> stage) { + public void setFirstStage(final StageWithPort stage, final InputPort<I> firstStageInputPort) { this.firstStage = stage; + this.firstStageInputPort = firstStageInputPort; } - public void addIntermediateStages(final StageWithPort<?, ?>... stages) { + public void addIntermediateStages(final StageWithPort... stages) { this.intermediateStages.addAll(Arrays.asList(stages)); } - public void addIntermediateStage(final StageWithPort<?, ?> stage) { + public void addIntermediateStage(final StageWithPort stage) { this.intermediateStages.add(stage); } - public void setLastStage(final StageWithPort<?, O> stage) { + public void setLastStage(final StageWithPort stage, final OutputPort<O> lastStageOutputPort) { this.lastStage = stage; + this.lastStageOutputPort = lastStageOutputPort; } @Override public void executeWithPorts() { - StageWithPort<?, ?> headStage = this.stages[this.firstStageIndex]; + StageWithPort headStage = this.stages[this.firstStageIndex]; // do { headStage.executeWithPorts(); @@ -106,7 +109,7 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { this.stages = new StageWithPort[size]; this.stages[0] = this.firstStage; for (int i = 0; i < this.intermediateStages.size(); i++) { - StageWithPort<?, ?> stage = this.intermediateStages.get(i); + StageWithPort stage = this.intermediateStages.get(i); this.stages[1 + i] = stage; } this.stages[this.stages.length - 1] = this.lastStage; @@ -123,18 +126,18 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { // } // this.stages[this.stages.length - 1].setSuccessor(new EndStage<Object>()); - for (StageWithPort<?, ?> stage : this.stages) { + for (StageWithPort stage : this.stages) { stage.onStart(); } } @Override - public StageWithPort<?, ?> getParentStage() { + public StageWithPort getParentStage() { return this.parentStage; } @Override - public void setParentStage(final StageWithPort<?, ?> parentStage, final int index) { + public void setParentStage(final StageWithPort parentStage, final int index) { this.parentStage = parentStage; } @@ -148,28 +151,12 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { // this.reschedulable = reschedulable; // } - @Override public InputPort<I> getInputPort() { - return this.firstStage.getInputPort(); // CACHE pipeline's input port + return this.firstStageInputPort; } - @Override public OutputPort<O> getOutputPort() { - return this.lastStage.getOutputPort(); // CACHE pipeline's output port - } - - // TODO remove since it does not increase performances - private void cleanUp() { - // for (int i = 0; i < this.stages.length; i++) { - // StageWithPort<?, ?> stage = this.stages[i]; - // // stage.setParentStage(null, i); - // // stage.setListener(null); - // // stage.setSuccessor(null); - // } - - this.firstStage = null; - this.intermediateStages.clear(); - this.lastStage = null; + return this.lastStageOutputPort; } @Override diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ProducerStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ProducerStage.java index 44646b809ffcc9ea66b8b25755300e299aab2018..c1e7c693cb122b2d8aa5370ef965185410c1a326 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ProducerStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ProducerStage.java @@ -1,6 +1,12 @@ package teetime.variant.methodcallWithPorts.framework.core; -public abstract class ProducerStage<I, O> extends AbstractStage<I, O> { +public abstract class ProducerStage<O> extends AbstractStage { + + protected final OutputPort<O> outputPort = this.createOutputPort(); + + public final OutputPort<O> getOutputPort() { + return this.outputPort; + } public ProducerStage() { this.setReschedulable(true); @@ -8,15 +14,7 @@ public abstract class ProducerStage<I, O> extends AbstractStage<I, O> { @Override public void executeWithPorts() { - // if (this.logger.isDebugEnabled()) { - // this.logger.debug("Executing stage..."); - // } - - this.execute5(null); - - // if (!this.getOutputPort().pipe.isEmpty()) { - // super.executeWithPorts(); - // } + this.execute(); } @Override @@ -24,4 +22,6 @@ public abstract class ProducerStage<I, O> extends AbstractStage<I, O> { // do nothing } + protected abstract void execute(); + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java index 2b0c752e9f6c0004b6a36eeb7967052515b26b34..47e14bdb8b56ade04d7bc288d31cd4b9201fbd62 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/RunnableStage.java @@ -5,10 +5,10 @@ import kieker.common.logging.LogFactory; public class RunnableStage<I> implements Runnable { - private final StageWithPort<I, ?> stage; + private final ConsumerStage<I> stage; private final Log logger; - public RunnableStage(final StageWithPort<I, ?> stage) { + public RunnableStage(final ConsumerStage<I> stage) { this.stage = stage; this.logger = LogFactory.getLog(stage.getClass()); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/StageWithPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/StageWithPort.java index 8c993238f87c32f1ae6622366ca1f9cd7bded17d..bbeb16316085dec098b2140d851c764b844bebe0 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/StageWithPort.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/StageWithPort.java @@ -1,18 +1,14 @@ package teetime.variant.methodcallWithPorts.framework.core; -public interface StageWithPort<I, O> { +public interface StageWithPort { String getId(); - InputPort<I> getInputPort(); - - OutputPort<O> getOutputPort(); - void executeWithPorts(); - StageWithPort<?, ?> getParentStage(); + StageWithPort getParentStage(); - void setParentStage(StageWithPort<?, ?> parentStage, int index); + void setParentStage(StageWithPort parentStage, int index); // void setListener(OnDisableListener listener); diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Cache.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Cache.java index b7904b19a2ec18a569c2ebff48453f7080e9a958..ea9bb2e9660b53e7b6f17721e321c204d159f304 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Cache.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Cache.java @@ -6,13 +6,16 @@ import java.util.concurrent.TimeUnit; import teetime.util.StopWatch; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; -public class Cache<T> extends ConsumerStage<T, T> { +public class Cache<T> extends ConsumerStage<T> { + + private final OutputPort<T> outputPort = this.createOutputPort(); private final List<T> cachedObjects = new LinkedList<T>(); @Override - protected void execute5(final T element) { + protected void execute(final T element) { this.cachedObjects.add(element); } @@ -22,11 +25,15 @@ public class Cache<T> extends ConsumerStage<T, T> { StopWatch stopWatch = new StopWatch(); stopWatch.start(); for (T cachedElement : this.cachedObjects) { - this.send(cachedElement); + this.send(this.outputPort, cachedElement); } stopWatch.end(); this.logger.debug("Emitting took " + TimeUnit.NANOSECONDS.toMillis(stopWatch.getDurationInNs()) + " ms"); super.onIsPipelineHead(); } + public OutputPort<T> getOutputPort() { + return this.outputPort; + } + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Clock.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Clock.java index 51658b1efad629c9839069490f482c20fe47aa22..1520b6097a364afe9086be53e49cce37d84e12a8 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Clock.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Clock.java @@ -1,9 +1,8 @@ package teetime.variant.methodcallWithPorts.stage; -import teetime.util.list.CommittableQueue; import teetime.variant.methodcallWithPorts.framework.core.ProducerStage; -public class Clock extends ProducerStage<Void, Long> { +public class Clock extends ProducerStage<Long> { private boolean initialDelayExceeded = false; @@ -11,13 +10,7 @@ public class Clock extends ProducerStage<Void, Long> { private long intervalDelayInMs; @Override - protected void execute4(final CommittableQueue<Void> elements) { - // TODO Auto-generated method stub - - } - - @Override - protected void execute5(final Void element) { + protected void execute() { if (!this.initialDelayExceeded) { this.initialDelayExceeded = true; this.sleep(this.initialDelayInMs); @@ -26,7 +19,7 @@ public class Clock extends ProducerStage<Void, Long> { } // this.logger.debug("Emitting timestamp"); - this.send(this.getCurrentTimeInNs()); + this.send(this.outputPort, this.getCurrentTimeInNs()); } private void sleep(final long delayInMs) { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/CollectorSink.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/CollectorSink.java index b172d211f53696f5c031ac4c5a02b7c8ba2bbb29..82f2ec8d81e1e226eb31bfdb3b07e5dc5a017e8c 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/CollectorSink.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/CollectorSink.java @@ -17,7 +17,6 @@ package teetime.variant.methodcallWithPorts.stage; import java.util.List; -import teetime.util.list.CommittableQueue; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; /** @@ -25,7 +24,7 @@ import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; * * @since 1.10 */ -public class CollectorSink<T> extends ConsumerStage<T, Void> { +public class CollectorSink<T> extends ConsumerStage<T> { private final List<T> elements; private final int threshold; @@ -45,13 +44,7 @@ public class CollectorSink<T> extends ConsumerStage<T, Void> { } @Override - protected void execute4(final CommittableQueue<T> elements) { - T element = elements.removeFromHead(); - this.execute5(element); - } - - @Override - protected void execute5(final T element) { + protected void execute(final T element) { this.elements.add(element); if ((this.elements.size() % this.threshold) == 0) { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Counter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Counter.java index bd11514e87d2dfadfee9c8b9f66f0596496902ae..88b446f00520cf29c130ff90a7a41caca0de31f9 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Counter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Counter.java @@ -1,16 +1,19 @@ package teetime.variant.methodcallWithPorts.stage; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; -public class Counter<T> extends ConsumerStage<T, T> { +public class Counter<T> extends ConsumerStage<T> { + + private final OutputPort<T> outputPort = this.createOutputPort(); private int numElementsPassed; @Override - protected void execute5(final T element) { + protected void execute(final T element) { this.numElementsPassed++; // this.logger.debug("count: " + this.numElementsPassed); - this.send(element); + this.send(this.outputPort, element); } // BETTER find a solution w/o any thread-safe code in this stage @@ -18,4 +21,7 @@ public class Counter<T> extends ConsumerStage<T, T> { return this.numElementsPassed; } + public OutputPort<T> getOutputPort() { + return this.outputPort; + } } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementDelayMeasuringStage.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementDelayMeasuringStage.java index 771cde6a08ebaac1bcd8aa43ba63e1d1b3160b47..72213b5db89716943e76fc2b2ecf64fbf5715bef 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementDelayMeasuringStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementDelayMeasuringStage.java @@ -5,10 +5,12 @@ import java.util.List; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; import teetime.variant.methodcallWithPorts.framework.core.InputPort; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; -public class ElementDelayMeasuringStage<T> extends ConsumerStage<T, T> { +public class ElementDelayMeasuringStage<T> extends ConsumerStage<T> { - private final InputPort<Long> triggerInputPort = new InputPort<Long>(this); + private final InputPort<Long> triggerInputPort = this.createInputPort(); + private final OutputPort<T> outputPort = this.createOutputPort(); private long numPassedElements; private long lastTimestampInNs; @@ -16,13 +18,14 @@ public class ElementDelayMeasuringStage<T> extends ConsumerStage<T, T> { private final List<Long> delays = new LinkedList<Long>(); @Override - protected void execute5(final T element) { + protected void execute(final T element) { Long timestampInNs = this.triggerInputPort.receive(); if (timestampInNs != null) { this.computeElementDelay(System.nanoTime()); } + this.numPassedElements++; - this.send(element); + this.send(this.outputPort, element); } @Override @@ -55,4 +58,8 @@ public class ElementDelayMeasuringStage<T> extends ConsumerStage<T, T> { return this.triggerInputPort; } + public OutputPort<T> getOutputPort() { + return outputPort; + } + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java index 9b742a058bc3789c7f267bc7626f23b3beb4cae7..a73ec83c1d365380ffff46734d95632f590f162f 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java @@ -6,10 +6,12 @@ import java.util.concurrent.TimeUnit; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; import teetime.variant.methodcallWithPorts.framework.core.InputPort; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; -public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T, T> { +public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T> { - private final InputPort<Long> triggerInputPort = new InputPort<Long>(this); + private final InputPort<Long> triggerInputPort = this.createInputPort(); + private final OutputPort<T> outputPort = this.createOutputPort(); private long numPassedElements; private long lastTimestampInNs; @@ -17,14 +19,14 @@ public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T, T> { private final List<Long> throughputs = new LinkedList<Long>(); @Override - protected void execute5(final T element) { + protected void execute(final T element) { Long timestampInNs = this.triggerInputPort.receive(); if (timestampInNs != null) { this.computeElementThroughput(System.nanoTime()); } this.numPassedElements++; - this.send(element); + this.send(this.outputPort, element); } @Override @@ -72,4 +74,8 @@ public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T, T> { return this.triggerInputPort; } + public OutputPort<T> getOutputPort() { + return outputPort; + } + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/EndStage.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/EndStage.java deleted file mode 100644 index 87f6968bab4e4810f0c50e24fc2fcac807543de9..0000000000000000000000000000000000000000 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/EndStage.java +++ /dev/null @@ -1,80 +0,0 @@ -package teetime.variant.methodcallWithPorts.stage; - -import java.util.UUID; - -import teetime.util.ConstructorClosure; -import teetime.variant.methodcallWithPorts.framework.core.InputPort; -import teetime.variant.methodcallWithPorts.framework.core.OutputPort; -import teetime.variant.methodcallWithPorts.framework.core.Signal; -import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; - -public class EndStage<T> implements StageWithPort<T, T> { - - private final InputPort<T> inputPort = new InputPort<T>(this); - - // public int count; - public ConstructorClosure<?> closure; - // public List<Object> list = new LinkedList<Object>(); - - private final String id; - - public EndStage() { - this.id = UUID.randomUUID().toString(); // the id should only be represented by a UUID, not additionally by the class name - } - - @Override - public void onIsPipelineHead() { - // do nothing - } - - @Override - public StageWithPort<?, ?> getParentStage() { - return null; - } - - @Override - public void setParentStage(final StageWithPort<?, ?> parentStage, final int index) { - // do nothing - - } - - @Override - public boolean isReschedulable() { - return false; - } - - @Override - public InputPort<T> getInputPort() { - return this.inputPort; - } - - @Override - public OutputPort<T> getOutputPort() { - return null; - } - - @Override - public void executeWithPorts() { - this.getInputPort().receive(); // just consume - // do nothing - // this.count++; - // Object r = this.closure.execute(null); - // this.list.add(r); - } - - @Override - public void onStart() { - // do nothing - } - - @Override - public void onSignal(final Signal signal, final InputPort<?> inputPort) { - // do nothing - } - - @Override - public String getId() { - return this.id; - } - -} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/FileExtensionSwitch.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/FileExtensionSwitch.java index 044a2ce0ba14b2dd0cc067fc36ab9f261864ee1b..235cd3608dd5c5cee966b45e8e554ac5c7f6cd1e 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/FileExtensionSwitch.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/FileExtensionSwitch.java @@ -5,20 +5,16 @@ import java.util.HashMap; import java.util.Map; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; -import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; -import teetime.variant.methodcallWithPorts.framework.core.Signal; import com.google.common.io.Files; -public class FileExtensionSwitch extends ConsumerStage<File, File> { - - // BETTER do not extends from AbstractStage since it provides another unused output port +public class FileExtensionSwitch extends ConsumerStage<File> { private final Map<String, OutputPort<File>> fileExtensions = new HashMap<String, OutputPort<File>>(); @Override - protected void execute5(final File file) { + protected void execute(final File file) { String fileExtension = Files.getFileExtension(file.getAbsolutePath()); this.logger.debug("fileExtension: " + fileExtension); OutputPort<File> outputPort = this.fileExtensions.get(fileExtension); @@ -27,29 +23,11 @@ public class FileExtensionSwitch extends ConsumerStage<File, File> { } } - @Override - public void onSignal(final Signal signal, final InputPort<?> inputPort) { - this.logger.debug("Got signal: " + signal + " from input port: " + inputPort); - - switch (signal) { - case FINISHED: - this.onFinished(); - break; - default: - this.logger.warn("Aborted sending signal " + signal + ". Reason: Unknown signal."); - break; - } - - for (OutputPort<File> op : this.fileExtensions.values()) { - op.sendSignal(signal); - } - } - public OutputPort<File> addFileExtension(String fileExtension) { if (fileExtension.startsWith(".")) { fileExtension = fileExtension.substring(1); } - OutputPort<File> outputPort = new OutputPort<File>(); + OutputPort<File> outputPort = this.createOutputPort(); this.fileExtensions.put(fileExtension, outputPort); this.logger.debug("SUCCESS: Registered output port for '" + fileExtension + "'"); return outputPort; diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/InstanceCounter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/InstanceCounter.java index 0da3a138481376c5ede170d9d16eb7038e57b74c..7e5652fc89d2b84ae37f82ca4f50c1b309b4705c 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/InstanceCounter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/InstanceCounter.java @@ -1,8 +1,11 @@ package teetime.variant.methodcallWithPorts.stage; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; -public class InstanceCounter<T, C extends T> extends ConsumerStage<T, T> { +public class InstanceCounter<T, C extends T> extends ConsumerStage<T> { + + private final OutputPort<T> outputPort = this.createOutputPort(); private final Class<C> type; private int counter; @@ -12,16 +15,20 @@ public class InstanceCounter<T, C extends T> extends ConsumerStage<T, T> { } @Override - protected void execute5(final T element) { + protected void execute(final T element) { if (this.type.isInstance(element)) { this.counter++; } - this.send(element); + this.send(this.outputPort, element); } public int getCounter() { return this.counter; } + public OutputPort<T> getOutputPort() { + return outputPort; + } + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/InstanceOfFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/InstanceOfFilter.java index bf3ca12bb74b3f329c79dad6e8576e8f4719e063..79596765a99cf8f56c61f9f5490e6e5c0c9a9645 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/InstanceOfFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/InstanceOfFilter.java @@ -1,12 +1,15 @@ package teetime.variant.methodcallWithPorts.stage; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; /** * @author Jan Waller, Nils Christian Ehmke, Christian Wulf * */ -public class InstanceOfFilter<I, O> extends ConsumerStage<I, O> { +public class InstanceOfFilter<I, O> extends ConsumerStage<I> { + + private final OutputPort<O> outputPort = this.createOutputPort(); private Class<O> type; @@ -16,12 +19,12 @@ public class InstanceOfFilter<I, O> extends ConsumerStage<I, O> { @SuppressWarnings("unchecked") @Override - protected void execute5(final I element) { + protected void execute(final I element) { if (this.type.isInstance(element)) { - this.send((O) element); + this.send(this.outputPort, (O) element); } else { // swallow up the element if (this.logger.isDebugEnabled()) { - this.logger.debug("element is not an instance of " + this.type.getName() + ", but of " + element.getClass()); + this.logger.info("element is not an instance of " + this.type.getName() + ", but of " + element.getClass()); } } } @@ -34,4 +37,8 @@ public class InstanceOfFilter<I, O> extends ConsumerStage<I, O> { this.type = type; } + public OutputPort<O> getOutputPort() { + return this.outputPort; + } + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/NoopFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/NoopFilter.java index 69110cb6d6fcdc73a7eff2a9189be703dc22dddb..385e4cbfc52c0e457b08b6146a7779d898c1674d 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/NoopFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/NoopFilter.java @@ -15,31 +15,25 @@ ***************************************************************************/ package teetime.variant.methodcallWithPorts.stage; -import teetime.util.list.CommittableQueue; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; /** * @author Christian Wulf * * @since 1.10 */ -public class NoopFilter<T> extends ConsumerStage<T, T> { +public class NoopFilter<T> extends ConsumerStage<T> { - // @Override - // public void execute3() { - // T element = this.getInputPort().receive(); - // // this.getOutputPort().send(element); - // } + private final OutputPort<T> outputPort = this.createOutputPort(); @Override - protected void execute4(final CommittableQueue<T> elements) { - T element = elements.removeFromHead(); - this.execute5(element); + protected void execute(final T element) { + this.send(this.outputPort, element); } - @Override - protected void execute5(final T element) { - this.send(element); // "send" calls the next stage and so on + public OutputPort<T> getOutputPort() { + return this.outputPort; } } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/ObjectProducer.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/ObjectProducer.java index b99078f94501f9e70d6d0892b4eb5aa0fffa2996..028d8709239c43e1a6659465d8044af7acc0e093 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/ObjectProducer.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/ObjectProducer.java @@ -16,7 +16,6 @@ package teetime.variant.methodcallWithPorts.stage; import teetime.util.ConstructorClosure; -import teetime.util.list.CommittableQueue; import teetime.variant.methodcallWithPorts.framework.core.ProducerStage; /** @@ -24,7 +23,7 @@ import teetime.variant.methodcallWithPorts.framework.core.ProducerStage; * * @since 1.10 */ -public class ObjectProducer<T> extends ProducerStage<Void, T> { +public class ObjectProducer<T> extends ProducerStage<T> { private long numInputObjects; private ConstructorClosure<T> inputObjectCreator; @@ -54,12 +53,7 @@ public class ObjectProducer<T> extends ProducerStage<Void, T> { } @Override - protected void execute4(final CommittableQueue<Void> elements) { - this.execute5(null); - } - - @Override - protected void execute5(final Void element) { + protected void execute() { // this.logger.debug("Executing object producer..."); T newObject = null; @@ -72,7 +66,7 @@ public class ObjectProducer<T> extends ProducerStage<Void, T> { } // System.out.println(this.getClass().getSimpleName() + ": sending " + this.numInputObjects); - this.send(newObject); + this.send(this.outputPort, newObject); } } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java index 98cc5b977113d20b0af0ca785e8388564c73f64c..f8456a931803d15d7140f5d331f0856e4ad89032 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java @@ -1,11 +1,15 @@ package teetime.variant.methodcallWithPorts.stage; -import teetime.util.list.CommittableQueue; import teetime.variant.methodcallWithPorts.framework.core.AbstractStage; +import teetime.variant.methodcallWithPorts.framework.core.InputPort; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; import teetime.variant.methodcallWithPorts.framework.core.Signal; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; -public class Relay<T> extends AbstractStage<T, T> { +public class Relay<T> extends AbstractStage { + + private final InputPort<T> inputPort = this.createInputPort(); + private final OutputPort<T> outputPort = this.createOutputPort(); private SpScPipe<T> cachedCastedInputPipe; @@ -15,22 +19,22 @@ public class Relay<T> extends AbstractStage<T, T> { @Override public void executeWithPorts() { - T element = this.getInputPort().receive(); + T element = this.inputPort.receive(); if (null == element) { // if (this.getInputPort().getPipe().isClosed()) { if (this.cachedCastedInputPipe.getSignal() == Signal.FINISHED) { this.setReschedulable(false); - assert 0 == this.getInputPort().getPipe().size(); + assert 0 == this.inputPort.getPipe().size(); } Thread.yield(); return; } - this.send(element); + this.send(this.outputPort, element); } @Override public void onStart() { - this.cachedCastedInputPipe = (SpScPipe<T>) this.getInputPort().getPipe(); + this.cachedCastedInputPipe = (SpScPipe<T>) this.inputPort.getPipe(); super.onStart(); } @@ -41,16 +45,11 @@ public class Relay<T> extends AbstractStage<T, T> { // } } - @Override - protected void execute4(final CommittableQueue<T> elements) { - // TODO Auto-generated method stub - + public InputPort<T> getInputPort() { + return this.inputPort; } - @Override - protected void execute5(final T element) { - // TODO Auto-generated method stub - + public OutputPort<T> getOutputPort() { + return this.outputPort; } - } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Sink.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Sink.java deleted file mode 100644 index b4485b35fd5cda94f4aa71af8f4932897a818f2b..0000000000000000000000000000000000000000 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Sink.java +++ /dev/null @@ -1,19 +0,0 @@ -package teetime.variant.methodcallWithPorts.stage; - -import teetime.util.list.CommittableQueue; -import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; - -public class Sink<T> extends ConsumerStage<T, T> { - - @Override - protected void execute4(final CommittableQueue<T> elements) { - // TODO Auto-generated method stub - - } - - @Override - protected void execute5(final T element) { - // do nothing - } - -} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/StartTimestampFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/StartTimestampFilter.java index a35bf75f8bcb287526871274858eff9f23d32b0a..2b10cdf378e44363780aa52790485d50ce3d1f9d 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/StartTimestampFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/StartTimestampFilter.java @@ -15,33 +15,26 @@ ***************************************************************************/ package teetime.variant.methodcallWithPorts.stage; -import teetime.util.list.CommittableQueue; import teetime.variant.explicitScheduling.examples.throughput.TimestampObject; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; /** * @author Christian Wulf * * @since 1.10 */ -public class StartTimestampFilter extends ConsumerStage<TimestampObject, TimestampObject> { +public class StartTimestampFilter extends ConsumerStage<TimestampObject> { - // @Override - // public void execute3() { - // TimestampObject element = this.getInputPort().receive(); - // element.setStartTimestamp(System.nanoTime()); - // // this.getOutputPort().send(element); - // } + private final OutputPort<TimestampObject> outputPort = this.createOutputPort(); @Override - protected void execute4(final CommittableQueue<TimestampObject> elements) { - TimestampObject element = elements.removeFromHead(); - this.execute5(element); + protected void execute(final TimestampObject element) { + element.setStartTimestamp(System.nanoTime()); + this.send(this.outputPort, element); } - @Override - protected void execute5(final TimestampObject element) { - element.setStartTimestamp(System.nanoTime()); - this.send(element); + public OutputPort<TimestampObject> getOutputPort() { + return outputPort; } } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/StopTimestampFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/StopTimestampFilter.java index a1601cb4c06e4aba99b07adbc0fde06ff6acfc78..220780e787dd9e03c2d4e82bc73b446fd8d24750 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/StopTimestampFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/StopTimestampFilter.java @@ -15,33 +15,26 @@ ***************************************************************************/ package teetime.variant.methodcallWithPorts.stage; -import teetime.util.list.CommittableQueue; import teetime.variant.explicitScheduling.examples.throughput.TimestampObject; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; /** * @author Christian Wulf * * @since 1.10 */ -public class StopTimestampFilter extends ConsumerStage<TimestampObject, TimestampObject> { +public class StopTimestampFilter extends ConsumerStage<TimestampObject> { - // @Override - // public void execute3() { - // TimestampObject element = this.getInputPort().receive(); - // element.setStopTimestamp(System.nanoTime()); - // // this.getOutputPort().send(element); - // } + private final OutputPort<TimestampObject> outputPort = this.createOutputPort(); @Override - protected void execute4(final CommittableQueue<TimestampObject> elements) { - TimestampObject element = elements.removeFromHead(); - this.execute5(element); + protected void execute(final TimestampObject element) { + element.setStopTimestamp(System.nanoTime()); + this.send(this.outputPort, element); } - @Override - protected void execute5(final TimestampObject element) { - element.setStopTimestamp(System.nanoTime()); - this.send(element); + public OutputPort<TimestampObject> getOutputPort() { + return this.outputPort; } } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Delay.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/Delay.java similarity index 58% rename from src/main/java/teetime/variant/methodcallWithPorts/stage/Delay.java rename to src/main/java/teetime/variant/methodcallWithPorts/stage/basic/Delay.java index 5c839a7f795e50fb362747475ee1edfdc39a8d27..5b234be8a879ef539f43f0eda71bfe7793e36eae 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Delay.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/Delay.java @@ -1,16 +1,14 @@ -package teetime.variant.methodcallWithPorts.stage; +package teetime.variant.methodcallWithPorts.stage.basic; -import teetime.util.list.CommittableQueue; import teetime.variant.methodcallWithPorts.framework.core.AbstractStage; import teetime.variant.methodcallWithPorts.framework.core.InputPort; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; -public class Delay<T> extends AbstractStage<T, T> { +public class Delay<T> extends AbstractStage { - private final InputPort<Long> timestampTriggerInputPort = new InputPort<Long>(this); - - public Delay() { - // this.setReschedulable(true); - } + private final InputPort<T> inputPort = this.createInputPort(); + private final InputPort<Long> timestampTriggerInputPort = this.createInputPort(); + private final OutputPort<T> outputPort = this.createOutputPort(); @Override public void executeWithPorts() { @@ -22,9 +20,9 @@ public class Delay<T> extends AbstractStage<T, T> { // System.out.println("#elements: " + this.getInputPort().pipe.size()); // TODO implement receiveAll() and sendMultiple() - while (!this.getInputPort().getPipe().isEmpty()) { - T element = this.getInputPort().receive(); - this.send(element); + while (!this.inputPort.getPipe().isEmpty()) { + T element = this.inputPort.receive(); + this.send(this.outputPort, element); } // this.setReschedulable(this.getInputPort().pipe.size() > 0); @@ -37,20 +35,16 @@ public class Delay<T> extends AbstractStage<T, T> { this.setReschedulable(true); } - @Override - protected void execute4(final CommittableQueue<T> elements) { - // TODO Auto-generated method stub - - } - - @Override - protected void execute5(final T element) { - // TODO Auto-generated method stub - + public InputPort<T> getInputPort() { + return this.inputPort; } public InputPort<Long> getTimestampTriggerInputPort() { return this.timestampTriggerInputPort; } + public OutputPort<T> getOutputPort() { + return this.outputPort; + } + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/Sink.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/Sink.java new file mode 100644 index 0000000000000000000000000000000000000000..b9e0bca2d124287f6c6afdc5e2123e61bc6481e2 --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/Sink.java @@ -0,0 +1,14 @@ +package teetime.variant.methodcallWithPorts.stage.basic; + +import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; + +public class Sink<T> extends ConsumerStage<T> { + + // PERFORMANCE let the sink remove all available input at once by using a new method receiveAll() that clears the pipe's buffer + + @Override + protected void execute(final T element) { + // do nothing; just consume + } + +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/CloneStrategy.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/CloneStrategy.java index 523b150eb37461ad17ec6e4180b8cb138e4db9f7..5ad9ab05f7d858b63239dd752404bb06ae2f86d6 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/CloneStrategy.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/CloneStrategy.java @@ -15,8 +15,6 @@ ***************************************************************************/ package teetime.variant.methodcallWithPorts.stage.basic.distributor; -import java.util.List; - import teetime.variant.methodcallWithPorts.framework.core.OutputPort; /** @@ -27,7 +25,7 @@ import teetime.variant.methodcallWithPorts.framework.core.OutputPort; public final class CloneStrategy<T> implements IDistributorStrategy<T> { @Override - public boolean distribute(final List<OutputPort<T>> outputPorts, final T element) { + public boolean distribute(final OutputPort<T>[] outputPorts, final T element) { throw new UnsupportedOperationException(); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/CopyByReferenceStrategy.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/CopyByReferenceStrategy.java index 0fa5efc0b43541919a75df69e4caa83e56e3853d..9445ae972f262cb23a3f7b7aaa023aeac075a239 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/CopyByReferenceStrategy.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/CopyByReferenceStrategy.java @@ -15,8 +15,6 @@ ***************************************************************************/ package teetime.variant.methodcallWithPorts.stage.basic.distributor; -import java.util.List; - import teetime.variant.methodcallWithPorts.framework.core.OutputPort; /** @@ -27,7 +25,7 @@ import teetime.variant.methodcallWithPorts.framework.core.OutputPort; public final class CopyByReferenceStrategy<T> implements IDistributorStrategy<T> { @Override - public boolean distribute(final List<OutputPort<T>> outputPorts, final T element) { + public boolean distribute(final OutputPort<T>[] outputPorts, final T element) { for (final OutputPort<T> port : outputPorts) { port.send(element); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/Distributor.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/Distributor.java index dd9418a898c79c72a669993c2427666dc69167a8..10c74c11e9d40a1ef10aa15e0a5f353ed0262343 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/Distributor.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/Distributor.java @@ -16,13 +16,8 @@ package teetime.variant.methodcallWithPorts.stage.basic.distributor; -import java.util.ArrayList; -import java.util.List; - -import teetime.variant.methodcallWithPorts.framework.core.AbstractStage; -import teetime.variant.methodcallWithPorts.framework.core.InputPort; +import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; -import teetime.variant.methodcallWithPorts.framework.core.Signal; /** * @author Christian Wulf @@ -32,27 +27,14 @@ import teetime.variant.methodcallWithPorts.framework.core.Signal; * @param T * the type of the input port and the output ports */ -public class Distributor<T> extends AbstractStage<T, T> { - - // TODO do not inherit from AbstractStage since it provides the default output port that is unnecessary for the distributor ConsumerStage<T, T> { - - // BETTER use an array since a list always creates a new iterator when looping - private final List<OutputPort<T>> outputPortList = new ArrayList<OutputPort<T>>(); +public class Distributor<T> extends ConsumerStage<T> { private IDistributorStrategy<T> strategy = new RoundRobinStrategy<T>(); + @SuppressWarnings("unchecked") @Override - public void executeWithPorts() { - T element = this.getInputPort().receive(); - - this.setReschedulable(this.getInputPort().getPipe().size() > 0); - - this.execute5(element); - } - - @Override - protected void execute5(final T element) { - this.strategy.distribute(this.outputPortList, element); + protected void execute(final T element) { + this.strategy.distribute((OutputPort<T>[]) this.getOutputPorts(), element); } @Override @@ -63,37 +45,8 @@ public class Distributor<T> extends AbstractStage<T, T> { // } } - @Override - public void onSignal(final Signal signal, final InputPort<?> inputPort) { - this.logger.info("Got signal: " + signal + " from input port: " + inputPort); - - switch (signal) { - case FINISHED: - this.onFinished(); - break; - default: - this.logger.warn("Aborted sending signal " + signal + ". Reason: Unknown signal."); - break; - } - - for (OutputPort<T> op : this.outputPortList) { - op.sendSignal(signal); - } - } - - @Override - public OutputPort<T> getOutputPort() { - return this.getNewOutputPort(); - } - public OutputPort<T> getNewOutputPort() { - final OutputPort<T> newOutputPort = new OutputPort<T>(); - this.outputPortList.add(newOutputPort); - return newOutputPort; - } - - public List<OutputPort<T>> getOutputPortList() { - return this.outputPortList; + return this.createOutputPort(); } public IDistributorStrategy<T> getStrategy() { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/IDistributorStrategy.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/IDistributorStrategy.java index 3906263d2e4293146b0b8bc8f0d07525d1fefd43..fae7a25bfaaaf06e75a9b4308fdc01d548c9c481 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/IDistributorStrategy.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/IDistributorStrategy.java @@ -15,8 +15,6 @@ ***************************************************************************/ package teetime.variant.methodcallWithPorts.stage.basic.distributor; -import java.util.List; - import teetime.variant.methodcallWithPorts.framework.core.OutputPort; /** @@ -26,6 +24,6 @@ import teetime.variant.methodcallWithPorts.framework.core.OutputPort; */ public interface IDistributorStrategy<T> { - public boolean distribute(final List<OutputPort<T>> allOutputPorts, final T element); + public boolean distribute(final OutputPort<T>[] allOutputPorts, final T element); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/RoundRobinStrategy.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/RoundRobinStrategy.java index 0a3dfb7888ff595430aab79134fbff63f100b304..e8b9279a6de87891250a18e4fd52794214f02376 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/RoundRobinStrategy.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/distributor/RoundRobinStrategy.java @@ -15,8 +15,6 @@ ***************************************************************************/ package teetime.variant.methodcallWithPorts.stage.basic.distributor; -import java.util.List; - import teetime.variant.methodcallWithPorts.framework.core.OutputPort; /** @@ -29,7 +27,7 @@ public final class RoundRobinStrategy<T> implements IDistributorStrategy<T> { private int index = 0; @Override - public boolean distribute(final List<OutputPort<T>> outputPorts, final T element) { + public boolean distribute(final OutputPort<T>[] outputPorts, final T element) { final OutputPort<T> outputPort = this.getNextPortInRoundRobinOrder(outputPorts); outputPort.send(element); @@ -37,10 +35,10 @@ public final class RoundRobinStrategy<T> implements IDistributorStrategy<T> { return true; } - private OutputPort<T> getNextPortInRoundRobinOrder(final List<OutputPort<T>> outputPorts) { - final OutputPort<T> outputPort = outputPorts.get(this.index); + private OutputPort<T> getNextPortInRoundRobinOrder(final OutputPort<T>[] outputPorts) { + final OutputPort<T> outputPort = outputPorts[this.index]; - this.index = (this.index + 1) % outputPorts.size(); + this.index = (this.index + 1) % outputPorts.length; return outputPort; } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java index 99b69911238dfadd2d517fe126efd2df2b2f6c65..3d70d4b9256c3387b9c5606a757d6d861f1c212d 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java @@ -16,13 +16,13 @@ package teetime.variant.methodcallWithPorts.stage.basic.merger; -import java.util.ArrayList; -import java.util.List; - -import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; +import teetime.variant.methodcallWithPorts.framework.core.AbstractStage; import teetime.variant.methodcallWithPorts.framework.core.InputPort; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; import teetime.variant.methodcallWithPorts.framework.core.Signal; +import kieker.common.record.IMonitoringRecord; + /** * * This stage merges data from the input ports, by taking elements according to the chosen merge strategy and by putting them to the output port. @@ -34,36 +34,29 @@ import teetime.variant.methodcallWithPorts.framework.core.Signal; * @param <T> * the type of the input ports and the output port */ -public class Merger<T> extends ConsumerStage<T, T> { +public class Merger<T> extends AbstractStage { - // TODO do not inherit from AbstractStage since it provides the default input port that is unnecessary for the merger - - // BETTER use an array since a list always creates a new iterator when looping - private final List<InputPort<T>> inputPortList = new ArrayList<InputPort<T>>(); + private final OutputPort<T> outputPort = this.createOutputPort(); private int finishedInputPorts; private IMergerStrategy<T> strategy = new RoundRobinStrategy<T>(); - public IMergerStrategy<T> getStrategy() { - return this.strategy; - } - - public void setStrategy(final IMergerStrategy<T> strategy) { - this.strategy = strategy; - } - @Override public void executeWithPorts() { - // if (this.logger.isDebugEnabled()) { - // this.logger.debug("Executing stage..."); - // } + final T token = this.strategy.getNextInput(this); + if (token == null) { + return; + } - this.execute5(null); + this.send(this.outputPort, token); boolean isReschedulable = false; - for (InputPort<T> inputPort : this.inputPortList) { - isReschedulable = isReschedulable || !inputPort.getPipe().isEmpty(); + for (InputPort<?> inputPort : this.getInputPorts()) { + if (!inputPort.getPipe().isEmpty()) { + isReschedulable = true; + break; + } } this.setReschedulable(isReschedulable); } @@ -81,39 +74,35 @@ public class Merger<T> extends ConsumerStage<T, T> { break; } - if (this.finishedInputPorts == this.inputPortList.size()) { - this.getOutputPort().sendSignal(signal); + if (this.finishedInputPorts == this.getInputPorts().length) { + this.outputPort.sendSignal(signal); } } @Override - protected void onFinished() { + public void onIsPipelineHead() { this.finishedInputPorts++; } - @Override - protected void execute5(final T element) { - final T token = this.strategy.getNextInput(this); - if (token == null) { - return; - } + public IMergerStrategy<T> getStrategy() { + return this.strategy; + } - this.send(token); + public void setStrategy(final IMergerStrategy<T> strategy) { + this.strategy = strategy; } @Override - public InputPort<T> getInputPort() { - return this.getNewInputPort(); + public InputPort<?>[] getInputPorts() { + return super.getInputPorts(); } - private InputPort<T> getNewInputPort() { - InputPort<T> inputPort = new InputPort<T>(this); - this.inputPortList.add(inputPort); - return inputPort; + public InputPort<IMonitoringRecord> getNewInputPort() { + return this.createInputPort(); } - public List<InputPort<T>> getInputPortList() { - return this.inputPortList; + public OutputPort<T> getOutputPort() { + return this.outputPort; } } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/RoundRobinStrategy.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/RoundRobinStrategy.java index 337b7662b74b88f19c62d59c82c8a156b17e7e86..bacfaa7a0d313fee0784e8663b28800634ed12de 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/RoundRobinStrategy.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/RoundRobinStrategy.java @@ -15,8 +15,6 @@ ***************************************************************************/ package teetime.variant.methodcallWithPorts.stage.basic.merger; -import java.util.List; - import teetime.variant.methodcallWithPorts.framework.core.InputPort; /** @@ -30,8 +28,9 @@ public final class RoundRobinStrategy<T> implements IMergerStrategy<T> { @Override public T getNextInput(final Merger<T> merger) { - List<InputPort<T>> inputPorts = merger.getInputPortList(); - int size = inputPorts.size(); + @SuppressWarnings("unchecked") + InputPort<T>[] inputPorts = (InputPort<T>[]) merger.getInputPorts(); + int size = inputPorts.length; // check each port at most once to avoid a potentially infinite loop while (size-- > 0) { InputPort<T> inputPort = this.getNextPortInRoundRobinOrder(inputPorts); @@ -43,10 +42,10 @@ public final class RoundRobinStrategy<T> implements IMergerStrategy<T> { return null; } - private InputPort<T> getNextPortInRoundRobinOrder(final List<InputPort<T>> inputPorts) { - InputPort<T> inputPort = inputPorts.get(this.index); + private InputPort<T> getNextPortInRoundRobinOrder(final InputPort<T>[] inputPorts) { + InputPort<T> inputPort = inputPorts[this.index]; - this.index = (this.index + 1) % inputPorts.size(); + this.index = (this.index + 1) % inputPorts.length; return inputPort; } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/DbReader.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/DbReader.java index a5079f94a3201a1f794ab281fef8353d93491249..d8746827b4a62f42c51a8d4d2c52497f95dc9ea4 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/DbReader.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/DbReader.java @@ -37,7 +37,7 @@ import kieker.common.record.IMonitoringRecord; * @since 1.10 */ @Description("A reader which reads records from a database") -public class DbReader extends ProducerStage<Void, IMonitoringRecord> { +public class DbReader extends ProducerStage<IMonitoringRecord> { @Description("The classname of the driver used for the connection.") private String driverClassname = "org.apache.derby.jdbc.EmbeddedDrive"; @@ -65,7 +65,7 @@ public class DbReader extends ProducerStage<Void, IMonitoringRecord> { // } @Override - protected void execute5(final Void element) { + protected void execute() { Connection connection = null; try { connection = DriverManager.getConnection(this.connectionString); @@ -163,7 +163,7 @@ public class DbReader extends ProducerStage<Void, IMonitoringRecord> { } final IMonitoringRecord record = AbstractMonitoringRecord.createFromArray(clazz, recordValues); record.setLoggingTimestamp(records.getLong(2)); - this.send(record); + this.send(this.outputPort, record); } } finally { if (records != null) { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/Directory2FilesFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/Directory2FilesFilter.java index 3e728acbadacb011dd4756ce721f6c81b43b00c0..1107e60541a2529fe59ac2e14c60f0beb901e38b 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/Directory2FilesFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/Directory2FilesFilter.java @@ -22,13 +22,16 @@ import java.util.Arrays; import java.util.Comparator; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; /** * @author Christian Wulf * * @since 1.10 */ -public class Directory2FilesFilter extends ConsumerStage<File, File> { +public class Directory2FilesFilter extends ConsumerStage<File> { + + private final OutputPort<File> outputPort = this.createOutputPort(); private FileFilter filter; private Comparator<File> fileComparator; @@ -63,7 +66,7 @@ public class Directory2FilesFilter extends ConsumerStage<File, File> { } @Override - protected void execute5(final File inputDir) { + protected void execute(final File inputDir) { final File[] inputFiles = inputDir.listFiles(this.filter); if (inputFiles == null) { @@ -76,7 +79,7 @@ public class Directory2FilesFilter extends ConsumerStage<File, File> { } for (final File file : inputFiles) { - this.send(file); + this.send(this.outputPort, file); } } @@ -96,4 +99,8 @@ public class Directory2FilesFilter extends ConsumerStage<File, File> { this.fileComparator = fileComparator; } + public OutputPort<File> getOutputPort() { + return outputPort; + } + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/File2TextLinesFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/File2TextLinesFilter.java index 47be6efe228f1cfc8a588e62861a4c7776d109f8..67fbc6a27b572c0f5a9103a2f0ae57e6fa5314e3 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/File2TextLinesFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/File2TextLinesFilter.java @@ -25,18 +25,21 @@ import java.io.InputStreamReader; import teetime.variant.explicitScheduling.stage.util.TextLine; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; /** * @author Christian Wulf * * @since 1.10 */ -public class File2TextLinesFilter extends ConsumerStage<File, TextLine> { +public class File2TextLinesFilter extends ConsumerStage<File> { + + private final OutputPort<TextLine> outputPort = this.createOutputPort(); private String charset = "UTF-8"; @Override - protected void execute5(final File textFile) { + protected void execute(final File textFile) { BufferedReader reader = null; try { reader = new BufferedReader(new InputStreamReader(new FileInputStream(textFile), this.charset)); @@ -44,7 +47,7 @@ public class File2TextLinesFilter extends ConsumerStage<File, TextLine> { while ((line = reader.readLine()) != null) { line = line.trim(); if (line.length() != 0) { - this.send(new TextLine(textFile, line)); + this.send(this.outputPort, new TextLine(textFile, line)); } // else: ignore empty line } } catch (final FileNotFoundException e) { @@ -70,4 +73,8 @@ public class File2TextLinesFilter extends ConsumerStage<File, TextLine> { this.charset = charset; } + public OutputPort<TextLine> getOutputPort() { + return outputPort; + } + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/Printer.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/Printer.java index af115a5325bd72fd07e82fd0b78bfb5bc18f1222..8cf9a9881f52aa7c2ca35394f3379d63d1fbda3a 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/Printer.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/Printer.java @@ -29,7 +29,7 @@ import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; * @since 1.10 */ @Description("A filter to print objects to a configured stream") -public class Printer<T> extends ConsumerStage<T, Void> { +public class Printer<T> extends ConsumerStage<T> { public static final String STREAM_STDOUT = "STDOUT"; public static final String STREAM_STDERR = "STDERR"; @@ -45,7 +45,7 @@ public class Printer<T> extends ConsumerStage<T, Void> { private boolean append = true; @Override - protected void execute5(final T object) { + protected void execute(final T object) { if (this.active) { final StringBuilder sb = new StringBuilder(128); @@ -90,11 +90,11 @@ public class Printer<T> extends ConsumerStage<T, Void> { this.initializeStream(); } - // @Override // TODO implement onStop - // public void onPipelineStops() { - // this.closeStream(); - // super.onPipelineStops(); - // } + @Override + protected void onFinished() { + this.closeStream(); + super.onFinished(); + } private void initializeStream() { if (STREAM_STDOUT.equals(this.streamName)) { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java index 99626076521e56a46247b72a5279fe7363cf46c0..3229fe7158bf717e2aca1dc5d45fba81a1128f37 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java @@ -46,7 +46,7 @@ import kieker.common.util.registry.Lookup; * * @since 1.10 */ -public class TCPReader extends ProducerStage<Void, IMonitoringRecord> { +public class TCPReader extends ProducerStage<IMonitoringRecord> { private static final int MESSAGE_BUFFER_SIZE = 65535; @@ -59,13 +59,6 @@ public class TCPReader extends ProducerStage<Void, IMonitoringRecord> { private RecordFactory recordFactory; - // @Override // implement onStop - // public void onPipelineStops() { - // super.logger.info("Shutdown of TCPReader requested."); - // // TODO actually implement terminate! - // super.onPipelineStops(); - // } - public final int getPort1() { return this.port1; } @@ -130,7 +123,7 @@ public class TCPReader extends ProducerStage<Void, IMonitoringRecord> { } @Override - protected void execute5(final Void element) { + protected void execute() { ServerSocketChannel serversocket = null; try { serversocket = ServerSocketChannel.open(); @@ -154,7 +147,7 @@ public class TCPReader extends ProducerStage<Void, IMonitoringRecord> { // record = this.recordFactory.create(clazzid, buffer, this.stringRegistry); record = AbstractMonitoringRecord.createFromByteBuffer(clazzid, buffer, this.stringRegistry); record.setLoggingTimestamp(loggingTimestamp); - this.send(record); + this.send(this.outputPort, record); } catch (final MonitoringRecordException ex) { super.logger.error("Failed to create record.", ex); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/Dir2RecordsFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/Dir2RecordsFilter.java index 1e713b6f2e61cdcaa77d6b6e55bd848b88054002..85941999d4d6e0c006b8792fd9b7a1a3b11ea19b 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/Dir2RecordsFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/Dir2RecordsFilter.java @@ -74,18 +74,18 @@ public class Dir2RecordsFilter extends Pipeline<File, IMonitoringRecord> { SingleElementPipe.connect(binFileOutputPort, binaryFile2RecordFilter.getInputPort()); SingleElementPipe.connect(zipFileOutputPort, zipFile2RecordFilter.getInputPort()); - SingleElementPipe.connect(datFile2RecordFilter.getOutputPort(), recordMerger.getInputPort()); - SingleElementPipe.connect(binaryFile2RecordFilter.getOutputPort(), recordMerger.getInputPort()); - SingleElementPipe.connect(zipFile2RecordFilter.getOutputPort(), recordMerger.getInputPort()); + SingleElementPipe.connect(datFile2RecordFilter.getOutputPort(), recordMerger.getNewInputPort()); + SingleElementPipe.connect(binaryFile2RecordFilter.getOutputPort(), recordMerger.getNewInputPort()); + SingleElementPipe.connect(zipFile2RecordFilter.getOutputPort(), recordMerger.getNewInputPort()); // prepare pipeline - this.setFirstStage(classNameRegistryCreationFilter); + this.setFirstStage(classNameRegistryCreationFilter, classNameRegistryCreationFilter.getInputPort()); this.addIntermediateStage(directory2FilesFilter); this.addIntermediateStage(fileExtensionSwitch); this.addIntermediateStage(datFile2RecordFilter); this.addIntermediateStage(binaryFile2RecordFilter); this.addIntermediateStage(zipFile2RecordFilter); - this.setLastStage(recordMerger); + this.setLastStage(recordMerger, recordMerger.getOutputPort()); } /** diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/DirWithBin2RecordFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/DirWithBin2RecordFilter.java index 9d63b583a5bc0352d7f10bee1da7f47d97e281f4..0ac7e9728cd5248f345592b008308592f546a2b5 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/DirWithBin2RecordFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/DirWithBin2RecordFilter.java @@ -21,9 +21,9 @@ public class DirWithBin2RecordFilter extends Pipeline<File, IMonitoringRecord> { final Directory2FilesFilter directory2FilesFilter = new Directory2FilesFilter(); final BinaryFile2RecordFilter binaryFile2RecordFilter = new BinaryFile2RecordFilter(classNameRegistryRepository); - this.setFirstStage(classNameRegistryCreationFilter); + this.setFirstStage(classNameRegistryCreationFilter, classNameRegistryCreationFilter.getInputPort()); this.addIntermediateStage(directory2FilesFilter); - this.setLastStage(binaryFile2RecordFilter); + this.setLastStage(binaryFile2RecordFilter, binaryFile2RecordFilter.getOutputPort()); } public DirWithBin2RecordFilter() { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/DirWithDat2RecordFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/DirWithDat2RecordFilter.java index e41429e289ff6f90fed5f65fa8046c11bdc20ca9..7a4eeab5490008ce50fdac2b70e7aac5fa98b9c1 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/DirWithDat2RecordFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/DirWithDat2RecordFilter.java @@ -21,9 +21,9 @@ public class DirWithDat2RecordFilter extends Pipeline<File, IMonitoringRecord> { final Directory2FilesFilter directory2FilesFilter = new Directory2FilesFilter(); final DatFile2RecordFilter datFile2RecordFilter = new DatFile2RecordFilter(classNameRegistryRepository); - this.setFirstStage(classNameRegistryCreationFilter); + this.setFirstStage(classNameRegistryCreationFilter, classNameRegistryCreationFilter.getInputPort()); this.addIntermediateStage(directory2FilesFilter); - this.setLastStage(datFile2RecordFilter); + this.setLastStage(datFile2RecordFilter, datFile2RecordFilter.getOutputPort()); } public DirWithDat2RecordFilter() { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/className/ClassNameRegistryCreationFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/className/ClassNameRegistryCreationFilter.java index 3dc945c17bc492c2ba707a50db8476f7bdf218bc..1142c22bb266c2127f8555440a2ce3d55ce1df29 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/className/ClassNameRegistryCreationFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/className/ClassNameRegistryCreationFilter.java @@ -19,15 +19,17 @@ import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; -import teetime.util.list.CommittableQueue; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; /** * @author Christian Wulf * * @since 1.10 */ -public class ClassNameRegistryCreationFilter extends ConsumerStage<File, File> { +public class ClassNameRegistryCreationFilter extends ConsumerStage<File> { + + private final OutputPort<File> outputPort = this.createOutputPort(); private ClassNameRegistryRepository classNameRegistryRepository; @@ -50,7 +52,7 @@ public class ClassNameRegistryCreationFilter extends ConsumerStage<File, File> { } @Override - protected void execute5(final File inputDir) { + protected void execute(final File inputDir) { final File mappingFile = this.mappingFileParser.findMappingFile(inputDir); if (mappingFile == null) { return; @@ -59,7 +61,7 @@ public class ClassNameRegistryCreationFilter extends ConsumerStage<File, File> { try { final ClassNameRegistry classNameRegistry = this.mappingFileParser.parseFromStream(new FileInputStream(mappingFile)); this.classNameRegistryRepository.put(inputDir, classNameRegistry); - this.send(inputDir); + this.send(this.outputPort, inputDir); // final String filePrefix = this.mappingFileParser.getFilePrefixFromMappingFile(mappingFile); // context.put(this.filePrefixOutputPort, filePrefix); // TODO pass prefix @@ -76,9 +78,8 @@ public class ClassNameRegistryCreationFilter extends ConsumerStage<File, File> { this.classNameRegistryRepository = classNameRegistryRepository; } - @Override - protected void execute4(final CommittableQueue<File> elements) { - throw new IllegalStateException(); + public OutputPort<File> getOutputPort() { + return this.outputPort; } } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/BinaryFile2RecordFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/BinaryFile2RecordFilter.java index 000de983c26fca83f5b1a494b1ed5f5a0359f676..9119e31b564204a60259745c85de07e0999f9b16 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/BinaryFile2RecordFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/BinaryFile2RecordFilter.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.IOException; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository; import kieker.common.exception.MonitoringRecordException; @@ -31,7 +32,9 @@ import kieker.common.util.filesystem.BinaryCompressionMethod; * * @since 1.10 */ -public class BinaryFile2RecordFilter extends ConsumerStage<File, IMonitoringRecord> { +public class BinaryFile2RecordFilter extends ConsumerStage<File> { + + private final OutputPort<IMonitoringRecord> outputPort = this.createOutputPort(); private static final int MB = 1024 * 1024; @@ -69,14 +72,14 @@ public class BinaryFile2RecordFilter extends ConsumerStage<File, IMonitoringReco } @Override - protected void execute5(final File binaryFile) { + protected void execute(final File binaryFile) { try { final BinaryCompressionMethod method = BinaryCompressionMethod.getByFileExtension(binaryFile.getName()); final DataInputStream in = method.getDataInputStream(binaryFile, 1 * MB); try { IMonitoringRecord record = this.recordFromBinaryFileCreator.createRecordFromBinaryFile(binaryFile, in); while (record != null) { - this.send(record); + this.send(this.outputPort, record); record = this.recordFromBinaryFileCreator.createRecordFromBinaryFile(binaryFile, in); } } catch (final MonitoringRecordException e) { @@ -97,4 +100,8 @@ public class BinaryFile2RecordFilter extends ConsumerStage<File, IMonitoringReco } } + public OutputPort<IMonitoringRecord> getOutputPort() { + return outputPort; + } + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/DatFile2RecordFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/DatFile2RecordFilter.java index 116e8b49c918e36419aac7e70dc74cf78bf71fc7..c61cd75457fda7bac419cea523cceac5c074425c 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/DatFile2RecordFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/DatFile2RecordFilter.java @@ -36,8 +36,8 @@ public class DatFile2RecordFilter extends Pipeline<File, IMonitoringRecord> { final File2TextLinesFilter file2TextLinesFilter = new File2TextLinesFilter(); final TextLine2RecordFilter textLine2RecordFilter = new TextLine2RecordFilter(classNameRegistryRepository); - this.setFirstStage(file2TextLinesFilter); - this.setLastStage(textLine2RecordFilter); + this.setFirstStage(file2TextLinesFilter, file2TextLinesFilter.getInputPort()); + this.setLastStage(textLine2RecordFilter, textLine2RecordFilter.getOutputPort()); // BETTER let the framework choose the optimal pipe implementation SingleElementPipe.connect(file2TextLinesFilter.getOutputPort(), textLine2RecordFilter.getInputPort()); diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/ZipFile2RecordFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/ZipFile2RecordFilter.java index 938f87bb639cbcbe395ec2a4aec2d5f2b0c2430c..a1ecb038b33bdbaa573899746c7af0e34b0f4988 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/ZipFile2RecordFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/ZipFile2RecordFilter.java @@ -29,6 +29,7 @@ import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistry; import teetime.variant.methodcallWithPorts.stage.kieker.className.MappingFileParser; @@ -40,7 +41,9 @@ import kieker.common.util.filesystem.FSUtil; * * @since 1.10 */ -public class ZipFile2RecordFilter extends ConsumerStage<File, IMonitoringRecord> { +public class ZipFile2RecordFilter extends ConsumerStage<File> { + + private final OutputPort<IMonitoringRecord> outputPort = this.createOutputPort(); private final MappingFileParser mappingFileParser; @@ -52,7 +55,7 @@ public class ZipFile2RecordFilter extends ConsumerStage<File, IMonitoringRecord> } @Override - protected void execute5(final File zipFile) { + protected void execute(final File zipFile) { final InputStream mappingFileInputStream = this.findMappingFileInputStream(zipFile); if (mappingFileInputStream == null) { return; @@ -83,7 +86,7 @@ public class ZipFile2RecordFilter extends ConsumerStage<File, IMonitoringRecord> try { while (null != (zipEntry = zipInputStream.getNextEntry())) { // NOCS NOPMD final String filename = zipEntry.getName(); - // TODO + // TODO implement the zip filter } } catch (final IOException e) { // TODO Auto-generated catch block @@ -119,4 +122,8 @@ public class ZipFile2RecordFilter extends ConsumerStage<File, IMonitoringRecord> return null; } + public OutputPort<IMonitoringRecord> getOutputPort() { + return outputPort; + } + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/textLine/TextLine2MappingRegistryFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/textLine/TextLine2MappingRegistryFilter.java index a3ddf9229c8214d6764bcf927d10de901fa57c4e..68f2cc72902d1f6c5db5a7823a1f7316f5bb8582 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/textLine/TextLine2MappingRegistryFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/textLine/TextLine2MappingRegistryFilter.java @@ -18,7 +18,6 @@ package teetime.variant.methodcallWithPorts.stage.kieker.fileToRecord.textLine; import java.util.Map; -import teetime.util.list.CommittableQueue; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; import kieker.common.util.filesystem.FSUtil; @@ -28,7 +27,7 @@ import kieker.common.util.filesystem.FSUtil; * * @since 1.10 */ -public class TextLine2MappingRegistryFilter extends ConsumerStage<String, Void> { +public class TextLine2MappingRegistryFilter extends ConsumerStage<String> { private final Map<Integer, String> stringRegistry; @@ -37,12 +36,7 @@ public class TextLine2MappingRegistryFilter extends ConsumerStage<String, Void> } @Override - protected void execute4(final CommittableQueue<String> elements) { - throw new IllegalStateException(); - } - - @Override - protected void execute5(final String textLine) { + protected void execute(final String textLine) { final int split = textLine.indexOf('='); if (split == -1) { this.logger.error("Failed to find character '=' in line: {" + textLine + "}. It must consist of a ID=VALUE pair."); diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/textLine/TextLine2RecordFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/textLine/TextLine2RecordFilter.java index 286ce50261a554b981b6cf40a7da70f5e8a24d54..6bc76669e79091803213242c0f851a9b8419617f 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/textLine/TextLine2RecordFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/textLine/TextLine2RecordFilter.java @@ -19,10 +19,10 @@ package teetime.variant.methodcallWithPorts.stage.kieker.fileToRecord.textLine; import java.util.HashSet; import java.util.Set; -import teetime.util.list.CommittableQueue; import teetime.variant.explicitScheduling.stage.MappingException; import teetime.variant.explicitScheduling.stage.util.TextLine; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository; import teetime.variant.methodcallWithPorts.stage.kieker.fileToRecord.RecordFromTextLineCreator; @@ -36,7 +36,9 @@ import kieker.common.record.IMonitoringRecord; * * @since 1.10 */ -public class TextLine2RecordFilter extends ConsumerStage<TextLine, IMonitoringRecord> { +public class TextLine2RecordFilter extends ConsumerStage<TextLine> { + + private final OutputPort<IMonitoringRecord> outputPort = this.createOutputPort(); private final Set<String> unknownTypesObserved = new HashSet<String>(); @@ -77,15 +79,10 @@ public class TextLine2RecordFilter extends ConsumerStage<TextLine, IMonitoringRe } @Override - protected void execute4(final CommittableQueue<TextLine> elements) { - throw new IllegalStateException(); - } - - @Override - protected void execute5(final TextLine textLine) { + protected void execute(final TextLine textLine) { try { final IMonitoringRecord record = this.recordFromTextLineCreator.createRecordFromLine(textLine.getTextFile(), textLine.getTextLine()); - this.send(record); + this.send(this.outputPort, record); } catch (final MonitoringRecordException e) { this.logger.error("Could not create record from text line: '" + textLine + "'", e); } catch (final IllegalRecordFormatException e) { @@ -104,4 +101,8 @@ public class TextLine2RecordFilter extends ConsumerStage<TextLine, IMonitoringRe } } + public OutputPort<IMonitoringRecord> getOutputPort() { + return this.outputPort; + } + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReconstruction/TraceReconstructionFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReconstruction/TraceReconstructionFilter.java index 8f0315efd6113b93da28309c454b3a8df99d327e..5d5254953a4e5e605aa6a5c6d282a7cd7898cce2 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReconstruction/TraceReconstructionFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReconstruction/TraceReconstructionFilter.java @@ -20,6 +20,7 @@ import java.util.concurrent.TimeUnit; import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault; import teetime.util.concurrent.hashmap.TraceBuffer; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; import kieker.analysis.plugin.filter.flow.TraceEventRecords; import kieker.common.record.flow.IFlowRecord; @@ -31,7 +32,9 @@ import kieker.common.record.flow.trace.TraceMetadata; * * @since 1.10 */ -public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord, TraceEventRecords> { +public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord> { + + private final OutputPort<TraceEventRecords> outputPort = this.createOutputPort(); private TimeUnit timeunit; private long maxTraceDuration = Long.MAX_VALUE; @@ -46,7 +49,7 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord, TraceE } @Override - protected void execute5(final IFlowRecord element) { + protected void execute(final IFlowRecord element) { final Long traceId = this.reconstructTrace(element); if (traceId != null) { this.putIfFinished(traceId); @@ -93,7 +96,7 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord, TraceE // final IOutputPort<TraceReconstructionFilter, TraceEventRecords> outputPort = // (traceBuffer.isInvalid()) ? this.traceInvalidOutputPort : this.traceValidOutputPort; // context.put(outputPort, traceBuffer.toTraceEvents()); - this.send(traceBuffer.toTraceEvents()); + this.send(this.outputPort, traceBuffer.toTraceEvents()); } public TimeUnit getTimeunit() { @@ -128,6 +131,10 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord, TraceE this.maxEncounteredLoggingTimestamp = maxEncounteredLoggingTimestamp; } + public OutputPort<TraceEventRecords> getOutputPort() { + return this.outputPort; + } + // public Map<Long, TraceBuffer> getTraceId2trace() { // return TraceReconstructionFilter.traceId2trace; // } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReduction/TraceReductionFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReduction/TraceReductionFilter.java index f9484eb6c8e3abf4621d93047abac3e6dbdf2348..6c3baa217e8897cb02981f0d49e6b140c2bc4f22 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReduction/TraceReductionFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/traceReduction/TraceReductionFilter.java @@ -22,6 +22,7 @@ import java.util.Map.Entry; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; import teetime.variant.methodcallWithPorts.framework.core.InputPort; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; import kieker.analysis.plugin.filter.flow.TraceEventRecords; @@ -36,9 +37,10 @@ import kieker.analysis.plugin.filter.flow.TraceEventRecords; * * @since */ -public class TraceReductionFilter extends ConsumerStage<TraceEventRecords, TraceEventRecords> { +public class TraceReductionFilter extends ConsumerStage<TraceEventRecords> { - private final InputPort<Long> triggerInputPort = new InputPort<Long>(this); + private final InputPort<Long> triggerInputPort = this.createInputPort(); + private final OutputPort<TraceEventRecords> outputPort = this.createOutputPort(); private final Map<TraceEventRecords, TraceAggregationBuffer> trace2buffer; @@ -49,7 +51,7 @@ public class TraceReductionFilter extends ConsumerStage<TraceEventRecords, Trace } @Override - protected void execute5(final TraceEventRecords traceEventRecords) { + protected void execute(final TraceEventRecords traceEventRecords) { Long timestampInNs = this.triggerInputPort.receive(); if (timestampInNs != null) { this.processTimeoutQueue(timestampInNs); @@ -77,7 +79,7 @@ public class TraceReductionFilter extends ConsumerStage<TraceEventRecords, Trace final TraceAggregationBuffer buffer = entry.getValue(); final TraceEventRecords record = buffer.getTraceEventRecords(); record.setCount(buffer.getCount()); - this.send(record); + this.send(this.outputPort, record); } this.trace2buffer.clear(); } @@ -95,7 +97,7 @@ public class TraceReductionFilter extends ConsumerStage<TraceEventRecords, Trace if (traceBuffer.getBufferCreatedTimestamp() <= bufferTimeoutInNs) { final TraceEventRecords record = traceBuffer.getTraceEventRecords(); record.setCount(traceBuffer.getCount()); - this.send(record); + this.send(this.outputPort, record); } iterator.remove(); } @@ -113,4 +115,8 @@ public class TraceReductionFilter extends ConsumerStage<TraceEventRecords, Trace public InputPort<Long> getTriggerInputPort() { return this.triggerInputPort; } + + public OutputPort<TraceEventRecords> getOutputPort() { + return this.outputPort; + } } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/stringBuffer/StringBufferFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/stringBuffer/StringBufferFilter.java index 789cf376a4d56f8318b4cdf633ca7599bf05840d..d7c6f3c41af839de8156af0aa8cbc83738fb31e1 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/stringBuffer/StringBufferFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/stringBuffer/StringBufferFilter.java @@ -19,6 +19,7 @@ import java.util.Collection; import java.util.LinkedList; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; import teetime.variant.methodcallWithPorts.stage.stringBuffer.handler.AbstractDataTypeHandler; import teetime.variant.methodcallWithPorts.stage.stringBuffer.util.KiekerHashMap; @@ -27,7 +28,9 @@ import teetime.variant.methodcallWithPorts.stage.stringBuffer.util.KiekerHashMap * * @since 1.10 */ -public class StringBufferFilter<T> extends ConsumerStage<T, T> { +public class StringBufferFilter<T> extends ConsumerStage<T> { + + private final OutputPort<T> outputPort = this.createOutputPort(); // BETTER use a non shared data structure to avoid synchronization between threads private KiekerHashMap kiekerHashMap = new KiekerHashMap(); @@ -35,9 +38,9 @@ public class StringBufferFilter<T> extends ConsumerStage<T, T> { private Collection<AbstractDataTypeHandler<?>> dataTypeHandlers = new LinkedList<AbstractDataTypeHandler<?>>(); @Override - protected void execute5(final T element) { + protected void execute(final T element) { final T returnedElement = this.handle(element); - this.send(returnedElement); + this.send(this.outputPort, returnedElement); } @Override @@ -76,4 +79,8 @@ public class StringBufferFilter<T> extends ConsumerStage<T, T> { this.dataTypeHandlers = dataTypeHandlers; } + public OutputPort<T> getOutputPort() { + return outputPort; + } + } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment09/MethodCallThroughputAnalysis9.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment09/MethodCallThroughputAnalysis9.java index 22e6ec1acc11e7f855f82e98b7d45fb841558a9b..04c2ecfd87f17b33681fd9a7b503842beacc53b2 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment09/MethodCallThroughputAnalysis9.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment09/MethodCallThroughputAnalysis9.java @@ -65,11 +65,11 @@ public class MethodCallThroughputAnalysis9 extends Analysis { final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(this.timestampObjects); final Pipeline<Void, Void> pipeline = new Pipeline<Void, Void>(); - pipeline.setFirstStage(objectProducer); + pipeline.setFirstStage(objectProducer, null); pipeline.addIntermediateStage(startTimestampFilter); pipeline.addIntermediateStages(noopFilters); pipeline.addIntermediateStage(stopTimestampFilter); - pipeline.setLastStage(collectorSink); + pipeline.setLastStage(collectorSink, null); Pipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort()); Pipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort()); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment15/MethodCallThroughputAnalysis15.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment15/MethodCallThroughputAnalysis15.java index 9c237a313bc500201e113bbcbd890a077a019f9e..b285f8fca81ab51e9c0d9f98d540e1e146a1856b 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment15/MethodCallThroughputAnalysis15.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment15/MethodCallThroughputAnalysis15.java @@ -27,12 +27,12 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import teetime.variant.methodcallWithPorts.stage.Clock; import teetime.variant.methodcallWithPorts.stage.CollectorSink; -import teetime.variant.methodcallWithPorts.stage.Delay; import teetime.variant.methodcallWithPorts.stage.EndStage; import teetime.variant.methodcallWithPorts.stage.NoopFilter; import teetime.variant.methodcallWithPorts.stage.ObjectProducer; import teetime.variant.methodcallWithPorts.stage.StartTimestampFilter; import teetime.variant.methodcallWithPorts.stage.StopTimestampFilter; +import teetime.variant.methodcallWithPorts.stage.basic.Delay; /** * @author Christian Wulf diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java index ae2572f27b01a8d4bdae4b05f23013bcb0970d58..c26772746ff420cba334ba28bb04a794da56e888 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java @@ -34,9 +34,9 @@ import teetime.variant.methodcallWithPorts.stage.EndStage; import teetime.variant.methodcallWithPorts.stage.NoopFilter; import teetime.variant.methodcallWithPorts.stage.ObjectProducer; import teetime.variant.methodcallWithPorts.stage.Relay; -import teetime.variant.methodcallWithPorts.stage.Sink; import teetime.variant.methodcallWithPorts.stage.StartTimestampFilter; import teetime.variant.methodcallWithPorts.stage.StopTimestampFilter; +import teetime.variant.methodcallWithPorts.stage.basic.Sink; import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor; /**