diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index 53ed24f01f44e8a7d3a6693978c826625a58efac..457415f3fd23ccafcd3b66d4973d24ae79eebfbd 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -126,7 +126,6 @@ public abstract class AbstractStage implements StageWithPort { public void onTerminating() { // empty default implementation - this.onIsPipelineHead(); } protected <T> InputPort<T> createInputPort() { diff --git a/src/main/java/teetime/framework/ConsumerStage.java b/src/main/java/teetime/framework/ConsumerStage.java index 769ca731e8a0cca2d0467b54ce2ea7124e396b16..60db33da9c85901e6482ea01ec21c8834b9f9e87 100644 --- a/src/main/java/teetime/framework/ConsumerStage.java +++ b/src/main/java/teetime/framework/ConsumerStage.java @@ -15,11 +15,6 @@ public abstract class ConsumerStage<I> extends AbstractStage { this.execute(element); } - @Override - public void onIsPipelineHead() { - // do nothing - } - protected abstract void execute(I element); } diff --git a/src/main/java/teetime/framework/Pipeline.java b/src/main/java/teetime/framework/Pipeline.java index 91b0ae866ecd9b073e1097e7be404bc7b2dfc812..eb616933cfed500e442bfeedc96961cbf147378a 100644 --- a/src/main/java/teetime/framework/Pipeline.java +++ b/src/main/java/teetime/framework/Pipeline.java @@ -46,11 +46,6 @@ public class Pipeline<FirstStage extends StageWithPort, LastStage extends StageW this.firstStage.setParentStage(parentStage, index); } - @Override - public void onIsPipelineHead() { - this.firstStage.onIsPipelineHead(); - } - @Override public void onSignal(final ISignal signal, final InputPort<?> inputPort) { this.firstStage.onSignal(signal, inputPort); diff --git a/src/main/java/teetime/framework/ProducerStage.java b/src/main/java/teetime/framework/ProducerStage.java index 5ef9c813a0eb26cbb58396a9991050b3b256691e..ac3dbd1f078dfe477e5029bae8f83a1e2eba5ae6 100644 --- a/src/main/java/teetime/framework/ProducerStage.java +++ b/src/main/java/teetime/framework/ProducerStage.java @@ -23,11 +23,6 @@ public abstract class ProducerStage<O> extends AbstractStage implements HeadStag this.execute(); } - @Override - public void onIsPipelineHead() { - // do nothing - } - @Override public void terminate() { this.shouldTerminate = true; diff --git a/src/main/java/teetime/framework/StageWithPort.java b/src/main/java/teetime/framework/StageWithPort.java index 281b9485ea5476b4d183e7fb2402f616050550a6..0a0d44c3d1fbd578cb32638884f4065f5451b94d 100644 --- a/src/main/java/teetime/framework/StageWithPort.java +++ b/src/main/java/teetime/framework/StageWithPort.java @@ -15,9 +15,6 @@ public interface StageWithPort { void setParentStage(StageWithPort parentStage, int index); - // BETTER remove this method since it will be replaced by onTerminating() - void onIsPipelineHead(); - void onSignal(ISignal signal, InputPort<?> inputPort); /** diff --git a/src/main/java/teetime/stage/Cache.java b/src/main/java/teetime/stage/Cache.java index 44588ceb2a085eba4f9c4b83145df1c637047c96..972e8e4b952956568fcfa8814584762035f770d9 100644 --- a/src/main/java/teetime/stage/Cache.java +++ b/src/main/java/teetime/stage/Cache.java @@ -20,7 +20,7 @@ public class Cache<T> extends ConsumerStage<T> { } @Override - public void onIsPipelineHead() { + public void onTerminating() { this.logger.debug("Emitting " + this.cachedObjects.size() + " cached elements..."); StopWatch stopWatch = new StopWatch(); stopWatch.start(); @@ -29,7 +29,7 @@ public class Cache<T> extends ConsumerStage<T> { } stopWatch.end(); this.logger.debug("Emitting took " + TimeUnit.NANOSECONDS.toMillis(stopWatch.getDurationInNs()) + " ms"); - super.onIsPipelineHead(); + super.onTerminating(); } public OutputPort<T> getOutputPort() { diff --git a/src/main/java/teetime/stage/CollectorSink.java b/src/main/java/teetime/stage/CollectorSink.java index 33d4b434cef61cfef3b1f6633ea64795d009a537..9ea251bae40e31aff5db53d313048429dc7d74ea 100644 --- a/src/main/java/teetime/stage/CollectorSink.java +++ b/src/main/java/teetime/stage/CollectorSink.java @@ -45,7 +45,7 @@ public class CollectorSink<T> extends ConsumerStage<T> { } @Override - public void onIsPipelineHead() { + public void onTerminating() { System.out.println("size: " + this.elements.size()); } diff --git a/src/main/java/teetime/stage/basic/Delay.java b/src/main/java/teetime/stage/basic/Delay.java index 1804def32735e94ea0a68a186f55748ce5e4f102..9b95052a624a6d23e50a62fe1880991ee03b5a44 100644 --- a/src/main/java/teetime/stage/basic/Delay.java +++ b/src/main/java/teetime/stage/basic/Delay.java @@ -27,7 +27,7 @@ public class Delay<T> extends AbstractStage { } @Override - public void onIsPipelineHead() { + public void onTerminating() { while (!this.inputPort.getPipe().isEmpty()) { this.executeWithPorts(); } diff --git a/src/main/java/teetime/stage/basic/distributor/Distributor.java b/src/main/java/teetime/stage/basic/distributor/Distributor.java index f2120ce2c2f54d05592fa4e7831c5720b0e2729b..c0162be1269abb0040c98d5bdf03894111d33441 100644 --- a/src/main/java/teetime/stage/basic/distributor/Distributor.java +++ b/src/main/java/teetime/stage/basic/distributor/Distributor.java @@ -38,7 +38,7 @@ public class Distributor<T> extends ConsumerStage<T> { } @Override - public void onIsPipelineHead() { + public void onTerminating() { // for (OutputPort<T> op : this.outputPortList) { // op.getPipe().close(); // System.out.println("End signal sent, size: " + op.getPipe().size()); diff --git a/src/main/java/teetime/stage/basic/merger/Merger.java b/src/main/java/teetime/stage/basic/merger/Merger.java index eb4c9e315680ce85f94d81f09ab526cd517b1554..70696813577b920088c698da6b791a2662c467ef 100644 --- a/src/main/java/teetime/stage/basic/merger/Merger.java +++ b/src/main/java/teetime/stage/basic/merger/Merger.java @@ -62,7 +62,7 @@ public class Merger<T> extends AbstractStage { } @Override - public void onIsPipelineHead() { + public void onTerminating() { this.finishedInputPorts++; } diff --git a/src/main/java/teetime/stage/kieker/TCPReaderSink.java b/src/main/java/teetime/stage/kieker/TCPReaderSink.java index f61dbdd52b689e4493dcb7160c93e4b1b696854a..5ce75d54d158eed4f70724d602fa846b72af2f4f 100644 --- a/src/main/java/teetime/stage/kieker/TCPReaderSink.java +++ b/src/main/java/teetime/stage/kieker/TCPReaderSink.java @@ -149,10 +149,10 @@ public class TCPReaderSink extends ProducerStage<IMonitoringRecord> { } @Override - public void onIsPipelineHead() { + public void onTerminating() { this.executorService.shutdown(); this.tcpStringReader.interrupt(); - super.onIsPipelineHead(); + super.onTerminating(); } /** diff --git a/src/main/java/teetime/stage/kieker/traceReconstruction/TraceReconstructionFilter.java b/src/main/java/teetime/stage/kieker/traceReconstruction/TraceReconstructionFilter.java index 9453af863ff0a7a74ac995464920e401d6088356..fe5a23d9d3fb171ccc93c91308436f8bdfbc4617 100644 --- a/src/main/java/teetime/stage/kieker/traceReconstruction/TraceReconstructionFilter.java +++ b/src/main/java/teetime/stage/kieker/traceReconstruction/TraceReconstructionFilter.java @@ -94,12 +94,12 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord> { } @Override - public void onIsPipelineHead() { + public void onTerminating() { for (Long traceId : this.traceId2trace.keySet()) { this.put(traceId, false); } - super.onIsPipelineHead(); + super.onTerminating(); } private void sendTraceBuffer(final TraceBuffer traceBuffer) { diff --git a/src/main/java/teetime/stage/kieker/traceReduction/TraceReductionFilter.java b/src/main/java/teetime/stage/kieker/traceReduction/TraceReductionFilter.java index 0a66326b105fbf19c0bc00c141fcc17a898f4240..c9d6e9948ba4478582c4ba67d6175ea8cc8ed18e 100644 --- a/src/main/java/teetime/stage/kieker/traceReduction/TraceReductionFilter.java +++ b/src/main/java/teetime/stage/kieker/traceReduction/TraceReductionFilter.java @@ -73,7 +73,7 @@ public class TraceReductionFilter extends ConsumerStage<TraceEventRecords> { } @Override - public void onIsPipelineHead() { + public void onTerminating() { synchronized (this.trace2buffer) { // BETTER hide and improve synchronization in the buffer for (final Entry<TraceEventRecords, TraceAggregationBuffer> entry : this.trace2buffer.entrySet()) { final TraceAggregationBuffer buffer = entry.getValue(); @@ -84,7 +84,7 @@ public class TraceReductionFilter extends ConsumerStage<TraceEventRecords> { this.trace2buffer.clear(); } - super.onIsPipelineHead(); + super.onTerminating(); } private void processTimeoutQueue(final long timestampInNs) {