Skip to content
Snippets Groups Projects
Commit f9ab1a67 authored by Christian Wulf's avatar Christian Wulf
Browse files

added Merger

parent 5588e6a7
Branches
Tags
No related merge requests found
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.examples.throughput.methodcall;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import teetime.examples.throughput.TimestampObject;
import teetime.examples.throughput.methodcall.stage.CollectorSink;
import teetime.examples.throughput.methodcall.stage.Distributor;
import teetime.examples.throughput.methodcall.stage.NoopFilter;
import teetime.examples.throughput.methodcall.stage.ObjectProducer;
import teetime.examples.throughput.methodcall.stage.Pipeline;
import teetime.examples.throughput.methodcall.stage.Relay;
import teetime.examples.throughput.methodcall.stage.StartTimestampFilter;
import teetime.examples.throughput.methodcall.stage.StopTimestampFilter;
import teetime.framework.core.Analysis;
/**
* @author Christian Wulf
*
* @since 1.10
*/
public class MethodCallThroughputAnalysis18 extends Analysis {
private static final int NUM_WORKER_THREADS = Runtime.getRuntime().availableProcessors();
private int numInputObjects;
private ConstructorClosure<TimestampObject> inputObjectCreator;
private int numNoopFilters;
private final List<List<TimestampObject>> timestampObjectsList = new LinkedList<List<TimestampObject>>();
private Distributor<TimestampObject> distributor;
private Thread producerThread;
private Thread[] workerThreads;
private int numWorkerThreads;
@Override
public void init() {
super.init();
Pipeline<Void, TimestampObject> producerPipeline = this.buildProducerPipeline(this.numInputObjects, this.inputObjectCreator);
this.producerThread = new Thread(new RunnableStage(producerPipeline));
this.numWorkerThreads = Math.min(NUM_WORKER_THREADS, this.numWorkerThreads);
this.workerThreads = new Thread[this.numWorkerThreads];
for (int i = 0; i < this.workerThreads.length; i++) {
List<TimestampObject> resultList = new ArrayList<TimestampObject>(this.numInputObjects);
this.timestampObjectsList.add(resultList);
Runnable workerRunnable = this.buildPipeline(producerPipeline, resultList);
this.workerThreads[i] = new Thread(workerRunnable);
}
}
private Pipeline<Void, TimestampObject> buildProducerPipeline(final int numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) {
final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(numInputObjects, inputObjectCreator);
this.distributor = new Distributor<TimestampObject>();
final Pipeline<Void, TimestampObject> pipeline = new Pipeline<Void, TimestampObject>();
pipeline.setFirstStage(objectProducer);
pipeline.setLastStage(this.distributor);
UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), this.distributor.getInputPort());
return pipeline;
}
/**
* @param numNoopFilters
* @since 1.10
*/
private Runnable buildPipeline(final StageWithPort<Void, TimestampObject> previousStage, final List<TimestampObject> timestampObjects) {
Relay<TimestampObject> relay = new Relay<TimestampObject>();
@SuppressWarnings("unchecked")
final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters];
// create stages
final StartTimestampFilter startTimestampFilter = new StartTimestampFilter();
for (int i = 0; i < noopFilters.length; i++) {
noopFilters[i] = new NoopFilter<TimestampObject>();
}
final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter();
final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects);
final Pipeline<TimestampObject, Object> pipeline = new Pipeline<TimestampObject, Object>();
pipeline.setFirstStage(relay);
pipeline.addIntermediateStage(startTimestampFilter);
pipeline.addIntermediateStages(noopFilters);
pipeline.addIntermediateStage(stopTimestampFilter);
pipeline.setLastStage(collectorSink);
SpScPipe.connect(previousStage.getOutputPort(), relay.getInputPort());
UnorderedGrowablePipe.connect(relay.getOutputPort(), startTimestampFilter.getInputPort());
UnorderedGrowablePipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort());
for (int i = 0; i < noopFilters.length - 1; i++) {
UnorderedGrowablePipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort());
}
UnorderedGrowablePipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
UnorderedGrowablePipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort());
return new RunnableStage(pipeline);
}
@Override
public void start() {
super.start();
for (Thread workerThread : this.workerThreads) {
workerThread.start();
}
try {
for (Thread workerThread : this.workerThreads) {
workerThread.join();
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void setInput(final int numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) {
this.numInputObjects = numInputObjects;
this.inputObjectCreator = inputObjectCreator;
}
public int getNumNoopFilters() {
return this.numNoopFilters;
}
public void setNumNoopFilters(final int numNoopFilters) {
this.numNoopFilters = numNoopFilters;
}
public List<List<TimestampObject>> getTimestampObjectsList() {
return this.timestampObjectsList;
}
public int getNumWorkerThreads() {
return this.numWorkerThreads;
}
public void setNumWorkerThreads(final int numWorkerThreads) {
this.numWorkerThreads = numWorkerThreads;
}
}
...@@ -61,7 +61,7 @@ public final class Distributor<T> extends AbstractStage<T, T> { ...@@ -61,7 +61,7 @@ public final class Distributor<T> extends AbstractStage<T, T> {
int sizeInPow2 = Pow2.findNextPositivePowerOfTwo(this.size); // is not necessary so far int sizeInPow2 = Pow2.findNextPositivePowerOfTwo(this.size); // is not necessary so far
this.outputPorts = this.outputPortList.toArray(new OutputPort[sizeInPow2]); this.outputPorts = this.outputPortList.toArray(new OutputPort[sizeInPow2]);
System.out.println("outputPorts: " + this.outputPorts); // System.out.println("outputPorts: " + this.outputPorts);
} }
@Override @Override
... ...
......
package teetime.examples.throughput.methodcall.stage;
import java.util.ArrayList;
import java.util.List;
import teetime.examples.throughput.methodcall.InputPort;
import teetime.util.concurrent.spsc.Pow2;
import teetime.util.list.CommittableQueue;
public class Merger<T> extends AbstractStage<T, T> {
// TODO do not inherit from AbstractStage since it provides the default input port that is unnecessary for the merger
private final List<InputPort<T>> inputPortList = new ArrayList<InputPort<T>>();
private int nextInputPortIndex;
private int size;
private InputPort<T>[] inputPorts;
@Override
public T execute(final Object element) {
// TODO Auto-generated method stub
return null;
}
@Override
protected void execute4(final CommittableQueue<T> elements) {
// TODO Auto-generated method stub
}
@Override
protected void execute5(final T element) {
this.send(element);
}
@Override
public void executeWithPorts() {
InputPort<T> inputPort = this.inputPorts[this.nextInputPortIndex % this.size];
T element = inputPort.receive();
// if (element == null) {
// return;
// }
this.nextInputPortIndex++;
InputPort<T> nextInputPort = this.inputPorts[this.nextInputPortIndex % this.size];
this.setReschedulable(nextInputPort.getPipe().size() > 0);
this.execute5(element);
}
@SuppressWarnings("unchecked")
@Override
public void onStart() {
this.size = this.inputPortList.size();
// this.mask = this.size - 1;
int sizeInPow2 = Pow2.findNextPositivePowerOfTwo(this.size); // is not necessary so far
this.inputPorts = this.inputPortList.toArray(new InputPort[sizeInPow2]);
// System.out.println("inputPorts: " + this.inputPorts);
}
@Override
public InputPort<T> getInputPort() {
return this.getNewInputPort();
}
private InputPort<T> getNewInputPort() {
InputPort<T> inputPort = new InputPort<T>();
this.inputPortList.add(inputPort);
return inputPort;
}
@Override
public void onIsPipelineHead() {
// TODO Auto-generated method stub
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment