Skip to content
Snippets Groups Projects
Commit d8afb985 authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

added super call to classes which override onTerminating

renamed Pipeline to OldPipeline and marked as deprecated #19
parent c572b2b5
No related branches found
No related tags found
No related merge requests found
Showing
with 23 additions and 74 deletions
package teetime.framework; package teetime.framework;
public class HeadPipeline<FirstStage extends HeadStage, LastStage extends StageWithPort> extends Pipeline<FirstStage, LastStage> implements HeadStage { public class HeadPipeline<FirstStage extends HeadStage, LastStage extends StageWithPort> extends OldPipeline<FirstStage, LastStage> implements HeadStage {
public HeadPipeline() {} public HeadPipeline() {}
......
package teetime.framework;
import java.util.List;
import teetime.framework.signal.ISignal;
import teetime.framework.validation.InvalidPortConnection;
public class Pipeline<FirstStage extends StageWithPort, LastStage extends StageWithPort> implements StageWithPort {
protected FirstStage firstStage;
protected LastStage lastStage;
public FirstStage getFirstStage() {
return this.firstStage;
}
public void setFirstStage(final FirstStage firstStage) {
this.firstStage = firstStage;
}
public LastStage getLastStage() {
return this.lastStage;
}
public void setLastStage(final LastStage lastStage) {
this.lastStage = lastStage;
}
@Override
public String getId() {
return this.firstStage.getId();
}
@Override
public void executeWithPorts() {
this.firstStage.executeWithPorts();
}
@Override
public StageWithPort getParentStage() {
return this.firstStage.getParentStage();
}
@Override
public void setParentStage(final StageWithPort parentStage, final int index) {
this.firstStage.setParentStage(parentStage, index);
}
@Override
public void onSignal(final ISignal signal, final InputPort<?> inputPort) {
this.firstStage.onSignal(signal, inputPort);
}
@Override
public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) {
this.lastStage.validateOutputPorts(invalidPortConnections);
}
}
...@@ -21,6 +21,7 @@ public class Cache<T> extends ConsumerStage<T> { ...@@ -21,6 +21,7 @@ public class Cache<T> extends ConsumerStage<T> {
@Override @Override
public void onTerminating() { public void onTerminating() {
super.onTerminating();
this.logger.debug("Emitting " + this.cachedObjects.size() + " cached elements..."); this.logger.debug("Emitting " + this.cachedObjects.size() + " cached elements...");
StopWatch stopWatch = new StopWatch(); StopWatch stopWatch = new StopWatch();
stopWatch.start(); stopWatch.start();
......
...@@ -46,6 +46,7 @@ public class CollectorSink<T> extends ConsumerStage<T> { ...@@ -46,6 +46,7 @@ public class CollectorSink<T> extends ConsumerStage<T> {
@Override @Override
public void onTerminating() { public void onTerminating() {
super.onTerminating();
System.out.println("size: " + this.elements.size()); System.out.println("size: " + this.elements.size());
} }
......
...@@ -28,6 +28,7 @@ public class Delay<T> extends AbstractStage { ...@@ -28,6 +28,7 @@ public class Delay<T> extends AbstractStage {
@Override @Override
public void onTerminating() { public void onTerminating() {
super.onTerminating();
while (!this.inputPort.getPipe().isEmpty()) { while (!this.inputPort.getPipe().isEmpty()) {
this.executeWithPorts(); this.executeWithPorts();
} }
......
...@@ -39,6 +39,7 @@ public class Distributor<T> extends ConsumerStage<T> { ...@@ -39,6 +39,7 @@ public class Distributor<T> extends ConsumerStage<T> {
@Override @Override
public void onTerminating() { public void onTerminating() {
super.onTerminating();
// for (OutputPort<T> op : this.outputPortList) { // for (OutputPort<T> op : this.outputPortList) {
// op.getPipe().close(); // op.getPipe().close();
// System.out.println("End signal sent, size: " + op.getPipe().size()); // System.out.println("End signal sent, size: " + op.getPipe().size());
......
...@@ -63,6 +63,7 @@ public class Merger<T> extends AbstractStage { ...@@ -63,6 +63,7 @@ public class Merger<T> extends AbstractStage {
@Override @Override
public void onTerminating() { public void onTerminating() {
super.onTerminating();
this.finishedInputPorts++; this.finishedInputPorts++;
} }
......
...@@ -19,7 +19,7 @@ import java.io.File; ...@@ -19,7 +19,7 @@ import java.io.File;
import teetime.framework.InputPort; import teetime.framework.InputPort;
import teetime.framework.OutputPort; import teetime.framework.OutputPort;
import teetime.framework.Pipeline; import teetime.framework.OldPipeline;
import teetime.framework.pipe.PipeFactory; import teetime.framework.pipe.PipeFactory;
import teetime.framework.pipe.SingleElementPipe; import teetime.framework.pipe.SingleElementPipe;
import teetime.framework.pipe.PipeFactory.PipeOrdering; import teetime.framework.pipe.PipeFactory.PipeOrdering;
...@@ -42,7 +42,7 @@ import kieker.common.util.filesystem.FSUtil; ...@@ -42,7 +42,7 @@ import kieker.common.util.filesystem.FSUtil;
* *
* @since 1.10 * @since 1.10
*/ */
public class Dir2RecordsFilter extends Pipeline<ClassNameRegistryCreationFilter, Merger<IMonitoringRecord>> { public class Dir2RecordsFilter extends OldPipeline<ClassNameRegistryCreationFilter, Merger<IMonitoringRecord>> {
private final PipeFactory pipeFactory = PipeFactory.INSTANCE; private final PipeFactory pipeFactory = PipeFactory.INSTANCE;
private ClassNameRegistryRepository classNameRegistryRepository; private ClassNameRegistryRepository classNameRegistryRepository;
......
...@@ -4,7 +4,7 @@ import java.io.File; ...@@ -4,7 +4,7 @@ import java.io.File;
import teetime.framework.InputPort; import teetime.framework.InputPort;
import teetime.framework.OutputPort; import teetime.framework.OutputPort;
import teetime.framework.Pipeline; import teetime.framework.OldPipeline;
import teetime.stage.io.Directory2FilesFilter; import teetime.stage.io.Directory2FilesFilter;
import teetime.stage.kieker.className.ClassNameRegistryCreationFilter; import teetime.stage.kieker.className.ClassNameRegistryCreationFilter;
import teetime.stage.kieker.className.ClassNameRegistryRepository; import teetime.stage.kieker.className.ClassNameRegistryRepository;
...@@ -12,7 +12,7 @@ import teetime.stage.kieker.fileToRecord.BinaryFile2RecordFilter; ...@@ -12,7 +12,7 @@ import teetime.stage.kieker.fileToRecord.BinaryFile2RecordFilter;
import kieker.common.record.IMonitoringRecord; import kieker.common.record.IMonitoringRecord;
public class DirWithBin2RecordFilter extends Pipeline<ClassNameRegistryCreationFilter, BinaryFile2RecordFilter> { public class DirWithBin2RecordFilter extends OldPipeline<ClassNameRegistryCreationFilter, BinaryFile2RecordFilter> {
private ClassNameRegistryRepository classNameRegistryRepository; private ClassNameRegistryRepository classNameRegistryRepository;
......
...@@ -4,7 +4,7 @@ import java.io.File; ...@@ -4,7 +4,7 @@ import java.io.File;
import teetime.framework.InputPort; import teetime.framework.InputPort;
import teetime.framework.OutputPort; import teetime.framework.OutputPort;
import teetime.framework.Pipeline; import teetime.framework.OldPipeline;
import teetime.stage.io.Directory2FilesFilter; import teetime.stage.io.Directory2FilesFilter;
import teetime.stage.kieker.className.ClassNameRegistryCreationFilter; import teetime.stage.kieker.className.ClassNameRegistryCreationFilter;
import teetime.stage.kieker.className.ClassNameRegistryRepository; import teetime.stage.kieker.className.ClassNameRegistryRepository;
...@@ -12,7 +12,7 @@ import teetime.stage.kieker.fileToRecord.DatFile2RecordFilter; ...@@ -12,7 +12,7 @@ import teetime.stage.kieker.fileToRecord.DatFile2RecordFilter;
import kieker.common.record.IMonitoringRecord; import kieker.common.record.IMonitoringRecord;
public class DirWithDat2RecordFilter extends Pipeline<ClassNameRegistryCreationFilter, DatFile2RecordFilter> { public class DirWithDat2RecordFilter extends OldPipeline<ClassNameRegistryCreationFilter, DatFile2RecordFilter> {
private ClassNameRegistryRepository classNameRegistryRepository; private ClassNameRegistryRepository classNameRegistryRepository;
......
...@@ -150,6 +150,7 @@ public class TCPReaderSink extends ProducerStage<IMonitoringRecord> { ...@@ -150,6 +150,7 @@ public class TCPReaderSink extends ProducerStage<IMonitoringRecord> {
@Override @Override
public void onTerminating() { public void onTerminating() {
super.onTerminating();
this.executorService.shutdown(); this.executorService.shutdown();
this.tcpStringReader.interrupt(); this.tcpStringReader.interrupt();
super.onTerminating(); super.onTerminating();
......
...@@ -19,7 +19,7 @@ import java.io.File; ...@@ -19,7 +19,7 @@ import java.io.File;
import teetime.framework.InputPort; import teetime.framework.InputPort;
import teetime.framework.OutputPort; import teetime.framework.OutputPort;
import teetime.framework.Pipeline; import teetime.framework.OldPipeline;
import teetime.framework.pipe.SingleElementPipe; import teetime.framework.pipe.SingleElementPipe;
import teetime.stage.io.File2TextLinesFilter; import teetime.stage.io.File2TextLinesFilter;
import teetime.stage.kieker.className.ClassNameRegistryRepository; import teetime.stage.kieker.className.ClassNameRegistryRepository;
...@@ -32,7 +32,7 @@ import kieker.common.record.IMonitoringRecord; ...@@ -32,7 +32,7 @@ import kieker.common.record.IMonitoringRecord;
* *
* @since 1.10 * @since 1.10
*/ */
public class DatFile2RecordFilter extends Pipeline<File2TextLinesFilter, TextLine2RecordFilter> { public class DatFile2RecordFilter extends OldPipeline<File2TextLinesFilter, TextLine2RecordFilter> {
public DatFile2RecordFilter(final ClassNameRegistryRepository classNameRegistryRepository) { public DatFile2RecordFilter(final ClassNameRegistryRepository classNameRegistryRepository) {
File2TextLinesFilter file2TextLinesFilter = new File2TextLinesFilter(); File2TextLinesFilter file2TextLinesFilter = new File2TextLinesFilter();
......
...@@ -95,6 +95,7 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord> { ...@@ -95,6 +95,7 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord> {
@Override @Override
public void onTerminating() { public void onTerminating() {
super.onTerminating();
for (Long traceId : this.traceId2trace.keySet()) { for (Long traceId : this.traceId2trace.keySet()) {
this.put(traceId, false); this.put(traceId, false);
} }
......
...@@ -74,6 +74,7 @@ public class TraceReductionFilter extends ConsumerStage<TraceEventRecords> { ...@@ -74,6 +74,7 @@ public class TraceReductionFilter extends ConsumerStage<TraceEventRecords> {
@Override @Override
public void onTerminating() { public void onTerminating() {
super.onTerminating();
synchronized (this.trace2buffer) { // BETTER hide and improve synchronization in the buffer synchronized (this.trace2buffer) { // BETTER hide and improve synchronization in the buffer
for (final Entry<TraceEventRecords, TraceAggregationBuffer> entry : this.trace2buffer.entrySet()) { for (final Entry<TraceEventRecords, TraceAggregationBuffer> entry : this.trace2buffer.entrySet()) {
final TraceAggregationBuffer buffer = entry.getValue(); final TraceAggregationBuffer buffer = entry.getValue();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment