Skip to content
Snippets Groups Projects
Commit 8825a558 authored by Christian Wulf's avatar Christian Wulf
Browse files

added concept doc "stage with trigger";

finished RecordReaderAnalysisTest
parent ab0edfbd
No related branches found
No related tags found
No related merge requests found
Showing
with 88 additions and 74 deletions
.handlers = java.util.logging.ConsoleHandler
.level= ALL
java.util.logging.ConsoleHandler.level = ALL
java.util.logging.ConsoleHandler.level = INFO
#java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %2$s %4$s: %5$s %n
#teetime.level = ALL
\ No newline at end of file
File added
File added
......@@ -15,21 +15,16 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
*/
protected Log logger;
private final InputPort<I> inputPort = new InputPort<I>();
private final InputPort<I> inputPort = new InputPort<I>(this);
private final OutputPort<O> outputPort = new OutputPort<O>();
// protected final CommittableQueue<O> outputElements = new CommittableResizableArrayQueue<O>(null, 4);
// private final CommittableQueue<O> outputElements = null;
private StageWithPort<?, ?> parentStage;
private int index;
private boolean reschedulable;
public AbstractStage() {
this.id = UUID.randomUUID().toString(); // the id should only be represented by a UUID, not additionally by the class name
this.logger = LogFactory.getLog(this.id);
this.logger = LogFactory.getLog(this.getClass().getName() + "(" + this.id + ")");
}
@Override
......@@ -74,6 +69,11 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
protected abstract void execute5(I element);
/**
* Sends the <code>element</code> using the default output port
*
* @param element
*/
protected final void send(final O element) {
this.send(this.getOutputPort(), element);
}
......@@ -81,19 +81,13 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
protected final void send(final OutputPort<O> outputPort, final O element) {
outputPort.send(element);
StageWithPort<O, ?> next = outputPort.getPipe().getTargetStage();
StageWithPort<?, ?> next = outputPort.getPipe().getTargetPort().getOwningStage();
// StageWithPort<?, ?> next = this.next();
do {
next.executeWithPorts();
} while (next.isReschedulable());
}
// @Override
// public SchedulingInformation getSchedulingInformation() {
// return this.schedulingInformation;
// }
// public void disable() {
// this.schedulingInformation.setActive(false);
// this.fireOnDisable();
......@@ -117,7 +111,6 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
@Override
public void setParentStage(final StageWithPort<?, ?> parentStage, final int index) {
this.index = index;
this.parentStage = parentStage;
}
......@@ -134,4 +127,9 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
return this.id;
}
@Override
public String toString() {
return this.getClass().getName() + ": " + this.id;
}
}
......@@ -26,12 +26,6 @@ public abstract class ConsumerStage<I, O> extends AbstractStage<I, O> {
this.setReschedulable(this.getInputPort().getPipe().size() > 0);
this.execute5(element);
// this.send(result);
// if (!this.getOutputPort().pipe.isEmpty()) {
// super.executeWithPorts();
// }
}
@Override
......
......@@ -4,8 +4,14 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
public class InputPort<T> {
private final StageWithPort<?, ?> owningStage;
private IPipe<T> pipe;
public InputPort(final StageWithPort<?, ?> owningStage) {
super();
this.owningStage = owningStage;
}
public T receive() {
T element = this.pipe.removeLast();
return element;
......@@ -22,6 +28,11 @@ public class InputPort<T> {
public void setPipe(final IPipe<T> pipe) {
this.pipe = pipe;
pipe.setTargetPort(this);
}
public StageWithPort<?, ?> getOwningStage() {
return this.owningStage;
}
}
......@@ -22,12 +22,9 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
private final List<StageWithPort<?, ?>> intermediateStages = new LinkedList<StageWithPort<?, ?>>();
private StageWithPort<?, O> lastStage;
private StageWithPort<?, ?> successor;
private StageWithPort<?, ?>[] stages;
private StageWithPort<?, ?> parentStage;
private int index;
private int startIndex;
// private int startIndex;
private boolean reschedulable;
private int firstStageIndex;
......@@ -49,7 +46,7 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
this.intermediateStages.addAll(Arrays.asList(stages));
}
public void addIntermediateStage(final StageWithPort stage) {
public void addIntermediateStage(final StageWithPort<?, ?> stage) {
this.intermediateStages.add(stage);
}
......@@ -93,12 +90,13 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
while (!currentStage.isReschedulable()) {
this.firstStageIndex++;
// currentStage = currentStage.getOutputPort().getPipe().getTargetStage(); // FIXME what to do with a stage with more than one output port?
currentStage = this.stages[this.firstStageIndex];
if (currentStage == null) { // loop reaches the last stage
// if (currentStage == null) { // loop reaches the last stage
if (this.firstStageIndex == this.stages.length) { // loop reaches the last stage
this.setReschedulable(false);
this.cleanUp();
return;
}
currentStage = this.stages[this.firstStageIndex];
currentStage.onIsPipelineHead();
// System.out.println("firstStageIndex: " + this.firstStageIndex + ", class:" + stage.getClass().getSimpleName());
}
......@@ -166,7 +164,6 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
@Override
public void setParentStage(final StageWithPort<?, ?> parentStage, final int index) {
this.index = index;
this.parentStage = parentStage;
}
......@@ -191,12 +188,12 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
// TODO remove since it does not increase performances
private void cleanUp() {
for (int i = 0; i < this.stages.length; i++) {
StageWithPort<?, ?> stage = this.stages[i];
stage.setParentStage(null, i);
// stage.setListener(null);
// stage.setSuccessor(null);
}
// for (int i = 0; i < this.stages.length; i++) {
// StageWithPort<?, ?> stage = this.stages[i];
// // stage.setParentStage(null, i);
// // stage.setListener(null);
// // stage.setSuccessor(null);
// }
this.firstStage = null;
this.intermediateStages.clear();
......
......@@ -2,12 +2,12 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe;
import java.util.concurrent.atomic.AtomicBoolean;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
public abstract class AbstractPipe<T> implements IPipe<T> {
private final AtomicBoolean closed = new AtomicBoolean();
private StageWithPort<T, ?> targetStage;
private InputPort<T> targetPort;
@Override
public boolean isClosed() {
......@@ -20,12 +20,13 @@ public abstract class AbstractPipe<T> implements IPipe<T> {
}
@Override
public StageWithPort<T, ?> getTargetStage() {
return this.targetStage;
public InputPort<T> getTargetPort() {
return this.targetPort;
}
public void setTargetStage(StageWithPort<T, ?> targetStage) {
this.targetStage = targetStage;
@Override
public void setTargetPort(final InputPort<T> targetPort) {
this.targetPort = targetPort;
}
}
package teetime.variant.methodcallWithPorts.framework.core.pipe;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
public interface IPipe<T> {
public abstract void add(T element);
void add(T element);
public abstract T removeLast();
T removeLast();
public abstract boolean isEmpty();
boolean isEmpty();
public abstract int size();
int size();
public abstract T readLast();
T readLast();
public abstract void close();
void close();
public abstract boolean isClosed();
boolean isClosed();
public abstract StageWithPort<T, ?> getTargetStage();
InputPort<T> getTargetPort();
void setTargetPort(InputPort<T> targetPort);
}
......@@ -41,16 +41,4 @@ public class SingleElementPipe<T> extends AbstractPipe<T> {
return (this.element == null) ? 0 : 1;
}
// @Override
// public void close() {
//
//
// }
//
// @Override
// public boolean isClosed() {
//
// return false;
// }
}
......@@ -27,7 +27,7 @@ import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
*/
public class CollectorSink<T> extends ConsumerStage<T, Object> {
private static final int THRESHOLD = 10000;
private static final int THRESHOLD = 100; // TODO make configurable or use an sysout stage instead
private final List<T> elements;
......
......@@ -4,9 +4,9 @@ import teetime.util.list.CommittableQueue;
import teetime.variant.methodcallWithPorts.framework.core.AbstractStage;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
public class Delay<I> extends AbstractStage<I, I> {
public class Delay<T> extends AbstractStage<T, T> {
private final InputPort<Long> timestampTriggerInputPort = new InputPort<Long>();
private final InputPort<Long> timestampTriggerInputPort = new InputPort<Long>(this);
public Delay() {
// this.setReschedulable(true);
......@@ -23,7 +23,7 @@ public class Delay<I> extends AbstractStage<I, I> {
// System.out.println("#elements: " + this.getInputPort().pipe.size());
// TODO implement receiveAll() and sendMultiple()
while (!this.getInputPort().getPipe().isEmpty()) {
I element = this.getInputPort().receive();
T element = this.getInputPort().receive();
this.send(element);
}
......@@ -38,13 +38,13 @@ public class Delay<I> extends AbstractStage<I, I> {
}
@Override
protected void execute4(final CommittableQueue<I> elements) {
protected void execute4(final CommittableQueue<T> elements) {
// TODO Auto-generated method stub
}
@Override
protected void execute5(final I element) {
protected void execute5(final T element) {
// TODO Auto-generated method stub
}
......
......@@ -19,11 +19,12 @@ package teetime.variant.methodcallWithPorts.stage.basic.merger;
import java.util.ArrayList;
import java.util.List;
import teetime.variant.explicitScheduling.framework.core.Description;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
/**
*
* This stage merges data from the input ports, by taking elements according to the chosen merge strategy and by putting them to the output port.
*
* @author Christian Wulf
*
......@@ -32,11 +33,11 @@ import teetime.variant.methodcallWithPorts.framework.core.InputPort;
* @param <T>
* the type of the input ports and the output port
*/
@Description("This stage merges data from the input ports, by taking elements according to the chosen merge strategy and by putting them to the output port.")
public class Merger<T> extends ConsumerStage<T, T> {
// TODO do not inherit from AbstractStage since it provides the default input port that is unnecessary for the merger
// BETTER use an array since a list always creates a new iterator when looping
private final List<InputPort<T>> inputPortList = new ArrayList<InputPort<T>>();
private IMergerStrategy<T> strategy = new RoundRobinStrategy<T>();
......@@ -49,6 +50,19 @@ public class Merger<T> extends ConsumerStage<T, T> {
this.strategy = strategy;
}
@Override
public void executeWithPorts() {
this.logger.debug("Executing stage...");
this.execute5(null);
boolean isReschedulable = false;
for (InputPort<T> inputPort : this.inputPortList) {
isReschedulable = isReschedulable || !inputPort.getPipe().isEmpty();
}
this.setReschedulable(isReschedulable);
}
@Override
protected void execute5(final T element) {
final T token = this.strategy.getNextInput(this);
......@@ -65,7 +79,7 @@ public class Merger<T> extends ConsumerStage<T, T> {
}
private InputPort<T> getNewInputPort() {
InputPort<T> inputPort = new InputPort<T>();
InputPort<T> inputPort = new InputPort<T>(this);
this.inputPortList.add(inputPort);
return inputPort;
}
......
......@@ -15,8 +15,11 @@
***************************************************************************/
package teetime.variant.methodcallWithPorts.examples.recordReader;
import static org.junit.Assert.assertEquals;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
......@@ -36,6 +39,12 @@ public class RecordReaderAnalysisTest {
this.stopWatch = new StopWatch();
}
@After
public void after() {
long overallDurationInNs = this.stopWatch.getDurationInNs();
System.out.println("Duration: " + TimeUnit.NANOSECONDS.toMillis(overallDurationInNs) + " ms");
}
@Test
public void performAnalysis() {
final RecordReaderAnalysis analysis = new RecordReaderAnalysis();
......@@ -49,8 +58,7 @@ public class RecordReaderAnalysisTest {
analysis.onTerminate();
}
long overallDurationInNs = this.stopWatch.getDurationInNs();
System.out.println("Duration: " + TimeUnit.NANOSECONDS.toMillis(overallDurationInNs) + " ms");
assertEquals(6541, analysis.getElementCollection().size());
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment