Skip to content
Snippets Groups Projects
Commit 340fcfb5 authored by Christian Wulf's avatar Christian Wulf
Browse files

added Merger

parent 812367ce
No related branches found
No related tags found
No related merge requests found
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.examples.throughput.methodcall;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import teetime.examples.throughput.TimestampObject;
import teetime.examples.throughput.methodcall.stage.CollectorSink;
import teetime.examples.throughput.methodcall.stage.Distributor;
import teetime.examples.throughput.methodcall.stage.NoopFilter;
import teetime.examples.throughput.methodcall.stage.ObjectProducer;
import teetime.examples.throughput.methodcall.stage.Pipeline;
import teetime.examples.throughput.methodcall.stage.Relay;
import teetime.examples.throughput.methodcall.stage.StartTimestampFilter;
import teetime.examples.throughput.methodcall.stage.StopTimestampFilter;
import teetime.framework.core.Analysis;
/**
* @author Christian Wulf
*
* @since 1.10
*/
public class MethodCallThroughputAnalysis18 extends Analysis {
private static final int NUM_WORKER_THREADS = Runtime.getRuntime().availableProcessors();
private int numInputObjects;
private ConstructorClosure<TimestampObject> inputObjectCreator;
private int numNoopFilters;
private final List<List<TimestampObject>> timestampObjectsList = new LinkedList<List<TimestampObject>>();
private Distributor<TimestampObject> distributor;
private Thread producerThread;
private Thread[] workerThreads;
private int numWorkerThreads;
@Override
public void init() {
super.init();
Pipeline<Void, TimestampObject> producerPipeline = this.buildProducerPipeline(this.numInputObjects, this.inputObjectCreator);
this.producerThread = new Thread(new RunnableStage(producerPipeline));
this.numWorkerThreads = Math.min(NUM_WORKER_THREADS, this.numWorkerThreads);
this.workerThreads = new Thread[this.numWorkerThreads];
for (int i = 0; i < this.workerThreads.length; i++) {
List<TimestampObject> resultList = new ArrayList<TimestampObject>(this.numInputObjects);
this.timestampObjectsList.add(resultList);
Runnable workerRunnable = this.buildPipeline(producerPipeline, resultList);
this.workerThreads[i] = new Thread(workerRunnable);
}
}
private Pipeline<Void, TimestampObject> buildProducerPipeline(final int numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) {
final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(numInputObjects, inputObjectCreator);
this.distributor = new Distributor<TimestampObject>();
final Pipeline<Void, TimestampObject> pipeline = new Pipeline<Void, TimestampObject>();
pipeline.setFirstStage(objectProducer);
pipeline.setLastStage(this.distributor);
UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), this.distributor.getInputPort());
return pipeline;
}
/**
* @param numNoopFilters
* @since 1.10
*/
private Runnable buildPipeline(final StageWithPort<Void, TimestampObject> previousStage, final List<TimestampObject> timestampObjects) {
Relay<TimestampObject> relay = new Relay<TimestampObject>();
@SuppressWarnings("unchecked")
final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters];
// create stages
final StartTimestampFilter startTimestampFilter = new StartTimestampFilter();
for (int i = 0; i < noopFilters.length; i++) {
noopFilters[i] = new NoopFilter<TimestampObject>();
}
final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter();
final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects);
final Pipeline<TimestampObject, Object> pipeline = new Pipeline<TimestampObject, Object>();
pipeline.setFirstStage(relay);
pipeline.addIntermediateStage(startTimestampFilter);
pipeline.addIntermediateStages(noopFilters);
pipeline.addIntermediateStage(stopTimestampFilter);
pipeline.setLastStage(collectorSink);
SpScPipe.connect(previousStage.getOutputPort(), relay.getInputPort());
UnorderedGrowablePipe.connect(relay.getOutputPort(), startTimestampFilter.getInputPort());
UnorderedGrowablePipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort());
for (int i = 0; i < noopFilters.length - 1; i++) {
UnorderedGrowablePipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort());
}
UnorderedGrowablePipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
UnorderedGrowablePipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort());
return new RunnableStage(pipeline);
}
@Override
public void start() {
super.start();
for (Thread workerThread : this.workerThreads) {
workerThread.start();
}
try {
for (Thread workerThread : this.workerThreads) {
workerThread.join();
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void setInput(final int numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) {
this.numInputObjects = numInputObjects;
this.inputObjectCreator = inputObjectCreator;
}
public int getNumNoopFilters() {
return this.numNoopFilters;
}
public void setNumNoopFilters(final int numNoopFilters) {
this.numNoopFilters = numNoopFilters;
}
public List<List<TimestampObject>> getTimestampObjectsList() {
return this.timestampObjectsList;
}
public int getNumWorkerThreads() {
return this.numWorkerThreads;
}
public void setNumWorkerThreads(final int numWorkerThreads) {
this.numWorkerThreads = numWorkerThreads;
}
}
......@@ -61,7 +61,7 @@ public final class Distributor<T> extends AbstractStage<T, T> {
int sizeInPow2 = Pow2.findNextPositivePowerOfTwo(this.size); // is not necessary so far
this.outputPorts = this.outputPortList.toArray(new OutputPort[sizeInPow2]);
System.out.println("outputPorts: " + this.outputPorts);
// System.out.println("outputPorts: " + this.outputPorts);
}
@Override
......
package teetime.examples.throughput.methodcall.stage;
import java.util.ArrayList;
import java.util.List;
import teetime.examples.throughput.methodcall.InputPort;
import teetime.util.concurrent.spsc.Pow2;
import teetime.util.list.CommittableQueue;
public class Merger<T> extends AbstractStage<T, T> {
// TODO do not inherit from AbstractStage since it provides the default input port that is unnecessary for the merger
private final List<InputPort<T>> inputPortList = new ArrayList<InputPort<T>>();
private int nextInputPortIndex;
private int size;
private InputPort<T>[] inputPorts;
@Override
public T execute(final Object element) {
// TODO Auto-generated method stub
return null;
}
@Override
protected void execute4(final CommittableQueue<T> elements) {
// TODO Auto-generated method stub
}
@Override
protected void execute5(final T element) {
this.send(element);
}
@Override
public void executeWithPorts() {
InputPort<T> inputPort = this.inputPorts[this.nextInputPortIndex % this.size];
T element = inputPort.receive();
// if (element == null) {
// return;
// }
this.nextInputPortIndex++;
InputPort<T> nextInputPort = this.inputPorts[this.nextInputPortIndex % this.size];
this.setReschedulable(nextInputPort.getPipe().size() > 0);
this.execute5(element);
}
@SuppressWarnings("unchecked")
@Override
public void onStart() {
this.size = this.inputPortList.size();
// this.mask = this.size - 1;
int sizeInPow2 = Pow2.findNextPositivePowerOfTwo(this.size); // is not necessary so far
this.inputPorts = this.inputPortList.toArray(new InputPort[sizeInPow2]);
// System.out.println("inputPorts: " + this.inputPorts);
}
@Override
public InputPort<T> getInputPort() {
return this.getNewInputPort();
}
private InputPort<T> getNewInputPort() {
InputPort<T> inputPort = new InputPort<T>();
this.inputPortList.add(inputPort);
return inputPort;
}
@Override
public void onIsPipelineHead() {
// TODO Auto-generated method stub
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment