Skip to content
Snippets Groups Projects
Commit 70e0411c authored by Christian Wulf's avatar Christian Wulf
Browse files

multiple ports abstraction

parent d08c1b19
Branches
Tags
No related merge requests found
Showing
with 193 additions and 276 deletions
......@@ -2,13 +2,15 @@ package teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThre
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
import kieker.common.record.IMonitoringRecord;
public class SysOutFilter<T> extends ConsumerStage<T, T> {
public class SysOutFilter<T> extends ConsumerStage<T> {
private final InputPort<Long> triggerInputPort = new InputPort<Long>(this);
private final InputPort<Long> triggerInputPort = this.createInputPort();
private final OutputPort<T> outputPort = this.createOutputPort();
private final IPipe<IMonitoringRecord> pipe;
......@@ -17,13 +19,13 @@ public class SysOutFilter<T> extends ConsumerStage<T, T> {
}
@Override
protected void execute5(final T element) {
protected void execute(final T element) {
Long timestamp = this.triggerInputPort.receive();
if (timestamp != null) {
// this.logger.info("pipe.size: " + this.pipe.size());
System.out.println("pipe.size: " + this.pipe.size());
}
this.send(element);
this.send(this.outputPort, element);
}
public InputPort<Long> getTriggerInputPort() {
......
package teetime.variant.methodcallWithPorts.framework.core;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import teetime.util.list.CommittableQueue;
import kieker.common.logging.Log;
import kieker.common.logging.LogFactory;
public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
public abstract class AbstractStage implements StageWithPort {
private final String id;
/**
......@@ -15,51 +15,35 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
*/
protected final Log logger; // BETTER use SLF4J as interface and logback as impl
private final InputPort<I> inputPort = new InputPort<I>(this);
private final OutputPort<O> outputPort = new OutputPort<O>();
private StageWithPort<?, ?> parentStage;
private StageWithPort parentStage;
private boolean reschedulable;
private final List<InputPort<?>> inputPortList = new ArrayList<InputPort<?>>();
private final List<OutputPort<?>> outputPortList = new ArrayList<OutputPort<?>>();
/** A cached instance of <code>inputPortList</code> to avoid creating an iterator each time iterating it */
protected InputPort<?>[] cachedInputPorts;
/** A cached instance of <code>outputPortList</code> to avoid creating an iterator each time iterating it */
protected OutputPort<?>[] cachedOutputPorts;
public AbstractStage() {
this.id = UUID.randomUUID().toString(); // the id should only be represented by a UUID, not additionally by the class name
this.logger = LogFactory.getLog(this.getClass().getName() + "(" + this.id + ")");
}
@Override
public InputPort<I> getInputPort() {
return this.inputPort;
}
@Override
public OutputPort<O> getOutputPort() {
return this.outputPort;
}
protected void execute4(final CommittableQueue<I> elements) {
throw new IllegalStateException(); // default implementation
}
protected abstract void execute5(I element);
/**
* Sends the given <code>element</code> using the default output port
*
* @param element
* @return <code>true</code> iff the given element could be sent, <code>false</code> otherwise (then use a re-try strategy)
*/
protected final boolean send(final O element) {
return this.send(this.getOutputPort(), element);
}
protected final boolean send(final OutputPort<O> outputPort, final O element) {
protected final <O> boolean send(final OutputPort<O> outputPort, final O element) {
if (!outputPort.send(element)) {
return false;
}
// StageWithPort<?, ?> next = outputPort.getPipe().getTargetPort().getOwningStage();
StageWithPort<?, ?> next = outputPort.getCachedTargetStage();
StageWithPort next = outputPort.getCachedTargetStage();
do {
next.executeWithPorts(); // PERFORMANCE use the return value as indicator for re-schedulability instead
......@@ -68,29 +52,32 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
return true;
}
// public void disable() {
// this.schedulingInformation.setActive(false);
// this.fireOnDisable();
// }
// private void fireOnDisable() {
// if (this.listener != null) {
// this.listener.onDisable(this, this.index);
// }
// }
@Override
public void onStart() {
this.cachedInputPorts = this.inputPortList.toArray(new InputPort<?>[0]);
this.cachedOutputPorts = this.outputPortList.toArray(new OutputPort<?>[0]);
}
protected void onFinished() {
// empty default implementation
this.onIsPipelineHead();
}
protected InputPort<?>[] getInputPorts() {
return this.cachedInputPorts;
}
protected OutputPort<?>[] getOutputPorts() {
return this.cachedOutputPorts;
}
@Override
public StageWithPort<?, ?> getParentStage() {
public StageWithPort getParentStage() {
return this.parentStage;
}
@Override
public void setParentStage(final StageWithPort<?, ?> parentStage, final int index) {
public void setParentStage(final StageWithPort parentStage, final int index) {
this.parentStage = parentStage;
}
......@@ -124,12 +111,21 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
break;
}
this.outputPort.sendSignal(signal);
for (OutputPort<?> outputPort : this.outputPortList) {
outputPort.sendSignal(signal);
}
}
protected void onFinished() {
// empty default implementation
this.onIsPipelineHead();
protected <T> InputPort<T> createInputPort() {
InputPort<T> inputPort = new InputPort<T>(this);
this.inputPortList.add(inputPort);
return inputPort;
}
protected <T> OutputPort<T> createOutputPort() {
OutputPort<T> outputPort = new OutputPort<T>();
this.outputPortList.add(outputPort);
return outputPort;
}
@Override
......
package teetime.variant.methodcallWithPorts.framework.core;
public abstract class ConsumerStage<I, O> extends AbstractStage<I, O> {
public abstract class ConsumerStage<I> extends AbstractStage {
protected final InputPort<I> inputPort = this.createInputPort();
public final InputPort<I> getInputPort() {
return this.inputPort;
}
@Override
public void executeWithPorts() {
// if (this.logger.isDebugEnabled()) {
// this.logger.debug("Executing stage...");
// }
I element = this.getInputPort().receive();
I element = this.inputPort.receive();
this.setReschedulable(this.getInputPort().getPipe().size() > 0);
boolean isReschedulable = this.determineReschedulability();
this.setReschedulable(isReschedulable);
this.execute5(element);
this.execute(element);
}
@Override
......@@ -20,4 +23,15 @@ public abstract class ConsumerStage<I, O> extends AbstractStage<I, O> {
// do nothing
}
/**
*
* @return <code>true</code> iff this stage makes progress when it is re-executed by the scheduler, otherwise <code>false</code>.<br>
* For example, many stages are re-schedulable if at least one of their input ports are not empty.
*/
protected boolean determineReschedulability() {
return this.inputPort.getPipe().size() > 0;
}
protected abstract void execute(I element);
}
......@@ -4,10 +4,10 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
public class InputPort<T> {
private final StageWithPort<?, ?> owningStage;
private final StageWithPort owningStage;
private IPipe<T> pipe;
public InputPort(final StageWithPort<?, ?> owningStage) {
InputPort(final StageWithPort owningStage) {
super();
this.owningStage = owningStage;
}
......@@ -36,7 +36,7 @@ public class InputPort<T> {
pipe.setTargetPort(this);
}
public StageWithPort<?, ?> getOwningStage() {
public StageWithPort getOwningStage() {
return this.owningStage;
}
......
......@@ -12,7 +12,11 @@ public class OutputPort<T> {
* this.getPipe().getTargetPort().getOwningStage()
* </pre>
*/
private StageWithPort<?, ?> cachedTargetStage;
private StageWithPort cachedTargetStage;
OutputPort() {
super();
}
/**
*
......@@ -31,11 +35,11 @@ public class OutputPort<T> {
this.pipe = pipe;
}
public StageWithPort<?, ?> getCachedTargetStage() {
public StageWithPort getCachedTargetStage() {
return this.cachedTargetStage;
}
public void setCachedTargetStage(final StageWithPort<?, ?> cachedTargetStage) {
public void setCachedTargetStage(final StageWithPort cachedTargetStage) {
this.cachedTargetStage = cachedTargetStage;
}
......
......@@ -9,7 +9,7 @@ import kieker.common.logging.Log;
import kieker.common.logging.LogFactory;
// BETTER remove the pipeline since it does not add any new functionality
public class Pipeline<I, O> implements StageWithPort<I, O> {
public class Pipeline<I, O> implements StageWithPort {
private final String id;
/**
......@@ -17,17 +17,18 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
*/
protected Log logger;
private StageWithPort<I, ?> firstStage;
private final List<StageWithPort<?, ?>> intermediateStages = new LinkedList<StageWithPort<?, ?>>();
private StageWithPort<?, O> lastStage;
private StageWithPort firstStage;
private InputPort<I> firstStageInputPort;
private final List<StageWithPort> intermediateStages = new LinkedList<StageWithPort>();
private StageWithPort lastStage;
private OutputPort<O> lastStageOutputPort;
// BETTER remove the stage array and use the output ports instead for passing a signal to all stages in the same thread; what about multiple same signals due to
// multiple input ports?
private StageWithPort<?, ?>[] stages;
private StageWithPort<?, ?> parentStage;
private StageWithPort[] stages;
private StageWithPort parentStage;
// private int startIndex;
private boolean reschedulable;
private int firstStageIndex;
// private final Set<StageWithPort<?, ?>> currentHeads = new HashSet<StageWithPort<?, ?>>();
......@@ -46,25 +47,27 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
return this.id;
}
public void setFirstStage(final StageWithPort<I, ?> stage) {
public void setFirstStage(final StageWithPort stage, final InputPort<I> firstStageInputPort) {
this.firstStage = stage;
this.firstStageInputPort = firstStageInputPort;
}
public void addIntermediateStages(final StageWithPort<?, ?>... stages) {
public void addIntermediateStages(final StageWithPort... stages) {
this.intermediateStages.addAll(Arrays.asList(stages));
}
public void addIntermediateStage(final StageWithPort<?, ?> stage) {
public void addIntermediateStage(final StageWithPort stage) {
this.intermediateStages.add(stage);
}
public void setLastStage(final StageWithPort<?, O> stage) {
public void setLastStage(final StageWithPort stage, final OutputPort<O> lastStageOutputPort) {
this.lastStage = stage;
this.lastStageOutputPort = lastStageOutputPort;
}
@Override
public void executeWithPorts() {
StageWithPort<?, ?> headStage = this.stages[this.firstStageIndex];
StageWithPort headStage = this.stages[this.firstStageIndex];
// do {
headStage.executeWithPorts();
......@@ -106,7 +109,7 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
this.stages = new StageWithPort[size];
this.stages[0] = this.firstStage;
for (int i = 0; i < this.intermediateStages.size(); i++) {
StageWithPort<?, ?> stage = this.intermediateStages.get(i);
StageWithPort stage = this.intermediateStages.get(i);
this.stages[1 + i] = stage;
}
this.stages[this.stages.length - 1] = this.lastStage;
......@@ -123,18 +126,18 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
// }
// this.stages[this.stages.length - 1].setSuccessor(new EndStage<Object>());
for (StageWithPort<?, ?> stage : this.stages) {
for (StageWithPort stage : this.stages) {
stage.onStart();
}
}
@Override
public StageWithPort<?, ?> getParentStage() {
public StageWithPort getParentStage() {
return this.parentStage;
}
@Override
public void setParentStage(final StageWithPort<?, ?> parentStage, final int index) {
public void setParentStage(final StageWithPort parentStage, final int index) {
this.parentStage = parentStage;
}
......@@ -148,28 +151,12 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
// this.reschedulable = reschedulable;
// }
@Override
public InputPort<I> getInputPort() {
return this.firstStage.getInputPort(); // CACHE pipeline's input port
return this.firstStageInputPort;
}
@Override
public OutputPort<O> getOutputPort() {
return this.lastStage.getOutputPort(); // CACHE pipeline's output port
}
// TODO remove since it does not increase performances
private void cleanUp() {
// for (int i = 0; i < this.stages.length; i++) {
// StageWithPort<?, ?> stage = this.stages[i];
// // stage.setParentStage(null, i);
// // stage.setListener(null);
// // stage.setSuccessor(null);
// }
this.firstStage = null;
this.intermediateStages.clear();
this.lastStage = null;
return this.lastStageOutputPort;
}
@Override
......
package teetime.variant.methodcallWithPorts.framework.core;
public abstract class ProducerStage<I, O> extends AbstractStage<I, O> {
public abstract class ProducerStage<O> extends AbstractStage {
protected final OutputPort<O> outputPort = this.createOutputPort();
public final OutputPort<O> getOutputPort() {
return this.outputPort;
}
public ProducerStage() {
this.setReschedulable(true);
......@@ -8,15 +14,7 @@ public abstract class ProducerStage<I, O> extends AbstractStage<I, O> {
@Override
public void executeWithPorts() {
// if (this.logger.isDebugEnabled()) {
// this.logger.debug("Executing stage...");
// }
this.execute5(null);
// if (!this.getOutputPort().pipe.isEmpty()) {
// super.executeWithPorts();
// }
this.execute();
}
@Override
......@@ -24,4 +22,6 @@ public abstract class ProducerStage<I, O> extends AbstractStage<I, O> {
// do nothing
}
protected abstract void execute();
}
......@@ -5,10 +5,10 @@ import kieker.common.logging.LogFactory;
public class RunnableStage<I> implements Runnable {
private final StageWithPort<I, ?> stage;
private final ConsumerStage<I> stage;
private final Log logger;
public RunnableStage(final StageWithPort<I, ?> stage) {
public RunnableStage(final ConsumerStage<I> stage) {
this.stage = stage;
this.logger = LogFactory.getLog(stage.getClass());
}
......
package teetime.variant.methodcallWithPorts.framework.core;
public interface StageWithPort<I, O> {
public interface StageWithPort {
String getId();
InputPort<I> getInputPort();
OutputPort<O> getOutputPort();
void executeWithPorts();
StageWithPort<?, ?> getParentStage();
StageWithPort getParentStage();
void setParentStage(StageWithPort<?, ?> parentStage, int index);
void setParentStage(StageWithPort parentStage, int index);
// void setListener(OnDisableListener listener);
......
......@@ -6,13 +6,16 @@ import java.util.concurrent.TimeUnit;
import teetime.util.StopWatch;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
public class Cache<T> extends ConsumerStage<T, T> {
public class Cache<T> extends ConsumerStage<T> {
private final OutputPort<T> outputPort = this.createOutputPort();
private final List<T> cachedObjects = new LinkedList<T>();
@Override
protected void execute5(final T element) {
protected void execute(final T element) {
this.cachedObjects.add(element);
}
......@@ -22,11 +25,15 @@ public class Cache<T> extends ConsumerStage<T, T> {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
for (T cachedElement : this.cachedObjects) {
this.send(cachedElement);
this.send(this.outputPort, cachedElement);
}
stopWatch.end();
this.logger.debug("Emitting took " + TimeUnit.NANOSECONDS.toMillis(stopWatch.getDurationInNs()) + " ms");
super.onIsPipelineHead();
}
public OutputPort<T> getOutputPort() {
return this.outputPort;
}
}
package teetime.variant.methodcallWithPorts.stage;
import teetime.util.list.CommittableQueue;
import teetime.variant.methodcallWithPorts.framework.core.ProducerStage;
public class Clock extends ProducerStage<Void, Long> {
public class Clock extends ProducerStage<Long> {
private boolean initialDelayExceeded = false;
......@@ -11,13 +10,7 @@ public class Clock extends ProducerStage<Void, Long> {
private long intervalDelayInMs;
@Override
protected void execute4(final CommittableQueue<Void> elements) {
// TODO Auto-generated method stub
}
@Override
protected void execute5(final Void element) {
protected void execute() {
if (!this.initialDelayExceeded) {
this.initialDelayExceeded = true;
this.sleep(this.initialDelayInMs);
......@@ -26,7 +19,7 @@ public class Clock extends ProducerStage<Void, Long> {
}
// this.logger.debug("Emitting timestamp");
this.send(this.getCurrentTimeInNs());
this.send(this.outputPort, this.getCurrentTimeInNs());
}
private void sleep(final long delayInMs) {
......
......@@ -17,7 +17,6 @@ package teetime.variant.methodcallWithPorts.stage;
import java.util.List;
import teetime.util.list.CommittableQueue;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
/**
......@@ -25,7 +24,7 @@ import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
*
* @since 1.10
*/
public class CollectorSink<T> extends ConsumerStage<T, Void> {
public class CollectorSink<T> extends ConsumerStage<T> {
private final List<T> elements;
private final int threshold;
......@@ -45,13 +44,7 @@ public class CollectorSink<T> extends ConsumerStage<T, Void> {
}
@Override
protected void execute4(final CommittableQueue<T> elements) {
T element = elements.removeFromHead();
this.execute5(element);
}
@Override
protected void execute5(final T element) {
protected void execute(final T element) {
this.elements.add(element);
if ((this.elements.size() % this.threshold) == 0) {
......
package teetime.variant.methodcallWithPorts.stage;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
public class Counter<T> extends ConsumerStage<T, T> {
public class Counter<T> extends ConsumerStage<T> {
private final OutputPort<T> outputPort = this.createOutputPort();
private int numElementsPassed;
@Override
protected void execute5(final T element) {
protected void execute(final T element) {
this.numElementsPassed++;
// this.logger.debug("count: " + this.numElementsPassed);
this.send(element);
this.send(this.outputPort, element);
}
// BETTER find a solution w/o any thread-safe code in this stage
......@@ -18,4 +21,7 @@ public class Counter<T> extends ConsumerStage<T, T> {
return this.numElementsPassed;
}
public OutputPort<T> getOutputPort() {
return this.outputPort;
}
}
......@@ -5,10 +5,12 @@ import java.util.List;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
public class ElementDelayMeasuringStage<T> extends ConsumerStage<T, T> {
public class ElementDelayMeasuringStage<T> extends ConsumerStage<T> {
private final InputPort<Long> triggerInputPort = new InputPort<Long>(this);
private final InputPort<Long> triggerInputPort = this.createInputPort();
private final OutputPort<T> outputPort = this.createOutputPort();
private long numPassedElements;
private long lastTimestampInNs;
......@@ -16,13 +18,14 @@ public class ElementDelayMeasuringStage<T> extends ConsumerStage<T, T> {
private final List<Long> delays = new LinkedList<Long>();
@Override
protected void execute5(final T element) {
protected void execute(final T element) {
Long timestampInNs = this.triggerInputPort.receive();
if (timestampInNs != null) {
this.computeElementDelay(System.nanoTime());
}
this.numPassedElements++;
this.send(element);
this.send(this.outputPort, element);
}
@Override
......@@ -55,4 +58,8 @@ public class ElementDelayMeasuringStage<T> extends ConsumerStage<T, T> {
return this.triggerInputPort;
}
public OutputPort<T> getOutputPort() {
return outputPort;
}
}
......@@ -6,10 +6,12 @@ import java.util.concurrent.TimeUnit;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T, T> {
public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T> {
private final InputPort<Long> triggerInputPort = new InputPort<Long>(this);
private final InputPort<Long> triggerInputPort = this.createInputPort();
private final OutputPort<T> outputPort = this.createOutputPort();
private long numPassedElements;
private long lastTimestampInNs;
......@@ -17,14 +19,14 @@ public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T, T> {
private final List<Long> throughputs = new LinkedList<Long>();
@Override
protected void execute5(final T element) {
protected void execute(final T element) {
Long timestampInNs = this.triggerInputPort.receive();
if (timestampInNs != null) {
this.computeElementThroughput(System.nanoTime());
}
this.numPassedElements++;
this.send(element);
this.send(this.outputPort, element);
}
@Override
......@@ -72,4 +74,8 @@ public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T, T> {
return this.triggerInputPort;
}
public OutputPort<T> getOutputPort() {
return outputPort;
}
}
package teetime.variant.methodcallWithPorts.stage;
import java.util.UUID;
import teetime.util.ConstructorClosure;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import teetime.variant.methodcallWithPorts.framework.core.Signal;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
public class EndStage<T> implements StageWithPort<T, T> {
private final InputPort<T> inputPort = new InputPort<T>(this);
// public int count;
public ConstructorClosure<?> closure;
// public List<Object> list = new LinkedList<Object>();
private final String id;
public EndStage() {
this.id = UUID.randomUUID().toString(); // the id should only be represented by a UUID, not additionally by the class name
}
@Override
public void onIsPipelineHead() {
// do nothing
}
@Override
public StageWithPort<?, ?> getParentStage() {
return null;
}
@Override
public void setParentStage(final StageWithPort<?, ?> parentStage, final int index) {
// do nothing
}
@Override
public boolean isReschedulable() {
return false;
}
@Override
public InputPort<T> getInputPort() {
return this.inputPort;
}
@Override
public OutputPort<T> getOutputPort() {
return null;
}
@Override
public void executeWithPorts() {
this.getInputPort().receive(); // just consume
// do nothing
// this.count++;
// Object r = this.closure.execute(null);
// this.list.add(r);
}
@Override
public void onStart() {
// do nothing
}
@Override
public void onSignal(final Signal signal, final InputPort<?> inputPort) {
// do nothing
}
@Override
public String getId() {
return this.id;
}
}
......@@ -5,20 +5,16 @@ import java.util.HashMap;
import java.util.Map;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import teetime.variant.methodcallWithPorts.framework.core.Signal;
import com.google.common.io.Files;
public class FileExtensionSwitch extends ConsumerStage<File, File> {
// BETTER do not extends from AbstractStage since it provides another unused output port
public class FileExtensionSwitch extends ConsumerStage<File> {
private final Map<String, OutputPort<File>> fileExtensions = new HashMap<String, OutputPort<File>>();
@Override
protected void execute5(final File file) {
protected void execute(final File file) {
String fileExtension = Files.getFileExtension(file.getAbsolutePath());
this.logger.debug("fileExtension: " + fileExtension);
OutputPort<File> outputPort = this.fileExtensions.get(fileExtension);
......@@ -27,29 +23,11 @@ public class FileExtensionSwitch extends ConsumerStage<File, File> {
}
}
@Override
public void onSignal(final Signal signal, final InputPort<?> inputPort) {
this.logger.debug("Got signal: " + signal + " from input port: " + inputPort);
switch (signal) {
case FINISHED:
this.onFinished();
break;
default:
this.logger.warn("Aborted sending signal " + signal + ". Reason: Unknown signal.");
break;
}
for (OutputPort<File> op : this.fileExtensions.values()) {
op.sendSignal(signal);
}
}
public OutputPort<File> addFileExtension(String fileExtension) {
if (fileExtension.startsWith(".")) {
fileExtension = fileExtension.substring(1);
}
OutputPort<File> outputPort = new OutputPort<File>();
OutputPort<File> outputPort = this.createOutputPort();
this.fileExtensions.put(fileExtension, outputPort);
this.logger.debug("SUCCESS: Registered output port for '" + fileExtension + "'");
return outputPort;
......
package teetime.variant.methodcallWithPorts.stage;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
public class InstanceCounter<T, C extends T> extends ConsumerStage<T, T> {
public class InstanceCounter<T, C extends T> extends ConsumerStage<T> {
private final OutputPort<T> outputPort = this.createOutputPort();
private final Class<C> type;
private int counter;
......@@ -12,16 +15,20 @@ public class InstanceCounter<T, C extends T> extends ConsumerStage<T, T> {
}
@Override
protected void execute5(final T element) {
protected void execute(final T element) {
if (this.type.isInstance(element)) {
this.counter++;
}
this.send(element);
this.send(this.outputPort, element);
}
public int getCounter() {
return this.counter;
}
public OutputPort<T> getOutputPort() {
return outputPort;
}
}
package teetime.variant.methodcallWithPorts.stage;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
/**
* @author Jan Waller, Nils Christian Ehmke, Christian Wulf
*
*/
public class InstanceOfFilter<I, O> extends ConsumerStage<I, O> {
public class InstanceOfFilter<I, O> extends ConsumerStage<I> {
private final OutputPort<O> outputPort = this.createOutputPort();
private Class<O> type;
......@@ -16,12 +19,12 @@ public class InstanceOfFilter<I, O> extends ConsumerStage<I, O> {
@SuppressWarnings("unchecked")
@Override
protected void execute5(final I element) {
protected void execute(final I element) {
if (this.type.isInstance(element)) {
this.send((O) element);
this.send(this.outputPort, (O) element);
} else { // swallow up the element
if (this.logger.isDebugEnabled()) {
this.logger.debug("element is not an instance of " + this.type.getName() + ", but of " + element.getClass());
this.logger.info("element is not an instance of " + this.type.getName() + ", but of " + element.getClass());
}
}
}
......@@ -34,4 +37,8 @@ public class InstanceOfFilter<I, O> extends ConsumerStage<I, O> {
this.type = type;
}
public OutputPort<O> getOutputPort() {
return this.outputPort;
}
}
......@@ -15,31 +15,25 @@
***************************************************************************/
package teetime.variant.methodcallWithPorts.stage;
import teetime.util.list.CommittableQueue;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
/**
* @author Christian Wulf
*
* @since 1.10
*/
public class NoopFilter<T> extends ConsumerStage<T, T> {
public class NoopFilter<T> extends ConsumerStage<T> {
// @Override
// public void execute3() {
// T element = this.getInputPort().receive();
// // this.getOutputPort().send(element);
// }
private final OutputPort<T> outputPort = this.createOutputPort();
@Override
protected void execute4(final CommittableQueue<T> elements) {
T element = elements.removeFromHead();
this.execute5(element);
protected void execute(final T element) {
this.send(this.outputPort, element);
}
@Override
protected void execute5(final T element) {
this.send(element); // "send" calls the next stage and so on
public OutputPort<T> getOutputPort() {
return this.outputPort;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment