Skip to content
Snippets Groups Projects
MethodCallThroughputAnalysis17.java 8.15 KiB
Newer Older
Christian Wulf's avatar
Christian Wulf committed
/***************************************************************************
 * Copyright 2014 Kieker Project (http://kieker-monitoring.net)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 ***************************************************************************/
package teetime.examples.throughput.methodcall;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;

import teetime.examples.throughput.TimestampObject;
import teetime.examples.throughput.methodcall.stage.CollectorSink;
import teetime.examples.throughput.methodcall.stage.Distributor;
import teetime.examples.throughput.methodcall.stage.EndStage;
Christian Wulf's avatar
Christian Wulf committed
import teetime.examples.throughput.methodcall.stage.NoopFilter;
import teetime.examples.throughput.methodcall.stage.ObjectProducer;
Christian Wulf's avatar
Christian Wulf committed
import teetime.examples.throughput.methodcall.stage.Pipeline;
import teetime.examples.throughput.methodcall.stage.Relay;
import teetime.examples.throughput.methodcall.stage.Sink;
Christian Wulf's avatar
Christian Wulf committed
import teetime.examples.throughput.methodcall.stage.StartTimestampFilter;
import teetime.examples.throughput.methodcall.stage.StopTimestampFilter;
import teetime.framework.core.Analysis;

/**
 * @author Christian Wulf
 * 
 * @since 1.10
 */
public class MethodCallThroughputAnalysis17 extends Analysis {

	private static final int NUM_WORKER_THREADS = Runtime.getRuntime().availableProcessors();

	private int numInputObjects;
	private Closure<Void, TimestampObject> inputObjectCreator;
Christian Wulf's avatar
Christian Wulf committed
	private int numNoopFilters;

	private final List<List<TimestampObject>> timestampObjectsList = new LinkedList<List<TimestampObject>>();

	private Thread producerThread;
	private Thread[] workerThreads;

	@Override
	public void init() {
		final Pipeline<Void, TimestampObject> producerPipeline = this.buildProducerPipeline(this.numInputObjects, this.inputObjectCreator);
		// this.producerThread = new Thread(new RunnableStage(producerPipeline));
Christian Wulf's avatar
Christian Wulf committed

		int numWorkerThreads = Math.min(NUM_WORKER_THREADS, 1); // only for testing purpose

		this.workerThreads = new Thread[numWorkerThreads];
		for (int i = 0; i < this.workerThreads.length; i++) {
			List<TimestampObject> resultList = new ArrayList<TimestampObject>(this.numInputObjects);
			this.timestampObjectsList.add(resultList);

			Runnable workerRunnable = this.buildPipeline(null, resultList);
Christian Wulf's avatar
Christian Wulf committed
			this.workerThreads[i] = new Thread(workerRunnable);
		}

		// this.producerThread = new Thread(new Runnable() {
		// @Override
		// public void run() {
		// TimestampObject ts;
		// try {
		// ts = MethodCallThroughputAnalysis17.this.inputObjectCreator.call();
		// System.out.println("test" + producerPipeline + ", # filters: " + MethodCallThroughputAnalysis17.this.numNoopFilters + ", ts: "
		// + ts);
		// MethodCallThroughputAnalysis17.this.numInputObjects++;
		// System.out.println("numInputObjects: " + MethodCallThroughputAnalysis17.this.numInputObjects);
		// MethodCallThroughputAnalysis17.this.numInputObjects--;
		// } catch (Exception e) {
		// // TODO Auto-generated catch block
		// e.printStackTrace();
		// }
		// System.out.println("run end");
		// }
		// });

Christian Wulf's avatar
Christian Wulf committed
		// this.producerThread.start();
		// this.producerThread.run();
		new RunnableStage(producerPipeline).run();

		// Pipeline<Void, TimestampObject> stage = producerPipeline;
		// stage.onStart();
		// do {
		// stage.executeWithPorts();
		// } while (stage.isReschedulable());

Christian Wulf's avatar
Christian Wulf committed
		// try {
		// this.producerThread.join();
		// } catch (InterruptedException e1) {
		// // TODO Auto-generated catch block
		// e1.printStackTrace();
		// }

		// try {
		// Thread.sleep(1000);
		// } catch (InterruptedException e) {
		// // TODO Auto-generated catch block
		// e.printStackTrace();
		// }

		super.init();
	private Pipeline<Void, TimestampObject> buildProducerPipeline(final int numInputObjects, final Closure<Void, TimestampObject> inputObjectCreator) {
		final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(numInputObjects, inputObjectCreator);
		Distributor<TimestampObject> distributor = new Distributor<TimestampObject>();
		Sink<TimestampObject> sink = new Sink<TimestampObject>();
		EndStage<Void> endStage = new EndStage<Void>();
		endStage.closure = inputObjectCreator;

		final Pipeline<Void, TimestampObject> pipeline = new Pipeline<Void, TimestampObject>();
		pipeline.setFirstStage(objectProducer);
		// pipeline.setFirstStage(sink);
		// pipeline.setFirstStage(endStage);

		pipeline.setLastStage(distributor);
		// pipeline.setLastStage(sink);
Christian Wulf's avatar
Christian Wulf committed
		// pipeline.setLastStage(new EndStage<TimestampObject>());

		// UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), sink.getInputPort());
		// objectProducer.getOutputPort().pipe = new UnorderedGrowablePipe<TimestampObject>();

		UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), distributor.getInputPort());
		distributor.getOutputPort().pipe = new UnorderedGrowablePipe<TimestampObject>();

		return pipeline;
	}
Christian Wulf's avatar
Christian Wulf committed

	/**
	 * @param numNoopFilters
	 * @since 1.10
	 */
	private Runnable buildPipeline(final StageWithPort<Void, TimestampObject> previousStage, final List<TimestampObject> timestampObjects) {
Christian Wulf's avatar
Christian Wulf committed
		Relay<TimestampObject> relay = new Relay<TimestampObject>();
		@SuppressWarnings("unchecked")
		final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters];
		// create stages
		final StartTimestampFilter startTimestampFilter = new StartTimestampFilter();
		for (int i = 0; i < noopFilters.length; i++) {
			noopFilters[i] = new NoopFilter<TimestampObject>();
		}
		final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter();
		final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects);

		final Pipeline<TimestampObject, Object> pipeline = new Pipeline<TimestampObject, Object>();
		pipeline.setFirstStage(relay);
		pipeline.addIntermediateStage(startTimestampFilter);
		pipeline.addIntermediateStages(noopFilters);
		pipeline.addIntermediateStage(stopTimestampFilter);
		pipeline.setLastStage(collectorSink);

		IPipe<TimestampObject> startPipe = new SpScPipe<TimestampObject>();
		try {
			for (int i = 0; i < this.numInputObjects; i++) {
				startPipe.add(this.inputObjectCreator.execute(null));
Christian Wulf's avatar
Christian Wulf committed
			}
			startPipe.close();
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		relay.getInputPort().pipe = startPipe;
		// previousStage.getOutputPort().pipe = startPipe;
Christian Wulf's avatar
Christian Wulf committed

		UnorderedGrowablePipe.connect(relay.getOutputPort(), startTimestampFilter.getInputPort());

		UnorderedGrowablePipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort());
		for (int i = 0; i < noopFilters.length - 1; i++) {
			UnorderedGrowablePipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort());
		}
		UnorderedGrowablePipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
		UnorderedGrowablePipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort());

		return new RunnableStage(pipeline);
	}

	@Override
	public void start() {
		super.start();

		for (Thread workerThread : this.workerThreads) {
			workerThread.start();
		}

		try {
			for (Thread workerThread : this.workerThreads) {
				workerThread.join();
			}
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	public void setInput(final int numInputObjects, final Closure<Void, TimestampObject> inputObjectCreator) {
Christian Wulf's avatar
Christian Wulf committed
		this.numInputObjects = numInputObjects;
		this.inputObjectCreator = inputObjectCreator;
	}

	public int getNumNoopFilters() {
		return this.numNoopFilters;
	}

	public void setNumNoopFilters(final int numNoopFilters) {
		this.numNoopFilters = numNoopFilters;
	}

	public List<List<TimestampObject>> getTimestampObjectsList() {
		return this.timestampObjectsList;
	}

}