Skip to content
Snippets Groups Projects
Commit 9ed1d6bf authored by Christian Wulf's avatar Christian Wulf
Browse files

added close concept

parent 43f333e4
No related branches found
No related tags found
No related merge requests found
Showing
with 315 additions and 39 deletions
......@@ -10,6 +10,7 @@
-foreach vs. index-based iteration
-iterative vs. recursive execution
-null-check vs. NullObject
-AbstractPipe vs. IPipe
-
......@@ -29,3 +30,5 @@
12: 3300 ns (recursive; argument/return w/o pipe)
13: 3300 ns (recursive; argument/return w/o pipe; w/o pipeline class)
14: 21,000 ns (spsc pipe)
16: 14,500 ns (with distributor thread)
17: 9800 ns (as 16, but with direct feeding of SpScPipe)
......@@ -28,10 +28,10 @@ public class Analysis {
}
public void start() {
// System.out.println("Analysis started.");
}
public void onTerminate() {
// System.out.println("Analysis stopped.");
}
}
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.examples.throughput;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import org.junit.Before;
import org.junit.Test;
import teetime.examples.throughput.methodcall.MethodCallThroughputAnalysis17;
import teetime.util.StatisticsUtil;
import teetime.util.StopWatch;
import kieker.common.logging.LogFactory;
/**
* @author Christian Wulf
*
* @since 1.10
*/
public class MethodCallThoughputTimestampAnalysis17Test {
private static final int NUM_OBJECTS_TO_CREATE = 100000;
private static final int NUM_NOOP_FILTERS = 800;
@Before
public void before() {
System.setProperty(LogFactory.CUSTOM_LOGGER_JVM, "NONE");
}
@Test
public void testWithManyObjects() {
System.out.println("Testing teetime (mc) with NUM_OBJECTS_TO_CREATE=" + NUM_OBJECTS_TO_CREATE + ", NUM_NOOP_FILTERS="
+ NUM_NOOP_FILTERS + "...");
final StopWatch stopWatch = new StopWatch();
final MethodCallThroughputAnalysis17 analysis = new MethodCallThroughputAnalysis17();
analysis.setNumNoopFilters(NUM_NOOP_FILTERS);
analysis.setInput(NUM_OBJECTS_TO_CREATE, new Callable<TimestampObject>() {
@Override
public TimestampObject call() throws Exception {
return new TimestampObject();
}
});
analysis.init();
stopWatch.start();
try {
analysis.start();
} finally {
stopWatch.end();
analysis.onTerminate();
}
// merge
List<TimestampObject> timestampObjects = analysis.getTimestampObjectsList().get(0);
for (int i = 1; i < analysis.getTimestampObjectsList().size(); i++) {
Collection<? extends TimestampObject> timestampObjectList = analysis.getTimestampObjectsList().get(i);
timestampObjects.addAll(timestampObjectList);
}
StatisticsUtil.printStatistics(stopWatch.getDurationInNs(), timestampObjects);
}
}
package teetime.examples.throughput.methodcall;
import java.util.concurrent.atomic.AtomicBoolean;
public abstract class AbstractPipe<T> implements IPipe<T> {
private final AtomicBoolean closed = new AtomicBoolean();
@Override
public boolean isClosed() {
return this.closed.get();
}
@Override
public void close() {
this.closed.lazySet(true); // lazySet is legal due to our single-writer requirement
}
}
......@@ -12,4 +12,8 @@ public interface IPipe<T> {
public abstract T readLast();
public abstract void close();
public abstract boolean isClosed();
}
......@@ -13,4 +13,5 @@ public class InputPort<T> {
T element = this.pipe.readLast();
return element;
}
}
......@@ -77,23 +77,7 @@ public class MethodCallThroughputAnalysis10 extends Analysis {
SingleElementPipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
SingleElementPipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort());
pipeline.onStart();
// pipeline.getInputPort().pipe = new Pipe<Void>();
// pipeline.getInputPort().pipe.add(new Object());
// pipeline.getOutputPort().pipe = new Pipe<Void>();
final Runnable runnable = new Runnable() {
@Override
public void run() {
do {
pipeline.executeWithPorts();
} while (pipeline.getSchedulingInformation().isActive() && pipeline.isReschedulable());
}
};
return runnable;
return new RunnableStage(pipeline);
}
@Override
......
......@@ -114,7 +114,7 @@ public class MethodCallThroughputAnalysis16 extends Analysis {
pipeline.addIntermediateStage(stopTimestampFilter);
pipeline.setLastStage(collectorSink);
OrderedGrowableArrayPipe.connect(distributor.getNewOutputPort(), relay.getInputPort());
SpScPipe.connect(distributor.getNewOutputPort(), relay.getInputPort());
UnorderedGrowablePipe.connect(relay.getOutputPort(), startTimestampFilter.getInputPort());
......
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.examples.throughput.methodcall;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import teetime.examples.throughput.TimestampObject;
import teetime.examples.throughput.methodcall.stage.CollectorSink;
import teetime.examples.throughput.methodcall.stage.Distributor;
import teetime.examples.throughput.methodcall.stage.NoopFilter;
import teetime.examples.throughput.methodcall.stage.Pipeline;
import teetime.examples.throughput.methodcall.stage.Relay;
import teetime.examples.throughput.methodcall.stage.StartTimestampFilter;
import teetime.examples.throughput.methodcall.stage.StopTimestampFilter;
import teetime.framework.core.Analysis;
/**
* @author Christian Wulf
*
* @since 1.10
*/
public class MethodCallThroughputAnalysis17 extends Analysis {
private static final int NUM_WORKER_THREADS = Runtime.getRuntime().availableProcessors();
private int numInputObjects;
private Callable<TimestampObject> inputObjectCreator;
private int numNoopFilters;
private final List<List<TimestampObject>> timestampObjectsList = new LinkedList<List<TimestampObject>>();
private Distributor<TimestampObject> distributor;
private Thread producerThread;
private Thread[] workerThreads;
@Override
public void init() {
super.init();
// Runnable producerRunnable = this.buildProducerPipeline();
// this.producerThread = new Thread(producerRunnable);
int numWorkerThreads = Math.min(NUM_WORKER_THREADS, 1); // only for testing purpose
this.workerThreads = new Thread[numWorkerThreads];
for (int i = 0; i < this.workerThreads.length; i++) {
List<TimestampObject> resultList = new ArrayList<TimestampObject>(this.numInputObjects);
this.timestampObjectsList.add(resultList);
Runnable workerRunnable = this.buildPipeline(this.distributor, resultList);
this.workerThreads[i] = new Thread(workerRunnable);
}
// this.producerThread.start();
//
// try {
// this.producerThread.join();
// } catch (InterruptedException e1) {
// // TODO Auto-generated catch block
// e1.printStackTrace();
// }
}
// private Runnable buildProducerPipeline() {
// final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(this.numInputObjects, this.inputObjectCreator);
// this.distributor = new Distributor<TimestampObject>();
//
// final Pipeline<Void, TimestampObject> pipeline = new Pipeline<Void, TimestampObject>();
// pipeline.setFirstStage(objectProducer);
// pipeline.setLastStage(this.distributor);
//
// UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), this.distributor.getInputPort());
//
// return new RunnableStage(pipeline);
// }
/**
* @param numNoopFilters
* @since 1.10
*/
private Runnable buildPipeline(final Distributor<TimestampObject> distributor, final List<TimestampObject> timestampObjects) {
Relay<TimestampObject> relay = new Relay<TimestampObject>();
@SuppressWarnings("unchecked")
final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters];
// create stages
final StartTimestampFilter startTimestampFilter = new StartTimestampFilter();
for (int i = 0; i < noopFilters.length; i++) {
noopFilters[i] = new NoopFilter<TimestampObject>();
}
final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter();
final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects);
final Pipeline<TimestampObject, Object> pipeline = new Pipeline<TimestampObject, Object>();
pipeline.setFirstStage(relay);
pipeline.addIntermediateStage(startTimestampFilter);
pipeline.addIntermediateStages(noopFilters);
pipeline.addIntermediateStage(stopTimestampFilter);
pipeline.setLastStage(collectorSink);
IPipe<TimestampObject> startPipe = new SpScPipe<TimestampObject>();
try {
for (int i = 0; i < this.numInputObjects; i++) {
startPipe.add(this.inputObjectCreator.call());
}
startPipe.close();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
relay.getInputPort().pipe = startPipe;
UnorderedGrowablePipe.connect(relay.getOutputPort(), startTimestampFilter.getInputPort());
UnorderedGrowablePipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort());
for (int i = 0; i < noopFilters.length - 1; i++) {
UnorderedGrowablePipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort());
}
UnorderedGrowablePipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
UnorderedGrowablePipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort());
return new RunnableStage(pipeline);
}
@Override
public void start() {
super.start();
for (Thread workerThread : this.workerThreads) {
workerThread.start();
}
try {
for (Thread workerThread : this.workerThreads) {
workerThread.join();
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void setInput(final int numInputObjects, final Callable<TimestampObject> inputObjectCreator) {
this.numInputObjects = numInputObjects;
this.inputObjectCreator = inputObjectCreator;
}
public int getNumNoopFilters() {
return this.numNoopFilters;
}
public void setNumNoopFilters(final int numNoopFilters) {
this.numNoopFilters = numNoopFilters;
}
public List<List<TimestampObject>> getTimestampObjectsList() {
return this.timestampObjectsList;
}
}
......@@ -2,7 +2,7 @@ package teetime.examples.throughput.methodcall;
import teetime.util.concurrent.workstealing.CircularArray;
public class OrderedGrowableArrayPipe<T> implements IPipe<T> {
public class OrderedGrowableArrayPipe<T> extends AbstractPipe<T> {
private CircularArray<T> elements;
private int head;
......
......@@ -2,7 +2,7 @@ package teetime.examples.throughput.methodcall;
import java.util.LinkedList;
public class OrderedGrowablePipe<T> implements IPipe<T> {
public class OrderedGrowablePipe<T> extends AbstractPipe<T> {
private LinkedList<T> elements;
......@@ -22,12 +22,12 @@ public class OrderedGrowablePipe<T> implements IPipe<T> {
@Override
public void add(final T element) {
this.elements.add(element);
this.elements.offer(element);
}
@Override
public T removeLast() {
return this.elements.removeFirst();
return this.elements.poll();
}
@Override
......@@ -37,7 +37,7 @@ public class OrderedGrowablePipe<T> implements IPipe<T> {
@Override
public T readLast() {
return this.elements.getFirst();
return this.elements.peek();
}
@Override
......
......@@ -2,7 +2,7 @@ package teetime.examples.throughput.methodcall;
import teetime.util.list.CommittableResizableArrayQueue;
public class Pipe<T> implements IPipe<T> {
public class Pipe<T> extends AbstractPipe<T> {
private final CommittableResizableArrayQueue<T> elements = new CommittableResizableArrayQueue<T>(null, 4);
......
package teetime.examples.throughput.methodcall;
public class SingleElementPipe<T> implements IPipe<T> {
//public class SingleElementPipe<T> implements IPipe<T> {
public class SingleElementPipe<T> extends AbstractPipe<T> {
private T element;
......@@ -37,4 +38,16 @@ public class SingleElementPipe<T> implements IPipe<T> {
return (this.element == null) ? 0 : 1;
}
// @Override
// public void close() {
//
//
// }
//
// @Override
// public boolean isClosed() {
//
// return false;
// }
}
......@@ -2,7 +2,7 @@ package teetime.examples.throughput.methodcall;
import teetime.util.concurrent.spsc.FFBufferOrdered3;
public class SpScPipe<T> implements IPipe<T> {
public class SpScPipe<T> extends AbstractPipe<T> {
private final FFBufferOrdered3<T> queue = new FFBufferOrdered3<T>(100010);
......
......@@ -4,8 +4,6 @@ import teetime.util.list.CommittableQueue;
public interface Stage<I, O> {
public static final Object END_SIGNAL = new Object();
Object executeRecursively(Object element);
O execute(Object element);
......
package teetime.examples.throughput.methodcall;
public class UnorderedGrowablePipe<T> implements IPipe<T> {
public class UnorderedGrowablePipe<T> extends AbstractPipe<T> {
// private static final class ArrayWrapper2<T> {
//
......
......@@ -17,7 +17,7 @@ public class Distributor<T> extends ConsumerStage<T, T> {
private int size;
private int mask;
// private int mask;
@Override
public T execute(final Object element) {
......@@ -40,9 +40,9 @@ public class Distributor<T> extends ConsumerStage<T, T> {
@Override
public void onIsPipelineHead() {
for (OutputPort op : this.outputPorts) {
op.send(END_SIGNAL);
System.out.println("End signal sent, size: " + op.pipe.size() + ", end signal:" + (op.pipe.readLast() == END_SIGNAL));
for (OutputPort<?> op : this.outputPorts) {
op.pipe.close();
System.out.println("End signal sent, size: " + op.pipe.size());
}
}
......
......@@ -12,10 +12,11 @@ public class Relay<T> extends AbstractStage<T, T> {
public void executeWithPorts() {
T element = this.getInputPort().receive();
if (null == element) {
return;
} else if (END_SIGNAL == element) {
this.setReschedulable(false);
System.out.println("got end signal; pipe.size: " + this.getInputPort().pipe.size());
if (this.getInputPort().pipe.isClosed()) {
this.setReschedulable(false);
System.out.println("got end signal; pipe.size: " + this.getInputPort().pipe.size());
return;
}
return;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment