Skip to content
Snippets Groups Projects
Commit 5e92cbcd authored by Christian Wulf's avatar Christian Wulf
Browse files

added concept doc "stage with trigger";

finished RecordReaderAnalysisTest
parent 08c655ce
No related branches found
No related tags found
No related merge requests found
Showing
with 88 additions and 74 deletions
.handlers = java.util.logging.ConsoleHandler .handlers = java.util.logging.ConsoleHandler
.level= ALL .level= ALL
java.util.logging.ConsoleHandler.level = ALL java.util.logging.ConsoleHandler.level = INFO
#java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter #java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %2$s %4$s: %5$s %n
#teetime.level = ALL #teetime.level = ALL
\ No newline at end of file
File added
File added
...@@ -15,21 +15,16 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { ...@@ -15,21 +15,16 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
*/ */
protected Log logger; protected Log logger;
private final InputPort<I> inputPort = new InputPort<I>(); private final InputPort<I> inputPort = new InputPort<I>(this);
private final OutputPort<O> outputPort = new OutputPort<O>(); private final OutputPort<O> outputPort = new OutputPort<O>();
// protected final CommittableQueue<O> outputElements = new CommittableResizableArrayQueue<O>(null, 4);
// private final CommittableQueue<O> outputElements = null;
private StageWithPort<?, ?> parentStage; private StageWithPort<?, ?> parentStage;
private int index;
private boolean reschedulable; private boolean reschedulable;
public AbstractStage() { public AbstractStage() {
this.id = UUID.randomUUID().toString(); // the id should only be represented by a UUID, not additionally by the class name this.id = UUID.randomUUID().toString(); // the id should only be represented by a UUID, not additionally by the class name
this.logger = LogFactory.getLog(this.id); this.logger = LogFactory.getLog(this.getClass().getName() + "(" + this.id + ")");
} }
@Override @Override
...@@ -74,6 +69,11 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { ...@@ -74,6 +69,11 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
protected abstract void execute5(I element); protected abstract void execute5(I element);
/**
* Sends the <code>element</code> using the default output port
*
* @param element
*/
protected final void send(final O element) { protected final void send(final O element) {
this.send(this.getOutputPort(), element); this.send(this.getOutputPort(), element);
} }
...@@ -81,19 +81,13 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { ...@@ -81,19 +81,13 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
protected final void send(final OutputPort<O> outputPort, final O element) { protected final void send(final OutputPort<O> outputPort, final O element) {
outputPort.send(element); outputPort.send(element);
StageWithPort<O, ?> next = outputPort.getPipe().getTargetStage(); StageWithPort<?, ?> next = outputPort.getPipe().getTargetPort().getOwningStage();
// StageWithPort<?, ?> next = this.next();
do { do {
next.executeWithPorts(); next.executeWithPorts();
} while (next.isReschedulable()); } while (next.isReschedulable());
} }
// @Override
// public SchedulingInformation getSchedulingInformation() {
// return this.schedulingInformation;
// }
// public void disable() { // public void disable() {
// this.schedulingInformation.setActive(false); // this.schedulingInformation.setActive(false);
// this.fireOnDisable(); // this.fireOnDisable();
...@@ -117,7 +111,6 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { ...@@ -117,7 +111,6 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
@Override @Override
public void setParentStage(final StageWithPort<?, ?> parentStage, final int index) { public void setParentStage(final StageWithPort<?, ?> parentStage, final int index) {
this.index = index;
this.parentStage = parentStage; this.parentStage = parentStage;
} }
...@@ -134,4 +127,9 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { ...@@ -134,4 +127,9 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
return this.id; return this.id;
} }
@Override
public String toString() {
return this.getClass().getName() + ": " + this.id;
}
} }
...@@ -26,12 +26,6 @@ public abstract class ConsumerStage<I, O> extends AbstractStage<I, O> { ...@@ -26,12 +26,6 @@ public abstract class ConsumerStage<I, O> extends AbstractStage<I, O> {
this.setReschedulable(this.getInputPort().getPipe().size() > 0); this.setReschedulable(this.getInputPort().getPipe().size() > 0);
this.execute5(element); this.execute5(element);
// this.send(result);
// if (!this.getOutputPort().pipe.isEmpty()) {
// super.executeWithPorts();
// }
} }
@Override @Override
......
...@@ -4,8 +4,14 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe; ...@@ -4,8 +4,14 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
public class InputPort<T> { public class InputPort<T> {
private final StageWithPort<?, ?> owningStage;
private IPipe<T> pipe; private IPipe<T> pipe;
public InputPort(final StageWithPort<?, ?> owningStage) {
super();
this.owningStage = owningStage;
}
public T receive() { public T receive() {
T element = this.pipe.removeLast(); T element = this.pipe.removeLast();
return element; return element;
...@@ -22,6 +28,11 @@ public class InputPort<T> { ...@@ -22,6 +28,11 @@ public class InputPort<T> {
public void setPipe(final IPipe<T> pipe) { public void setPipe(final IPipe<T> pipe) {
this.pipe = pipe; this.pipe = pipe;
pipe.setTargetPort(this);
}
public StageWithPort<?, ?> getOwningStage() {
return this.owningStage;
} }
} }
...@@ -22,12 +22,9 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { ...@@ -22,12 +22,9 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
private final List<StageWithPort<?, ?>> intermediateStages = new LinkedList<StageWithPort<?, ?>>(); private final List<StageWithPort<?, ?>> intermediateStages = new LinkedList<StageWithPort<?, ?>>();
private StageWithPort<?, O> lastStage; private StageWithPort<?, O> lastStage;
private StageWithPort<?, ?> successor;
private StageWithPort<?, ?>[] stages; private StageWithPort<?, ?>[] stages;
private StageWithPort<?, ?> parentStage; private StageWithPort<?, ?> parentStage;
private int index; // private int startIndex;
private int startIndex;
private boolean reschedulable; private boolean reschedulable;
private int firstStageIndex; private int firstStageIndex;
...@@ -49,7 +46,7 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { ...@@ -49,7 +46,7 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
this.intermediateStages.addAll(Arrays.asList(stages)); this.intermediateStages.addAll(Arrays.asList(stages));
} }
public void addIntermediateStage(final StageWithPort stage) { public void addIntermediateStage(final StageWithPort<?, ?> stage) {
this.intermediateStages.add(stage); this.intermediateStages.add(stage);
} }
...@@ -93,12 +90,13 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { ...@@ -93,12 +90,13 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
while (!currentStage.isReschedulable()) { while (!currentStage.isReschedulable()) {
this.firstStageIndex++; this.firstStageIndex++;
// currentStage = currentStage.getOutputPort().getPipe().getTargetStage(); // FIXME what to do with a stage with more than one output port? // currentStage = currentStage.getOutputPort().getPipe().getTargetStage(); // FIXME what to do with a stage with more than one output port?
currentStage = this.stages[this.firstStageIndex]; // if (currentStage == null) { // loop reaches the last stage
if (currentStage == null) { // loop reaches the last stage if (this.firstStageIndex == this.stages.length) { // loop reaches the last stage
this.setReschedulable(false); this.setReschedulable(false);
this.cleanUp(); this.cleanUp();
return; return;
} }
currentStage = this.stages[this.firstStageIndex];
currentStage.onIsPipelineHead(); currentStage.onIsPipelineHead();
// System.out.println("firstStageIndex: " + this.firstStageIndex + ", class:" + stage.getClass().getSimpleName()); // System.out.println("firstStageIndex: " + this.firstStageIndex + ", class:" + stage.getClass().getSimpleName());
} }
...@@ -166,7 +164,6 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { ...@@ -166,7 +164,6 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
@Override @Override
public void setParentStage(final StageWithPort<?, ?> parentStage, final int index) { public void setParentStage(final StageWithPort<?, ?> parentStage, final int index) {
this.index = index;
this.parentStage = parentStage; this.parentStage = parentStage;
} }
...@@ -191,12 +188,12 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { ...@@ -191,12 +188,12 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
// TODO remove since it does not increase performances // TODO remove since it does not increase performances
private void cleanUp() { private void cleanUp() {
for (int i = 0; i < this.stages.length; i++) { // for (int i = 0; i < this.stages.length; i++) {
StageWithPort<?, ?> stage = this.stages[i]; // StageWithPort<?, ?> stage = this.stages[i];
stage.setParentStage(null, i); // // stage.setParentStage(null, i);
// stage.setListener(null); // // stage.setListener(null);
// stage.setSuccessor(null); // // stage.setSuccessor(null);
} // }
this.firstStage = null; this.firstStage = null;
this.intermediateStages.clear(); this.intermediateStages.clear();
......
...@@ -2,12 +2,12 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; ...@@ -2,12 +2,12 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.InputPort;
public abstract class AbstractPipe<T> implements IPipe<T> { public abstract class AbstractPipe<T> implements IPipe<T> {
private final AtomicBoolean closed = new AtomicBoolean(); private final AtomicBoolean closed = new AtomicBoolean();
private StageWithPort<T, ?> targetStage; private InputPort<T> targetPort;
@Override @Override
public boolean isClosed() { public boolean isClosed() {
...@@ -20,12 +20,13 @@ public abstract class AbstractPipe<T> implements IPipe<T> { ...@@ -20,12 +20,13 @@ public abstract class AbstractPipe<T> implements IPipe<T> {
} }
@Override @Override
public StageWithPort<T, ?> getTargetStage() { public InputPort<T> getTargetPort() {
return this.targetStage; return this.targetPort;
} }
public void setTargetStage(StageWithPort<T, ?> targetStage) { @Override
this.targetStage = targetStage; public void setTargetPort(final InputPort<T> targetPort) {
this.targetPort = targetPort;
} }
} }
package teetime.variant.methodcallWithPorts.framework.core.pipe; package teetime.variant.methodcallWithPorts.framework.core.pipe;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.InputPort;
public interface IPipe<T> { public interface IPipe<T> {
public abstract void add(T element); void add(T element);
public abstract T removeLast(); T removeLast();
public abstract boolean isEmpty(); boolean isEmpty();
public abstract int size(); int size();
public abstract T readLast(); T readLast();
public abstract void close(); void close();
public abstract boolean isClosed(); boolean isClosed();
public abstract StageWithPort<T, ?> getTargetStage(); InputPort<T> getTargetPort();
void setTargetPort(InputPort<T> targetPort);
} }
...@@ -41,16 +41,4 @@ public class SingleElementPipe<T> extends AbstractPipe<T> { ...@@ -41,16 +41,4 @@ public class SingleElementPipe<T> extends AbstractPipe<T> {
return (this.element == null) ? 0 : 1; return (this.element == null) ? 0 : 1;
} }
// @Override
// public void close() {
//
//
// }
//
// @Override
// public boolean isClosed() {
//
// return false;
// }
} }
...@@ -27,7 +27,7 @@ import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; ...@@ -27,7 +27,7 @@ import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
*/ */
public class CollectorSink<T> extends ConsumerStage<T, Object> { public class CollectorSink<T> extends ConsumerStage<T, Object> {
private static final int THRESHOLD = 10000; private static final int THRESHOLD = 100; // TODO make configurable or use an sysout stage instead
private final List<T> elements; private final List<T> elements;
......
...@@ -4,9 +4,9 @@ import teetime.util.list.CommittableQueue; ...@@ -4,9 +4,9 @@ import teetime.util.list.CommittableQueue;
import teetime.variant.methodcallWithPorts.framework.core.AbstractStage; import teetime.variant.methodcallWithPorts.framework.core.AbstractStage;
import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.InputPort;
public class Delay<I> extends AbstractStage<I, I> { public class Delay<T> extends AbstractStage<T, T> {
private final InputPort<Long> timestampTriggerInputPort = new InputPort<Long>(); private final InputPort<Long> timestampTriggerInputPort = new InputPort<Long>(this);
public Delay() { public Delay() {
// this.setReschedulable(true); // this.setReschedulable(true);
...@@ -23,7 +23,7 @@ public class Delay<I> extends AbstractStage<I, I> { ...@@ -23,7 +23,7 @@ public class Delay<I> extends AbstractStage<I, I> {
// System.out.println("#elements: " + this.getInputPort().pipe.size()); // System.out.println("#elements: " + this.getInputPort().pipe.size());
// TODO implement receiveAll() and sendMultiple() // TODO implement receiveAll() and sendMultiple()
while (!this.getInputPort().getPipe().isEmpty()) { while (!this.getInputPort().getPipe().isEmpty()) {
I element = this.getInputPort().receive(); T element = this.getInputPort().receive();
this.send(element); this.send(element);
} }
...@@ -38,13 +38,13 @@ public class Delay<I> extends AbstractStage<I, I> { ...@@ -38,13 +38,13 @@ public class Delay<I> extends AbstractStage<I, I> {
} }
@Override @Override
protected void execute4(final CommittableQueue<I> elements) { protected void execute4(final CommittableQueue<T> elements) {
// TODO Auto-generated method stub // TODO Auto-generated method stub
} }
@Override @Override
protected void execute5(final I element) { protected void execute5(final T element) {
// TODO Auto-generated method stub // TODO Auto-generated method stub
} }
......
...@@ -19,11 +19,12 @@ package teetime.variant.methodcallWithPorts.stage.basic.merger; ...@@ -19,11 +19,12 @@ package teetime.variant.methodcallWithPorts.stage.basic.merger;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import teetime.variant.explicitScheduling.framework.core.Description;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.InputPort;
/** /**
*
* This stage merges data from the input ports, by taking elements according to the chosen merge strategy and by putting them to the output port.
* *
* @author Christian Wulf * @author Christian Wulf
* *
...@@ -32,11 +33,11 @@ import teetime.variant.methodcallWithPorts.framework.core.InputPort; ...@@ -32,11 +33,11 @@ import teetime.variant.methodcallWithPorts.framework.core.InputPort;
* @param <T> * @param <T>
* the type of the input ports and the output port * the type of the input ports and the output port
*/ */
@Description("This stage merges data from the input ports, by taking elements according to the chosen merge strategy and by putting them to the output port.")
public class Merger<T> extends ConsumerStage<T, T> { public class Merger<T> extends ConsumerStage<T, T> {
// TODO do not inherit from AbstractStage since it provides the default input port that is unnecessary for the merger // TODO do not inherit from AbstractStage since it provides the default input port that is unnecessary for the merger
// BETTER use an array since a list always creates a new iterator when looping
private final List<InputPort<T>> inputPortList = new ArrayList<InputPort<T>>(); private final List<InputPort<T>> inputPortList = new ArrayList<InputPort<T>>();
private IMergerStrategy<T> strategy = new RoundRobinStrategy<T>(); private IMergerStrategy<T> strategy = new RoundRobinStrategy<T>();
...@@ -49,6 +50,19 @@ public class Merger<T> extends ConsumerStage<T, T> { ...@@ -49,6 +50,19 @@ public class Merger<T> extends ConsumerStage<T, T> {
this.strategy = strategy; this.strategy = strategy;
} }
@Override
public void executeWithPorts() {
this.logger.debug("Executing stage...");
this.execute5(null);
boolean isReschedulable = false;
for (InputPort<T> inputPort : this.inputPortList) {
isReschedulable = isReschedulable || !inputPort.getPipe().isEmpty();
}
this.setReschedulable(isReschedulable);
}
@Override @Override
protected void execute5(final T element) { protected void execute5(final T element) {
final T token = this.strategy.getNextInput(this); final T token = this.strategy.getNextInput(this);
...@@ -65,7 +79,7 @@ public class Merger<T> extends ConsumerStage<T, T> { ...@@ -65,7 +79,7 @@ public class Merger<T> extends ConsumerStage<T, T> {
} }
private InputPort<T> getNewInputPort() { private InputPort<T> getNewInputPort() {
InputPort<T> inputPort = new InputPort<T>(); InputPort<T> inputPort = new InputPort<T>(this);
this.inputPortList.add(inputPort); this.inputPortList.add(inputPort);
return inputPort; return inputPort;
} }
......
...@@ -15,8 +15,11 @@ ...@@ -15,8 +15,11 @@
***************************************************************************/ ***************************************************************************/
package teetime.variant.methodcallWithPorts.examples.recordReader; package teetime.variant.methodcallWithPorts.examples.recordReader;
import static org.junit.Assert.assertEquals;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
...@@ -36,6 +39,12 @@ public class RecordReaderAnalysisTest { ...@@ -36,6 +39,12 @@ public class RecordReaderAnalysisTest {
this.stopWatch = new StopWatch(); this.stopWatch = new StopWatch();
} }
@After
public void after() {
long overallDurationInNs = this.stopWatch.getDurationInNs();
System.out.println("Duration: " + TimeUnit.NANOSECONDS.toMillis(overallDurationInNs) + " ms");
}
@Test @Test
public void performAnalysis() { public void performAnalysis() {
final RecordReaderAnalysis analysis = new RecordReaderAnalysis(); final RecordReaderAnalysis analysis = new RecordReaderAnalysis();
...@@ -49,8 +58,7 @@ public class RecordReaderAnalysisTest { ...@@ -49,8 +58,7 @@ public class RecordReaderAnalysisTest {
analysis.onTerminate(); analysis.onTerminate();
} }
long overallDurationInNs = this.stopWatch.getDurationInNs(); assertEquals(6541, analysis.getElementCollection().size());
System.out.println("Duration: " + TimeUnit.NANOSECONDS.toMillis(overallDurationInNs) + " ms");
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment