diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index dc9d3871ad46a4f42d6d3e355b555123169b9bea..33be2f0cfb5b0773f794ffe0097a679d1e23b36b 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -126,7 +126,6 @@ public abstract class AbstractStage implements Stage { public void onTerminating() { // empty default implementation - this.onIsPipelineHead(); } protected <T> InputPort<T> createInputPort() { diff --git a/src/main/java/teetime/framework/ConsumerStage.java b/src/main/java/teetime/framework/ConsumerStage.java index 769ca731e8a0cca2d0467b54ce2ea7124e396b16..60db33da9c85901e6482ea01ec21c8834b9f9e87 100644 --- a/src/main/java/teetime/framework/ConsumerStage.java +++ b/src/main/java/teetime/framework/ConsumerStage.java @@ -15,11 +15,6 @@ public abstract class ConsumerStage<I> extends AbstractStage { this.execute(element); } - @Override - public void onIsPipelineHead() { - // do nothing - } - protected abstract void execute(I element); } diff --git a/src/main/java/teetime/framework/HeadPipeline.java b/src/main/java/teetime/framework/HeadPipeline.java index e2ec170b0b3d80f708cfb41516b26be7865ea5e5..293700a045013be308290e984687897a35a0d03f 100644 --- a/src/main/java/teetime/framework/HeadPipeline.java +++ b/src/main/java/teetime/framework/HeadPipeline.java @@ -1,6 +1,6 @@ package teetime.framework; -public class HeadPipeline<FirstStage extends HeadStage, LastStage extends Stage> extends Pipeline<FirstStage, LastStage> implements HeadStage { +public class HeadPipeline<FirstStage extends HeadStage, LastStage extends Stage> extends OldPipeline<FirstStage, LastStage> implements HeadStage { public HeadPipeline() {} diff --git a/src/main/java/teetime/framework/Pipeline.java b/src/main/java/teetime/framework/OldPipeline.java similarity index 87% rename from src/main/java/teetime/framework/Pipeline.java rename to src/main/java/teetime/framework/OldPipeline.java index 4900bf65fe96624270f06dd92b41e4c54a23455e..77b70577e9c3ec38d49747253d5ead7c22722ec8 100644 --- a/src/main/java/teetime/framework/Pipeline.java +++ b/src/main/java/teetime/framework/OldPipeline.java @@ -5,7 +5,7 @@ import java.util.List; import teetime.framework.signal.ISignal; import teetime.framework.validation.InvalidPortConnection; -public class Pipeline<FirstStage extends Stage, LastStage extends Stage> implements Stage { +public class OldPipeline<FirstStage extends Stage, LastStage extends Stage> implements Stage { protected FirstStage firstStage; protected LastStage lastStage; @@ -46,11 +46,6 @@ public class Pipeline<FirstStage extends Stage, LastStage extends Stage> impleme this.firstStage.setParentStage(parentStage, index); } - @Override - public void onIsPipelineHead() { - this.firstStage.onIsPipelineHead(); - } - @Override public void onSignal(final ISignal signal, final InputPort<?> inputPort) { this.firstStage.onSignal(signal, inputPort); diff --git a/src/main/java/teetime/framework/ProducerStage.java b/src/main/java/teetime/framework/ProducerStage.java index 5ef9c813a0eb26cbb58396a9991050b3b256691e..ac3dbd1f078dfe477e5029bae8f83a1e2eba5ae6 100644 --- a/src/main/java/teetime/framework/ProducerStage.java +++ b/src/main/java/teetime/framework/ProducerStage.java @@ -23,11 +23,6 @@ public abstract class ProducerStage<O> extends AbstractStage implements HeadStag this.execute(); } - @Override - public void onIsPipelineHead() { - // do nothing - } - @Override public void terminate() { this.shouldTerminate = true; diff --git a/src/main/java/teetime/framework/Stage.java b/src/main/java/teetime/framework/Stage.java index b773f6b2ac815e0a2cb79d09268f7c713d04aab5..3f0798ac9bd35ed3ad195e1ec1f5feefdc973fb1 100644 --- a/src/main/java/teetime/framework/Stage.java +++ b/src/main/java/teetime/framework/Stage.java @@ -15,9 +15,6 @@ public interface Stage { void setParentStage(Stage parentStage, int index); - // BETTER remove this method since it will be replaced by onTerminating() - void onIsPipelineHead(); - void onSignal(ISignal signal, InputPort<?> inputPort); /** diff --git a/src/main/java/teetime/stage/Cache.java b/src/main/java/teetime/stage/Cache.java index 44588ceb2a085eba4f9c4b83145df1c637047c96..07b7d2896a04bb5338ff31f63aa09613b8ff9cdf 100644 --- a/src/main/java/teetime/stage/Cache.java +++ b/src/main/java/teetime/stage/Cache.java @@ -20,7 +20,8 @@ public class Cache<T> extends ConsumerStage<T> { } @Override - public void onIsPipelineHead() { + public void onTerminating() { + super.onTerminating(); this.logger.debug("Emitting " + this.cachedObjects.size() + " cached elements..."); StopWatch stopWatch = new StopWatch(); stopWatch.start(); @@ -29,7 +30,7 @@ public class Cache<T> extends ConsumerStage<T> { } stopWatch.end(); this.logger.debug("Emitting took " + TimeUnit.NANOSECONDS.toMillis(stopWatch.getDurationInNs()) + " ms"); - super.onIsPipelineHead(); + super.onTerminating(); } public OutputPort<T> getOutputPort() { diff --git a/src/main/java/teetime/stage/CollectorSink.java b/src/main/java/teetime/stage/CollectorSink.java index 33d4b434cef61cfef3b1f6633ea64795d009a537..f3b3875dce73a457cc1bec91c195944e7cc9689a 100644 --- a/src/main/java/teetime/stage/CollectorSink.java +++ b/src/main/java/teetime/stage/CollectorSink.java @@ -45,7 +45,8 @@ public class CollectorSink<T> extends ConsumerStage<T> { } @Override - public void onIsPipelineHead() { + public void onTerminating() { + super.onTerminating(); System.out.println("size: " + this.elements.size()); } diff --git a/src/main/java/teetime/stage/basic/Delay.java b/src/main/java/teetime/stage/basic/Delay.java index 1804def32735e94ea0a68a186f55748ce5e4f102..09903e8c447f54620a437de8b8fe6db964360cb8 100644 --- a/src/main/java/teetime/stage/basic/Delay.java +++ b/src/main/java/teetime/stage/basic/Delay.java @@ -27,7 +27,8 @@ public class Delay<T> extends AbstractStage { } @Override - public void onIsPipelineHead() { + public void onTerminating() { + super.onTerminating(); while (!this.inputPort.getPipe().isEmpty()) { this.executeWithPorts(); } diff --git a/src/main/java/teetime/stage/basic/distributor/Distributor.java b/src/main/java/teetime/stage/basic/distributor/Distributor.java index f2120ce2c2f54d05592fa4e7831c5720b0e2729b..b80c8e5ef21356fde8b837e2611df330187b75c9 100644 --- a/src/main/java/teetime/stage/basic/distributor/Distributor.java +++ b/src/main/java/teetime/stage/basic/distributor/Distributor.java @@ -21,9 +21,9 @@ import teetime.framework.OutputPort; /** * @author Christian Wulf - * + * * @since 1.10 - * + * * @param T * the type of the input port and the output ports */ @@ -38,7 +38,8 @@ public class Distributor<T> extends ConsumerStage<T> { } @Override - public void onIsPipelineHead() { + public void onTerminating() { + super.onTerminating(); // for (OutputPort<T> op : this.outputPortList) { // op.getPipe().close(); // System.out.println("End signal sent, size: " + op.getPipe().size()); diff --git a/src/main/java/teetime/stage/basic/merger/Merger.java b/src/main/java/teetime/stage/basic/merger/Merger.java index eb4c9e315680ce85f94d81f09ab526cd517b1554..870b22b90832de34648468ba03f7c67abc7c21b4 100644 --- a/src/main/java/teetime/stage/basic/merger/Merger.java +++ b/src/main/java/teetime/stage/basic/merger/Merger.java @@ -62,7 +62,8 @@ public class Merger<T> extends AbstractStage { } @Override - public void onIsPipelineHead() { + public void onTerminating() { + super.onTerminating(); this.finishedInputPorts++; } diff --git a/src/main/java/teetime/stage/kieker/Dir2RecordsFilter.java b/src/main/java/teetime/stage/kieker/Dir2RecordsFilter.java index eaf453f8e418c71c69d41a9497cb7b3eb9216cd9..1961af39d92664d0b307b08ec57a31e19c2c1b7c 100644 --- a/src/main/java/teetime/stage/kieker/Dir2RecordsFilter.java +++ b/src/main/java/teetime/stage/kieker/Dir2RecordsFilter.java @@ -19,7 +19,7 @@ import java.io.File; import teetime.framework.InputPort; import teetime.framework.OutputPort; -import teetime.framework.Pipeline; +import teetime.framework.OldPipeline; import teetime.framework.pipe.PipeFactory; import teetime.framework.pipe.SingleElementPipe; import teetime.framework.pipe.PipeFactory.PipeOrdering; @@ -42,7 +42,7 @@ import kieker.common.util.filesystem.FSUtil; * * @since 1.10 */ -public class Dir2RecordsFilter extends Pipeline<ClassNameRegistryCreationFilter, Merger<IMonitoringRecord>> { +public class Dir2RecordsFilter extends OldPipeline<ClassNameRegistryCreationFilter, Merger<IMonitoringRecord>> { private final PipeFactory pipeFactory = PipeFactory.INSTANCE; private ClassNameRegistryRepository classNameRegistryRepository; diff --git a/src/main/java/teetime/stage/kieker/DirWithBin2RecordFilter.java b/src/main/java/teetime/stage/kieker/DirWithBin2RecordFilter.java index 37d672307e6683341fda7da2844976bd09a4df70..efe06d6db427d917de5451f05a7fedef6a59bf7f 100644 --- a/src/main/java/teetime/stage/kieker/DirWithBin2RecordFilter.java +++ b/src/main/java/teetime/stage/kieker/DirWithBin2RecordFilter.java @@ -4,7 +4,7 @@ import java.io.File; import teetime.framework.InputPort; import teetime.framework.OutputPort; -import teetime.framework.Pipeline; +import teetime.framework.OldPipeline; import teetime.stage.io.Directory2FilesFilter; import teetime.stage.kieker.className.ClassNameRegistryCreationFilter; import teetime.stage.kieker.className.ClassNameRegistryRepository; @@ -12,7 +12,7 @@ import teetime.stage.kieker.fileToRecord.BinaryFile2RecordFilter; import kieker.common.record.IMonitoringRecord; -public class DirWithBin2RecordFilter extends Pipeline<ClassNameRegistryCreationFilter, BinaryFile2RecordFilter> { +public class DirWithBin2RecordFilter extends OldPipeline<ClassNameRegistryCreationFilter, BinaryFile2RecordFilter> { private ClassNameRegistryRepository classNameRegistryRepository; diff --git a/src/main/java/teetime/stage/kieker/DirWithDat2RecordFilter.java b/src/main/java/teetime/stage/kieker/DirWithDat2RecordFilter.java index f663ba25d8886146f53f7f688042c64e3c454273..93fe14f22242b576a818b5cd809fc8d3fd7c0dbe 100644 --- a/src/main/java/teetime/stage/kieker/DirWithDat2RecordFilter.java +++ b/src/main/java/teetime/stage/kieker/DirWithDat2RecordFilter.java @@ -4,7 +4,7 @@ import java.io.File; import teetime.framework.InputPort; import teetime.framework.OutputPort; -import teetime.framework.Pipeline; +import teetime.framework.OldPipeline; import teetime.stage.io.Directory2FilesFilter; import teetime.stage.kieker.className.ClassNameRegistryCreationFilter; import teetime.stage.kieker.className.ClassNameRegistryRepository; @@ -12,7 +12,7 @@ import teetime.stage.kieker.fileToRecord.DatFile2RecordFilter; import kieker.common.record.IMonitoringRecord; -public class DirWithDat2RecordFilter extends Pipeline<ClassNameRegistryCreationFilter, DatFile2RecordFilter> { +public class DirWithDat2RecordFilter extends OldPipeline<ClassNameRegistryCreationFilter, DatFile2RecordFilter> { private ClassNameRegistryRepository classNameRegistryRepository; diff --git a/src/main/java/teetime/stage/kieker/TCPReaderSink.java b/src/main/java/teetime/stage/kieker/TCPReaderSink.java index f61dbdd52b689e4493dcb7160c93e4b1b696854a..e3e0523aa85017b72da40e79c04708ef07c947d2 100644 --- a/src/main/java/teetime/stage/kieker/TCPReaderSink.java +++ b/src/main/java/teetime/stage/kieker/TCPReaderSink.java @@ -149,10 +149,11 @@ public class TCPReaderSink extends ProducerStage<IMonitoringRecord> { } @Override - public void onIsPipelineHead() { + public void onTerminating() { + super.onTerminating(); this.executorService.shutdown(); this.tcpStringReader.interrupt(); - super.onIsPipelineHead(); + super.onTerminating(); } /** diff --git a/src/main/java/teetime/stage/kieker/fileToRecord/DatFile2RecordFilter.java b/src/main/java/teetime/stage/kieker/fileToRecord/DatFile2RecordFilter.java index 8058dacc2c7acc1584687fd17938471a1f6e3cf2..30dfa1b59b3c4076081acc7a68249e5dfa3d78db 100644 --- a/src/main/java/teetime/stage/kieker/fileToRecord/DatFile2RecordFilter.java +++ b/src/main/java/teetime/stage/kieker/fileToRecord/DatFile2RecordFilter.java @@ -19,7 +19,7 @@ import java.io.File; import teetime.framework.InputPort; import teetime.framework.OutputPort; -import teetime.framework.Pipeline; +import teetime.framework.OldPipeline; import teetime.framework.pipe.SingleElementPipe; import teetime.stage.io.File2TextLinesFilter; import teetime.stage.kieker.className.ClassNameRegistryRepository; @@ -32,7 +32,7 @@ import kieker.common.record.IMonitoringRecord; * * @since 1.10 */ -public class DatFile2RecordFilter extends Pipeline<File2TextLinesFilter, TextLine2RecordFilter> { +public class DatFile2RecordFilter extends OldPipeline<File2TextLinesFilter, TextLine2RecordFilter> { public DatFile2RecordFilter(final ClassNameRegistryRepository classNameRegistryRepository) { File2TextLinesFilter file2TextLinesFilter = new File2TextLinesFilter(); diff --git a/src/main/java/teetime/stage/kieker/traceReconstruction/TraceReconstructionFilter.java b/src/main/java/teetime/stage/kieker/traceReconstruction/TraceReconstructionFilter.java index 9453af863ff0a7a74ac995464920e401d6088356..b9e1d973fedf6dbcaa68b46a6d734e55133632de 100644 --- a/src/main/java/teetime/stage/kieker/traceReconstruction/TraceReconstructionFilter.java +++ b/src/main/java/teetime/stage/kieker/traceReconstruction/TraceReconstructionFilter.java @@ -29,7 +29,7 @@ import kieker.common.record.flow.trace.TraceMetadata; /** * @author Christian Wulf - * + * * @since 1.10 */ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord> { @@ -94,12 +94,13 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord> { } @Override - public void onIsPipelineHead() { + public void onTerminating() { + super.onTerminating(); for (Long traceId : this.traceId2trace.keySet()) { this.put(traceId, false); } - super.onIsPipelineHead(); + super.onTerminating(); } private void sendTraceBuffer(final TraceBuffer traceBuffer) { diff --git a/src/main/java/teetime/stage/kieker/traceReduction/TraceReductionFilter.java b/src/main/java/teetime/stage/kieker/traceReduction/TraceReductionFilter.java index 0a66326b105fbf19c0bc00c141fcc17a898f4240..f9a282328367ae32c3396299a0571b6f1c231e86 100644 --- a/src/main/java/teetime/stage/kieker/traceReduction/TraceReductionFilter.java +++ b/src/main/java/teetime/stage/kieker/traceReduction/TraceReductionFilter.java @@ -30,11 +30,11 @@ import kieker.analysis.plugin.filter.flow.TraceEventRecords; * This filter collects incoming traces for a specified amount of time. * Any traces representing the same series of events will be used to calculate statistical informations like the average runtime of this kind of trace. * Only one specimen of these traces containing this information will be forwarded from this filter. - * + * * Statistical outliers regarding the runtime of the trace will be treated special and therefore send out as they are and will not be mixed with others. - * + * * @author Jan Waller, Florian Biss - * + * * @since */ public class TraceReductionFilter extends ConsumerStage<TraceEventRecords> { @@ -73,7 +73,8 @@ public class TraceReductionFilter extends ConsumerStage<TraceEventRecords> { } @Override - public void onIsPipelineHead() { + public void onTerminating() { + super.onTerminating(); synchronized (this.trace2buffer) { // BETTER hide and improve synchronization in the buffer for (final Entry<TraceEventRecords, TraceAggregationBuffer> entry : this.trace2buffer.entrySet()) { final TraceAggregationBuffer buffer = entry.getValue(); @@ -84,7 +85,7 @@ public class TraceReductionFilter extends ConsumerStage<TraceEventRecords> { this.trace2buffer.clear(); } - super.onIsPipelineHead(); + super.onTerminating(); } private void processTimeoutQueue(final long timestampInNs) {