Skip to content
Snippets Groups Projects
Commit a91d121f authored by Christian Wulf's avatar Christian Wulf
Browse files

removed unused/deprecated attributes in AbstractStage

parent 9ed1d6bf
No related branches found
No related tags found
No related merge requests found
Showing
with 241 additions and 168 deletions
-verbose:gc
-XX:+PrintCompilation
\ No newline at end of file
......@@ -24,7 +24,7 @@ package teetime.framework.core;
public class Analysis {
public void init() {
System.out.println("Analysis initialized.");
}
public void start() {
......
package teetime.util;
import java.util.Collection;
import java.util.List;
public class ListUtil {
private ListUtil() {
// utility class
}
public static <T> List<T> merge(final List<List<T>> listOfLists) {
List<T> resultList = listOfLists.get(0);
for (int i = 1; i < listOfLists.size(); i++) {
Collection<? extends T> timestampObjectList = listOfLists.get(i);
resultList.addAll(timestampObjectList);
}
return resultList;
}
}
......@@ -15,7 +15,6 @@
***************************************************************************/
package teetime.examples.throughput;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
......@@ -23,6 +22,7 @@ import org.junit.Before;
import org.junit.Test;
import teetime.examples.throughput.methodcall.MethodCallThroughputAnalysis16;
import teetime.util.ListUtil;
import teetime.util.StatisticsUtil;
import teetime.util.StopWatch;
......@@ -67,11 +67,7 @@ public class MethodCallThoughputTimestampAnalysis16Test {
analysis.onTerminate();
}
// merge
List<TimestampObject> timestampObjects = new LinkedList<TimestampObject>();
for (List<TimestampObject> timestampObjectList : analysis.getTimestampObjectsList()) {
timestampObjects.addAll(timestampObjectList);
}
List<TimestampObject> timestampObjects = ListUtil.merge(analysis.getTimestampObjectsList());
StatisticsUtil.printStatistics(stopWatch.getDurationInNs(), timestampObjects);
}
}
......@@ -15,14 +15,14 @@
***************************************************************************/
package teetime.examples.throughput;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import org.junit.Before;
import org.junit.Test;
import teetime.examples.throughput.methodcall.Closure;
import teetime.examples.throughput.methodcall.MethodCallThroughputAnalysis17;
import teetime.util.ListUtil;
import teetime.util.StatisticsUtil;
import teetime.util.StopWatch;
......@@ -51,14 +51,15 @@ public class MethodCallThoughputTimestampAnalysis17Test {
final MethodCallThroughputAnalysis17 analysis = new MethodCallThroughputAnalysis17();
analysis.setNumNoopFilters(NUM_NOOP_FILTERS);
analysis.setInput(NUM_OBJECTS_TO_CREATE, new Callable<TimestampObject>() {
analysis.setInput(NUM_OBJECTS_TO_CREATE, new Closure<Void, TimestampObject>() {
@Override
public TimestampObject call() throws Exception {
public TimestampObject execute(final Void element) {
return new TimestampObject();
}
});
analysis.init();
System.out.println("starting");
stopWatch.start();
try {
analysis.start();
......@@ -67,12 +68,7 @@ public class MethodCallThoughputTimestampAnalysis17Test {
analysis.onTerminate();
}
// merge
List<TimestampObject> timestampObjects = analysis.getTimestampObjectsList().get(0);
for (int i = 1; i < analysis.getTimestampObjectsList().size(); i++) {
Collection<? extends TimestampObject> timestampObjectList = analysis.getTimestampObjectsList().get(i);
timestampObjects.addAll(timestampObjectList);
}
List<TimestampObject> timestampObjects = ListUtil.merge(analysis.getTimestampObjectsList());
StatisticsUtil.printStatistics(stopWatch.getDurationInNs(), timestampObjects);
}
}
package teetime.examples.throughput.methodcall;
public interface Closure<I, O> {
O execute(I element);
}
......@@ -54,8 +54,8 @@ public class MethodCallThroughputAnalysis16 extends Analysis {
@Override
public void init() {
super.init();
Runnable producerRunnable = this.buildProducerPipeline();
this.producerThread = new Thread(producerRunnable);
Pipeline<Void, TimestampObject> producerPipeline = this.buildProducerPipeline();
this.producerThread = new Thread(new RunnableStage(producerPipeline));
int numWorkerThreads = Math.min(NUM_WORKER_THREADS, 1); // only for testing purpose
......@@ -64,7 +64,7 @@ public class MethodCallThroughputAnalysis16 extends Analysis {
List<TimestampObject> resultList = new ArrayList<TimestampObject>(this.numInputObjects);
this.timestampObjectsList.add(resultList);
Runnable workerRunnable = this.buildPipeline(this.distributor, resultList);
Runnable workerRunnable = this.buildPipeline(producerPipeline, resultList);
this.workerThreads[i] = new Thread(workerRunnable);
}
......@@ -78,7 +78,7 @@ public class MethodCallThroughputAnalysis16 extends Analysis {
}
}
private Runnable buildProducerPipeline() {
private Pipeline<Void, TimestampObject> buildProducerPipeline() {
final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(this.numInputObjects, this.inputObjectCreator);
this.distributor = new Distributor<TimestampObject>();
......@@ -88,14 +88,14 @@ public class MethodCallThroughputAnalysis16 extends Analysis {
UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), this.distributor.getInputPort());
return new RunnableStage(pipeline);
return pipeline;
}
/**
* @param numNoopFilters
* @since 1.10
*/
private Runnable buildPipeline(final Distributor<TimestampObject> distributor, final List<TimestampObject> timestampObjects) {
private Runnable buildPipeline(final StageWithPort<Void, TimestampObject> previousStage, final List<TimestampObject> timestampObjects) {
Relay<TimestampObject> relay = new Relay<TimestampObject>();
@SuppressWarnings("unchecked")
final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters];
......@@ -114,7 +114,7 @@ public class MethodCallThroughputAnalysis16 extends Analysis {
pipeline.addIntermediateStage(stopTimestampFilter);
pipeline.setLastStage(collectorSink);
SpScPipe.connect(distributor.getNewOutputPort(), relay.getInputPort());
SpScPipe.connect(previousStage.getOutputPort(), relay.getInputPort());
UnorderedGrowablePipe.connect(relay.getOutputPort(), startTimestampFilter.getInputPort());
......
......@@ -18,14 +18,16 @@ package teetime.examples.throughput.methodcall;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import teetime.examples.throughput.TimestampObject;
import teetime.examples.throughput.methodcall.stage.CollectorSink;
import teetime.examples.throughput.methodcall.stage.Distributor;
import teetime.examples.throughput.methodcall.stage.EndStage;
import teetime.examples.throughput.methodcall.stage.NoopFilter;
import teetime.examples.throughput.methodcall.stage.ObjectProducer;
import teetime.examples.throughput.methodcall.stage.Pipeline;
import teetime.examples.throughput.methodcall.stage.Relay;
import teetime.examples.throughput.methodcall.stage.Sink;
import teetime.examples.throughput.methodcall.stage.StartTimestampFilter;
import teetime.examples.throughput.methodcall.stage.StopTimestampFilter;
import teetime.framework.core.Analysis;
......@@ -40,21 +42,18 @@ public class MethodCallThroughputAnalysis17 extends Analysis {
private static final int NUM_WORKER_THREADS = Runtime.getRuntime().availableProcessors();
private int numInputObjects;
private Callable<TimestampObject> inputObjectCreator;
private Closure<Void, TimestampObject> inputObjectCreator;
private int numNoopFilters;
private final List<List<TimestampObject>> timestampObjectsList = new LinkedList<List<TimestampObject>>();
private Distributor<TimestampObject> distributor;
private Thread producerThread;
private Thread[] workerThreads;
@Override
public void init() {
super.init();
// Runnable producerRunnable = this.buildProducerPipeline();
// this.producerThread = new Thread(producerRunnable);
final Pipeline<Void, TimestampObject> producerPipeline = this.buildProducerPipeline(this.numInputObjects, this.inputObjectCreator);
// this.producerThread = new Thread(new RunnableStage(producerPipeline));
int numWorkerThreads = Math.min(NUM_WORKER_THREADS, 1); // only for testing purpose
......@@ -63,38 +62,86 @@ public class MethodCallThroughputAnalysis17 extends Analysis {
List<TimestampObject> resultList = new ArrayList<TimestampObject>(this.numInputObjects);
this.timestampObjectsList.add(resultList);
Runnable workerRunnable = this.buildPipeline(this.distributor, resultList);
Runnable workerRunnable = this.buildPipeline(null, resultList);
this.workerThreads[i] = new Thread(workerRunnable);
}
// this.producerThread = new Thread(new Runnable() {
// @Override
// public void run() {
// TimestampObject ts;
// try {
// ts = MethodCallThroughputAnalysis17.this.inputObjectCreator.call();
// System.out.println("test" + producerPipeline + ", # filters: " + MethodCallThroughputAnalysis17.this.numNoopFilters + ", ts: "
// + ts);
// MethodCallThroughputAnalysis17.this.numInputObjects++;
// System.out.println("numInputObjects: " + MethodCallThroughputAnalysis17.this.numInputObjects);
// MethodCallThroughputAnalysis17.this.numInputObjects--;
// } catch (Exception e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
// System.out.println("run end");
// }
// });
// this.producerThread.start();
//
// this.producerThread.run();
new RunnableStage(producerPipeline).run();
// Pipeline<Void, TimestampObject> stage = producerPipeline;
// stage.onStart();
// do {
// stage.executeWithPorts();
// } while (stage.isReschedulable());
// try {
// this.producerThread.join();
// } catch (InterruptedException e1) {
// // TODO Auto-generated catch block
// e1.printStackTrace();
// }
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
super.init();
}
// private Runnable buildProducerPipeline() {
// final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(this.numInputObjects, this.inputObjectCreator);
// this.distributor = new Distributor<TimestampObject>();
//
// final Pipeline<Void, TimestampObject> pipeline = new Pipeline<Void, TimestampObject>();
// pipeline.setFirstStage(objectProducer);
// pipeline.setLastStage(this.distributor);
//
// UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), this.distributor.getInputPort());
//
// return new RunnableStage(pipeline);
// }
private Pipeline<Void, TimestampObject> buildProducerPipeline(final int numInputObjects, final Closure<Void, TimestampObject> inputObjectCreator) {
final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(numInputObjects, inputObjectCreator);
Distributor<TimestampObject> distributor = new Distributor<TimestampObject>();
Sink<TimestampObject> sink = new Sink<TimestampObject>();
EndStage<Void> endStage = new EndStage<Void>();
endStage.closure = inputObjectCreator;
final Pipeline<Void, TimestampObject> pipeline = new Pipeline<Void, TimestampObject>();
pipeline.setFirstStage(objectProducer);
// pipeline.setFirstStage(sink);
// pipeline.setFirstStage(endStage);
pipeline.setLastStage(distributor);
// pipeline.setLastStage(sink);
pipeline.setLastStage(new EndStage<TimestampObject>());
// UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), sink.getInputPort());
// objectProducer.getOutputPort().pipe = new UnorderedGrowablePipe<TimestampObject>();
UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), distributor.getInputPort());
distributor.getOutputPort().pipe = new UnorderedGrowablePipe<TimestampObject>();
return pipeline;
}
/**
* @param numNoopFilters
* @since 1.10
*/
private Runnable buildPipeline(final Distributor<TimestampObject> distributor, final List<TimestampObject> timestampObjects) {
private Runnable buildPipeline(final StageWithPort<Void, TimestampObject> previousStage, final List<TimestampObject> timestampObjects) {
Relay<TimestampObject> relay = new Relay<TimestampObject>();
@SuppressWarnings("unchecked")
final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters];
......@@ -116,7 +163,7 @@ public class MethodCallThroughputAnalysis17 extends Analysis {
IPipe<TimestampObject> startPipe = new SpScPipe<TimestampObject>();
try {
for (int i = 0; i < this.numInputObjects; i++) {
startPipe.add(this.inputObjectCreator.call());
startPipe.add(this.inputObjectCreator.execute(null));
}
startPipe.close();
} catch (Exception e) {
......@@ -124,6 +171,7 @@ public class MethodCallThroughputAnalysis17 extends Analysis {
e.printStackTrace();
}
relay.getInputPort().pipe = startPipe;
// previousStage.getOutputPort().pipe = startPipe;
UnorderedGrowablePipe.connect(relay.getOutputPort(), startTimestampFilter.getInputPort());
......@@ -155,7 +203,7 @@ public class MethodCallThroughputAnalysis17 extends Analysis {
}
}
public void setInput(final int numInputObjects, final Callable<TimestampObject> inputObjectCreator) {
public void setInput(final int numInputObjects, final Closure<Void, TimestampObject> inputObjectCreator) {
this.numInputObjects = numInputObjects;
this.inputObjectCreator = inputObjectCreator;
}
......
......@@ -15,7 +15,7 @@ public abstract class ProducerStage<I, O> extends AbstractStage<I, O> {
boolean outputIsEmpty = outputElements.isEmpty();
if (outputIsEmpty) {
this.disable();
this.getOutputPort().pipe.close();
}
return outputElements;
......
......@@ -12,15 +12,15 @@ public interface Stage<I, O> {
CommittableQueue<O> execute2(CommittableQueue<I> elements);
SchedulingInformation getSchedulingInformation();
// SchedulingInformation getSchedulingInformation();
Stage getParentStage();
Stage<?, ?> getParentStage();
void setParentStage(Stage parentStage, int index);
void setParentStage(Stage<?, ?> parentStage, int index);
void setListener(OnDisableListener listener);
// void setListener(OnDisableListener listener);
Stage next();
Stage<?, ?> next();
void setSuccessor(Stage<?, ?> successor);
......
......@@ -58,6 +58,7 @@ public class UnorderedGrowablePipe<T> extends AbstractPipe<T> {
@Override
public T removeLast() {
T element = this.elements[--this.lastFreeIndex];
this.elements[this.lastFreeIndex] = null;
// T element = this.elements.get(--this.lastFreeIndex);
return element;
}
......
package teetime.examples.throughput.methodcall.stage;
import teetime.examples.throughput.methodcall.InputPort;
import teetime.examples.throughput.methodcall.OnDisableListener;
import teetime.examples.throughput.methodcall.OutputPort;
import teetime.examples.throughput.methodcall.SchedulingInformation;
import teetime.examples.throughput.methodcall.Stage;
import teetime.examples.throughput.methodcall.StageWithPort;
import teetime.util.list.CommittableQueue;
import teetime.util.list.CommittableResizableArrayQueue;
public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
private final InputPort<I> inputPort = new InputPort<I>();
private final OutputPort<O> outputPort = new OutputPort<O>();
protected final CommittableQueue<O> outputElements = new CommittableResizableArrayQueue<O>(null, 4);
// protected final CommittableQueue<O> outputElements = new CommittableResizableArrayQueue<O>(null, 4);
protected final CommittableQueue<O> outputElements = null;
private final SchedulingInformation schedulingInformation = new SchedulingInformation();
private Stage parentStage;
private OnDisableListener listener;
private Stage<?, ?> parentStage;
private int index;
private StageWithPort successor;
private StageWithPort<?, ?> successor;
private boolean reschedulable;
......@@ -34,7 +28,7 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
if (result == null) {
return null;
}
StageWithPort next = this.next();
StageWithPort<?, ?> next = this.next();
// if (next != null) {
// return next.executeRecursively(result);
// } else {
......@@ -100,31 +94,32 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
// CommittableQueue execute;
StageWithPort<?, ?> next = this.next();
do {
// execute = this.next().execute2(this.outputElements);
// execute = this.next().execute2(this.getOutputPort().pipe.getElements());
this.next().executeWithPorts();
next.executeWithPorts();
// System.out.println("Executed " + this.next().getClass().getSimpleName());
} while (this.next().isReschedulable());
} while (next.isReschedulable());
// } while (this.next().getInputPort().pipe.size() > 0);
// } while (execute.size() > 0);
}
@Override
public SchedulingInformation getSchedulingInformation() {
return this.schedulingInformation;
}
// @Override
// public SchedulingInformation getSchedulingInformation() {
// return this.schedulingInformation;
// }
public void disable() {
this.schedulingInformation.setActive(false);
this.fireOnDisable();
}
// public void disable() {
// this.schedulingInformation.setActive(false);
// this.fireOnDisable();
// }
private void fireOnDisable() {
if (this.listener != null) {
this.listener.onDisable(this, this.index);
}
}
// private void fireOnDisable() {
// if (this.listener != null) {
// this.listener.onDisable(this, this.index);
// }
// }
@Override
public void onStart() {
......@@ -132,27 +127,18 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
}
@Override
public Stage getParentStage() {
public Stage<?, ?> getParentStage() {
return this.parentStage;
}
@Override
public void setParentStage(final Stage parentStage, final int index) {
public void setParentStage(final Stage<?, ?> parentStage, final int index) {
this.index = index;
this.parentStage = parentStage;
}
public OnDisableListener getListener() {
return this.listener;
}
@Override
public void setListener(final OnDisableListener listener) {
this.listener = listener;
}
@Override
public StageWithPort next() {
public StageWithPort<?, ?> next() {
return this.successor;
}
......
......@@ -44,6 +44,12 @@ public class Distributor<T> extends ConsumerStage<T, T> {
op.pipe.close();
System.out.println("End signal sent, size: " + op.pipe.size());
}
// for (OutputPort<?> op : this.outputPorts) {
// op.pipe = null;
// }
// this.outputPorts = null;
// this.outputPortList.clear();
}
@SuppressWarnings("unchecked")
......@@ -57,7 +63,12 @@ public class Distributor<T> extends ConsumerStage<T, T> {
System.out.println("outputPorts: " + this.outputPorts);
}
public OutputPort<T> getNewOutputPort() {
@Override
public OutputPort<T> getOutputPort() {
return this.getNewOutputPort();
}
private OutputPort<T> getNewOutputPort() {
OutputPort<T> outputPort = new OutputPort<T>();
this.outputPortList.add(outputPort);
return outputPort;
......
package teetime.examples.throughput.methodcall.stage;
import java.util.LinkedList;
import java.util.List;
import teetime.examples.throughput.methodcall.Closure;
import teetime.examples.throughput.methodcall.InputPort;
import teetime.examples.throughput.methodcall.OnDisableListener;
import teetime.examples.throughput.methodcall.OutputPort;
import teetime.examples.throughput.methodcall.SchedulingInformation;
import teetime.examples.throughput.methodcall.Stage;
import teetime.examples.throughput.methodcall.StageWithPort;
import teetime.util.list.CommittableQueue;
......@@ -15,6 +17,10 @@ public class EndStage<T> implements StageWithPort<T, T> {
return this.execute(element);
}
public int count;
public Closure<Void, ?> closure;
public List<Object> list = new LinkedList<Object>();
@Override
public T execute(final Object element) {
return (T) element;
......@@ -31,12 +37,6 @@ public class EndStage<T> implements StageWithPort<T, T> {
return null;
}
@Override
public SchedulingInformation getSchedulingInformation() {
// TODO Auto-generated method stub
return null;
}
@Override
public Stage getParentStage() {
// TODO Auto-generated method stub
......@@ -49,12 +49,6 @@ public class EndStage<T> implements StageWithPort<T, T> {
}
@Override
public void setListener(final OnDisableListener listener) {
// TODO Auto-generated method stub
}
@Override
public Stage next() {
// TODO Auto-generated method stub
......@@ -87,8 +81,11 @@ public class EndStage<T> implements StageWithPort<T, T> {
@Override
public void executeWithPorts() {
// TODO Auto-generated method stub
// this.getInputPort().receive(); // just consume
// do nothing
// this.count++;
// Object r = this.closure.execute(null);
// this.list.add(r);
}
@Override
......
......@@ -15,8 +15,7 @@
***************************************************************************/
package teetime.examples.throughput.methodcall.stage;
import java.util.concurrent.Callable;
import teetime.examples.throughput.methodcall.Closure;
import teetime.examples.throughput.methodcall.ProducerStage;
import teetime.util.list.CommittableQueue;
......@@ -28,12 +27,12 @@ import teetime.util.list.CommittableQueue;
public class ObjectProducer<T> extends ProducerStage<Void, T> {
private long numInputObjects;
private Callable<T> inputObjectCreator;
private Closure<Void, T> inputObjectCreator;
/**
* @since 1.10
*/
public ObjectProducer(final long numInputObjects, final Callable<T> inputObjectCreator) {
public ObjectProducer(final long numInputObjects, final Closure<Void, T> inputObjectCreator) {
this.numInputObjects = numInputObjects;
this.inputObjectCreator = inputObjectCreator;
}
......@@ -46,7 +45,8 @@ public class ObjectProducer<T> extends ProducerStage<Void, T> {
}
try {
final T newObject = this.inputObjectCreator.call();
// final T newObject = this.inputObjectCreator.call();
final T newObject = null;
this.numInputObjects--;
return newObject;
......@@ -63,11 +63,11 @@ public class ObjectProducer<T> extends ProducerStage<Void, T> {
this.numInputObjects = numInputObjects;
}
public Callable<T> getInputObjectCreator() {
public Closure<Void, T> getInputObjectCreator() {
return this.inputObjectCreator;
}
public void setInputObjectCreator(final Callable<T> inputObjectCreator) {
public void setInputObjectCreator(final Closure<Void, T> inputObjectCreator) {
this.inputObjectCreator = inputObjectCreator;
}
......@@ -100,14 +100,21 @@ public class ObjectProducer<T> extends ProducerStage<Void, T> {
return;
}
try {
final T newObject = this.inputObjectCreator.call();
this.numInputObjects--;
// System.out.println(this.getClass().getSimpleName() + ": sending " + this.numInputObjects);
this.send(newObject);
} catch (final Exception e) {
throw new IllegalStateException(e);
}
// try {
T newObject = null;
newObject = this.inputObjectCreator.execute(null);
this.numInputObjects--;
// System.out.println(this.getClass().getSimpleName() + ": sending " + this.numInputObjects);
this.send(newObject);
// } catch (final Exception e) {
// throw new IllegalStateException(e);
// }
}
@Override
public void onIsPipelineHead() {
// this.getOutputPort().pipe = null; // no performance increase
super.onIsPipelineHead();
}
}
......@@ -5,18 +5,17 @@ import java.util.LinkedList;
import java.util.List;
import teetime.examples.throughput.methodcall.InputPort;
import teetime.examples.throughput.methodcall.OnDisableListener;
import teetime.examples.throughput.methodcall.OutputPort;
import teetime.examples.throughput.methodcall.SchedulingInformation;
import teetime.examples.throughput.methodcall.Stage;
import teetime.examples.throughput.methodcall.StageWithPort;
import teetime.util.list.CommittableQueue;
public class Pipeline<I, O> implements StageWithPort<I, O>, OnDisableListener {
public class Pipeline<I, O> implements StageWithPort<I, O> {
private StageWithPort firstStage;
private StageWithPort<I, ?> firstStage;
private final List<StageWithPort> intermediateStages = new LinkedList<StageWithPort>();
private StageWithPort lastStage;
private StageWithPort<?, O> lastStage;
private final SchedulingInformation schedulingInformation = new SchedulingInformation();
......@@ -24,7 +23,6 @@ public class Pipeline<I, O> implements StageWithPort<I, O>, OnDisableListener {
private Stage parentStage;
private int index;
private int startIndex;
private OnDisableListener listener;
private boolean reschedulable;
private int firstStageIndex;
......@@ -75,12 +73,13 @@ public class Pipeline<I, O> implements StageWithPort<I, O>, OnDisableListener {
// this.setReschedulable(stage.isReschedulable());
}
private final void updateRescheduable(Stage stage) {
private final void updateRescheduable(Stage<?, ?> stage) {
while (!stage.isReschedulable()) {
this.firstStageIndex++;
stage = stage.next();
if (stage == null) { // loop reaches the last stage
this.setReschedulable(false);
this.cleanUp();
return;
}
stage.onIsPipelineHead();
......@@ -134,7 +133,7 @@ public class Pipeline<I, O> implements StageWithPort<I, O>, OnDisableListener {
for (int i = 0; i < this.stages.length; i++) {
StageWithPort<?, ?> stage = this.stages[i];
stage.setParentStage(this, i);
stage.setListener(this);
// stage.setListener(this);
}
for (int i = 0; i < this.stages.length - 1; i++) {
......@@ -154,11 +153,6 @@ public class Pipeline<I, O> implements StageWithPort<I, O>, OnDisableListener {
// return this.firstStage.getInputPort();
// }
@Override
public SchedulingInformation getSchedulingInformation() {
return this.schedulingInformation;
}
@Override
public Stage getParentStage() {
return this.parentStage;
......@@ -170,34 +164,6 @@ public class Pipeline<I, O> implements StageWithPort<I, O>, OnDisableListener {
this.parentStage = parentStage;
}
@Override
public void onDisable(final Stage stage, final int index) {
this.startIndex = index + 1;
if (this.startIndex == this.stages.length) {
this.disable();
}
}
public void disable() {
this.schedulingInformation.setActive(false);
this.fireOnDisable();
}
private void fireOnDisable() {
if (this.listener != null) {
this.listener.onDisable(this, this.index);
}
}
public OnDisableListener getListener() {
return this.listener;
}
@Override
public void setListener(final OnDisableListener listener) {
this.listener = listener;
}
@Override
public O execute(final Object element) {
throw new IllegalStateException();
......@@ -237,9 +203,20 @@ public class Pipeline<I, O> implements StageWithPort<I, O>, OnDisableListener {
return this.lastStage.getOutputPort();
}
// @Override
// public OutputPort getOutputPort() {
// return this.lastStage.getOutputPort();
// }
// TODO remove since it does not increase performances
private void cleanUp() {
for (int i = 0; i < this.stages.length; i++) {
StageWithPort<?, ?> stage = this.stages[i];
stage.setParentStage(null, i);
// stage.setListener(null);
stage.setSuccessor(null);
}
this.firstStage = null;
this.intermediateStages.clear();
this.lastStage = null;
System.out.println("cleaned up");
}
}
package teetime.examples.throughput.methodcall.stage;
import teetime.examples.throughput.methodcall.ConsumerStage;
import teetime.util.list.CommittableQueue;
public class Sink<T> extends ConsumerStage<T, T> {
@Override
public T execute(final Object element) {
// TODO Auto-generated method stub
return null;
}
@Override
protected void execute4(final CommittableQueue<T> elements) {
// TODO Auto-generated method stub
}
@Override
protected void execute5(final T element) {
// do nothing
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment