From c572b2b57b87429978ce64a28826eb2f232a1e76 Mon Sep 17 00:00:00 2001 From: Nelson Tavares de Sousa <stu103017@mail.uni-kiel.de> Date: Tue, 14 Oct 2014 15:14:07 +0200 Subject: [PATCH] removed StageWithPorts.onIsPipelineHead and updated references to onTerminating --- src/main/java/teetime/framework/AbstractStage.java | 1 - src/main/java/teetime/framework/ConsumerStage.java | 5 ----- src/main/java/teetime/framework/Pipeline.java | 5 ----- src/main/java/teetime/framework/ProducerStage.java | 5 ----- src/main/java/teetime/framework/StageWithPort.java | 3 --- src/main/java/teetime/stage/Cache.java | 4 ++-- src/main/java/teetime/stage/CollectorSink.java | 2 +- src/main/java/teetime/stage/basic/Delay.java | 2 +- .../java/teetime/stage/basic/distributor/Distributor.java | 2 +- src/main/java/teetime/stage/basic/merger/Merger.java | 2 +- src/main/java/teetime/stage/kieker/TCPReaderSink.java | 4 ++-- .../traceReconstruction/TraceReconstructionFilter.java | 4 ++-- .../stage/kieker/traceReduction/TraceReductionFilter.java | 4 ++-- 13 files changed, 12 insertions(+), 31 deletions(-) diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index 53ed24f0..457415f3 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -126,7 +126,6 @@ public abstract class AbstractStage implements StageWithPort { public void onTerminating() { // empty default implementation - this.onIsPipelineHead(); } protected <T> InputPort<T> createInputPort() { diff --git a/src/main/java/teetime/framework/ConsumerStage.java b/src/main/java/teetime/framework/ConsumerStage.java index 769ca731..60db33da 100644 --- a/src/main/java/teetime/framework/ConsumerStage.java +++ b/src/main/java/teetime/framework/ConsumerStage.java @@ -15,11 +15,6 @@ public abstract class ConsumerStage<I> extends AbstractStage { this.execute(element); } - @Override - public void onIsPipelineHead() { - // do nothing - } - protected abstract void execute(I element); } diff --git a/src/main/java/teetime/framework/Pipeline.java b/src/main/java/teetime/framework/Pipeline.java index 91b0ae86..eb616933 100644 --- a/src/main/java/teetime/framework/Pipeline.java +++ b/src/main/java/teetime/framework/Pipeline.java @@ -46,11 +46,6 @@ public class Pipeline<FirstStage extends StageWithPort, LastStage extends StageW this.firstStage.setParentStage(parentStage, index); } - @Override - public void onIsPipelineHead() { - this.firstStage.onIsPipelineHead(); - } - @Override public void onSignal(final ISignal signal, final InputPort<?> inputPort) { this.firstStage.onSignal(signal, inputPort); diff --git a/src/main/java/teetime/framework/ProducerStage.java b/src/main/java/teetime/framework/ProducerStage.java index 5ef9c813..ac3dbd1f 100644 --- a/src/main/java/teetime/framework/ProducerStage.java +++ b/src/main/java/teetime/framework/ProducerStage.java @@ -23,11 +23,6 @@ public abstract class ProducerStage<O> extends AbstractStage implements HeadStag this.execute(); } - @Override - public void onIsPipelineHead() { - // do nothing - } - @Override public void terminate() { this.shouldTerminate = true; diff --git a/src/main/java/teetime/framework/StageWithPort.java b/src/main/java/teetime/framework/StageWithPort.java index 281b9485..0a0d44c3 100644 --- a/src/main/java/teetime/framework/StageWithPort.java +++ b/src/main/java/teetime/framework/StageWithPort.java @@ -15,9 +15,6 @@ public interface StageWithPort { void setParentStage(StageWithPort parentStage, int index); - // BETTER remove this method since it will be replaced by onTerminating() - void onIsPipelineHead(); - void onSignal(ISignal signal, InputPort<?> inputPort); /** diff --git a/src/main/java/teetime/stage/Cache.java b/src/main/java/teetime/stage/Cache.java index 44588ceb..972e8e4b 100644 --- a/src/main/java/teetime/stage/Cache.java +++ b/src/main/java/teetime/stage/Cache.java @@ -20,7 +20,7 @@ public class Cache<T> extends ConsumerStage<T> { } @Override - public void onIsPipelineHead() { + public void onTerminating() { this.logger.debug("Emitting " + this.cachedObjects.size() + " cached elements..."); StopWatch stopWatch = new StopWatch(); stopWatch.start(); @@ -29,7 +29,7 @@ public class Cache<T> extends ConsumerStage<T> { } stopWatch.end(); this.logger.debug("Emitting took " + TimeUnit.NANOSECONDS.toMillis(stopWatch.getDurationInNs()) + " ms"); - super.onIsPipelineHead(); + super.onTerminating(); } public OutputPort<T> getOutputPort() { diff --git a/src/main/java/teetime/stage/CollectorSink.java b/src/main/java/teetime/stage/CollectorSink.java index 33d4b434..9ea251ba 100644 --- a/src/main/java/teetime/stage/CollectorSink.java +++ b/src/main/java/teetime/stage/CollectorSink.java @@ -45,7 +45,7 @@ public class CollectorSink<T> extends ConsumerStage<T> { } @Override - public void onIsPipelineHead() { + public void onTerminating() { System.out.println("size: " + this.elements.size()); } diff --git a/src/main/java/teetime/stage/basic/Delay.java b/src/main/java/teetime/stage/basic/Delay.java index 1804def3..9b95052a 100644 --- a/src/main/java/teetime/stage/basic/Delay.java +++ b/src/main/java/teetime/stage/basic/Delay.java @@ -27,7 +27,7 @@ public class Delay<T> extends AbstractStage { } @Override - public void onIsPipelineHead() { + public void onTerminating() { while (!this.inputPort.getPipe().isEmpty()) { this.executeWithPorts(); } diff --git a/src/main/java/teetime/stage/basic/distributor/Distributor.java b/src/main/java/teetime/stage/basic/distributor/Distributor.java index f2120ce2..c0162be1 100644 --- a/src/main/java/teetime/stage/basic/distributor/Distributor.java +++ b/src/main/java/teetime/stage/basic/distributor/Distributor.java @@ -38,7 +38,7 @@ public class Distributor<T> extends ConsumerStage<T> { } @Override - public void onIsPipelineHead() { + public void onTerminating() { // for (OutputPort<T> op : this.outputPortList) { // op.getPipe().close(); // System.out.println("End signal sent, size: " + op.getPipe().size()); diff --git a/src/main/java/teetime/stage/basic/merger/Merger.java b/src/main/java/teetime/stage/basic/merger/Merger.java index eb4c9e31..70696813 100644 --- a/src/main/java/teetime/stage/basic/merger/Merger.java +++ b/src/main/java/teetime/stage/basic/merger/Merger.java @@ -62,7 +62,7 @@ public class Merger<T> extends AbstractStage { } @Override - public void onIsPipelineHead() { + public void onTerminating() { this.finishedInputPorts++; } diff --git a/src/main/java/teetime/stage/kieker/TCPReaderSink.java b/src/main/java/teetime/stage/kieker/TCPReaderSink.java index f61dbdd5..5ce75d54 100644 --- a/src/main/java/teetime/stage/kieker/TCPReaderSink.java +++ b/src/main/java/teetime/stage/kieker/TCPReaderSink.java @@ -149,10 +149,10 @@ public class TCPReaderSink extends ProducerStage<IMonitoringRecord> { } @Override - public void onIsPipelineHead() { + public void onTerminating() { this.executorService.shutdown(); this.tcpStringReader.interrupt(); - super.onIsPipelineHead(); + super.onTerminating(); } /** diff --git a/src/main/java/teetime/stage/kieker/traceReconstruction/TraceReconstructionFilter.java b/src/main/java/teetime/stage/kieker/traceReconstruction/TraceReconstructionFilter.java index 9453af86..fe5a23d9 100644 --- a/src/main/java/teetime/stage/kieker/traceReconstruction/TraceReconstructionFilter.java +++ b/src/main/java/teetime/stage/kieker/traceReconstruction/TraceReconstructionFilter.java @@ -94,12 +94,12 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord> { } @Override - public void onIsPipelineHead() { + public void onTerminating() { for (Long traceId : this.traceId2trace.keySet()) { this.put(traceId, false); } - super.onIsPipelineHead(); + super.onTerminating(); } private void sendTraceBuffer(final TraceBuffer traceBuffer) { diff --git a/src/main/java/teetime/stage/kieker/traceReduction/TraceReductionFilter.java b/src/main/java/teetime/stage/kieker/traceReduction/TraceReductionFilter.java index 0a66326b..c9d6e994 100644 --- a/src/main/java/teetime/stage/kieker/traceReduction/TraceReductionFilter.java +++ b/src/main/java/teetime/stage/kieker/traceReduction/TraceReductionFilter.java @@ -73,7 +73,7 @@ public class TraceReductionFilter extends ConsumerStage<TraceEventRecords> { } @Override - public void onIsPipelineHead() { + public void onTerminating() { synchronized (this.trace2buffer) { // BETTER hide and improve synchronization in the buffer for (final Entry<TraceEventRecords, TraceAggregationBuffer> entry : this.trace2buffer.entrySet()) { final TraceAggregationBuffer buffer = entry.getValue(); @@ -84,7 +84,7 @@ public class TraceReductionFilter extends ConsumerStage<TraceEventRecords> { this.trace2buffer.clear(); } - super.onIsPipelineHead(); + super.onTerminating(); } private void processTimeoutQueue(final long timestampInNs) { -- GitLab