diff --git a/conf/logging.properties b/conf/logging.properties index 55a2ada3d4e44623771a9cb3a329ecf012d1dbc2..81a62dd85d67be01a3674fda5b32a1302a6cb265 100644 --- a/conf/logging.properties +++ b/conf/logging.properties @@ -1,7 +1,8 @@ .handlers = java.util.logging.ConsoleHandler .level= ALL -java.util.logging.ConsoleHandler.level = ALL +java.util.logging.ConsoleHandler.level = INFO #java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter +java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %2$s %4$s: %5$s %n #teetime.level = ALL \ No newline at end of file diff --git a/doc/concept/stage with trigger.pdf b/doc/concept/stage with trigger.pdf new file mode 100644 index 0000000000000000000000000000000000000000..19e6cb0f5ff93ada56e9c996846a640c5d050065 Binary files /dev/null and b/doc/concept/stage with trigger.pdf differ diff --git a/doc/concept/stage with trigger.pptx b/doc/concept/stage with trigger.pptx new file mode 100644 index 0000000000000000000000000000000000000000..627959737e7a98d6b69e44a2341059242aa25474 Binary files /dev/null and b/doc/concept/stage with trigger.pptx differ diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java index 829f8e7e1525981d22808564e5d10feddb171567..6e74f0ca4a4d090cccb81712e82c5bb05048e796 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java @@ -15,21 +15,16 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { */ protected Log logger; - private final InputPort<I> inputPort = new InputPort<I>(); + private final InputPort<I> inputPort = new InputPort<I>(this); private final OutputPort<O> outputPort = new OutputPort<O>(); - // protected final CommittableQueue<O> outputElements = new CommittableResizableArrayQueue<O>(null, 4); - // private final CommittableQueue<O> outputElements = null; - private StageWithPort<?, ?> parentStage; - private int index; - private boolean reschedulable; public AbstractStage() { this.id = UUID.randomUUID().toString(); // the id should only be represented by a UUID, not additionally by the class name - this.logger = LogFactory.getLog(this.id); + this.logger = LogFactory.getLog(this.getClass().getName() + "(" + this.id + ")"); } @Override @@ -74,6 +69,11 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { protected abstract void execute5(I element); + /** + * Sends the <code>element</code> using the default output port + * + * @param element + */ protected final void send(final O element) { this.send(this.getOutputPort(), element); } @@ -81,19 +81,13 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { protected final void send(final OutputPort<O> outputPort, final O element) { outputPort.send(element); - StageWithPort<O, ?> next = outputPort.getPipe().getTargetStage(); + StageWithPort<?, ?> next = outputPort.getPipe().getTargetPort().getOwningStage(); - // StageWithPort<?, ?> next = this.next(); do { next.executeWithPorts(); } while (next.isReschedulable()); } - // @Override - // public SchedulingInformation getSchedulingInformation() { - // return this.schedulingInformation; - // } - // public void disable() { // this.schedulingInformation.setActive(false); // this.fireOnDisable(); @@ -117,7 +111,6 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { @Override public void setParentStage(final StageWithPort<?, ?> parentStage, final int index) { - this.index = index; this.parentStage = parentStage; } @@ -134,4 +127,9 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { return this.id; } + @Override + public String toString() { + return this.getClass().getName() + ": " + this.id; + } + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ConsumerStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ConsumerStage.java index e98323d4c2fad931618f67415bdb5b097771044a..940fefdb91751e13a3cb694f61b9f256a050874e 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ConsumerStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/ConsumerStage.java @@ -26,12 +26,6 @@ public abstract class ConsumerStage<I, O> extends AbstractStage<I, O> { this.setReschedulable(this.getInputPort().getPipe().size() > 0); this.execute5(element); - - // this.send(result); - - // if (!this.getOutputPort().pipe.isEmpty()) { - // super.executeWithPorts(); - // } } @Override diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InputPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InputPort.java index 95f674f2fe4302cd8dd97208b3599154883e2e91..d55ad342259a9699e06cd1d84e7fd306f0927729 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InputPort.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/InputPort.java @@ -4,8 +4,14 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe; public class InputPort<T> { + private final StageWithPort<?, ?> owningStage; private IPipe<T> pipe; + public InputPort(final StageWithPort<?, ?> owningStage) { + super(); + this.owningStage = owningStage; + } + public T receive() { T element = this.pipe.removeLast(); return element; @@ -22,6 +28,11 @@ public class InputPort<T> { public void setPipe(final IPipe<T> pipe) { this.pipe = pipe; + pipe.setTargetPort(this); + } + + public StageWithPort<?, ?> getOwningStage() { + return this.owningStage; } } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java index 8e179eb1ac8eaed583496998ab5801d5729c80b7..35ec227550545ec5713be7d01dae33db6f4ff9a4 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/Pipeline.java @@ -22,12 +22,9 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { private final List<StageWithPort<?, ?>> intermediateStages = new LinkedList<StageWithPort<?, ?>>(); private StageWithPort<?, O> lastStage; - private StageWithPort<?, ?> successor; - private StageWithPort<?, ?>[] stages; private StageWithPort<?, ?> parentStage; - private int index; - private int startIndex; + // private int startIndex; private boolean reschedulable; private int firstStageIndex; @@ -49,7 +46,7 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { this.intermediateStages.addAll(Arrays.asList(stages)); } - public void addIntermediateStage(final StageWithPort stage) { + public void addIntermediateStage(final StageWithPort<?, ?> stage) { this.intermediateStages.add(stage); } @@ -93,12 +90,13 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { while (!currentStage.isReschedulable()) { this.firstStageIndex++; // currentStage = currentStage.getOutputPort().getPipe().getTargetStage(); // FIXME what to do with a stage with more than one output port? - currentStage = this.stages[this.firstStageIndex]; - if (currentStage == null) { // loop reaches the last stage + // if (currentStage == null) { // loop reaches the last stage + if (this.firstStageIndex == this.stages.length) { // loop reaches the last stage this.setReschedulable(false); this.cleanUp(); return; } + currentStage = this.stages[this.firstStageIndex]; currentStage.onIsPipelineHead(); // System.out.println("firstStageIndex: " + this.firstStageIndex + ", class:" + stage.getClass().getSimpleName()); } @@ -166,7 +164,6 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { @Override public void setParentStage(final StageWithPort<?, ?> parentStage, final int index) { - this.index = index; this.parentStage = parentStage; } @@ -191,12 +188,12 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { // TODO remove since it does not increase performances private void cleanUp() { - for (int i = 0; i < this.stages.length; i++) { - StageWithPort<?, ?> stage = this.stages[i]; - stage.setParentStage(null, i); - // stage.setListener(null); - // stage.setSuccessor(null); - } + // for (int i = 0; i < this.stages.length; i++) { + // StageWithPort<?, ?> stage = this.stages[i]; + // // stage.setParentStage(null, i); + // // stage.setListener(null); + // // stage.setSuccessor(null); + // } this.firstStage = null; this.intermediateStages.clear(); diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java index d41f04c4d0e6c92c82f41adaa678ed5d8a10fc6b..5522b69b96c9ea048095d9da824dfbf64294f452 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/AbstractPipe.java @@ -2,12 +2,12 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; import java.util.concurrent.atomic.AtomicBoolean; -import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; +import teetime.variant.methodcallWithPorts.framework.core.InputPort; public abstract class AbstractPipe<T> implements IPipe<T> { private final AtomicBoolean closed = new AtomicBoolean(); - private StageWithPort<T, ?> targetStage; + private InputPort<T> targetPort; @Override public boolean isClosed() { @@ -20,12 +20,13 @@ public abstract class AbstractPipe<T> implements IPipe<T> { } @Override - public StageWithPort<T, ?> getTargetStage() { - return this.targetStage; + public InputPort<T> getTargetPort() { + return this.targetPort; } - public void setTargetStage(StageWithPort<T, ?> targetStage) { - this.targetStage = targetStage; + @Override + public void setTargetPort(final InputPort<T> targetPort) { + this.targetPort = targetPort; } } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java index 68cbd0516ce93358f343d3e04bef275eda538c90..2d13264878c4d08a3d257804927ad20a48784ea4 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/IPipe.java @@ -1,23 +1,25 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; -import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; +import teetime.variant.methodcallWithPorts.framework.core.InputPort; public interface IPipe<T> { - public abstract void add(T element); + void add(T element); - public abstract T removeLast(); + T removeLast(); - public abstract boolean isEmpty(); + boolean isEmpty(); - public abstract int size(); + int size(); - public abstract T readLast(); + T readLast(); - public abstract void close(); + void close(); - public abstract boolean isClosed(); + boolean isClosed(); - public abstract StageWithPort<T, ?> getTargetStage(); + InputPort<T> getTargetPort(); + + void setTargetPort(InputPort<T> targetPort); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java index f4dd5c6b0877b0bcadafdc506f8b00d36180199d..9440263d549b3b1262cf71c58a4f311a4c7ac13a 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SingleElementPipe.java @@ -41,16 +41,4 @@ public class SingleElementPipe<T> extends AbstractPipe<T> { return (this.element == null) ? 0 : 1; } - // @Override - // public void close() { - // - // - // } - // - // @Override - // public boolean isClosed() { - // - // return false; - // } - } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/CollectorSink.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/CollectorSink.java index c13fa8949b51faf21a26a1a022226911ca333317..f72b987ba40ccd347b79cdc1b6cdc2e74181e5e1 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/CollectorSink.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/CollectorSink.java @@ -27,7 +27,7 @@ import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; */ public class CollectorSink<T> extends ConsumerStage<T, Object> { - private static final int THRESHOLD = 10000; + private static final int THRESHOLD = 100; // TODO make configurable or use an sysout stage instead private final List<T> elements; diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Delay.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Delay.java index d27bc6b68ac4a3240df2849b464af9c4c470e774..5c839a7f795e50fb362747475ee1edfdc39a8d27 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Delay.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Delay.java @@ -4,9 +4,9 @@ import teetime.util.list.CommittableQueue; import teetime.variant.methodcallWithPorts.framework.core.AbstractStage; import teetime.variant.methodcallWithPorts.framework.core.InputPort; -public class Delay<I> extends AbstractStage<I, I> { +public class Delay<T> extends AbstractStage<T, T> { - private final InputPort<Long> timestampTriggerInputPort = new InputPort<Long>(); + private final InputPort<Long> timestampTriggerInputPort = new InputPort<Long>(this); public Delay() { // this.setReschedulable(true); @@ -23,7 +23,7 @@ public class Delay<I> extends AbstractStage<I, I> { // System.out.println("#elements: " + this.getInputPort().pipe.size()); // TODO implement receiveAll() and sendMultiple() while (!this.getInputPort().getPipe().isEmpty()) { - I element = this.getInputPort().receive(); + T element = this.getInputPort().receive(); this.send(element); } @@ -38,13 +38,13 @@ public class Delay<I> extends AbstractStage<I, I> { } @Override - protected void execute4(final CommittableQueue<I> elements) { + protected void execute4(final CommittableQueue<T> elements) { // TODO Auto-generated method stub } @Override - protected void execute5(final I element) { + protected void execute5(final T element) { // TODO Auto-generated method stub } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java index 23238ec8fa27eee3385676b81d7747b7e6f03669..daed0fa5db4cb07b0902aa5ae6da185e8ee0b77a 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java @@ -19,11 +19,12 @@ package teetime.variant.methodcallWithPorts.stage.basic.merger; import java.util.ArrayList; import java.util.List; -import teetime.variant.explicitScheduling.framework.core.Description; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; import teetime.variant.methodcallWithPorts.framework.core.InputPort; /** + * + * This stage merges data from the input ports, by taking elements according to the chosen merge strategy and by putting them to the output port. * * @author Christian Wulf * @@ -32,11 +33,11 @@ import teetime.variant.methodcallWithPorts.framework.core.InputPort; * @param <T> * the type of the input ports and the output port */ -@Description("This stage merges data from the input ports, by taking elements according to the chosen merge strategy and by putting them to the output port.") public class Merger<T> extends ConsumerStage<T, T> { // TODO do not inherit from AbstractStage since it provides the default input port that is unnecessary for the merger + // BETTER use an array since a list always creates a new iterator when looping private final List<InputPort<T>> inputPortList = new ArrayList<InputPort<T>>(); private IMergerStrategy<T> strategy = new RoundRobinStrategy<T>(); @@ -49,6 +50,19 @@ public class Merger<T> extends ConsumerStage<T, T> { this.strategy = strategy; } + @Override + public void executeWithPorts() { + this.logger.debug("Executing stage..."); + + this.execute5(null); + + boolean isReschedulable = false; + for (InputPort<T> inputPort : this.inputPortList) { + isReschedulable = isReschedulable || !inputPort.getPipe().isEmpty(); + } + this.setReschedulable(isReschedulable); + } + @Override protected void execute5(final T element) { final T token = this.strategy.getNextInput(this); @@ -65,7 +79,7 @@ public class Merger<T> extends ConsumerStage<T, T> { } private InputPort<T> getNewInputPort() { - InputPort<T> inputPort = new InputPort<T>(); + InputPort<T> inputPort = new InputPort<T>(this); this.inputPortList.add(inputPort); return inputPort; } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderAnalysisTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderAnalysisTest.java index 6825637d67153cc2f5648d1946ad6c6b353960d7..f4e1e027c6b3195e1151f724112e0c6da50eb244 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderAnalysisTest.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderAnalysisTest.java @@ -15,8 +15,11 @@ ***************************************************************************/ package teetime.variant.methodcallWithPorts.examples.recordReader; +import static org.junit.Assert.assertEquals; + import java.util.concurrent.TimeUnit; +import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -36,6 +39,12 @@ public class RecordReaderAnalysisTest { this.stopWatch = new StopWatch(); } + @After + public void after() { + long overallDurationInNs = this.stopWatch.getDurationInNs(); + System.out.println("Duration: " + TimeUnit.NANOSECONDS.toMillis(overallDurationInNs) + " ms"); + } + @Test public void performAnalysis() { final RecordReaderAnalysis analysis = new RecordReaderAnalysis(); @@ -49,8 +58,7 @@ public class RecordReaderAnalysisTest { analysis.onTerminate(); } - long overallDurationInNs = this.stopWatch.getDurationInNs(); - System.out.println("Duration: " + TimeUnit.NANOSECONDS.toMillis(overallDurationInNs) + " ms"); + assertEquals(6541, analysis.getElementCollection().size()); } }