diff --git a/src/main/java/teetime/framework/HeadPipeline.java b/src/main/java/teetime/framework/HeadPipeline.java index 3c9dd13940f4cde35f0d1d900daa3fd944e4ef9a..be402431041d4cab5e77a7dcbc4e7b6265a914ac 100644 --- a/src/main/java/teetime/framework/HeadPipeline.java +++ b/src/main/java/teetime/framework/HeadPipeline.java @@ -1,6 +1,6 @@ package teetime.framework; -public class HeadPipeline<FirstStage extends HeadStage, LastStage extends StageWithPort> extends Pipeline<FirstStage, LastStage> implements HeadStage { +public class HeadPipeline<FirstStage extends HeadStage, LastStage extends StageWithPort> extends OldPipeline<FirstStage, LastStage> implements HeadStage { public HeadPipeline() {} diff --git a/src/main/java/teetime/framework/Pipeline.java b/src/main/java/teetime/framework/Pipeline.java deleted file mode 100644 index eb616933cfed500e442bfeedc96961cbf147378a..0000000000000000000000000000000000000000 --- a/src/main/java/teetime/framework/Pipeline.java +++ /dev/null @@ -1,59 +0,0 @@ -package teetime.framework; - -import java.util.List; - -import teetime.framework.signal.ISignal; -import teetime.framework.validation.InvalidPortConnection; - -public class Pipeline<FirstStage extends StageWithPort, LastStage extends StageWithPort> implements StageWithPort { - - protected FirstStage firstStage; - protected LastStage lastStage; - - public FirstStage getFirstStage() { - return this.firstStage; - } - - public void setFirstStage(final FirstStage firstStage) { - this.firstStage = firstStage; - } - - public LastStage getLastStage() { - return this.lastStage; - } - - public void setLastStage(final LastStage lastStage) { - this.lastStage = lastStage; - } - - @Override - public String getId() { - return this.firstStage.getId(); - } - - @Override - public void executeWithPorts() { - this.firstStage.executeWithPorts(); - } - - @Override - public StageWithPort getParentStage() { - return this.firstStage.getParentStage(); - } - - @Override - public void setParentStage(final StageWithPort parentStage, final int index) { - this.firstStage.setParentStage(parentStage, index); - } - - @Override - public void onSignal(final ISignal signal, final InputPort<?> inputPort) { - this.firstStage.onSignal(signal, inputPort); - } - - @Override - public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) { - this.lastStage.validateOutputPorts(invalidPortConnections); - } - -} diff --git a/src/main/java/teetime/stage/Cache.java b/src/main/java/teetime/stage/Cache.java index 972e8e4b952956568fcfa8814584762035f770d9..07b7d2896a04bb5338ff31f63aa09613b8ff9cdf 100644 --- a/src/main/java/teetime/stage/Cache.java +++ b/src/main/java/teetime/stage/Cache.java @@ -21,6 +21,7 @@ public class Cache<T> extends ConsumerStage<T> { @Override public void onTerminating() { + super.onTerminating(); this.logger.debug("Emitting " + this.cachedObjects.size() + " cached elements..."); StopWatch stopWatch = new StopWatch(); stopWatch.start(); diff --git a/src/main/java/teetime/stage/CollectorSink.java b/src/main/java/teetime/stage/CollectorSink.java index 9ea251bae40e31aff5db53d313048429dc7d74ea..f3b3875dce73a457cc1bec91c195944e7cc9689a 100644 --- a/src/main/java/teetime/stage/CollectorSink.java +++ b/src/main/java/teetime/stage/CollectorSink.java @@ -46,6 +46,7 @@ public class CollectorSink<T> extends ConsumerStage<T> { @Override public void onTerminating() { + super.onTerminating(); System.out.println("size: " + this.elements.size()); } diff --git a/src/main/java/teetime/stage/basic/Delay.java b/src/main/java/teetime/stage/basic/Delay.java index 9b95052a624a6d23e50a62fe1880991ee03b5a44..09903e8c447f54620a437de8b8fe6db964360cb8 100644 --- a/src/main/java/teetime/stage/basic/Delay.java +++ b/src/main/java/teetime/stage/basic/Delay.java @@ -28,6 +28,7 @@ public class Delay<T> extends AbstractStage { @Override public void onTerminating() { + super.onTerminating(); while (!this.inputPort.getPipe().isEmpty()) { this.executeWithPorts(); } diff --git a/src/main/java/teetime/stage/basic/distributor/Distributor.java b/src/main/java/teetime/stage/basic/distributor/Distributor.java index c0162be1269abb0040c98d5bdf03894111d33441..b80c8e5ef21356fde8b837e2611df330187b75c9 100644 --- a/src/main/java/teetime/stage/basic/distributor/Distributor.java +++ b/src/main/java/teetime/stage/basic/distributor/Distributor.java @@ -21,9 +21,9 @@ import teetime.framework.OutputPort; /** * @author Christian Wulf - * + * * @since 1.10 - * + * * @param T * the type of the input port and the output ports */ @@ -39,6 +39,7 @@ public class Distributor<T> extends ConsumerStage<T> { @Override public void onTerminating() { + super.onTerminating(); // for (OutputPort<T> op : this.outputPortList) { // op.getPipe().close(); // System.out.println("End signal sent, size: " + op.getPipe().size()); diff --git a/src/main/java/teetime/stage/basic/merger/Merger.java b/src/main/java/teetime/stage/basic/merger/Merger.java index 70696813577b920088c698da6b791a2662c467ef..870b22b90832de34648468ba03f7c67abc7c21b4 100644 --- a/src/main/java/teetime/stage/basic/merger/Merger.java +++ b/src/main/java/teetime/stage/basic/merger/Merger.java @@ -63,6 +63,7 @@ public class Merger<T> extends AbstractStage { @Override public void onTerminating() { + super.onTerminating(); this.finishedInputPorts++; } diff --git a/src/main/java/teetime/stage/kieker/Dir2RecordsFilter.java b/src/main/java/teetime/stage/kieker/Dir2RecordsFilter.java index eaf453f8e418c71c69d41a9497cb7b3eb9216cd9..1961af39d92664d0b307b08ec57a31e19c2c1b7c 100644 --- a/src/main/java/teetime/stage/kieker/Dir2RecordsFilter.java +++ b/src/main/java/teetime/stage/kieker/Dir2RecordsFilter.java @@ -19,7 +19,7 @@ import java.io.File; import teetime.framework.InputPort; import teetime.framework.OutputPort; -import teetime.framework.Pipeline; +import teetime.framework.OldPipeline; import teetime.framework.pipe.PipeFactory; import teetime.framework.pipe.SingleElementPipe; import teetime.framework.pipe.PipeFactory.PipeOrdering; @@ -42,7 +42,7 @@ import kieker.common.util.filesystem.FSUtil; * * @since 1.10 */ -public class Dir2RecordsFilter extends Pipeline<ClassNameRegistryCreationFilter, Merger<IMonitoringRecord>> { +public class Dir2RecordsFilter extends OldPipeline<ClassNameRegistryCreationFilter, Merger<IMonitoringRecord>> { private final PipeFactory pipeFactory = PipeFactory.INSTANCE; private ClassNameRegistryRepository classNameRegistryRepository; diff --git a/src/main/java/teetime/stage/kieker/DirWithBin2RecordFilter.java b/src/main/java/teetime/stage/kieker/DirWithBin2RecordFilter.java index 37d672307e6683341fda7da2844976bd09a4df70..efe06d6db427d917de5451f05a7fedef6a59bf7f 100644 --- a/src/main/java/teetime/stage/kieker/DirWithBin2RecordFilter.java +++ b/src/main/java/teetime/stage/kieker/DirWithBin2RecordFilter.java @@ -4,7 +4,7 @@ import java.io.File; import teetime.framework.InputPort; import teetime.framework.OutputPort; -import teetime.framework.Pipeline; +import teetime.framework.OldPipeline; import teetime.stage.io.Directory2FilesFilter; import teetime.stage.kieker.className.ClassNameRegistryCreationFilter; import teetime.stage.kieker.className.ClassNameRegistryRepository; @@ -12,7 +12,7 @@ import teetime.stage.kieker.fileToRecord.BinaryFile2RecordFilter; import kieker.common.record.IMonitoringRecord; -public class DirWithBin2RecordFilter extends Pipeline<ClassNameRegistryCreationFilter, BinaryFile2RecordFilter> { +public class DirWithBin2RecordFilter extends OldPipeline<ClassNameRegistryCreationFilter, BinaryFile2RecordFilter> { private ClassNameRegistryRepository classNameRegistryRepository; diff --git a/src/main/java/teetime/stage/kieker/DirWithDat2RecordFilter.java b/src/main/java/teetime/stage/kieker/DirWithDat2RecordFilter.java index f663ba25d8886146f53f7f688042c64e3c454273..93fe14f22242b576a818b5cd809fc8d3fd7c0dbe 100644 --- a/src/main/java/teetime/stage/kieker/DirWithDat2RecordFilter.java +++ b/src/main/java/teetime/stage/kieker/DirWithDat2RecordFilter.java @@ -4,7 +4,7 @@ import java.io.File; import teetime.framework.InputPort; import teetime.framework.OutputPort; -import teetime.framework.Pipeline; +import teetime.framework.OldPipeline; import teetime.stage.io.Directory2FilesFilter; import teetime.stage.kieker.className.ClassNameRegistryCreationFilter; import teetime.stage.kieker.className.ClassNameRegistryRepository; @@ -12,7 +12,7 @@ import teetime.stage.kieker.fileToRecord.DatFile2RecordFilter; import kieker.common.record.IMonitoringRecord; -public class DirWithDat2RecordFilter extends Pipeline<ClassNameRegistryCreationFilter, DatFile2RecordFilter> { +public class DirWithDat2RecordFilter extends OldPipeline<ClassNameRegistryCreationFilter, DatFile2RecordFilter> { private ClassNameRegistryRepository classNameRegistryRepository; diff --git a/src/main/java/teetime/stage/kieker/TCPReaderSink.java b/src/main/java/teetime/stage/kieker/TCPReaderSink.java index 5ce75d54d158eed4f70724d602fa846b72af2f4f..e3e0523aa85017b72da40e79c04708ef07c947d2 100644 --- a/src/main/java/teetime/stage/kieker/TCPReaderSink.java +++ b/src/main/java/teetime/stage/kieker/TCPReaderSink.java @@ -150,6 +150,7 @@ public class TCPReaderSink extends ProducerStage<IMonitoringRecord> { @Override public void onTerminating() { + super.onTerminating(); this.executorService.shutdown(); this.tcpStringReader.interrupt(); super.onTerminating(); diff --git a/src/main/java/teetime/stage/kieker/fileToRecord/DatFile2RecordFilter.java b/src/main/java/teetime/stage/kieker/fileToRecord/DatFile2RecordFilter.java index 8058dacc2c7acc1584687fd17938471a1f6e3cf2..30dfa1b59b3c4076081acc7a68249e5dfa3d78db 100644 --- a/src/main/java/teetime/stage/kieker/fileToRecord/DatFile2RecordFilter.java +++ b/src/main/java/teetime/stage/kieker/fileToRecord/DatFile2RecordFilter.java @@ -19,7 +19,7 @@ import java.io.File; import teetime.framework.InputPort; import teetime.framework.OutputPort; -import teetime.framework.Pipeline; +import teetime.framework.OldPipeline; import teetime.framework.pipe.SingleElementPipe; import teetime.stage.io.File2TextLinesFilter; import teetime.stage.kieker.className.ClassNameRegistryRepository; @@ -32,7 +32,7 @@ import kieker.common.record.IMonitoringRecord; * * @since 1.10 */ -public class DatFile2RecordFilter extends Pipeline<File2TextLinesFilter, TextLine2RecordFilter> { +public class DatFile2RecordFilter extends OldPipeline<File2TextLinesFilter, TextLine2RecordFilter> { public DatFile2RecordFilter(final ClassNameRegistryRepository classNameRegistryRepository) { File2TextLinesFilter file2TextLinesFilter = new File2TextLinesFilter(); diff --git a/src/main/java/teetime/stage/kieker/traceReconstruction/TraceReconstructionFilter.java b/src/main/java/teetime/stage/kieker/traceReconstruction/TraceReconstructionFilter.java index fe5a23d9d3fb171ccc93c91308436f8bdfbc4617..b9e1d973fedf6dbcaa68b46a6d734e55133632de 100644 --- a/src/main/java/teetime/stage/kieker/traceReconstruction/TraceReconstructionFilter.java +++ b/src/main/java/teetime/stage/kieker/traceReconstruction/TraceReconstructionFilter.java @@ -29,7 +29,7 @@ import kieker.common.record.flow.trace.TraceMetadata; /** * @author Christian Wulf - * + * * @since 1.10 */ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord> { @@ -95,6 +95,7 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord> { @Override public void onTerminating() { + super.onTerminating(); for (Long traceId : this.traceId2trace.keySet()) { this.put(traceId, false); } diff --git a/src/main/java/teetime/stage/kieker/traceReduction/TraceReductionFilter.java b/src/main/java/teetime/stage/kieker/traceReduction/TraceReductionFilter.java index c9d6e9948ba4478582c4ba67d6175ea8cc8ed18e..f9a282328367ae32c3396299a0571b6f1c231e86 100644 --- a/src/main/java/teetime/stage/kieker/traceReduction/TraceReductionFilter.java +++ b/src/main/java/teetime/stage/kieker/traceReduction/TraceReductionFilter.java @@ -30,11 +30,11 @@ import kieker.analysis.plugin.filter.flow.TraceEventRecords; * This filter collects incoming traces for a specified amount of time. * Any traces representing the same series of events will be used to calculate statistical informations like the average runtime of this kind of trace. * Only one specimen of these traces containing this information will be forwarded from this filter. - * + * * Statistical outliers regarding the runtime of the trace will be treated special and therefore send out as they are and will not be mixed with others. - * + * * @author Jan Waller, Florian Biss - * + * * @since */ public class TraceReductionFilter extends ConsumerStage<TraceEventRecords> { @@ -74,6 +74,7 @@ public class TraceReductionFilter extends ConsumerStage<TraceEventRecords> { @Override public void onTerminating() { + super.onTerminating(); synchronized (this.trace2buffer) { // BETTER hide and improve synchronization in the buffer for (final Entry<TraceEventRecords, TraceAggregationBuffer> entry : this.trace2buffer.entrySet()) { final TraceAggregationBuffer buffer = entry.getValue();