Skip to content
Snippets Groups Projects
Commit 59711614 authored by Christian Wulf's avatar Christian Wulf
Browse files

added performance test for stage with port

parent 0f98dac4
No related branches found
No related tags found
No related merge requests found
Showing
with 286 additions and 31 deletions
......@@ -43,9 +43,6 @@ public class MethodCallThoughputTimestampAnalysis8Test {
System.setProperty(LogFactory.CUSTOM_LOGGER_JVM, "NONE");
}
// 500 times faster than our new framework
// TODO check why
@Test
public void testWithManyObjects() {
System.out.println("Testing teetime (mc) with NUM_OBJECTS_TO_CREATE=" + NUM_OBJECTS_TO_CREATE + ", NUM_NOOP_FILTERS="
......
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.examples.throughput;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import org.junit.Before;
import org.junit.Test;
import teetime.examples.throughput.methodcall.MethodCallThroughputAnalysis9;
import teetime.util.StatisticsUtil;
import teetime.util.StopWatch;
import kieker.common.logging.LogFactory;
/**
* @author Christian Wulf
*
* @since 1.10
*/
public class MethodCallThoughputTimestampAnalysis9Test {
private static final int NUM_OBJECTS_TO_CREATE = 100000;
private static final int NUM_NOOP_FILTERS = 800;
@Before
public void before() {
System.setProperty(LogFactory.CUSTOM_LOGGER_JVM, "NONE");
}
@Test
public void testWithManyObjects() {
System.out.println("Testing teetime (mc) with NUM_OBJECTS_TO_CREATE=" + NUM_OBJECTS_TO_CREATE + ", NUM_NOOP_FILTERS="
+ NUM_NOOP_FILTERS + "...");
final StopWatch stopWatch = new StopWatch();
final List<TimestampObject> timestampObjects = new ArrayList<TimestampObject>(NUM_OBJECTS_TO_CREATE);
final MethodCallThroughputAnalysis9 analysis = new MethodCallThroughputAnalysis9();
analysis.setNumNoopFilters(NUM_NOOP_FILTERS);
analysis.setTimestampObjects(timestampObjects);
analysis.setInput(NUM_OBJECTS_TO_CREATE, new Callable<TimestampObject>() {
@Override
public TimestampObject call() throws Exception {
return new TimestampObject();
}
});
analysis.init();
stopWatch.start();
try {
analysis.start();
} finally {
stopWatch.end();
}
StatisticsUtil.printStatistics(stopWatch.getDurationInNs(), timestampObjects);
}
}
......@@ -3,10 +3,10 @@ package teetime.examples.throughput.methodcall;
import teetime.util.list.CommittableQueue;
import teetime.util.list.CommittableResizableArrayQueue;
abstract class AbstractStage<I, O> implements Stage<I, O> {
abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
// private final InputPort<I> inputPort = new InputPort<I>();
// private final OutputPort<O> outputPort = new OutputPort<O>();
private final InputPort<I> inputPort = new InputPort<I>();
private final OutputPort<O> outputPort = new OutputPort<O>();
protected final CommittableQueue<O> outputElements = new CommittableResizableArrayQueue<O>(null, 4);
......@@ -22,15 +22,15 @@ abstract class AbstractStage<I, O> implements Stage<I, O> {
private boolean reschedulable;
// @Override
// public InputPort<I> getInputPort() {
// return this.inputPort;
// }
@Override
public InputPort<I> getInputPort() {
return this.inputPort;
}
// @Override
// public OutputPort<O> getOutputPort() {
// return this.outputPort;
// }
@Override
public OutputPort<O> getOutputPort() {
return this.outputPort;
}
@Override
public CommittableQueue<O> execute2(final CommittableQueue<I> elements) {
......@@ -57,10 +57,10 @@ abstract class AbstractStage<I, O> implements Stage<I, O> {
return this.outputElements;
}
// protected abstract void execute3();
protected abstract void execute4(CommittableQueue<I> elements);
protected abstract void execute5(I element);
protected final void send(final O element) {
this.outputElements.addToTailUncommitted(element);
......
......@@ -60,6 +60,11 @@ public class CollectorSink<T> extends ConsumerStage<T, Object> {
@Override
protected void execute4(final CommittableQueue<T> elements) {
T element = elements.removeFromHead();
this.execute5(element);
}
@Override
protected void execute5(final T element) {
this.elements.add(element);
if ((this.elements.size() % THRESHOLD) == 0) {
System.out.println("size: " + this.elements.size());
......
......@@ -17,4 +17,10 @@ public abstract class ConsumerStage<I, O> extends AbstractStage<I, O> {
return output;
}
@Override
public void executeWithPorts() {
I element = this.getInputPort().receive();
this.execute5(element);
}
}
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.examples.throughput.methodcall;
import java.util.List;
import java.util.concurrent.Callable;
import teetime.examples.throughput.TimestampObject;
import teetime.framework.core.Analysis;
/**
* @author Christian Wulf
*
* @since 1.10
*/
public class MethodCallThroughputAnalysis9 extends Analysis {
private long numInputObjects;
private Callable<TimestampObject> inputObjectCreator;
private int numNoopFilters;
private List<TimestampObject> timestampObjects;
private Runnable runnable;
@Override
public void init() {
super.init();
this.runnable = this.buildPipeline();
}
/**
* @param numNoopFilters
* @since 1.10
*/
private Runnable buildPipeline() {
@SuppressWarnings("unchecked")
final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters];
// create stages
final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(this.numInputObjects, this.inputObjectCreator);
final StartTimestampFilter startTimestampFilter = new StartTimestampFilter();
for (int i = 0; i < noopFilters.length; i++) {
noopFilters[i] = new NoopFilter<TimestampObject>();
}
final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter();
final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(this.timestampObjects);
final Pipeline<Void, Object> pipeline = new Pipeline<Void, Object>();
pipeline.setFirstStage(objectProducer);
pipeline.addIntermediateStage(startTimestampFilter);
pipeline.addIntermediateStages(noopFilters);
pipeline.addIntermediateStage(stopTimestampFilter);
pipeline.setLastStage(collectorSink);
Pipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort());
Pipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort());
for (int i = 0; i < noopFilters.length - 1; i++) {
Pipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort());
}
Pipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
Pipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort());
pipeline.onStart();
// pipeline.getInputPort().pipe = new Pipe<Void>();
// pipeline.getInputPort().pipe.add(new Object());
// pipeline.getOutputPort().pipe = new Pipe<Void>();
final Runnable runnable = new Runnable() {
@Override
public void run() {
do {
pipeline.executeWithPorts();
} while (pipeline.getSchedulingInformation().isActive() && pipeline.isReschedulable());
}
};
return runnable;
}
@Override
public void start() {
super.start();
this.runnable.run();
}
public void setInput(final int numInputObjects, final Callable<TimestampObject> inputObjectCreator) {
this.numInputObjects = numInputObjects;
this.inputObjectCreator = inputObjectCreator;
}
public int getNumNoopFilters() {
return this.numNoopFilters;
}
public void setNumNoopFilters(final int numNoopFilters) {
this.numNoopFilters = numNoopFilters;
}
public List<TimestampObject> getTimestampObjects() {
return this.timestampObjects;
}
public void setTimestampObjects(final List<TimestampObject> timestampObjects) {
this.timestampObjects = timestampObjects;
}
}
......@@ -39,6 +39,11 @@ public class NoopFilter<T> extends ConsumerStage<T, T> {
@Override
protected void execute4(final CommittableQueue<T> elements) {
T element = elements.removeFromHead();
this.execute5(element);
}
@Override
protected void execute5(final T element) {
this.send(element);
}
......
......@@ -89,6 +89,11 @@ public class ObjectProducer<T> extends ProducerStage<Void, T> {
@Override
protected void execute4(final CommittableQueue<Void> elements) {
this.execute5(null);
}
@Override
protected void execute5(final Void element) {
if (this.numInputObjects == 0) {
this.setReschedulable(false);
return;
......
......@@ -6,6 +6,12 @@ public class Pipe<T> {
private final CommittableResizableArrayQueue<T> elements = new CommittableResizableArrayQueue<T>(null, 4);
public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
Pipe<T> pipe = new Pipe<T>();
sourcePort.pipe = pipe;
targetPort.pipe = pipe;
}
public void add(final T element) {
this.elements.addToTailUncommitted(element);
this.elements.commit();
......
......@@ -6,15 +6,15 @@ import java.util.List;
import teetime.util.list.CommittableQueue;
public class Pipeline<I, O> implements Stage<I, O>, OnDisableListener {
public class Pipeline<I, O> implements StageWithPort<I, O>, OnDisableListener {
private Stage firstStage;
private final List<Stage> intermediateStages = new LinkedList<Stage>();
private Stage lastStage;
private StageWithPort firstStage;
private final List<StageWithPort> intermediateStages = new LinkedList<StageWithPort>();
private StageWithPort lastStage;
private final SchedulingInformation schedulingInformation = new SchedulingInformation();
private Stage[] stages;
private StageWithPort[] stages;
private Stage parentStage;
private int index;
private int startIndex;
......@@ -22,19 +22,19 @@ public class Pipeline<I, O> implements Stage<I, O>, OnDisableListener {
private boolean reschedulable;
void setFirstStage(final Stage<I, ?> stage) {
void setFirstStage(final StageWithPort<I, ?> stage) {
this.firstStage = stage;
}
void addIntermediateStages(final Stage... stages) {
void addIntermediateStages(final StageWithPort... stages) {
this.intermediateStages.addAll(Arrays.asList(stages));
}
void addIntermediateStage(final Stage stage) {
void addIntermediateStage(final StageWithPort stage) {
this.intermediateStages.add(stage);
}
void setLastStage(final Stage<?, O> stage) {
void setLastStage(final StageWithPort<?, O> stage) {
this.lastStage = stage;
}
......@@ -59,6 +59,13 @@ public class Pipeline<I, O> implements Stage<I, O>, OnDisableListener {
return queue;
}
@Override
public void executeWithPorts() {
this.stages[0].executeWithPorts();
this.setReschedulable(this.stages[0].isReschedulable());
}
void onStart() {
// Pipe pipe = new Pipe();
// this.outputPort.pipe = pipe;
......@@ -82,10 +89,10 @@ public class Pipeline<I, O> implements Stage<I, O>, OnDisableListener {
// this.lastStage.getInputPort().pipe = pipe;
int size = 1 + this.intermediateStages.size() + 1;
this.stages = new Stage[size];
this.stages = new StageWithPort[size];
this.stages[0] = this.firstStage;
for (int i = 0; i < this.intermediateStages.size(); i++) {
Stage<?, ?> stage = this.intermediateStages.get(i);
StageWithPort<?, ?> stage = this.intermediateStages.get(i);
this.stages[1 + i] = stage;
}
this.stages[this.stages.length - 1] = this.lastStage;
......@@ -176,6 +183,16 @@ public class Pipeline<I, O> implements Stage<I, O>, OnDisableListener {
this.reschedulable = reschedulable;
}
@Override
public InputPort<I> getInputPort() {
return this.firstStage.getInputPort();
}
@Override
public OutputPort<O> getOutputPort() {
return this.lastStage.getOutputPort();
}
// @Override
// public OutputPort getOutputPort() {
// return this.lastStage.getOutputPort();
......
......@@ -20,4 +20,9 @@ public abstract class ProducerStage<I, O> extends AbstractStage<I, O> {
return outputElements;
}
@Override
public void executeWithPorts() {
this.execute5(null);
}
}
......@@ -10,14 +10,10 @@ public interface Stage<I, O> {
// CommittableQueue<O> execute2();
// InputPort<I> getInputPort();
CommittableQueue<O> execute2(CommittableQueue<I> elements);
SchedulingInformation getSchedulingInformation();
// OutputPort<O> getOutputPort();
Stage getParentStage();
void setParentStage(Stage parentStage, int index);
......
package teetime.examples.throughput.methodcall;
public interface StageWithPort<I, O> extends Stage<I, O> {
InputPort<I> getInputPort();
OutputPort<O> getOutputPort();
void executeWithPorts();
}
......@@ -42,6 +42,11 @@ public class StartTimestampFilter extends ConsumerStage<TimestampObject, Timesta
@Override
protected void execute4(final CommittableQueue<TimestampObject> elements) {
TimestampObject element = elements.removeFromHead();
this.execute5(element);
}
@Override
protected void execute5(final TimestampObject element) {
element.setStartTimestamp(System.nanoTime());
this.send(element);
}
......
......@@ -42,6 +42,11 @@ public class StopTimestampFilter extends ConsumerStage<TimestampObject, Timestam
@Override
protected void execute4(final CommittableQueue<TimestampObject> elements) {
TimestampObject element = elements.removeFromHead();
this.execute5(element);
}
@Override
protected void execute5(final TimestampObject element) {
element.setStopTimestamp(System.nanoTime());
this.send(element);
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment