Skip to content
Snippets Groups Projects
Commit 8ffe138c authored by Christian Wulf's avatar Christian Wulf
Browse files

multiple ports abstraction

parent 0491b4ca
No related branches found
No related tags found
No related merge requests found
Showing
with 193 additions and 276 deletions
...@@ -2,13 +2,15 @@ package teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThre ...@@ -2,13 +2,15 @@ package teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThre
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
import kieker.common.record.IMonitoringRecord; import kieker.common.record.IMonitoringRecord;
public class SysOutFilter<T> extends ConsumerStage<T, T> { public class SysOutFilter<T> extends ConsumerStage<T> {
private final InputPort<Long> triggerInputPort = new InputPort<Long>(this); private final InputPort<Long> triggerInputPort = this.createInputPort();
private final OutputPort<T> outputPort = this.createOutputPort();
private final IPipe<IMonitoringRecord> pipe; private final IPipe<IMonitoringRecord> pipe;
...@@ -17,13 +19,13 @@ public class SysOutFilter<T> extends ConsumerStage<T, T> { ...@@ -17,13 +19,13 @@ public class SysOutFilter<T> extends ConsumerStage<T, T> {
} }
@Override @Override
protected void execute5(final T element) { protected void execute(final T element) {
Long timestamp = this.triggerInputPort.receive(); Long timestamp = this.triggerInputPort.receive();
if (timestamp != null) { if (timestamp != null) {
// this.logger.info("pipe.size: " + this.pipe.size()); // this.logger.info("pipe.size: " + this.pipe.size());
System.out.println("pipe.size: " + this.pipe.size()); System.out.println("pipe.size: " + this.pipe.size());
} }
this.send(element); this.send(this.outputPort, element);
} }
public InputPort<Long> getTriggerInputPort() { public InputPort<Long> getTriggerInputPort() {
......
package teetime.variant.methodcallWithPorts.framework.core; package teetime.variant.methodcallWithPorts.framework.core;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID; import java.util.UUID;
import teetime.util.list.CommittableQueue;
import kieker.common.logging.Log; import kieker.common.logging.Log;
import kieker.common.logging.LogFactory; import kieker.common.logging.LogFactory;
public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { public abstract class AbstractStage implements StageWithPort {
private final String id; private final String id;
/** /**
...@@ -15,51 +15,35 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { ...@@ -15,51 +15,35 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
*/ */
protected final Log logger; // BETTER use SLF4J as interface and logback as impl protected final Log logger; // BETTER use SLF4J as interface and logback as impl
private final InputPort<I> inputPort = new InputPort<I>(this); private StageWithPort parentStage;
private final OutputPort<O> outputPort = new OutputPort<O>();
private StageWithPort<?, ?> parentStage;
private boolean reschedulable; private boolean reschedulable;
private final List<InputPort<?>> inputPortList = new ArrayList<InputPort<?>>();
private final List<OutputPort<?>> outputPortList = new ArrayList<OutputPort<?>>();
/** A cached instance of <code>inputPortList</code> to avoid creating an iterator each time iterating it */
protected InputPort<?>[] cachedInputPorts;
/** A cached instance of <code>outputPortList</code> to avoid creating an iterator each time iterating it */
protected OutputPort<?>[] cachedOutputPorts;
public AbstractStage() { public AbstractStage() {
this.id = UUID.randomUUID().toString(); // the id should only be represented by a UUID, not additionally by the class name this.id = UUID.randomUUID().toString(); // the id should only be represented by a UUID, not additionally by the class name
this.logger = LogFactory.getLog(this.getClass().getName() + "(" + this.id + ")"); this.logger = LogFactory.getLog(this.getClass().getName() + "(" + this.id + ")");
} }
@Override
public InputPort<I> getInputPort() {
return this.inputPort;
}
@Override
public OutputPort<O> getOutputPort() {
return this.outputPort;
}
protected void execute4(final CommittableQueue<I> elements) {
throw new IllegalStateException(); // default implementation
}
protected abstract void execute5(I element);
/** /**
* Sends the given <code>element</code> using the default output port * Sends the given <code>element</code> using the default output port
* *
* @param element * @param element
* @return <code>true</code> iff the given element could be sent, <code>false</code> otherwise (then use a re-try strategy) * @return <code>true</code> iff the given element could be sent, <code>false</code> otherwise (then use a re-try strategy)
*/ */
protected final boolean send(final O element) { protected final <O> boolean send(final OutputPort<O> outputPort, final O element) {
return this.send(this.getOutputPort(), element);
}
protected final boolean send(final OutputPort<O> outputPort, final O element) {
if (!outputPort.send(element)) { if (!outputPort.send(element)) {
return false; return false;
} }
// StageWithPort<?, ?> next = outputPort.getPipe().getTargetPort().getOwningStage(); StageWithPort next = outputPort.getCachedTargetStage();
StageWithPort<?, ?> next = outputPort.getCachedTargetStage();
do { do {
next.executeWithPorts(); // PERFORMANCE use the return value as indicator for re-schedulability instead next.executeWithPorts(); // PERFORMANCE use the return value as indicator for re-schedulability instead
...@@ -68,29 +52,32 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { ...@@ -68,29 +52,32 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
return true; return true;
} }
// public void disable() {
// this.schedulingInformation.setActive(false);
// this.fireOnDisable();
// }
// private void fireOnDisable() {
// if (this.listener != null) {
// this.listener.onDisable(this, this.index);
// }
// }
@Override @Override
public void onStart() { public void onStart() {
this.cachedInputPorts = this.inputPortList.toArray(new InputPort<?>[0]);
this.cachedOutputPorts = this.outputPortList.toArray(new OutputPort<?>[0]);
}
protected void onFinished() {
// empty default implementation // empty default implementation
this.onIsPipelineHead();
}
protected InputPort<?>[] getInputPorts() {
return this.cachedInputPorts;
}
protected OutputPort<?>[] getOutputPorts() {
return this.cachedOutputPorts;
} }
@Override @Override
public StageWithPort<?, ?> getParentStage() { public StageWithPort getParentStage() {
return this.parentStage; return this.parentStage;
} }
@Override @Override
public void setParentStage(final StageWithPort<?, ?> parentStage, final int index) { public void setParentStage(final StageWithPort parentStage, final int index) {
this.parentStage = parentStage; this.parentStage = parentStage;
} }
...@@ -124,12 +111,21 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { ...@@ -124,12 +111,21 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
break; break;
} }
this.outputPort.sendSignal(signal); for (OutputPort<?> outputPort : this.outputPortList) {
outputPort.sendSignal(signal);
}
} }
protected void onFinished() { protected <T> InputPort<T> createInputPort() {
// empty default implementation InputPort<T> inputPort = new InputPort<T>(this);
this.onIsPipelineHead(); this.inputPortList.add(inputPort);
return inputPort;
}
protected <T> OutputPort<T> createOutputPort() {
OutputPort<T> outputPort = new OutputPort<T>();
this.outputPortList.add(outputPort);
return outputPort;
} }
@Override @Override
......
package teetime.variant.methodcallWithPorts.framework.core; package teetime.variant.methodcallWithPorts.framework.core;
public abstract class ConsumerStage<I, O> extends AbstractStage<I, O> { public abstract class ConsumerStage<I> extends AbstractStage {
protected final InputPort<I> inputPort = this.createInputPort();
public final InputPort<I> getInputPort() {
return this.inputPort;
}
@Override @Override
public void executeWithPorts() { public void executeWithPorts() {
// if (this.logger.isDebugEnabled()) { I element = this.inputPort.receive();
// this.logger.debug("Executing stage...");
// }
I element = this.getInputPort().receive();
this.setReschedulable(this.getInputPort().getPipe().size() > 0); boolean isReschedulable = this.determineReschedulability();
this.setReschedulable(isReschedulable);
this.execute5(element); this.execute(element);
} }
@Override @Override
...@@ -20,4 +23,15 @@ public abstract class ConsumerStage<I, O> extends AbstractStage<I, O> { ...@@ -20,4 +23,15 @@ public abstract class ConsumerStage<I, O> extends AbstractStage<I, O> {
// do nothing // do nothing
} }
/**
*
* @return <code>true</code> iff this stage makes progress when it is re-executed by the scheduler, otherwise <code>false</code>.<br>
* For example, many stages are re-schedulable if at least one of their input ports are not empty.
*/
protected boolean determineReschedulability() {
return this.inputPort.getPipe().size() > 0;
}
protected abstract void execute(I element);
} }
...@@ -4,10 +4,10 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe; ...@@ -4,10 +4,10 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
public class InputPort<T> { public class InputPort<T> {
private final StageWithPort<?, ?> owningStage; private final StageWithPort owningStage;
private IPipe<T> pipe; private IPipe<T> pipe;
public InputPort(final StageWithPort<?, ?> owningStage) { InputPort(final StageWithPort owningStage) {
super(); super();
this.owningStage = owningStage; this.owningStage = owningStage;
} }
...@@ -36,7 +36,7 @@ public class InputPort<T> { ...@@ -36,7 +36,7 @@ public class InputPort<T> {
pipe.setTargetPort(this); pipe.setTargetPort(this);
} }
public StageWithPort<?, ?> getOwningStage() { public StageWithPort getOwningStage() {
return this.owningStage; return this.owningStage;
} }
......
...@@ -12,7 +12,11 @@ public class OutputPort<T> { ...@@ -12,7 +12,11 @@ public class OutputPort<T> {
* this.getPipe().getTargetPort().getOwningStage() * this.getPipe().getTargetPort().getOwningStage()
* </pre> * </pre>
*/ */
private StageWithPort<?, ?> cachedTargetStage; private StageWithPort cachedTargetStage;
OutputPort() {
super();
}
/** /**
* *
...@@ -31,11 +35,11 @@ public class OutputPort<T> { ...@@ -31,11 +35,11 @@ public class OutputPort<T> {
this.pipe = pipe; this.pipe = pipe;
} }
public StageWithPort<?, ?> getCachedTargetStage() { public StageWithPort getCachedTargetStage() {
return this.cachedTargetStage; return this.cachedTargetStage;
} }
public void setCachedTargetStage(final StageWithPort<?, ?> cachedTargetStage) { public void setCachedTargetStage(final StageWithPort cachedTargetStage) {
this.cachedTargetStage = cachedTargetStage; this.cachedTargetStage = cachedTargetStage;
} }
......
...@@ -9,7 +9,7 @@ import kieker.common.logging.Log; ...@@ -9,7 +9,7 @@ import kieker.common.logging.Log;
import kieker.common.logging.LogFactory; import kieker.common.logging.LogFactory;
// BETTER remove the pipeline since it does not add any new functionality // BETTER remove the pipeline since it does not add any new functionality
public class Pipeline<I, O> implements StageWithPort<I, O> { public class Pipeline<I, O> implements StageWithPort {
private final String id; private final String id;
/** /**
...@@ -17,17 +17,18 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { ...@@ -17,17 +17,18 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
*/ */
protected Log logger; protected Log logger;
private StageWithPort<I, ?> firstStage; private StageWithPort firstStage;
private final List<StageWithPort<?, ?>> intermediateStages = new LinkedList<StageWithPort<?, ?>>(); private InputPort<I> firstStageInputPort;
private StageWithPort<?, O> lastStage; private final List<StageWithPort> intermediateStages = new LinkedList<StageWithPort>();
private StageWithPort lastStage;
private OutputPort<O> lastStageOutputPort;
// BETTER remove the stage array and use the output ports instead for passing a signal to all stages in the same thread; what about multiple same signals due to // BETTER remove the stage array and use the output ports instead for passing a signal to all stages in the same thread; what about multiple same signals due to
// multiple input ports? // multiple input ports?
private StageWithPort<?, ?>[] stages; private StageWithPort[] stages;
private StageWithPort<?, ?> parentStage; private StageWithPort parentStage;
// private int startIndex; // private int startIndex;
private boolean reschedulable;
private int firstStageIndex; private int firstStageIndex;
// private final Set<StageWithPort<?, ?>> currentHeads = new HashSet<StageWithPort<?, ?>>(); // private final Set<StageWithPort<?, ?>> currentHeads = new HashSet<StageWithPort<?, ?>>();
...@@ -46,25 +47,27 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { ...@@ -46,25 +47,27 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
return this.id; return this.id;
} }
public void setFirstStage(final StageWithPort<I, ?> stage) { public void setFirstStage(final StageWithPort stage, final InputPort<I> firstStageInputPort) {
this.firstStage = stage; this.firstStage = stage;
this.firstStageInputPort = firstStageInputPort;
} }
public void addIntermediateStages(final StageWithPort<?, ?>... stages) { public void addIntermediateStages(final StageWithPort... stages) {
this.intermediateStages.addAll(Arrays.asList(stages)); this.intermediateStages.addAll(Arrays.asList(stages));
} }
public void addIntermediateStage(final StageWithPort<?, ?> stage) { public void addIntermediateStage(final StageWithPort stage) {
this.intermediateStages.add(stage); this.intermediateStages.add(stage);
} }
public void setLastStage(final StageWithPort<?, O> stage) { public void setLastStage(final StageWithPort stage, final OutputPort<O> lastStageOutputPort) {
this.lastStage = stage; this.lastStage = stage;
this.lastStageOutputPort = lastStageOutputPort;
} }
@Override @Override
public void executeWithPorts() { public void executeWithPorts() {
StageWithPort<?, ?> headStage = this.stages[this.firstStageIndex]; StageWithPort headStage = this.stages[this.firstStageIndex];
// do { // do {
headStage.executeWithPorts(); headStage.executeWithPorts();
...@@ -106,7 +109,7 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { ...@@ -106,7 +109,7 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
this.stages = new StageWithPort[size]; this.stages = new StageWithPort[size];
this.stages[0] = this.firstStage; this.stages[0] = this.firstStage;
for (int i = 0; i < this.intermediateStages.size(); i++) { for (int i = 0; i < this.intermediateStages.size(); i++) {
StageWithPort<?, ?> stage = this.intermediateStages.get(i); StageWithPort stage = this.intermediateStages.get(i);
this.stages[1 + i] = stage; this.stages[1 + i] = stage;
} }
this.stages[this.stages.length - 1] = this.lastStage; this.stages[this.stages.length - 1] = this.lastStage;
...@@ -123,18 +126,18 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { ...@@ -123,18 +126,18 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
// } // }
// this.stages[this.stages.length - 1].setSuccessor(new EndStage<Object>()); // this.stages[this.stages.length - 1].setSuccessor(new EndStage<Object>());
for (StageWithPort<?, ?> stage : this.stages) { for (StageWithPort stage : this.stages) {
stage.onStart(); stage.onStart();
} }
} }
@Override @Override
public StageWithPort<?, ?> getParentStage() { public StageWithPort getParentStage() {
return this.parentStage; return this.parentStage;
} }
@Override @Override
public void setParentStage(final StageWithPort<?, ?> parentStage, final int index) { public void setParentStage(final StageWithPort parentStage, final int index) {
this.parentStage = parentStage; this.parentStage = parentStage;
} }
...@@ -148,28 +151,12 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { ...@@ -148,28 +151,12 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
// this.reschedulable = reschedulable; // this.reschedulable = reschedulable;
// } // }
@Override
public InputPort<I> getInputPort() { public InputPort<I> getInputPort() {
return this.firstStage.getInputPort(); // CACHE pipeline's input port return this.firstStageInputPort;
} }
@Override
public OutputPort<O> getOutputPort() { public OutputPort<O> getOutputPort() {
return this.lastStage.getOutputPort(); // CACHE pipeline's output port return this.lastStageOutputPort;
}
// TODO remove since it does not increase performances
private void cleanUp() {
// for (int i = 0; i < this.stages.length; i++) {
// StageWithPort<?, ?> stage = this.stages[i];
// // stage.setParentStage(null, i);
// // stage.setListener(null);
// // stage.setSuccessor(null);
// }
this.firstStage = null;
this.intermediateStages.clear();
this.lastStage = null;
} }
@Override @Override
......
package teetime.variant.methodcallWithPorts.framework.core; package teetime.variant.methodcallWithPorts.framework.core;
public abstract class ProducerStage<I, O> extends AbstractStage<I, O> { public abstract class ProducerStage<O> extends AbstractStage {
protected final OutputPort<O> outputPort = this.createOutputPort();
public final OutputPort<O> getOutputPort() {
return this.outputPort;
}
public ProducerStage() { public ProducerStage() {
this.setReschedulable(true); this.setReschedulable(true);
...@@ -8,15 +14,7 @@ public abstract class ProducerStage<I, O> extends AbstractStage<I, O> { ...@@ -8,15 +14,7 @@ public abstract class ProducerStage<I, O> extends AbstractStage<I, O> {
@Override @Override
public void executeWithPorts() { public void executeWithPorts() {
// if (this.logger.isDebugEnabled()) { this.execute();
// this.logger.debug("Executing stage...");
// }
this.execute5(null);
// if (!this.getOutputPort().pipe.isEmpty()) {
// super.executeWithPorts();
// }
} }
@Override @Override
...@@ -24,4 +22,6 @@ public abstract class ProducerStage<I, O> extends AbstractStage<I, O> { ...@@ -24,4 +22,6 @@ public abstract class ProducerStage<I, O> extends AbstractStage<I, O> {
// do nothing // do nothing
} }
protected abstract void execute();
} }
...@@ -5,10 +5,10 @@ import kieker.common.logging.LogFactory; ...@@ -5,10 +5,10 @@ import kieker.common.logging.LogFactory;
public class RunnableStage<I> implements Runnable { public class RunnableStage<I> implements Runnable {
private final StageWithPort<I, ?> stage; private final ConsumerStage<I> stage;
private final Log logger; private final Log logger;
public RunnableStage(final StageWithPort<I, ?> stage) { public RunnableStage(final ConsumerStage<I> stage) {
this.stage = stage; this.stage = stage;
this.logger = LogFactory.getLog(stage.getClass()); this.logger = LogFactory.getLog(stage.getClass());
} }
......
package teetime.variant.methodcallWithPorts.framework.core; package teetime.variant.methodcallWithPorts.framework.core;
public interface StageWithPort<I, O> { public interface StageWithPort {
String getId(); String getId();
InputPort<I> getInputPort();
OutputPort<O> getOutputPort();
void executeWithPorts(); void executeWithPorts();
StageWithPort<?, ?> getParentStage(); StageWithPort getParentStage();
void setParentStage(StageWithPort<?, ?> parentStage, int index); void setParentStage(StageWithPort parentStage, int index);
// void setListener(OnDisableListener listener); // void setListener(OnDisableListener listener);
......
...@@ -6,13 +6,16 @@ import java.util.concurrent.TimeUnit; ...@@ -6,13 +6,16 @@ import java.util.concurrent.TimeUnit;
import teetime.util.StopWatch; import teetime.util.StopWatch;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
public class Cache<T> extends ConsumerStage<T, T> { public class Cache<T> extends ConsumerStage<T> {
private final OutputPort<T> outputPort = this.createOutputPort();
private final List<T> cachedObjects = new LinkedList<T>(); private final List<T> cachedObjects = new LinkedList<T>();
@Override @Override
protected void execute5(final T element) { protected void execute(final T element) {
this.cachedObjects.add(element); this.cachedObjects.add(element);
} }
...@@ -22,11 +25,15 @@ public class Cache<T> extends ConsumerStage<T, T> { ...@@ -22,11 +25,15 @@ public class Cache<T> extends ConsumerStage<T, T> {
StopWatch stopWatch = new StopWatch(); StopWatch stopWatch = new StopWatch();
stopWatch.start(); stopWatch.start();
for (T cachedElement : this.cachedObjects) { for (T cachedElement : this.cachedObjects) {
this.send(cachedElement); this.send(this.outputPort, cachedElement);
} }
stopWatch.end(); stopWatch.end();
this.logger.debug("Emitting took " + TimeUnit.NANOSECONDS.toMillis(stopWatch.getDurationInNs()) + " ms"); this.logger.debug("Emitting took " + TimeUnit.NANOSECONDS.toMillis(stopWatch.getDurationInNs()) + " ms");
super.onIsPipelineHead(); super.onIsPipelineHead();
} }
public OutputPort<T> getOutputPort() {
return this.outputPort;
}
} }
package teetime.variant.methodcallWithPorts.stage; package teetime.variant.methodcallWithPorts.stage;
import teetime.util.list.CommittableQueue;
import teetime.variant.methodcallWithPorts.framework.core.ProducerStage; import teetime.variant.methodcallWithPorts.framework.core.ProducerStage;
public class Clock extends ProducerStage<Void, Long> { public class Clock extends ProducerStage<Long> {
private boolean initialDelayExceeded = false; private boolean initialDelayExceeded = false;
...@@ -11,13 +10,7 @@ public class Clock extends ProducerStage<Void, Long> { ...@@ -11,13 +10,7 @@ public class Clock extends ProducerStage<Void, Long> {
private long intervalDelayInMs; private long intervalDelayInMs;
@Override @Override
protected void execute4(final CommittableQueue<Void> elements) { protected void execute() {
// TODO Auto-generated method stub
}
@Override
protected void execute5(final Void element) {
if (!this.initialDelayExceeded) { if (!this.initialDelayExceeded) {
this.initialDelayExceeded = true; this.initialDelayExceeded = true;
this.sleep(this.initialDelayInMs); this.sleep(this.initialDelayInMs);
...@@ -26,7 +19,7 @@ public class Clock extends ProducerStage<Void, Long> { ...@@ -26,7 +19,7 @@ public class Clock extends ProducerStage<Void, Long> {
} }
// this.logger.debug("Emitting timestamp"); // this.logger.debug("Emitting timestamp");
this.send(this.getCurrentTimeInNs()); this.send(this.outputPort, this.getCurrentTimeInNs());
} }
private void sleep(final long delayInMs) { private void sleep(final long delayInMs) {
......
...@@ -17,7 +17,6 @@ package teetime.variant.methodcallWithPorts.stage; ...@@ -17,7 +17,6 @@ package teetime.variant.methodcallWithPorts.stage;
import java.util.List; import java.util.List;
import teetime.util.list.CommittableQueue;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
/** /**
...@@ -25,7 +24,7 @@ import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; ...@@ -25,7 +24,7 @@ import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
* *
* @since 1.10 * @since 1.10
*/ */
public class CollectorSink<T> extends ConsumerStage<T, Void> { public class CollectorSink<T> extends ConsumerStage<T> {
private final List<T> elements; private final List<T> elements;
private final int threshold; private final int threshold;
...@@ -45,13 +44,7 @@ public class CollectorSink<T> extends ConsumerStage<T, Void> { ...@@ -45,13 +44,7 @@ public class CollectorSink<T> extends ConsumerStage<T, Void> {
} }
@Override @Override
protected void execute4(final CommittableQueue<T> elements) { protected void execute(final T element) {
T element = elements.removeFromHead();
this.execute5(element);
}
@Override
protected void execute5(final T element) {
this.elements.add(element); this.elements.add(element);
if ((this.elements.size() % this.threshold) == 0) { if ((this.elements.size() % this.threshold) == 0) {
......
package teetime.variant.methodcallWithPorts.stage; package teetime.variant.methodcallWithPorts.stage;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
public class Counter<T> extends ConsumerStage<T, T> { public class Counter<T> extends ConsumerStage<T> {
private final OutputPort<T> outputPort = this.createOutputPort();
private int numElementsPassed; private int numElementsPassed;
@Override @Override
protected void execute5(final T element) { protected void execute(final T element) {
this.numElementsPassed++; this.numElementsPassed++;
// this.logger.debug("count: " + this.numElementsPassed); // this.logger.debug("count: " + this.numElementsPassed);
this.send(element); this.send(this.outputPort, element);
} }
// BETTER find a solution w/o any thread-safe code in this stage // BETTER find a solution w/o any thread-safe code in this stage
...@@ -18,4 +21,7 @@ public class Counter<T> extends ConsumerStage<T, T> { ...@@ -18,4 +21,7 @@ public class Counter<T> extends ConsumerStage<T, T> {
return this.numElementsPassed; return this.numElementsPassed;
} }
public OutputPort<T> getOutputPort() {
return this.outputPort;
}
} }
...@@ -5,10 +5,12 @@ import java.util.List; ...@@ -5,10 +5,12 @@ import java.util.List;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
public class ElementDelayMeasuringStage<T> extends ConsumerStage<T, T> { public class ElementDelayMeasuringStage<T> extends ConsumerStage<T> {
private final InputPort<Long> triggerInputPort = new InputPort<Long>(this); private final InputPort<Long> triggerInputPort = this.createInputPort();
private final OutputPort<T> outputPort = this.createOutputPort();
private long numPassedElements; private long numPassedElements;
private long lastTimestampInNs; private long lastTimestampInNs;
...@@ -16,13 +18,14 @@ public class ElementDelayMeasuringStage<T> extends ConsumerStage<T, T> { ...@@ -16,13 +18,14 @@ public class ElementDelayMeasuringStage<T> extends ConsumerStage<T, T> {
private final List<Long> delays = new LinkedList<Long>(); private final List<Long> delays = new LinkedList<Long>();
@Override @Override
protected void execute5(final T element) { protected void execute(final T element) {
Long timestampInNs = this.triggerInputPort.receive(); Long timestampInNs = this.triggerInputPort.receive();
if (timestampInNs != null) { if (timestampInNs != null) {
this.computeElementDelay(System.nanoTime()); this.computeElementDelay(System.nanoTime());
} }
this.numPassedElements++; this.numPassedElements++;
this.send(element); this.send(this.outputPort, element);
} }
@Override @Override
...@@ -55,4 +58,8 @@ public class ElementDelayMeasuringStage<T> extends ConsumerStage<T, T> { ...@@ -55,4 +58,8 @@ public class ElementDelayMeasuringStage<T> extends ConsumerStage<T, T> {
return this.triggerInputPort; return this.triggerInputPort;
} }
public OutputPort<T> getOutputPort() {
return outputPort;
}
} }
...@@ -6,10 +6,12 @@ import java.util.concurrent.TimeUnit; ...@@ -6,10 +6,12 @@ import java.util.concurrent.TimeUnit;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T, T> { public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T> {
private final InputPort<Long> triggerInputPort = new InputPort<Long>(this); private final InputPort<Long> triggerInputPort = this.createInputPort();
private final OutputPort<T> outputPort = this.createOutputPort();
private long numPassedElements; private long numPassedElements;
private long lastTimestampInNs; private long lastTimestampInNs;
...@@ -17,14 +19,14 @@ public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T, T> { ...@@ -17,14 +19,14 @@ public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T, T> {
private final List<Long> throughputs = new LinkedList<Long>(); private final List<Long> throughputs = new LinkedList<Long>();
@Override @Override
protected void execute5(final T element) { protected void execute(final T element) {
Long timestampInNs = this.triggerInputPort.receive(); Long timestampInNs = this.triggerInputPort.receive();
if (timestampInNs != null) { if (timestampInNs != null) {
this.computeElementThroughput(System.nanoTime()); this.computeElementThroughput(System.nanoTime());
} }
this.numPassedElements++; this.numPassedElements++;
this.send(element); this.send(this.outputPort, element);
} }
@Override @Override
...@@ -72,4 +74,8 @@ public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T, T> { ...@@ -72,4 +74,8 @@ public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T, T> {
return this.triggerInputPort; return this.triggerInputPort;
} }
public OutputPort<T> getOutputPort() {
return outputPort;
}
} }
package teetime.variant.methodcallWithPorts.stage;
import java.util.UUID;
import teetime.util.ConstructorClosure;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import teetime.variant.methodcallWithPorts.framework.core.Signal;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
public class EndStage<T> implements StageWithPort<T, T> {
private final InputPort<T> inputPort = new InputPort<T>(this);
// public int count;
public ConstructorClosure<?> closure;
// public List<Object> list = new LinkedList<Object>();
private final String id;
public EndStage() {
this.id = UUID.randomUUID().toString(); // the id should only be represented by a UUID, not additionally by the class name
}
@Override
public void onIsPipelineHead() {
// do nothing
}
@Override
public StageWithPort<?, ?> getParentStage() {
return null;
}
@Override
public void setParentStage(final StageWithPort<?, ?> parentStage, final int index) {
// do nothing
}
@Override
public boolean isReschedulable() {
return false;
}
@Override
public InputPort<T> getInputPort() {
return this.inputPort;
}
@Override
public OutputPort<T> getOutputPort() {
return null;
}
@Override
public void executeWithPorts() {
this.getInputPort().receive(); // just consume
// do nothing
// this.count++;
// Object r = this.closure.execute(null);
// this.list.add(r);
}
@Override
public void onStart() {
// do nothing
}
@Override
public void onSignal(final Signal signal, final InputPort<?> inputPort) {
// do nothing
}
@Override
public String getId() {
return this.id;
}
}
...@@ -5,20 +5,16 @@ import java.util.HashMap; ...@@ -5,20 +5,16 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import teetime.variant.methodcallWithPorts.framework.core.Signal;
import com.google.common.io.Files; import com.google.common.io.Files;
public class FileExtensionSwitch extends ConsumerStage<File, File> { public class FileExtensionSwitch extends ConsumerStage<File> {
// BETTER do not extends from AbstractStage since it provides another unused output port
private final Map<String, OutputPort<File>> fileExtensions = new HashMap<String, OutputPort<File>>(); private final Map<String, OutputPort<File>> fileExtensions = new HashMap<String, OutputPort<File>>();
@Override @Override
protected void execute5(final File file) { protected void execute(final File file) {
String fileExtension = Files.getFileExtension(file.getAbsolutePath()); String fileExtension = Files.getFileExtension(file.getAbsolutePath());
this.logger.debug("fileExtension: " + fileExtension); this.logger.debug("fileExtension: " + fileExtension);
OutputPort<File> outputPort = this.fileExtensions.get(fileExtension); OutputPort<File> outputPort = this.fileExtensions.get(fileExtension);
...@@ -27,29 +23,11 @@ public class FileExtensionSwitch extends ConsumerStage<File, File> { ...@@ -27,29 +23,11 @@ public class FileExtensionSwitch extends ConsumerStage<File, File> {
} }
} }
@Override
public void onSignal(final Signal signal, final InputPort<?> inputPort) {
this.logger.debug("Got signal: " + signal + " from input port: " + inputPort);
switch (signal) {
case FINISHED:
this.onFinished();
break;
default:
this.logger.warn("Aborted sending signal " + signal + ". Reason: Unknown signal.");
break;
}
for (OutputPort<File> op : this.fileExtensions.values()) {
op.sendSignal(signal);
}
}
public OutputPort<File> addFileExtension(String fileExtension) { public OutputPort<File> addFileExtension(String fileExtension) {
if (fileExtension.startsWith(".")) { if (fileExtension.startsWith(".")) {
fileExtension = fileExtension.substring(1); fileExtension = fileExtension.substring(1);
} }
OutputPort<File> outputPort = new OutputPort<File>(); OutputPort<File> outputPort = this.createOutputPort();
this.fileExtensions.put(fileExtension, outputPort); this.fileExtensions.put(fileExtension, outputPort);
this.logger.debug("SUCCESS: Registered output port for '" + fileExtension + "'"); this.logger.debug("SUCCESS: Registered output port for '" + fileExtension + "'");
return outputPort; return outputPort;
......
package teetime.variant.methodcallWithPorts.stage; package teetime.variant.methodcallWithPorts.stage;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
public class InstanceCounter<T, C extends T> extends ConsumerStage<T, T> { public class InstanceCounter<T, C extends T> extends ConsumerStage<T> {
private final OutputPort<T> outputPort = this.createOutputPort();
private final Class<C> type; private final Class<C> type;
private int counter; private int counter;
...@@ -12,16 +15,20 @@ public class InstanceCounter<T, C extends T> extends ConsumerStage<T, T> { ...@@ -12,16 +15,20 @@ public class InstanceCounter<T, C extends T> extends ConsumerStage<T, T> {
} }
@Override @Override
protected void execute5(final T element) { protected void execute(final T element) {
if (this.type.isInstance(element)) { if (this.type.isInstance(element)) {
this.counter++; this.counter++;
} }
this.send(element); this.send(this.outputPort, element);
} }
public int getCounter() { public int getCounter() {
return this.counter; return this.counter;
} }
public OutputPort<T> getOutputPort() {
return outputPort;
}
} }
package teetime.variant.methodcallWithPorts.stage; package teetime.variant.methodcallWithPorts.stage;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
/** /**
* @author Jan Waller, Nils Christian Ehmke, Christian Wulf * @author Jan Waller, Nils Christian Ehmke, Christian Wulf
* *
*/ */
public class InstanceOfFilter<I, O> extends ConsumerStage<I, O> { public class InstanceOfFilter<I, O> extends ConsumerStage<I> {
private final OutputPort<O> outputPort = this.createOutputPort();
private Class<O> type; private Class<O> type;
...@@ -16,12 +19,12 @@ public class InstanceOfFilter<I, O> extends ConsumerStage<I, O> { ...@@ -16,12 +19,12 @@ public class InstanceOfFilter<I, O> extends ConsumerStage<I, O> {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
protected void execute5(final I element) { protected void execute(final I element) {
if (this.type.isInstance(element)) { if (this.type.isInstance(element)) {
this.send((O) element); this.send(this.outputPort, (O) element);
} else { // swallow up the element } else { // swallow up the element
if (this.logger.isDebugEnabled()) { if (this.logger.isDebugEnabled()) {
this.logger.debug("element is not an instance of " + this.type.getName() + ", but of " + element.getClass()); this.logger.info("element is not an instance of " + this.type.getName() + ", but of " + element.getClass());
} }
} }
} }
...@@ -34,4 +37,8 @@ public class InstanceOfFilter<I, O> extends ConsumerStage<I, O> { ...@@ -34,4 +37,8 @@ public class InstanceOfFilter<I, O> extends ConsumerStage<I, O> {
this.type = type; this.type = type;
} }
public OutputPort<O> getOutputPort() {
return this.outputPort;
}
} }
...@@ -15,31 +15,25 @@ ...@@ -15,31 +15,25 @@
***************************************************************************/ ***************************************************************************/
package teetime.variant.methodcallWithPorts.stage; package teetime.variant.methodcallWithPorts.stage;
import teetime.util.list.CommittableQueue;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
/** /**
* @author Christian Wulf * @author Christian Wulf
* *
* @since 1.10 * @since 1.10
*/ */
public class NoopFilter<T> extends ConsumerStage<T, T> { public class NoopFilter<T> extends ConsumerStage<T> {
// @Override private final OutputPort<T> outputPort = this.createOutputPort();
// public void execute3() {
// T element = this.getInputPort().receive();
// // this.getOutputPort().send(element);
// }
@Override @Override
protected void execute4(final CommittableQueue<T> elements) { protected void execute(final T element) {
T element = elements.removeFromHead(); this.send(this.outputPort, element);
this.execute5(element);
} }
@Override public OutputPort<T> getOutputPort() {
protected void execute5(final T element) { return this.outputPort;
this.send(element); // "send" calls the next stage and so on
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment