Skip to content
Snippets Groups Projects
Commit cb643b9c authored by Christian Wulf's avatar Christian Wulf
Browse files

moved documents to doc;

fixed test
parent bdf2a57e
No related branches found
No related tags found
No related merge requests found
Showing
with 179 additions and 176 deletions
File moved
File moved
......@@ -22,6 +22,7 @@ import teetime.variant.explicitScheduling.examples.throughput.TimestampObject;
import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.Pipe;
import teetime.variant.methodcallWithPorts.stage.CollectorSink;
import teetime.variant.methodcallWithPorts.stage.NoopFilter;
......@@ -45,14 +46,15 @@ public class MethodCallThroughputAnalysis9 extends Analysis {
@Override
public void init() {
super.init();
this.runnable = this.buildPipeline();
StageWithPort pipeline = this.buildPipeline();
this.runnable = new RunnableStage(pipeline);
}
/**
* @param numNoopFilters
* @since 1.10
*/
private Runnable buildPipeline() {
private StageWithPort buildPipeline() {
@SuppressWarnings("unchecked")
final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters];
// create stages
......@@ -64,12 +66,12 @@ public class MethodCallThroughputAnalysis9 extends Analysis {
final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter();
final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(this.timestampObjects);
final Pipeline<Void, Void> pipeline = new Pipeline<Void, Void>();
pipeline.setFirstStage(objectProducer, null);
final Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>();
pipeline.setFirstStage(objectProducer);
pipeline.addIntermediateStage(startTimestampFilter);
pipeline.addIntermediateStages(noopFilters);
pipeline.addIntermediateStage(stopTimestampFilter);
pipeline.setLastStage(collectorSink, null);
pipeline.setLastStage(collectorSink);
Pipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort());
Pipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort());
......@@ -79,7 +81,7 @@ public class MethodCallThroughputAnalysis9 extends Analysis {
Pipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
Pipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort());
return new RunnableStage(pipeline);
return pipeline;
}
@Override
......
......@@ -64,7 +64,7 @@ public class MethodCallThroughputAnalysis10 extends Analysis {
final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter();
final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(this.timestampObjects);
final Pipeline<Void, Void> pipeline = new Pipeline<Void, Void>();
final Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>();
pipeline.setFirstStage(objectProducer);
pipeline.addIntermediateStage(startTimestampFilter);
pipeline.addIntermediateStages(noopFilters);
......
......@@ -22,6 +22,7 @@ import teetime.variant.explicitScheduling.examples.throughput.TimestampObject;
import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.UnorderedGrowablePipe;
import teetime.variant.methodcallWithPorts.stage.CollectorSink;
import teetime.variant.methodcallWithPorts.stage.NoopFilter;
......@@ -45,15 +46,11 @@ public class MethodCallThroughputAnalysis11 extends Analysis {
@Override
public void init() {
super.init();
Pipeline<Void, ?> pipeline = this.buildPipeline(this.numInputObjects, this.inputObjectCreator);
this.runnable = new RunnableStage<Void>(pipeline);
StageWithPort pipeline = this.buildPipeline(this.numInputObjects, this.inputObjectCreator);
this.runnable = new RunnableStage(pipeline);
}
/**
* @param numNoopFilters
* @since 1.10
*/
private Pipeline<Void, Void> buildPipeline(final long numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) {
private StageWithPort buildPipeline(final long numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) {
@SuppressWarnings("unchecked")
final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters];
// create stages
......@@ -67,7 +64,7 @@ public class MethodCallThroughputAnalysis11 extends Analysis {
final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter();
final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(this.timestampObjects);
final Pipeline<Void, Void> pipeline = new Pipeline<Void, Void>();
final Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>();
pipeline.setFirstStage(objectProducer);
// pipeline.addIntermediateStage(relayFake);
pipeline.addIntermediateStage(startTimestampFilter);
......
......@@ -22,6 +22,7 @@ import teetime.variant.explicitScheduling.examples.throughput.TimestampObject;
import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.CollectorSink;
import teetime.variant.methodcallWithPorts.stage.NoopFilter;
......@@ -47,14 +48,15 @@ public class MethodCallThroughputAnalysis14 extends Analysis {
@Override
public void init() {
super.init();
this.runnable = this.buildPipeline();
StageWithPort pipeline = this.buildPipeline();
this.runnable = new RunnableStage(pipeline);
}
/**
* @param numNoopFilters
* @since 1.10
*/
private Runnable buildPipeline() {
private StageWithPort buildPipeline() {
@SuppressWarnings("unchecked")
final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters];
// create stages
......@@ -66,7 +68,7 @@ public class MethodCallThroughputAnalysis14 extends Analysis {
final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter();
final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(this.timestampObjects);
final Pipeline<Void, Void> pipeline = new Pipeline<Void, Void>();
final Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>();
pipeline.setFirstStage(objectProducer);
pipeline.addIntermediateStage(startTimestampFilter);
pipeline.addIntermediateStages(noopFilters);
......@@ -81,7 +83,7 @@ public class MethodCallThroughputAnalysis14 extends Analysis {
SpScPipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort(), SPSC_INITIAL_CAPACITY);
SpScPipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort(), SPSC_INITIAL_CAPACITY);
return new RunnableStage(pipeline);
return pipeline;
}
@Override
......
......@@ -22,17 +22,18 @@ import teetime.variant.explicitScheduling.examples.throughput.TimestampObject;
import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.OrderedGrowableArrayPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.Clock;
import teetime.variant.methodcallWithPorts.stage.CollectorSink;
import teetime.variant.methodcallWithPorts.stage.EndStage;
import teetime.variant.methodcallWithPorts.stage.NoopFilter;
import teetime.variant.methodcallWithPorts.stage.ObjectProducer;
import teetime.variant.methodcallWithPorts.stage.StartTimestampFilter;
import teetime.variant.methodcallWithPorts.stage.StopTimestampFilter;
import teetime.variant.methodcallWithPorts.stage.basic.Delay;
import teetime.variant.methodcallWithPorts.stage.basic.Sink;
/**
* @author Christian Wulf
......@@ -57,28 +58,31 @@ public class MethodCallThroughputAnalysis15 extends Analysis {
public void init() {
super.init();
this.clockRunnable = this.buildClockPipeline();
this.runnable = this.buildPipeline(this.clock);
StageWithPort clockPipeline = this.buildClockPipeline();
this.clockRunnable = new RunnableStage(clockPipeline);
StageWithPort pipeline = this.buildPipeline(this.clock);
this.runnable = new RunnableStage(pipeline);
}
private Runnable buildClockPipeline() {
private StageWithPort buildClockPipeline() {
this.clock = new Clock();
this.clock.setInitialDelayInMs(100);
this.clock.setIntervalDelayInMs(100);
final Pipeline<Void, Long> pipeline = new Pipeline<Void, Long>();
final Pipeline<Clock, Sink<Long>> pipeline = new Pipeline<Clock, Sink<Long>>();
pipeline.setFirstStage(this.clock);
pipeline.setLastStage(new EndStage<Long>());
pipeline.setLastStage(new Sink<Long>());
return new RunnableStage(pipeline);
return pipeline;
}
/**
* @param numNoopFilters
* @since 1.10
*/
private Runnable buildPipeline(final Clock clock) {
private StageWithPort buildPipeline(final Clock clock) {
@SuppressWarnings("unchecked")
final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters];
// create stages
......@@ -91,7 +95,7 @@ public class MethodCallThroughputAnalysis15 extends Analysis {
final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter();
final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(this.timestampObjects);
final Pipeline<Void, Void> pipeline = new Pipeline<Void, Void>();
final Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new Pipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>();
pipeline.setFirstStage(objectProducer);
pipeline.addIntermediateStage(startTimestampFilter);
pipeline.addIntermediateStages(noopFilters);
......@@ -111,7 +115,7 @@ public class MethodCallThroughputAnalysis15 extends Analysis {
SingleElementPipe.connect(delay.getOutputPort(), collectorSink.getInputPort());
return new RunnableStage(pipeline);
return pipeline;
}
@Override
......
......@@ -24,7 +24,6 @@ import teetime.variant.explicitScheduling.examples.throughput.TimestampObject;
import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.CollectorSink;
......@@ -51,7 +50,6 @@ public class MethodCallThroughputAnalysis16 extends Analysis {
private final List<List<TimestampObject>> timestampObjectsList = new LinkedList<List<TimestampObject>>();
private Distributor<TimestampObject> distributor;
private Thread producerThread;
private Thread[] workerThreads;
......@@ -61,8 +59,9 @@ public class MethodCallThroughputAnalysis16 extends Analysis {
@Override
public void init() {
super.init();
Pipeline<Void, TimestampObject> producerPipeline = this.buildProducerPipeline(this.numInputObjects, this.inputObjectCreator);
this.producerThread = new Thread(new RunnableStage<Void>(producerPipeline));
Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> producerPipeline = this.buildProducerPipeline(this.numInputObjects,
this.inputObjectCreator);
this.producerThread = new Thread(new RunnableStage(producerPipeline));
this.numWorkerThreads = Math.min(NUM_WORKER_THREADS, this.numWorkerThreads);
......@@ -71,20 +70,21 @@ public class MethodCallThroughputAnalysis16 extends Analysis {
List<TimestampObject> resultList = new ArrayList<TimestampObject>(this.numInputObjects);
this.timestampObjectsList.add(resultList);
Pipeline<TimestampObject, Void> workerPipeline = this.buildPipeline(producerPipeline, resultList);
this.workerThreads[i] = new Thread(new RunnableStage<TimestampObject>(workerPipeline));
Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> workerPipeline = this.buildPipeline(producerPipeline, resultList);
this.workerThreads[i] = new Thread(new RunnableStage(workerPipeline));
}
}
private Pipeline<Void, TimestampObject> buildProducerPipeline(final int numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) {
private Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> buildProducerPipeline(final int numInputObjects,
final ConstructorClosure<TimestampObject> inputObjectCreator) {
final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(numInputObjects, inputObjectCreator);
this.distributor = new Distributor<TimestampObject>();
Distributor<TimestampObject> distributor = new Distributor<TimestampObject>();
final Pipeline<Void, TimestampObject> pipeline = new Pipeline<Void, TimestampObject>();
final Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> pipeline = new Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>>();
pipeline.setFirstStage(objectProducer);
pipeline.setLastStage(this.distributor);
pipeline.setLastStage(distributor);
SingleElementPipe.connect(objectProducer.getOutputPort(), this.distributor.getInputPort());
SingleElementPipe.connect(objectProducer.getOutputPort(), distributor.getInputPort());
return pipeline;
}
......@@ -93,7 +93,9 @@ public class MethodCallThroughputAnalysis16 extends Analysis {
* @param numNoopFilters
* @since 1.10
*/
private Pipeline<TimestampObject, Void> buildPipeline(final StageWithPort<Void, TimestampObject> previousStage, final List<TimestampObject> timestampObjects) {
private Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> buildPipeline(
final Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> previousStage,
final List<TimestampObject> timestampObjects) {
Relay<TimestampObject> relay = new Relay<TimestampObject>();
@SuppressWarnings("unchecked")
final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters];
......@@ -105,14 +107,14 @@ public class MethodCallThroughputAnalysis16 extends Analysis {
final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter();
final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects);
final Pipeline<TimestampObject, Void> pipeline = new Pipeline<TimestampObject, Void>();
final Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>();
pipeline.setFirstStage(relay);
pipeline.addIntermediateStage(startTimestampFilter);
pipeline.addIntermediateStages(noopFilters);
pipeline.addIntermediateStage(stopTimestampFilter);
pipeline.setLastStage(collectorSink);
SpScPipe.connect(previousStage.getOutputPort(), relay.getInputPort(), SPSC_INITIAL_CAPACITY);
SpScPipe.connect(previousStage.getLastStage().getNewOutputPort(), relay.getInputPort(), SPSC_INITIAL_CAPACITY);
SingleElementPipe.connect(relay.getOutputPort(), startTimestampFilter.getInputPort());
......
......@@ -30,7 +30,6 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.UnorderedGrowablePipe;
import teetime.variant.methodcallWithPorts.stage.CollectorSink;
import teetime.variant.methodcallWithPorts.stage.EndStage;
import teetime.variant.methodcallWithPorts.stage.NoopFilter;
import teetime.variant.methodcallWithPorts.stage.ObjectProducer;
import teetime.variant.methodcallWithPorts.stage.Relay;
......@@ -60,8 +59,8 @@ public class MethodCallThroughputAnalysis17 extends Analysis {
@Override
public void init() {
final Pipeline<Void, TimestampObject> producerPipeline = this.buildProducerPipeline(this.numInputObjects, this.inputObjectCreator);
this.producerThread = new Thread(new RunnableStage<Void>(producerPipeline));
final StageWithPort producerPipeline = this.buildProducerPipeline(this.numInputObjects, this.inputObjectCreator);
this.producerThread = new Thread(new RunnableStage(producerPipeline));
int numWorkerThreads = Math.min(NUM_WORKER_THREADS, 1); // only for testing purpose
......@@ -95,7 +94,7 @@ public class MethodCallThroughputAnalysis17 extends Analysis {
// this.producerThread.start();
// this.producerThread.run();
new RunnableStage<Void>(producerPipeline).run();
new RunnableStage(producerPipeline).run();
// try {
// this.producerThread.join();
......@@ -107,14 +106,14 @@ public class MethodCallThroughputAnalysis17 extends Analysis {
super.init();
}
private Pipeline<Void, TimestampObject> buildProducerPipeline(final int numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) {
private StageWithPort buildProducerPipeline(final int numInputObjects,
final ConstructorClosure<TimestampObject> inputObjectCreator) {
final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(numInputObjects, inputObjectCreator);
Distributor<TimestampObject> distributor = new Distributor<TimestampObject>();
Sink<TimestampObject> sink = new Sink<TimestampObject>();
EndStage<Void> endStage = new EndStage<Void>();
endStage.closure = inputObjectCreator;
Sink<Void> endStage = new Sink<Void>();
final Pipeline<Void, TimestampObject> pipeline = new Pipeline<Void, TimestampObject>();
final Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> pipeline = new Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>>();
pipeline.setFirstStage(objectProducer);
// pipeline.setFirstStage(sink);
// pipeline.setFirstStage(endStage);
......@@ -127,7 +126,7 @@ public class MethodCallThroughputAnalysis17 extends Analysis {
// objectProducer.getOutputPort().pipe = new UnorderedGrowablePipe<TimestampObject>();
UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), distributor.getInputPort());
distributor.getOutputPort().setPipe(new UnorderedGrowablePipe<TimestampObject>());
distributor.getNewOutputPort().setPipe(new UnorderedGrowablePipe<TimestampObject>());
return pipeline;
}
......@@ -136,7 +135,7 @@ public class MethodCallThroughputAnalysis17 extends Analysis {
* @param numNoopFilters
* @since 1.10
*/
private Runnable buildPipeline(final StageWithPort<Void, TimestampObject> previousStage, final List<TimestampObject> timestampObjects) {
private Runnable buildPipeline(final StageWithPort previousStage, final List<TimestampObject> timestampObjects) {
Relay<TimestampObject> relay = new Relay<TimestampObject>();
// create stages
final StartTimestampFilter startTimestampFilter = new StartTimestampFilter();
......@@ -148,7 +147,7 @@ public class MethodCallThroughputAnalysis17 extends Analysis {
final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter();
final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects);
final Pipeline<TimestampObject, Void> pipeline = new Pipeline<TimestampObject, Void>();
final Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>();
pipeline.setFirstStage(relay);
pipeline.addIntermediateStage(startTimestampFilter);
pipeline.addIntermediateStages(noopFilters);
......
......@@ -51,7 +51,6 @@ public class MethodCallThroughputAnalysis18 extends Analysis {
private final List<List<TimestampObject>> timestampObjectsList = new LinkedList<List<TimestampObject>>();
private Distributor<TimestampObject> distributor;
private Thread producerThread;
private Thread[] workerThreads;
......@@ -61,7 +60,8 @@ public class MethodCallThroughputAnalysis18 extends Analysis {
@Override
public void init() {
super.init();
Pipeline<Void, TimestampObject> producerPipeline = this.buildProducerPipeline(this.numInputObjects, this.inputObjectCreator);
Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> producerPipeline = this.buildProducerPipeline(this.numInputObjects,
this.inputObjectCreator);
this.producerThread = new Thread(new RunnableStage(producerPipeline));
this.numWorkerThreads = Math.min(NUM_WORKER_THREADS, this.numWorkerThreads);
......@@ -71,20 +71,21 @@ public class MethodCallThroughputAnalysis18 extends Analysis {
List<TimestampObject> resultList = new ArrayList<TimestampObject>(this.numInputObjects);
this.timestampObjectsList.add(resultList);
Runnable workerRunnable = this.buildPipeline(producerPipeline, resultList);
this.workerThreads[i] = new Thread(workerRunnable);
StageWithPort pipeline = this.buildPipeline(producerPipeline, resultList);
this.workerThreads[i] = new Thread(new RunnableStage(pipeline));
}
}
private Pipeline<Void, TimestampObject> buildProducerPipeline(final int numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) {
private Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> buildProducerPipeline(final int numInputObjects,
final ConstructorClosure<TimestampObject> inputObjectCreator) {
final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(numInputObjects, inputObjectCreator);
this.distributor = new Distributor<TimestampObject>();
Distributor<TimestampObject> distributor = new Distributor<TimestampObject>();
final Pipeline<Void, TimestampObject> pipeline = new Pipeline<Void, TimestampObject>();
final Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> pipeline = new Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>>();
pipeline.setFirstStage(objectProducer);
pipeline.setLastStage(this.distributor);
pipeline.setLastStage(distributor);
UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), this.distributor.getInputPort());
UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), distributor.getInputPort());
return pipeline;
}
......@@ -93,7 +94,9 @@ public class MethodCallThroughputAnalysis18 extends Analysis {
* @param numNoopFilters
* @since 1.10
*/
private Runnable buildPipeline(final StageWithPort<Void, TimestampObject> previousStage, final List<TimestampObject> timestampObjects) {
private Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> buildPipeline(
final Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> previousStage,
final List<TimestampObject> timestampObjects) {
Relay<TimestampObject> relay = new Relay<TimestampObject>();
@SuppressWarnings("unchecked")
final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters];
......@@ -105,14 +108,14 @@ public class MethodCallThroughputAnalysis18 extends Analysis {
final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter();
final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects);
final Pipeline<TimestampObject, Void> pipeline = new Pipeline<TimestampObject, Void>();
final Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>();
pipeline.setFirstStage(relay);
pipeline.addIntermediateStage(startTimestampFilter);
pipeline.addIntermediateStages(noopFilters);
pipeline.addIntermediateStage(stopTimestampFilter);
pipeline.setLastStage(collectorSink);
SpScPipe.connect(previousStage.getOutputPort(), relay.getInputPort(), SPSC_INITIAL_CAPACITY);
SpScPipe.connect(previousStage.getLastStage().getNewOutputPort(), relay.getInputPort(), SPSC_INITIAL_CAPACITY);
UnorderedGrowablePipe.connect(relay.getOutputPort(), startTimestampFilter.getInputPort());
......@@ -123,7 +126,7 @@ public class MethodCallThroughputAnalysis18 extends Analysis {
UnorderedGrowablePipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
UnorderedGrowablePipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort());
return new RunnableStage(pipeline);
return pipeline;
}
@Override
......
......@@ -24,7 +24,6 @@ import teetime.variant.explicitScheduling.examples.throughput.TimestampObject;
import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.OrderedGrowableArrayPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.CollectorSink;
......@@ -51,7 +50,6 @@ public class MethodCallThroughputAnalysis19 extends Analysis {
private final List<List<TimestampObject>> timestampObjectsList = new LinkedList<List<TimestampObject>>();
private Distributor<TimestampObject> distributor;
private Thread producerThread;
private Thread[] workerThreads;
......@@ -61,7 +59,8 @@ public class MethodCallThroughputAnalysis19 extends Analysis {
@Override
public void init() {
super.init();
Pipeline<Void, TimestampObject> producerPipeline = this.buildProducerPipeline(this.numInputObjects, this.inputObjectCreator);
Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> producerPipeline = this.buildProducerPipeline(this.numInputObjects,
this.inputObjectCreator);
this.producerThread = new Thread(new RunnableStage(producerPipeline));
this.numWorkerThreads = Math.min(NUM_WORKER_THREADS, this.numWorkerThreads);
......@@ -71,30 +70,27 @@ public class MethodCallThroughputAnalysis19 extends Analysis {
List<TimestampObject> resultList = new ArrayList<TimestampObject>(this.numInputObjects);
this.timestampObjectsList.add(resultList);
Runnable workerRunnable = this.buildPipeline(producerPipeline, resultList);
Runnable workerRunnable = this.buildPipeline(producerPipeline.getLastStage(), resultList);
this.workerThreads[i] = new Thread(workerRunnable);
}
}
private Pipeline<Void, TimestampObject> buildProducerPipeline(final int numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) {
private Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> buildProducerPipeline(final int numInputObjects,
final ConstructorClosure<TimestampObject> inputObjectCreator) {
final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(numInputObjects, inputObjectCreator);
this.distributor = new Distributor<TimestampObject>();
Distributor<TimestampObject> distributor = new Distributor<TimestampObject>();
final Pipeline<Void, TimestampObject> pipeline = new Pipeline<Void, TimestampObject>();
final Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> pipeline = new Pipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>>();
pipeline.setFirstStage(objectProducer);
pipeline.setLastStage(this.distributor);
pipeline.setLastStage(distributor);
OrderedGrowableArrayPipe.connect(objectProducer.getOutputPort(), this.distributor.getInputPort());
OrderedGrowableArrayPipe.connect(objectProducer.getOutputPort(), distributor.getInputPort());
return pipeline;
}
/**
* @param numNoopFilters
* @since 1.10
*/
private Runnable buildPipeline(final StageWithPort<Void, TimestampObject> previousStage, final List<TimestampObject> timestampObjects) {
private Runnable buildPipeline(final Distributor<TimestampObject> previousStage, final List<TimestampObject> timestampObjects) {
Relay<TimestampObject> relay = new Relay<TimestampObject>();
@SuppressWarnings("unchecked")
final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters];
......@@ -106,14 +102,14 @@ public class MethodCallThroughputAnalysis19 extends Analysis {
final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter();
final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects);
final Pipeline<TimestampObject, Void> pipeline = new Pipeline<TimestampObject, Void>();
final Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new Pipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>();
pipeline.setFirstStage(relay);
pipeline.addIntermediateStage(startTimestampFilter);
pipeline.addIntermediateStages(noopFilters);
pipeline.addIntermediateStage(stopTimestampFilter);
pipeline.setLastStage(collectorSink);
SpScPipe.connect(previousStage.getOutputPort(), relay.getInputPort(), SPSC_INITIAL_CAPACITY);
SpScPipe.connect(previousStage.getNewOutputPort(), relay.getInputPort(), SPSC_INITIAL_CAPACITY);
OrderedGrowableArrayPipe.connect(relay.getOutputPort(), startTimestampFilter.getInputPort());
......
......@@ -44,7 +44,7 @@ import kieker.common.util.registry.Lookup;
*
* @since 1.10
*/
public class TCPReaderSink extends ProducerStage<Void, IMonitoringRecord> {
public class TCPReaderSink extends ProducerStage<IMonitoringRecord> {
private static final int MESSAGE_BUFFER_SIZE = 65535;
......@@ -92,7 +92,7 @@ public class TCPReaderSink extends ProducerStage<Void, IMonitoringRecord> {
}
@Override
protected void execute5(final Void element) {
protected void execute() {
ServerSocketChannel serversocket = null;
try {
serversocket = ServerSocketChannel.open();
......
......@@ -4,8 +4,6 @@ import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import kieker.common.record.IMonitoringRecord;
public class TcpTraceLogging extends Analysis {
private Thread tcpThread;
......@@ -13,8 +11,8 @@ public class TcpTraceLogging extends Analysis {
@Override
public void init() {
super.init();
StageWithPort<Void, IMonitoringRecord> tcpPipeline = this.buildTcpPipeline();
this.tcpThread = new Thread(new RunnableStage<Void>(tcpPipeline));
StageWithPort tcpPipeline = this.buildTcpPipeline();
this.tcpThread = new Thread(new RunnableStage(tcpPipeline));
}
@Override
......@@ -30,7 +28,7 @@ public class TcpTraceLogging extends Analysis {
}
}
private StageWithPort<Void, IMonitoringRecord> buildTcpPipeline() {
private StageWithPort buildTcpPipeline() {
TCPReaderSink tcpReader = new TCPReaderSink();
// EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>();
//
......
......@@ -12,9 +12,9 @@ import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.EndStage;
import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
import teetime.variant.methodcallWithPorts.stage.Relay;
import teetime.variant.methodcallWithPorts.stage.basic.Sink;
import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter;
......@@ -41,41 +41,41 @@ public class TcpTraceReconstruction extends Analysis {
@Override
public void init() {
super.init();
StageWithPort<Void, IMonitoringRecord> tcpPipeline = this.buildTcpPipeline();
this.tcpThread = new Thread(new RunnableStage<Void>(tcpPipeline));
Pipeline<TCPReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline();
this.tcpThread = new Thread(new RunnableStage(tcpPipeline));
this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads);
this.workerThreads = new Thread[this.numWorkerThreads];
for (int i = 0; i < this.workerThreads.length; i++) {
StageWithPort<IMonitoringRecord, ?> pipeline = this.buildPipeline(tcpPipeline);
this.workerThreads[i] = new Thread(new RunnableStage<IMonitoringRecord>(pipeline));
StageWithPort pipeline = this.buildPipeline(tcpPipeline.getLastStage());
this.workerThreads[i] = new Thread(new RunnableStage(pipeline));
}
}
private StageWithPort<Void, IMonitoringRecord> buildTcpPipeline() {
private Pipeline<TCPReader, Distributor<IMonitoringRecord>> buildTcpPipeline() {
TCPReader tcpReader = new TCPReader();
Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>();
SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort());
// create and configure pipeline
Pipeline<Void, IMonitoringRecord> pipeline = new Pipeline<Void, IMonitoringRecord>();
Pipeline<TCPReader, Distributor<IMonitoringRecord>> pipeline = new Pipeline<TCPReader, Distributor<IMonitoringRecord>>();
pipeline.setFirstStage(tcpReader);
pipeline.setLastStage(distributor);
return pipeline;
}
private Pipeline<IMonitoringRecord, ?> buildPipeline(final StageWithPort<Void, IMonitoringRecord> tcpReaderPipeline) {
private StageWithPort buildPipeline(final Distributor<IMonitoringRecord> tcpReaderPipeline) {
// create stages
Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>();
final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>(
IFlowRecord.class);
final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(this.traceId2trace);
EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>();
Sink<TraceEventRecords> endStage = new Sink<TraceEventRecords>();
// connect stages
SpScPipe<IMonitoringRecord> tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
SpScPipe<IMonitoringRecord> tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getNewOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
this.tcpRelayPipes.add(tcpRelayPipe);
SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort());
......@@ -83,7 +83,7 @@ public class TcpTraceReconstruction extends Analysis {
SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), endStage.getInputPort());
// create and configure pipeline
Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>();
Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>();
pipeline.setFirstStage(relay);
pipeline.addIntermediateStage(instanceOfFilter);
pipeline.addIntermediateStage(traceReconstructionFilter);
......
......@@ -15,9 +15,9 @@ import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.Clock;
import teetime.variant.methodcallWithPorts.stage.EndStage;
import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
import teetime.variant.methodcallWithPorts.stage.Relay;
import teetime.variant.methodcallWithPorts.stage.basic.Sink;
import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter;
......@@ -49,35 +49,35 @@ public class TcpTraceReduction extends Analysis {
@Override
public void init() {
super.init();
StageWithPort<Void, IMonitoringRecord> tcpPipeline = this.buildTcpPipeline();
this.tcpThread = new Thread(new RunnableStage<Void>(tcpPipeline));
Pipeline<TCPReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline();
this.tcpThread = new Thread(new RunnableStage(tcpPipeline));
StageWithPort<Void, Long> clockStage = this.buildClockPipeline(5000);
this.clockThread = new Thread(new RunnableStage<Void>(clockStage));
Pipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(5000);
this.clockThread = new Thread(new RunnableStage(clockStage));
this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads);
this.workerThreads = new Thread[this.numWorkerThreads];
for (int i = 0; i < this.workerThreads.length; i++) {
StageWithPort<IMonitoringRecord, ?> pipeline = this.buildPipeline(tcpPipeline, clockStage);
this.workerThreads[i] = new Thread(new RunnableStage<IMonitoringRecord>(pipeline));
StageWithPort pipeline = this.buildPipeline(tcpPipeline.getLastStage(), clockStage.getLastStage());
this.workerThreads[i] = new Thread(new RunnableStage(pipeline));
}
}
private StageWithPort<Void, IMonitoringRecord> buildTcpPipeline() {
private Pipeline<TCPReader, Distributor<IMonitoringRecord>> buildTcpPipeline() {
TCPReader tcpReader = new TCPReader();
Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>();
SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort());
// create and configure pipeline
Pipeline<Void, IMonitoringRecord> pipeline = new Pipeline<Void, IMonitoringRecord>();
Pipeline<TCPReader, Distributor<IMonitoringRecord>> pipeline = new Pipeline<TCPReader, Distributor<IMonitoringRecord>>();
pipeline.setFirstStage(tcpReader);
pipeline.setLastStage(distributor);
return pipeline;
}
private StageWithPort<Void, Long> buildClockPipeline(final long intervalDelayInMs) {
private Pipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) {
Clock clock = new Clock();
clock.setInitialDelayInMs(intervalDelayInMs);
clock.setIntervalDelayInMs(intervalDelayInMs);
......@@ -86,24 +86,23 @@ public class TcpTraceReduction extends Analysis {
SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort());
// create and configure pipeline
Pipeline<Void, Long> pipeline = new Pipeline<Void, Long>();
Pipeline<Clock, Distributor<Long>> pipeline = new Pipeline<Clock, Distributor<Long>>();
pipeline.setFirstStage(clock);
pipeline.setLastStage(distributor);
return pipeline;
}
private Pipeline<IMonitoringRecord, ?> buildPipeline(final StageWithPort<Void, IMonitoringRecord> tcpReaderPipeline,
final StageWithPort<Void, Long> clockStage) {
private StageWithPort buildPipeline(final Distributor<IMonitoringRecord> tcpReaderPipeline, final Distributor<Long> clockStage) {
// create stages
Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>();
final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>(
IFlowRecord.class);
final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(this.traceId2trace);
TraceReductionFilter traceReductionFilter = new TraceReductionFilter(this.trace2buffer);
EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>();
Sink<TraceEventRecords> endStage = new Sink<TraceEventRecords>();
// connect stages
SpScPipe<IMonitoringRecord> tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
SpScPipe<IMonitoringRecord> tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getNewOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
this.tcpRelayPipes.add(tcpRelayPipe);
SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort());
......@@ -111,10 +110,10 @@ public class TcpTraceReduction extends Analysis {
SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), traceReductionFilter.getInputPort());
SingleElementPipe.connect(traceReductionFilter.getOutputPort(), endStage.getInputPort());
SpScPipe.connect(clockStage.getOutputPort(), traceReductionFilter.getTriggerInputPort(), 10);
SpScPipe.connect(clockStage.getNewOutputPort(), traceReductionFilter.getTriggerInputPort(), 10);
// create and configure pipeline
Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>();
Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>();
pipeline.setFirstStage(relay);
pipeline.addIntermediateStage(instanceOfFilter);
pipeline.addIntermediateStage(traceReconstructionFilter);
......
......@@ -22,6 +22,7 @@ import java.util.List;
import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.CollectorSink;
......@@ -46,21 +47,21 @@ public class RecordReaderAnalysis extends Analysis {
@Override
public void init() {
super.init();
Pipeline<File, ?> producerPipeline = this.buildProducerPipeline();
this.producerThread = new Thread(new RunnableStage<File>(producerPipeline));
StageWithPort producerPipeline = this.buildProducerPipeline();
this.producerThread = new Thread(new RunnableStage(producerPipeline));
}
private Pipeline<File, Void> buildProducerPipeline() {
private StageWithPort buildProducerPipeline() {
this.classNameRegistryRepository = new ClassNameRegistryRepository();
// create stages
Dir2RecordsFilter dir2RecordsFilter = new Dir2RecordsFilter(this.classNameRegistryRepository);
CollectorSink<IMonitoringRecord> collector = new CollectorSink<IMonitoringRecord>(this.elementCollection);
final Pipeline<File, Void> pipeline = new Pipeline<File, Void>();
final Pipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>> pipeline = new Pipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>>();
pipeline.setFirstStage(dir2RecordsFilter);
pipeline.setLastStage(collector);
SpScPipe.connect(null, dir2RecordsFilter.getInputPort(), 1);
dir2RecordsFilter.getInputPort().setPipe(new SpScPipe<File>(1));
SingleElementPipe.connect(dir2RecordsFilter.getOutputPort(), collector.getInputPort());
dir2RecordsFilter.getInputPort().getPipe().add(new File("src/test/data/bookstore-logs"));
......
......@@ -11,7 +11,7 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.Clock;
import teetime.variant.methodcallWithPorts.stage.Counter;
import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage;
import teetime.variant.methodcallWithPorts.stage.EndStage;
import teetime.variant.methodcallWithPorts.stage.basic.Sink;
import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
......@@ -25,7 +25,7 @@ public class TcpTraceLoggingExtAnalysis extends Analysis {
private Counter<IMonitoringRecord> recordCounter;
private ElementThroughputMeasuringStage<IMonitoringRecord> recordThroughputStage;
private StageWithPort<Void, Long> buildClockPipeline(final long intervalDelayInMs) {
private Pipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) {
Clock clockStage = new Clock();
clockStage.setInitialDelayInMs(intervalDelayInMs);
clockStage.setIntervalDelayInMs(intervalDelayInMs);
......@@ -34,26 +34,26 @@ public class TcpTraceLoggingExtAnalysis extends Analysis {
SingleElementPipe.connect(clockStage.getOutputPort(), distributor.getInputPort());
// create and configure pipeline
Pipeline<Void, Long> pipeline = new Pipeline<Void, Long>();
Pipeline<Clock, Distributor<Long>> pipeline = new Pipeline<Clock, Distributor<Long>>();
pipeline.setFirstStage(clockStage);
pipeline.setLastStage(distributor);
return pipeline;
}
private StageWithPort<Void, IMonitoringRecord> buildTcpPipeline(final StageWithPort<Void, Long> clockPipeline) {
private StageWithPort buildTcpPipeline(final Distributor<Long> previousClockStage) {
TCPReader tcpReader = new TCPReader();
this.recordCounter = new Counter<IMonitoringRecord>();
this.recordThroughputStage = new ElementThroughputMeasuringStage<IMonitoringRecord>();
EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>();
Sink<IMonitoringRecord> endStage = new Sink<IMonitoringRecord>();
SingleElementPipe.connect(tcpReader.getOutputPort(), this.recordCounter.getInputPort());
SingleElementPipe.connect(this.recordCounter.getOutputPort(), this.recordThroughputStage.getInputPort());
SingleElementPipe.connect(this.recordThroughputStage.getOutputPort(), endStage.getInputPort());
SpScPipe.connect(clockPipeline.getOutputPort(), this.recordThroughputStage.getTriggerInputPort(), 10);
SpScPipe.connect(previousClockStage.getNewOutputPort(), this.recordThroughputStage.getTriggerInputPort(), 10);
// create and configure pipeline
Pipeline<Void, IMonitoringRecord> pipeline = new Pipeline<Void, IMonitoringRecord>();
Pipeline<TCPReader, Sink<IMonitoringRecord>> pipeline = new Pipeline<TCPReader, Sink<IMonitoringRecord>>();
pipeline.setFirstStage(tcpReader);
pipeline.addIntermediateStage(this.recordCounter);
pipeline.addIntermediateStage(this.recordThroughputStage);
......@@ -65,11 +65,11 @@ public class TcpTraceLoggingExtAnalysis extends Analysis {
public void init() {
super.init();
StageWithPort<Void, Long> clockPipeline = this.buildClockPipeline(1000);
this.clockThread = new Thread(new RunnableStage<Void>(clockPipeline));
Pipeline<Clock, Distributor<Long>> clockPipeline = this.buildClockPipeline(1000);
this.clockThread = new Thread(new RunnableStage(clockPipeline));
StageWithPort<Void, IMonitoringRecord> tcpPipeline = this.buildTcpPipeline(clockPipeline);
this.tcpThread = new Thread(new RunnableStage<Void>(tcpPipeline));
StageWithPort tcpPipeline = this.buildTcpPipeline(clockPipeline.getLastStage());
this.tcpThread = new Thread(new RunnableStage(tcpPipeline));
}
@Override
......
......@@ -14,8 +14,8 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.Clock;
import teetime.variant.methodcallWithPorts.stage.Counter;
import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage;
import teetime.variant.methodcallWithPorts.stage.EndStage;
import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
import teetime.variant.methodcallWithPorts.stage.basic.Sink;
import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter;
......@@ -45,17 +45,17 @@ public class TcpTraceReconstructionAnalysis extends Analysis {
@Override
public void init() {
super.init();
StageWithPort<Void, Long> clockStage = this.buildClockPipeline(1000);
this.clockThread = new Thread(new RunnableStage<Void>(clockStage));
Pipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000);
this.clockThread = new Thread(new RunnableStage(clockStage));
StageWithPort<Void, Long> clock2Stage = this.buildClockPipeline(2000);
this.clock2Thread = new Thread(new RunnableStage<Void>(clock2Stage));
Pipeline<Clock, Distributor<Long>> clock2Stage = this.buildClockPipeline(2000);
this.clock2Thread = new Thread(new RunnableStage(clock2Stage));
Pipeline<Void, ?> pipeline = this.buildPipeline(clockStage, clock2Stage);
this.workerThread = new Thread(new RunnableStage<Void>(pipeline));
StageWithPort pipeline = this.buildPipeline(clockStage.getLastStage(), clock2Stage.getLastStage());
this.workerThread = new Thread(new RunnableStage(pipeline));
}
private StageWithPort<Void, Long> buildClockPipeline(final long intervalDelayInMs) {
private Pipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) {
Clock clock = new Clock();
clock.setIntervalDelayInMs(intervalDelayInMs);
Distributor<Long> distributor = new Distributor<Long>();
......@@ -63,13 +63,13 @@ public class TcpTraceReconstructionAnalysis extends Analysis {
SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort());
// create and configure pipeline
Pipeline<Void, Long> pipeline = new Pipeline<Void, Long>();
Pipeline<Clock, Distributor<Long>> pipeline = new Pipeline<Clock, Distributor<Long>>();
pipeline.setFirstStage(clock);
pipeline.setLastStage(distributor);
return pipeline;
}
private Pipeline<Void, TraceEventRecords> buildPipeline(final StageWithPort<Void, Long> clockStage, final StageWithPort<Void, Long> clock2Stage) {
private StageWithPort buildPipeline(final Distributor<Long> clockStage, final Distributor<Long> clock2Stage) {
// create stages
TCPReader tcpReader = new TCPReader();
this.recordCounter = new Counter<IMonitoringRecord>();
......@@ -79,7 +79,7 @@ public class TcpTraceReconstructionAnalysis extends Analysis {
final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(this.traceId2trace);
this.traceThroughputFilter = new ElementThroughputMeasuringStage<TraceEventRecords>();
this.traceCounter = new Counter<TraceEventRecords>();
EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>();
Sink<TraceEventRecords> endStage = new Sink<TraceEventRecords>();
// connect stages
SpScPipe.connect(tcpReader.getOutputPort(), this.recordCounter.getInputPort(), TCP_RELAY_MAX_SIZE);
......@@ -92,11 +92,11 @@ public class TcpTraceReconstructionAnalysis extends Analysis {
SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.traceCounter.getInputPort());
SingleElementPipe.connect(this.traceCounter.getOutputPort(), endStage.getInputPort());
SpScPipe.connect(clockStage.getOutputPort(), this.recordThroughputFilter.getTriggerInputPort(), 10);
SpScPipe.connect(clock2Stage.getOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 10);
SpScPipe.connect(clockStage.getNewOutputPort(), this.recordThroughputFilter.getTriggerInputPort(), 10);
SpScPipe.connect(clock2Stage.getNewOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 10);
// create and configure pipeline
Pipeline<Void, TraceEventRecords> pipeline = new Pipeline<Void, TraceEventRecords>();
Pipeline<TCPReader, Sink<TraceEventRecords>> pipeline = new Pipeline<TCPReader, Sink<TraceEventRecords>>();
pipeline.setFirstStage(tcpReader);
pipeline.addIntermediateStage(this.recordCounter);
pipeline.addIntermediateStage(instanceOfFilter);
......
......@@ -48,21 +48,21 @@ public class TraceReconstructionAnalysis extends Analysis {
@Override
public void init() {
super.init();
StageWithPort<Void, Long> clockStage = this.buildClockPipeline();
this.clockThread = new Thread(new RunnableStage<Void>(clockStage));
Clock clockStage = this.buildClockPipeline();
this.clockThread = new Thread(new RunnableStage(clockStage));
Pipeline<File, ?> pipeline = this.buildPipeline(clockStage);
this.workerThread = new Thread(new RunnableStage<File>(pipeline));
StageWithPort pipeline = this.buildPipeline(clockStage);
this.workerThread = new Thread(new RunnableStage(pipeline));
}
private StageWithPort<Void, Long> buildClockPipeline() {
private Clock buildClockPipeline() {
Clock clock = new Clock();
clock.setIntervalDelayInMs(100);
return clock;
}
private Pipeline<File, Void> buildPipeline(final StageWithPort<Void, Long> clockStage) {
private StageWithPort buildPipeline(final Clock clockStage) {
this.classNameRegistryRepository = new ClassNameRegistryRepository();
// create stages
......@@ -100,7 +100,7 @@ public class TraceReconstructionAnalysis extends Analysis {
dir2RecordsFilter.getInputPort().getPipe().add(this.inputDir);
// create and configure pipeline
Pipeline<File, Void> pipeline = new Pipeline<File, Void>();
Pipeline<Dir2RecordsFilter, CollectorSink<TraceEventRecords>> pipeline = new Pipeline<Dir2RecordsFilter, CollectorSink<TraceEventRecords>>();
pipeline.setFirstStage(dir2RecordsFilter);
pipeline.addIntermediateStage(this.recordCounter);
pipeline.addIntermediateStage(cache);
......
......@@ -18,10 +18,10 @@ import teetime.variant.methodcallWithPorts.stage.Clock;
import teetime.variant.methodcallWithPorts.stage.Counter;
import teetime.variant.methodcallWithPorts.stage.ElementDelayMeasuringStage;
import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage;
import teetime.variant.methodcallWithPorts.stage.EndStage;
import teetime.variant.methodcallWithPorts.stage.InstanceCounter;
import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
import teetime.variant.methodcallWithPorts.stage.Relay;
import teetime.variant.methodcallWithPorts.stage.basic.Sink;
import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter;
......@@ -49,38 +49,38 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
@Override
public void init() {
super.init();
StageWithPort<Void, IMonitoringRecord> tcpPipeline = this.buildTcpPipeline();
this.tcpThread = new Thread(new RunnableStage<Void>(tcpPipeline));
Pipeline<TCPReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline();
this.tcpThread = new Thread(new RunnableStage(tcpPipeline));
StageWithPort<Void, Long> clockStage = this.buildClockPipeline(1000);
this.clockThread = new Thread(new RunnableStage<Void>(clockStage));
Pipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000);
this.clockThread = new Thread(new RunnableStage(clockStage));
StageWithPort<Void, Long> clock2Stage = this.buildClockPipeline(2000);
this.clock2Thread = new Thread(new RunnableStage<Void>(clock2Stage));
Pipeline<Clock, Distributor<Long>> clock2Stage = this.buildClockPipeline(2000);
this.clock2Thread = new Thread(new RunnableStage(clock2Stage));
this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads);
this.workerThreads = new Thread[this.numWorkerThreads];
for (int i = 0; i < this.workerThreads.length; i++) {
StageWithPort<IMonitoringRecord, ?> pipeline = this.buildPipeline(tcpPipeline, clockStage, clock2Stage);
this.workerThreads[i] = new Thread(new RunnableStage<IMonitoringRecord>(pipeline));
StageWithPort pipeline = this.buildPipeline(tcpPipeline.getLastStage(), clockStage.getLastStage(), clock2Stage.getLastStage());
this.workerThreads[i] = new Thread(new RunnableStage(pipeline));
}
}
private StageWithPort<Void, IMonitoringRecord> buildTcpPipeline() {
private Pipeline<TCPReader, Distributor<IMonitoringRecord>> buildTcpPipeline() {
TCPReader tcpReader = new TCPReader();
Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>();
SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort());
// create and configure pipeline
Pipeline<Void, IMonitoringRecord> pipeline = new Pipeline<Void, IMonitoringRecord>("TCP reader pipeline");
Pipeline<TCPReader, Distributor<IMonitoringRecord>> pipeline = new Pipeline<TCPReader, Distributor<IMonitoringRecord>>("TCP reader pipeline");
pipeline.setFirstStage(tcpReader);
pipeline.setLastStage(distributor);
return pipeline;
}
private StageWithPort<Void, Long> buildClockPipeline(final long intervalDelayInMs) {
private Pipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) {
Clock clock = new Clock();
clock.setInitialDelayInMs(intervalDelayInMs);
clock.setIntervalDelayInMs(intervalDelayInMs);
......@@ -89,13 +89,13 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort());
// create and configure pipeline
Pipeline<Void, Long> pipeline = new Pipeline<Void, Long>();
Pipeline<Clock, Distributor<Long>> pipeline = new Pipeline<Clock, Distributor<Long>>();
pipeline.setFirstStage(clock);
pipeline.setLastStage(distributor);
return pipeline;
}
private static class StageFactory<T extends StageWithPort<?, ?>> {
private static class StageFactory<T extends StageWithPort> {
private final Constructor<T> constructor;
private final List<T> stages = new ArrayList<T>();
......@@ -154,8 +154,8 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
}
}
private Pipeline<IMonitoringRecord, ?> buildPipeline(final StageWithPort<Void, IMonitoringRecord> tcpReaderPipeline,
final StageWithPort<Void, Long> clockStage, final StageWithPort<Void, Long> clock2Stage) {
private StageWithPort buildPipeline(final Distributor<IMonitoringRecord> tcpReaderPipeline,
final Distributor<Long> clockStage, final Distributor<Long> clock2Stage) {
// create stages
Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>();
Counter<IMonitoringRecord> recordCounter = this.recordCounterFactory.create();
......@@ -168,16 +168,16 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
final TraceReconstructionFilter traceReconstructionFilter = this.traceReconstructionFilterFactory.create(this.traceId2trace);
Counter<TraceEventRecords> traceCounter = this.traceCounterFactory.create();
ElementThroughputMeasuringStage<TraceEventRecords> traceThroughputFilter = this.traceThroughputFilterFactory.create();
EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>();
Sink<TraceEventRecords> endStage = new Sink<TraceEventRecords>();
// EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>();
// connect stages
SpScPipe<IMonitoringRecord> tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
SpScPipe<IMonitoringRecord> tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getNewOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
this.tcpRelayPipes.add(tcpRelayPipe);
// SysOutFilter<TraceEventRecords> sysout = new SysOutFilter<TraceEventRecords>(tcpRelayPipe);
SpScPipe.connect(clockStage.getOutputPort(), recordThroughputFilter.getTriggerInputPort(), 10);
SpScPipe.connect(clock2Stage.getOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10);
SpScPipe.connect(clockStage.getNewOutputPort(), recordThroughputFilter.getTriggerInputPort(), 10);
SpScPipe.connect(clock2Stage.getNewOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10);
SingleElementPipe.connect(relay.getOutputPort(), recordCounter.getInputPort());
SingleElementPipe.connect(recordCounter.getOutputPort(), recordThroughputFilter.getInputPort());
......@@ -190,7 +190,7 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
SingleElementPipe.connect(traceCounter.getOutputPort(), endStage.getInputPort());
// create and configure pipeline
Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>("Worker pipeline");
Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>("Worker pipeline");
pipeline.setFirstStage(relay);
pipeline.addIntermediateStage(recordCounter);
pipeline.addIntermediateStage(recordThroughputFilter);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment