From f9ab1a67e8b231a0fec0f363fd63a77855616232 Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Wed, 18 Jun 2014 21:11:53 +0200
Subject: [PATCH] added Merger

---
 .../MethodCallThroughputAnalysis18.java       | 166 ++++++++++++++++++
 .../methodcall/stage/Distributor.java         |   2 +-
 .../throughput/methodcall/stage/Merger.java   |  79 +++++++++
 3 files changed, 246 insertions(+), 1 deletion(-)
 create mode 100644 src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis18.java
 create mode 100644 src/test/java/teetime/examples/throughput/methodcall/stage/Merger.java

diff --git a/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis18.java b/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis18.java
new file mode 100644
index 00000000..2f03f2e4
--- /dev/null
+++ b/src/test/java/teetime/examples/throughput/methodcall/MethodCallThroughputAnalysis18.java
@@ -0,0 +1,166 @@
+/***************************************************************************
+ * Copyright 2014 Kieker Project (http://kieker-monitoring.net)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ***************************************************************************/
+package teetime.examples.throughput.methodcall;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import teetime.examples.throughput.TimestampObject;
+import teetime.examples.throughput.methodcall.stage.CollectorSink;
+import teetime.examples.throughput.methodcall.stage.Distributor;
+import teetime.examples.throughput.methodcall.stage.NoopFilter;
+import teetime.examples.throughput.methodcall.stage.ObjectProducer;
+import teetime.examples.throughput.methodcall.stage.Pipeline;
+import teetime.examples.throughput.methodcall.stage.Relay;
+import teetime.examples.throughput.methodcall.stage.StartTimestampFilter;
+import teetime.examples.throughput.methodcall.stage.StopTimestampFilter;
+import teetime.framework.core.Analysis;
+
+/**
+ * @author Christian Wulf
+ * 
+ * @since 1.10
+ */
+public class MethodCallThroughputAnalysis18 extends Analysis {
+
+	private static final int NUM_WORKER_THREADS = Runtime.getRuntime().availableProcessors();
+
+	private int numInputObjects;
+	private ConstructorClosure<TimestampObject> inputObjectCreator;
+	private int numNoopFilters;
+
+	private final List<List<TimestampObject>> timestampObjectsList = new LinkedList<List<TimestampObject>>();
+
+	private Distributor<TimestampObject> distributor;
+	private Thread producerThread;
+
+	private Thread[] workerThreads;
+
+	private int numWorkerThreads;
+
+	@Override
+	public void init() {
+		super.init();
+		Pipeline<Void, TimestampObject> producerPipeline = this.buildProducerPipeline(this.numInputObjects, this.inputObjectCreator);
+		this.producerThread = new Thread(new RunnableStage(producerPipeline));
+
+		this.numWorkerThreads = Math.min(NUM_WORKER_THREADS, this.numWorkerThreads);
+
+		this.workerThreads = new Thread[this.numWorkerThreads];
+		for (int i = 0; i < this.workerThreads.length; i++) {
+			List<TimestampObject> resultList = new ArrayList<TimestampObject>(this.numInputObjects);
+			this.timestampObjectsList.add(resultList);
+
+			Runnable workerRunnable = this.buildPipeline(producerPipeline, resultList);
+			this.workerThreads[i] = new Thread(workerRunnable);
+		}
+	}
+
+	private Pipeline<Void, TimestampObject> buildProducerPipeline(final int numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) {
+		final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(numInputObjects, inputObjectCreator);
+		this.distributor = new Distributor<TimestampObject>();
+
+		final Pipeline<Void, TimestampObject> pipeline = new Pipeline<Void, TimestampObject>();
+		pipeline.setFirstStage(objectProducer);
+		pipeline.setLastStage(this.distributor);
+
+		UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), this.distributor.getInputPort());
+
+		return pipeline;
+	}
+
+	/**
+	 * @param numNoopFilters
+	 * @since 1.10
+	 */
+	private Runnable buildPipeline(final StageWithPort<Void, TimestampObject> previousStage, final List<TimestampObject> timestampObjects) {
+		Relay<TimestampObject> relay = new Relay<TimestampObject>();
+		@SuppressWarnings("unchecked")
+		final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters];
+		// create stages
+		final StartTimestampFilter startTimestampFilter = new StartTimestampFilter();
+		for (int i = 0; i < noopFilters.length; i++) {
+			noopFilters[i] = new NoopFilter<TimestampObject>();
+		}
+		final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter();
+		final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects);
+
+		final Pipeline<TimestampObject, Object> pipeline = new Pipeline<TimestampObject, Object>();
+		pipeline.setFirstStage(relay);
+		pipeline.addIntermediateStage(startTimestampFilter);
+		pipeline.addIntermediateStages(noopFilters);
+		pipeline.addIntermediateStage(stopTimestampFilter);
+		pipeline.setLastStage(collectorSink);
+
+		SpScPipe.connect(previousStage.getOutputPort(), relay.getInputPort());
+
+		UnorderedGrowablePipe.connect(relay.getOutputPort(), startTimestampFilter.getInputPort());
+
+		UnorderedGrowablePipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort());
+		for (int i = 0; i < noopFilters.length - 1; i++) {
+			UnorderedGrowablePipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort());
+		}
+		UnorderedGrowablePipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
+		UnorderedGrowablePipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort());
+
+		return new RunnableStage(pipeline);
+	}
+
+	@Override
+	public void start() {
+		super.start();
+
+		for (Thread workerThread : this.workerThreads) {
+			workerThread.start();
+		}
+
+		try {
+			for (Thread workerThread : this.workerThreads) {
+				workerThread.join();
+			}
+		} catch (InterruptedException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+	}
+
+	public void setInput(final int numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) {
+		this.numInputObjects = numInputObjects;
+		this.inputObjectCreator = inputObjectCreator;
+	}
+
+	public int getNumNoopFilters() {
+		return this.numNoopFilters;
+	}
+
+	public void setNumNoopFilters(final int numNoopFilters) {
+		this.numNoopFilters = numNoopFilters;
+	}
+
+	public List<List<TimestampObject>> getTimestampObjectsList() {
+		return this.timestampObjectsList;
+	}
+
+	public int getNumWorkerThreads() {
+		return this.numWorkerThreads;
+	}
+
+	public void setNumWorkerThreads(final int numWorkerThreads) {
+		this.numWorkerThreads = numWorkerThreads;
+	}
+
+}
diff --git a/src/test/java/teetime/examples/throughput/methodcall/stage/Distributor.java b/src/test/java/teetime/examples/throughput/methodcall/stage/Distributor.java
index 825055df..18e1cc95 100644
--- a/src/test/java/teetime/examples/throughput/methodcall/stage/Distributor.java
+++ b/src/test/java/teetime/examples/throughput/methodcall/stage/Distributor.java
@@ -61,7 +61,7 @@ public final class Distributor<T> extends AbstractStage<T, T> {
 
 		int sizeInPow2 = Pow2.findNextPositivePowerOfTwo(this.size); // is not necessary so far
 		this.outputPorts = this.outputPortList.toArray(new OutputPort[sizeInPow2]);
-		System.out.println("outputPorts: " + this.outputPorts);
+		// System.out.println("outputPorts: " + this.outputPorts);
 	}
 
 	@Override
diff --git a/src/test/java/teetime/examples/throughput/methodcall/stage/Merger.java b/src/test/java/teetime/examples/throughput/methodcall/stage/Merger.java
new file mode 100644
index 00000000..4977757c
--- /dev/null
+++ b/src/test/java/teetime/examples/throughput/methodcall/stage/Merger.java
@@ -0,0 +1,79 @@
+package teetime.examples.throughput.methodcall.stage;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import teetime.examples.throughput.methodcall.InputPort;
+import teetime.util.concurrent.spsc.Pow2;
+import teetime.util.list.CommittableQueue;
+
+public class Merger<T> extends AbstractStage<T, T> {
+
+	// TODO do not inherit from AbstractStage since it provides the default input port that is unnecessary for the merger
+
+	private final List<InputPort<T>> inputPortList = new ArrayList<InputPort<T>>();
+	private int nextInputPortIndex;
+	private int size;
+	private InputPort<T>[] inputPorts;
+
+	@Override
+	public T execute(final Object element) {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	@Override
+	protected void execute4(final CommittableQueue<T> elements) {
+		// TODO Auto-generated method stub
+
+	}
+
+	@Override
+	protected void execute5(final T element) {
+		this.send(element);
+	}
+
+	@Override
+	public void executeWithPorts() {
+		InputPort<T> inputPort = this.inputPorts[this.nextInputPortIndex % this.size];
+		T element = inputPort.receive();
+		// if (element == null) {
+		// return;
+		// }
+
+		this.nextInputPortIndex++;
+		InputPort<T> nextInputPort = this.inputPorts[this.nextInputPortIndex % this.size];
+		this.setReschedulable(nextInputPort.getPipe().size() > 0);
+
+		this.execute5(element);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void onStart() {
+		this.size = this.inputPortList.size();
+		// this.mask = this.size - 1;
+
+		int sizeInPow2 = Pow2.findNextPositivePowerOfTwo(this.size); // is not necessary so far
+		this.inputPorts = this.inputPortList.toArray(new InputPort[sizeInPow2]);
+		// System.out.println("inputPorts: " + this.inputPorts);
+	}
+
+	@Override
+	public InputPort<T> getInputPort() {
+		return this.getNewInputPort();
+	}
+
+	private InputPort<T> getNewInputPort() {
+		InputPort<T> inputPort = new InputPort<T>();
+		this.inputPortList.add(inputPort);
+		return inputPort;
+	}
+
+	@Override
+	public void onIsPipelineHead() {
+		// TODO Auto-generated method stub
+
+	}
+
+}
-- 
GitLab