Skip to content
Snippets Groups Projects
Commit f1666af7 authored by Christian Wulf's avatar Christian Wulf
Browse files

Merge remote-tracking branch 'origin/removed-onIsPipelineHead'

Conflicts:
	src/main/java/teetime/framework/HeadPipeline.java
	src/main/java/teetime/framework/Pipeline.java
	src/main/java/teetime/framework/StageWithPort.java
parents eb6d9a6d bb2661aa
No related branches found
No related tags found
No related merge requests found
Showing
with 36 additions and 47 deletions
...@@ -126,7 +126,6 @@ public abstract class AbstractStage implements Stage { ...@@ -126,7 +126,6 @@ public abstract class AbstractStage implements Stage {
public void onTerminating() { public void onTerminating() {
// empty default implementation // empty default implementation
this.onIsPipelineHead();
} }
protected <T> InputPort<T> createInputPort() { protected <T> InputPort<T> createInputPort() {
......
...@@ -15,11 +15,6 @@ public abstract class ConsumerStage<I> extends AbstractStage { ...@@ -15,11 +15,6 @@ public abstract class ConsumerStage<I> extends AbstractStage {
this.execute(element); this.execute(element);
} }
@Override
public void onIsPipelineHead() {
// do nothing
}
protected abstract void execute(I element); protected abstract void execute(I element);
} }
package teetime.framework; package teetime.framework;
public class HeadPipeline<FirstStage extends HeadStage, LastStage extends Stage> extends Pipeline<FirstStage, LastStage> implements HeadStage { public class HeadPipeline<FirstStage extends HeadStage, LastStage extends Stage> extends OldPipeline<FirstStage, LastStage> implements HeadStage {
public HeadPipeline() {} public HeadPipeline() {}
......
...@@ -5,7 +5,7 @@ import java.util.List; ...@@ -5,7 +5,7 @@ import java.util.List;
import teetime.framework.signal.ISignal; import teetime.framework.signal.ISignal;
import teetime.framework.validation.InvalidPortConnection; import teetime.framework.validation.InvalidPortConnection;
public class Pipeline<FirstStage extends Stage, LastStage extends Stage> implements Stage { public class OldPipeline<FirstStage extends Stage, LastStage extends Stage> implements Stage {
protected FirstStage firstStage; protected FirstStage firstStage;
protected LastStage lastStage; protected LastStage lastStage;
...@@ -46,11 +46,6 @@ public class Pipeline<FirstStage extends Stage, LastStage extends Stage> impleme ...@@ -46,11 +46,6 @@ public class Pipeline<FirstStage extends Stage, LastStage extends Stage> impleme
this.firstStage.setParentStage(parentStage, index); this.firstStage.setParentStage(parentStage, index);
} }
@Override
public void onIsPipelineHead() {
this.firstStage.onIsPipelineHead();
}
@Override @Override
public void onSignal(final ISignal signal, final InputPort<?> inputPort) { public void onSignal(final ISignal signal, final InputPort<?> inputPort) {
this.firstStage.onSignal(signal, inputPort); this.firstStage.onSignal(signal, inputPort);
......
...@@ -23,11 +23,6 @@ public abstract class ProducerStage<O> extends AbstractStage implements HeadStag ...@@ -23,11 +23,6 @@ public abstract class ProducerStage<O> extends AbstractStage implements HeadStag
this.execute(); this.execute();
} }
@Override
public void onIsPipelineHead() {
// do nothing
}
@Override @Override
public void terminate() { public void terminate() {
this.shouldTerminate = true; this.shouldTerminate = true;
......
...@@ -15,9 +15,6 @@ public interface Stage { ...@@ -15,9 +15,6 @@ public interface Stage {
void setParentStage(Stage parentStage, int index); void setParentStage(Stage parentStage, int index);
// BETTER remove this method since it will be replaced by onTerminating()
void onIsPipelineHead();
void onSignal(ISignal signal, InputPort<?> inputPort); void onSignal(ISignal signal, InputPort<?> inputPort);
/** /**
......
...@@ -20,7 +20,8 @@ public class Cache<T> extends ConsumerStage<T> { ...@@ -20,7 +20,8 @@ public class Cache<T> extends ConsumerStage<T> {
} }
@Override @Override
public void onIsPipelineHead() { public void onTerminating() {
super.onTerminating();
this.logger.debug("Emitting " + this.cachedObjects.size() + " cached elements..."); this.logger.debug("Emitting " + this.cachedObjects.size() + " cached elements...");
StopWatch stopWatch = new StopWatch(); StopWatch stopWatch = new StopWatch();
stopWatch.start(); stopWatch.start();
...@@ -29,7 +30,7 @@ public class Cache<T> extends ConsumerStage<T> { ...@@ -29,7 +30,7 @@ public class Cache<T> extends ConsumerStage<T> {
} }
stopWatch.end(); stopWatch.end();
this.logger.debug("Emitting took " + TimeUnit.NANOSECONDS.toMillis(stopWatch.getDurationInNs()) + " ms"); this.logger.debug("Emitting took " + TimeUnit.NANOSECONDS.toMillis(stopWatch.getDurationInNs()) + " ms");
super.onIsPipelineHead(); super.onTerminating();
} }
public OutputPort<T> getOutputPort() { public OutputPort<T> getOutputPort() {
......
...@@ -45,7 +45,8 @@ public class CollectorSink<T> extends ConsumerStage<T> { ...@@ -45,7 +45,8 @@ public class CollectorSink<T> extends ConsumerStage<T> {
} }
@Override @Override
public void onIsPipelineHead() { public void onTerminating() {
super.onTerminating();
System.out.println("size: " + this.elements.size()); System.out.println("size: " + this.elements.size());
} }
......
...@@ -27,7 +27,8 @@ public class Delay<T> extends AbstractStage { ...@@ -27,7 +27,8 @@ public class Delay<T> extends AbstractStage {
} }
@Override @Override
public void onIsPipelineHead() { public void onTerminating() {
super.onTerminating();
while (!this.inputPort.getPipe().isEmpty()) { while (!this.inputPort.getPipe().isEmpty()) {
this.executeWithPorts(); this.executeWithPorts();
} }
......
...@@ -21,9 +21,9 @@ import teetime.framework.OutputPort; ...@@ -21,9 +21,9 @@ import teetime.framework.OutputPort;
/** /**
* @author Christian Wulf * @author Christian Wulf
* *
* @since 1.10 * @since 1.10
* *
* @param T * @param T
* the type of the input port and the output ports * the type of the input port and the output ports
*/ */
...@@ -38,7 +38,8 @@ public class Distributor<T> extends ConsumerStage<T> { ...@@ -38,7 +38,8 @@ public class Distributor<T> extends ConsumerStage<T> {
} }
@Override @Override
public void onIsPipelineHead() { public void onTerminating() {
super.onTerminating();
// for (OutputPort<T> op : this.outputPortList) { // for (OutputPort<T> op : this.outputPortList) {
// op.getPipe().close(); // op.getPipe().close();
// System.out.println("End signal sent, size: " + op.getPipe().size()); // System.out.println("End signal sent, size: " + op.getPipe().size());
......
...@@ -62,7 +62,8 @@ public class Merger<T> extends AbstractStage { ...@@ -62,7 +62,8 @@ public class Merger<T> extends AbstractStage {
} }
@Override @Override
public void onIsPipelineHead() { public void onTerminating() {
super.onTerminating();
this.finishedInputPorts++; this.finishedInputPorts++;
} }
......
...@@ -19,7 +19,7 @@ import java.io.File; ...@@ -19,7 +19,7 @@ import java.io.File;
import teetime.framework.InputPort; import teetime.framework.InputPort;
import teetime.framework.OutputPort; import teetime.framework.OutputPort;
import teetime.framework.Pipeline; import teetime.framework.OldPipeline;
import teetime.framework.pipe.PipeFactory; import teetime.framework.pipe.PipeFactory;
import teetime.framework.pipe.SingleElementPipe; import teetime.framework.pipe.SingleElementPipe;
import teetime.framework.pipe.PipeFactory.PipeOrdering; import teetime.framework.pipe.PipeFactory.PipeOrdering;
...@@ -42,7 +42,7 @@ import kieker.common.util.filesystem.FSUtil; ...@@ -42,7 +42,7 @@ import kieker.common.util.filesystem.FSUtil;
* *
* @since 1.10 * @since 1.10
*/ */
public class Dir2RecordsFilter extends Pipeline<ClassNameRegistryCreationFilter, Merger<IMonitoringRecord>> { public class Dir2RecordsFilter extends OldPipeline<ClassNameRegistryCreationFilter, Merger<IMonitoringRecord>> {
private final PipeFactory pipeFactory = PipeFactory.INSTANCE; private final PipeFactory pipeFactory = PipeFactory.INSTANCE;
private ClassNameRegistryRepository classNameRegistryRepository; private ClassNameRegistryRepository classNameRegistryRepository;
......
...@@ -4,7 +4,7 @@ import java.io.File; ...@@ -4,7 +4,7 @@ import java.io.File;
import teetime.framework.InputPort; import teetime.framework.InputPort;
import teetime.framework.OutputPort; import teetime.framework.OutputPort;
import teetime.framework.Pipeline; import teetime.framework.OldPipeline;
import teetime.stage.io.Directory2FilesFilter; import teetime.stage.io.Directory2FilesFilter;
import teetime.stage.kieker.className.ClassNameRegistryCreationFilter; import teetime.stage.kieker.className.ClassNameRegistryCreationFilter;
import teetime.stage.kieker.className.ClassNameRegistryRepository; import teetime.stage.kieker.className.ClassNameRegistryRepository;
...@@ -12,7 +12,7 @@ import teetime.stage.kieker.fileToRecord.BinaryFile2RecordFilter; ...@@ -12,7 +12,7 @@ import teetime.stage.kieker.fileToRecord.BinaryFile2RecordFilter;
import kieker.common.record.IMonitoringRecord; import kieker.common.record.IMonitoringRecord;
public class DirWithBin2RecordFilter extends Pipeline<ClassNameRegistryCreationFilter, BinaryFile2RecordFilter> { public class DirWithBin2RecordFilter extends OldPipeline<ClassNameRegistryCreationFilter, BinaryFile2RecordFilter> {
private ClassNameRegistryRepository classNameRegistryRepository; private ClassNameRegistryRepository classNameRegistryRepository;
......
...@@ -4,7 +4,7 @@ import java.io.File; ...@@ -4,7 +4,7 @@ import java.io.File;
import teetime.framework.InputPort; import teetime.framework.InputPort;
import teetime.framework.OutputPort; import teetime.framework.OutputPort;
import teetime.framework.Pipeline; import teetime.framework.OldPipeline;
import teetime.stage.io.Directory2FilesFilter; import teetime.stage.io.Directory2FilesFilter;
import teetime.stage.kieker.className.ClassNameRegistryCreationFilter; import teetime.stage.kieker.className.ClassNameRegistryCreationFilter;
import teetime.stage.kieker.className.ClassNameRegistryRepository; import teetime.stage.kieker.className.ClassNameRegistryRepository;
...@@ -12,7 +12,7 @@ import teetime.stage.kieker.fileToRecord.DatFile2RecordFilter; ...@@ -12,7 +12,7 @@ import teetime.stage.kieker.fileToRecord.DatFile2RecordFilter;
import kieker.common.record.IMonitoringRecord; import kieker.common.record.IMonitoringRecord;
public class DirWithDat2RecordFilter extends Pipeline<ClassNameRegistryCreationFilter, DatFile2RecordFilter> { public class DirWithDat2RecordFilter extends OldPipeline<ClassNameRegistryCreationFilter, DatFile2RecordFilter> {
private ClassNameRegistryRepository classNameRegistryRepository; private ClassNameRegistryRepository classNameRegistryRepository;
......
...@@ -149,10 +149,11 @@ public class TCPReaderSink extends ProducerStage<IMonitoringRecord> { ...@@ -149,10 +149,11 @@ public class TCPReaderSink extends ProducerStage<IMonitoringRecord> {
} }
@Override @Override
public void onIsPipelineHead() { public void onTerminating() {
super.onTerminating();
this.executorService.shutdown(); this.executorService.shutdown();
this.tcpStringReader.interrupt(); this.tcpStringReader.interrupt();
super.onIsPipelineHead(); super.onTerminating();
} }
/** /**
......
...@@ -19,7 +19,7 @@ import java.io.File; ...@@ -19,7 +19,7 @@ import java.io.File;
import teetime.framework.InputPort; import teetime.framework.InputPort;
import teetime.framework.OutputPort; import teetime.framework.OutputPort;
import teetime.framework.Pipeline; import teetime.framework.OldPipeline;
import teetime.framework.pipe.SingleElementPipe; import teetime.framework.pipe.SingleElementPipe;
import teetime.stage.io.File2TextLinesFilter; import teetime.stage.io.File2TextLinesFilter;
import teetime.stage.kieker.className.ClassNameRegistryRepository; import teetime.stage.kieker.className.ClassNameRegistryRepository;
...@@ -32,7 +32,7 @@ import kieker.common.record.IMonitoringRecord; ...@@ -32,7 +32,7 @@ import kieker.common.record.IMonitoringRecord;
* *
* @since 1.10 * @since 1.10
*/ */
public class DatFile2RecordFilter extends Pipeline<File2TextLinesFilter, TextLine2RecordFilter> { public class DatFile2RecordFilter extends OldPipeline<File2TextLinesFilter, TextLine2RecordFilter> {
public DatFile2RecordFilter(final ClassNameRegistryRepository classNameRegistryRepository) { public DatFile2RecordFilter(final ClassNameRegistryRepository classNameRegistryRepository) {
File2TextLinesFilter file2TextLinesFilter = new File2TextLinesFilter(); File2TextLinesFilter file2TextLinesFilter = new File2TextLinesFilter();
......
...@@ -29,7 +29,7 @@ import kieker.common.record.flow.trace.TraceMetadata; ...@@ -29,7 +29,7 @@ import kieker.common.record.flow.trace.TraceMetadata;
/** /**
* @author Christian Wulf * @author Christian Wulf
* *
* @since 1.10 * @since 1.10
*/ */
public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord> { public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord> {
...@@ -94,12 +94,13 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord> { ...@@ -94,12 +94,13 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord> {
} }
@Override @Override
public void onIsPipelineHead() { public void onTerminating() {
super.onTerminating();
for (Long traceId : this.traceId2trace.keySet()) { for (Long traceId : this.traceId2trace.keySet()) {
this.put(traceId, false); this.put(traceId, false);
} }
super.onIsPipelineHead(); super.onTerminating();
} }
private void sendTraceBuffer(final TraceBuffer traceBuffer) { private void sendTraceBuffer(final TraceBuffer traceBuffer) {
......
...@@ -30,11 +30,11 @@ import kieker.analysis.plugin.filter.flow.TraceEventRecords; ...@@ -30,11 +30,11 @@ import kieker.analysis.plugin.filter.flow.TraceEventRecords;
* This filter collects incoming traces for a specified amount of time. * This filter collects incoming traces for a specified amount of time.
* Any traces representing the same series of events will be used to calculate statistical informations like the average runtime of this kind of trace. * Any traces representing the same series of events will be used to calculate statistical informations like the average runtime of this kind of trace.
* Only one specimen of these traces containing this information will be forwarded from this filter. * Only one specimen of these traces containing this information will be forwarded from this filter.
* *
* Statistical outliers regarding the runtime of the trace will be treated special and therefore send out as they are and will not be mixed with others. * Statistical outliers regarding the runtime of the trace will be treated special and therefore send out as they are and will not be mixed with others.
* *
* @author Jan Waller, Florian Biss * @author Jan Waller, Florian Biss
* *
* @since * @since
*/ */
public class TraceReductionFilter extends ConsumerStage<TraceEventRecords> { public class TraceReductionFilter extends ConsumerStage<TraceEventRecords> {
...@@ -73,7 +73,8 @@ public class TraceReductionFilter extends ConsumerStage<TraceEventRecords> { ...@@ -73,7 +73,8 @@ public class TraceReductionFilter extends ConsumerStage<TraceEventRecords> {
} }
@Override @Override
public void onIsPipelineHead() { public void onTerminating() {
super.onTerminating();
synchronized (this.trace2buffer) { // BETTER hide and improve synchronization in the buffer synchronized (this.trace2buffer) { // BETTER hide and improve synchronization in the buffer
for (final Entry<TraceEventRecords, TraceAggregationBuffer> entry : this.trace2buffer.entrySet()) { for (final Entry<TraceEventRecords, TraceAggregationBuffer> entry : this.trace2buffer.entrySet()) {
final TraceAggregationBuffer buffer = entry.getValue(); final TraceAggregationBuffer buffer = entry.getValue();
...@@ -84,7 +85,7 @@ public class TraceReductionFilter extends ConsumerStage<TraceEventRecords> { ...@@ -84,7 +85,7 @@ public class TraceReductionFilter extends ConsumerStage<TraceEventRecords> {
this.trace2buffer.clear(); this.trace2buffer.clear();
} }
super.onIsPipelineHead(); super.onTerminating();
} }
private void processTimeoutQueue(final long timestampInNs) { private void processTimeoutQueue(final long timestampInNs) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment