Skip to content
Snippets Groups Projects
Commit 5325c4b3 authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

removed StageWithPorts.onIsPipelineHead and updated references to

onTerminating
parent 0bd32903
No related branches found
No related tags found
No related merge requests found
Showing
with 12 additions and 31 deletions
......@@ -126,7 +126,6 @@ public abstract class AbstractStage implements StageWithPort {
public void onTerminating() {
// empty default implementation
this.onIsPipelineHead();
}
protected <T> InputPort<T> createInputPort() {
......
......@@ -15,11 +15,6 @@ public abstract class ConsumerStage<I> extends AbstractStage {
this.execute(element);
}
@Override
public void onIsPipelineHead() {
// do nothing
}
protected abstract void execute(I element);
}
......@@ -46,11 +46,6 @@ public class Pipeline<FirstStage extends StageWithPort, LastStage extends StageW
this.firstStage.setParentStage(parentStage, index);
}
@Override
public void onIsPipelineHead() {
this.firstStage.onIsPipelineHead();
}
@Override
public void onSignal(final ISignal signal, final InputPort<?> inputPort) {
this.firstStage.onSignal(signal, inputPort);
......
......@@ -23,11 +23,6 @@ public abstract class ProducerStage<O> extends AbstractStage implements HeadStag
this.execute();
}
@Override
public void onIsPipelineHead() {
// do nothing
}
@Override
public void terminate() {
this.shouldTerminate = true;
......
......@@ -15,9 +15,6 @@ public interface StageWithPort {
void setParentStage(StageWithPort parentStage, int index);
// BETTER remove this method since it will be replaced by onTerminating()
void onIsPipelineHead();
void onSignal(ISignal signal, InputPort<?> inputPort);
/**
......
......@@ -20,7 +20,7 @@ public class Cache<T> extends ConsumerStage<T> {
}
@Override
public void onIsPipelineHead() {
public void onTerminating() {
this.logger.debug("Emitting " + this.cachedObjects.size() + " cached elements...");
StopWatch stopWatch = new StopWatch();
stopWatch.start();
......@@ -29,7 +29,7 @@ public class Cache<T> extends ConsumerStage<T> {
}
stopWatch.end();
this.logger.debug("Emitting took " + TimeUnit.NANOSECONDS.toMillis(stopWatch.getDurationInNs()) + " ms");
super.onIsPipelineHead();
super.onTerminating();
}
public OutputPort<T> getOutputPort() {
......
......@@ -45,7 +45,7 @@ public class CollectorSink<T> extends ConsumerStage<T> {
}
@Override
public void onIsPipelineHead() {
public void onTerminating() {
System.out.println("size: " + this.elements.size());
}
......
......@@ -27,7 +27,7 @@ public class Delay<T> extends AbstractStage {
}
@Override
public void onIsPipelineHead() {
public void onTerminating() {
while (!this.inputPort.getPipe().isEmpty()) {
this.executeWithPorts();
}
......
......@@ -38,7 +38,7 @@ public class Distributor<T> extends ConsumerStage<T> {
}
@Override
public void onIsPipelineHead() {
public void onTerminating() {
// for (OutputPort<T> op : this.outputPortList) {
// op.getPipe().close();
// System.out.println("End signal sent, size: " + op.getPipe().size());
......
......@@ -62,7 +62,7 @@ public class Merger<T> extends AbstractStage {
}
@Override
public void onIsPipelineHead() {
public void onTerminating() {
this.finishedInputPorts++;
}
......
......@@ -149,10 +149,10 @@ public class TCPReaderSink extends ProducerStage<IMonitoringRecord> {
}
@Override
public void onIsPipelineHead() {
public void onTerminating() {
this.executorService.shutdown();
this.tcpStringReader.interrupt();
super.onIsPipelineHead();
super.onTerminating();
}
/**
......
......@@ -94,12 +94,12 @@ public class TraceReconstructionFilter extends ConsumerStage<IFlowRecord> {
}
@Override
public void onIsPipelineHead() {
public void onTerminating() {
for (Long traceId : this.traceId2trace.keySet()) {
this.put(traceId, false);
}
super.onIsPipelineHead();
super.onTerminating();
}
private void sendTraceBuffer(final TraceBuffer traceBuffer) {
......
......@@ -73,7 +73,7 @@ public class TraceReductionFilter extends ConsumerStage<TraceEventRecords> {
}
@Override
public void onIsPipelineHead() {
public void onTerminating() {
synchronized (this.trace2buffer) { // BETTER hide and improve synchronization in the buffer
for (final Entry<TraceEventRecords, TraceAggregationBuffer> entry : this.trace2buffer.entrySet()) {
final TraceAggregationBuffer buffer = entry.getValue();
......@@ -84,7 +84,7 @@ public class TraceReductionFilter extends ConsumerStage<TraceEventRecords> {
this.trace2buffer.clear();
}
super.onIsPipelineHead();
super.onTerminating();
}
private void processTimeoutQueue(final long timestampInNs) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment