Skip to content
Snippets Groups Projects
Commit 0ceca0ad authored by Christian Wulf's avatar Christian Wulf
Browse files

added Clock and Delay concept

parent a9c43045
No related branches found
No related tags found
No related merge requests found
Showing
with 461 additions and 17 deletions
......@@ -24,6 +24,8 @@
11: 8600 ns (executeWithPorts: fixed sized pipe; with CircularArray(int))
11: 8200 ns (executeWithPorts: fixed sized pipe; with CircularArray(int) w/o mask)
11: 7800 ns (executeWithPorts: fixed sized pipe; with setReschedulable() after each read)
11: 8200 ns (executeWithPorts: fixed sized pipe; with setReschedulable() after each read; non-final elements)
11: 7000 ns (executeWithPorts: fixed sized pipe; with setReschedulable() after each read; non-final elements; pipeline searches for firstStageIndex)
12: 3300 ns (recursive; argument/return w/o pipe)
13: 3300 ns (recursive; argument/return w/o pipe; w/o pipeline class)
14: 21,000 ns (spsc pipe)
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.examples.throughput;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import org.junit.Before;
import org.junit.Test;
import teetime.examples.throughput.methodcall.MethodCallThroughputAnalysis15;
import teetime.util.StatisticsUtil;
import teetime.util.StopWatch;
import kieker.common.logging.LogFactory;
/**
* @author Christian Wulf
*
* @since 1.10
*/
public class MethodCallThoughputTimestampAnalysis15Test {
private static final int NUM_OBJECTS_TO_CREATE = 100000;
private static final int NUM_NOOP_FILTERS = 800;
@Before
public void before() {
System.setProperty(LogFactory.CUSTOM_LOGGER_JVM, "NONE");
}
@Test
public void testWithManyObjects() {
System.out.println("Testing teetime (mc) with NUM_OBJECTS_TO_CREATE=" + NUM_OBJECTS_TO_CREATE + ", NUM_NOOP_FILTERS="
+ NUM_NOOP_FILTERS + "...");
final StopWatch stopWatch = new StopWatch();
final List<TimestampObject> timestampObjects = new ArrayList<TimestampObject>(NUM_OBJECTS_TO_CREATE);
final MethodCallThroughputAnalysis15 analysis = new MethodCallThroughputAnalysis15();
analysis.setNumNoopFilters(NUM_NOOP_FILTERS);
analysis.setTimestampObjects(timestampObjects);
analysis.setInput(NUM_OBJECTS_TO_CREATE, new Callable<TimestampObject>() {
@Override
public TimestampObject call() throws Exception {
return new TimestampObject();
}
});
analysis.init();
stopWatch.start();
try {
analysis.start();
} finally {
stopWatch.end();
}
StatisticsUtil.printStatistics(stopWatch.getDurationInNs(), timestampObjects);
}
}
package teetime.examples.throughput.methodcall;
import teetime.examples.throughput.methodcall.stage.AbstractStage;
import teetime.util.list.CommittableQueue;
public abstract class ConsumerStage<I, O> extends AbstractStage<I, O> {
......@@ -33,4 +34,9 @@ public abstract class ConsumerStage<I, O> extends AbstractStage<I, O> {
// }
}
@Override
public void onIsPipelineHead() {
// do nothing
}
}
......@@ -27,12 +27,18 @@ public class FixedSizedPipe<T> implements IPipe<T> {
// }
//
// }
private final int MIN_CAPACITY;
@SuppressWarnings("unchecked")
private final T[] elements = (T[]) new Object[4];
private T[] elements;
// private final ArrayWrapper2<T> elements = new ArrayWrapper2<T>(2);
private int lastFreeIndex;
@SuppressWarnings("unchecked")
public FixedSizedPipe() {
this.MIN_CAPACITY = 4;
this.elements = (T[]) new Object[this.MIN_CAPACITY];
}
public static <T> void connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort) {
IPipe<T> pipe = new FixedSizedPipe<T>();
sourcePort.pipe = pipe;
......@@ -43,13 +49,7 @@ public class FixedSizedPipe<T> implements IPipe<T> {
public void add(final T element) {
if (this.lastFreeIndex == this.elements.length) {
// if (this.lastFreeIndex == this.elements.getCapacity()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// grow
this.elements = this.grow();
}
this.elements[this.lastFreeIndex++] = element;
// this.elements.put(this.lastFreeIndex++, element);
......@@ -57,8 +57,9 @@ public class FixedSizedPipe<T> implements IPipe<T> {
@Override
public T removeLast() {
return this.elements[--this.lastFreeIndex];
// return this.elements.get(--this.lastFreeIndex);
T element = this.elements[--this.lastFreeIndex];
// T element = this.elements.get(--this.lastFreeIndex);
return element;
}
@Override
......@@ -77,4 +78,24 @@ public class FixedSizedPipe<T> implements IPipe<T> {
return this.lastFreeIndex;
}
private T[] grow() {
int newSize = this.elements.length * 2;
return this.newArray(newSize);
}
// we do not support shrink since it causes too much overhead due to the capacity checks
// private T[] shrink() {
// int newSize = this.elements.length / 2;
// return this.newArray(newSize);
// }
private T[] newArray(final int newSize) {
@SuppressWarnings("unchecked")
T[] newElements = (T[]) new Object[newSize];
System.arraycopy(this.elements, 0, newElements, 0, this.elements.length);
return newElements;
}
}
......@@ -21,6 +21,7 @@ import java.util.List;
import java.util.concurrent.Callable;
import teetime.examples.throughput.TimestampObject;
import teetime.examples.throughput.methodcall.stage.AbstractStage;
import teetime.examples.throughput.methodcall.stage.CollectorSink;
import teetime.examples.throughput.methodcall.stage.EndStage;
import teetime.examples.throughput.methodcall.stage.NoopFilter;
......
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.examples.throughput.methodcall;
import java.util.List;
import java.util.concurrent.Callable;
import teetime.examples.throughput.TimestampObject;
import teetime.examples.throughput.methodcall.stage.Clock;
import teetime.examples.throughput.methodcall.stage.CollectorSink;
import teetime.examples.throughput.methodcall.stage.Delay;
import teetime.examples.throughput.methodcall.stage.EndStage;
import teetime.examples.throughput.methodcall.stage.NoopFilter;
import teetime.examples.throughput.methodcall.stage.ObjectProducer;
import teetime.examples.throughput.methodcall.stage.Pipeline;
import teetime.examples.throughput.methodcall.stage.StartTimestampFilter;
import teetime.examples.throughput.methodcall.stage.StopTimestampFilter;
import teetime.framework.core.Analysis;
/**
* @author Christian Wulf
*
* @since 1.10
*/
public class MethodCallThroughputAnalysis15 extends Analysis {
private long numInputObjects;
private Callable<TimestampObject> inputObjectCreator;
private int numNoopFilters;
private List<TimestampObject> timestampObjects;
private Runnable clockRunnable;
private Runnable runnable;
private Clock clock;
@Override
public void init() {
super.init();
this.clockRunnable = this.buildClockPipeline();
this.runnable = this.buildPipeline(this.clock);
}
private Runnable buildClockPipeline() {
this.clock = new Clock();
this.clock.setInitialDelayInMs(100);
this.clock.setIntervalDelayInMs(100);
final Pipeline<Void, Long> pipeline = new Pipeline<Void, Long>();
pipeline.setFirstStage(this.clock);
pipeline.setLastStage(new EndStage<Long>());
pipeline.onStart();
final Runnable runnable = new Runnable() {
@Override
public void run() {
do {
pipeline.executeWithPorts();
} while (pipeline.isReschedulable());
}
};
return runnable;
}
/**
* @param numNoopFilters
* @since 1.10
*/
private Runnable buildPipeline(final Clock clock) {
@SuppressWarnings("unchecked")
final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters];
// create stages
final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(this.numInputObjects, this.inputObjectCreator);
Delay<TimestampObject> delay = new Delay<TimestampObject>();
final StartTimestampFilter startTimestampFilter = new StartTimestampFilter();
for (int i = 0; i < noopFilters.length; i++) {
noopFilters[i] = new NoopFilter<TimestampObject>();
}
final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter();
final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(this.timestampObjects);
final Pipeline<Void, Object> pipeline = new Pipeline<Void, Object>();
pipeline.setFirstStage(objectProducer);
pipeline.addIntermediateStage(startTimestampFilter);
pipeline.addIntermediateStages(noopFilters);
pipeline.addIntermediateStage(stopTimestampFilter);
pipeline.addIntermediateStage(delay);
pipeline.setLastStage(collectorSink);
SpScPipe.connect(clock.getOutputPort(), delay.getTimestampTriggerInputPort());
FixedSizedPipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort());
FixedSizedPipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort());
for (int i = 0; i < noopFilters.length - 1; i++) {
FixedSizedPipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort());
}
FixedSizedPipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
FixedSizedPipe.connect(stopTimestampFilter.getOutputPort(), delay.getInputPort());
FixedSizedPipe.connect(delay.getOutputPort(), collectorSink.getInputPort());
pipeline.onStart();
// pipeline.getInputPort().pipe = new Pipe<Void>();
// pipeline.getInputPort().pipe.add(new Object());
// pipeline.getOutputPort().pipe = new Pipe<Void>();
final Runnable runnable = new Runnable() {
@Override
public void run() {
do {
pipeline.executeWithPorts();
} while (pipeline.getSchedulingInformation().isActive() && pipeline.isReschedulable());
}
};
return runnable;
}
@Override
public void start() {
super.start();
Thread clockThread = new Thread(this.clockRunnable);
clockThread.start();
this.runnable.run();
clockThread.interrupt();
}
public void setInput(final int numInputObjects, final Callable<TimestampObject> inputObjectCreator) {
this.numInputObjects = numInputObjects;
this.inputObjectCreator = inputObjectCreator;
}
public int getNumNoopFilters() {
return this.numNoopFilters;
}
public void setNumNoopFilters(final int numNoopFilters) {
this.numNoopFilters = numNoopFilters;
}
public List<TimestampObject> getTimestampObjects() {
return this.timestampObjects;
}
public void setTimestampObjects(final List<TimestampObject> timestampObjects) {
this.timestampObjects = timestampObjects;
}
}
......@@ -21,6 +21,7 @@ import java.util.List;
import java.util.concurrent.Callable;
import teetime.examples.throughput.TimestampObject;
import teetime.examples.throughput.methodcall.stage.AbstractStage;
import teetime.examples.throughput.methodcall.stage.CollectorSink;
import teetime.examples.throughput.methodcall.stage.NoopFilter;
import teetime.examples.throughput.methodcall.stage.ObjectProducer;
......
......@@ -21,6 +21,7 @@ import java.util.List;
import java.util.concurrent.Callable;
import teetime.examples.throughput.TimestampObject;
import teetime.examples.throughput.methodcall.stage.AbstractStage;
import teetime.examples.throughput.methodcall.stage.CollectorSink;
import teetime.examples.throughput.methodcall.stage.NoopFilter;
import teetime.examples.throughput.methodcall.stage.ObjectProducer;
......
package teetime.examples.throughput.methodcall;
import teetime.examples.throughput.methodcall.stage.AbstractStage;
import teetime.util.list.CommittableQueue;
public abstract class ProducerStage<I, O> extends AbstractStage<I, O> {
......@@ -29,4 +30,9 @@ public abstract class ProducerStage<I, O> extends AbstractStage<I, O> {
// }
}
@Override
public void onIsPipelineHead() {
// do nothing
}
}
......@@ -28,4 +28,6 @@ public interface Stage<I, O> {
boolean isReschedulable();
void onIsPipelineHead();
}
package teetime.examples.throughput.methodcall;
package teetime.examples.throughput.methodcall.stage;
import teetime.examples.throughput.methodcall.InputPort;
import teetime.examples.throughput.methodcall.OnDisableListener;
import teetime.examples.throughput.methodcall.OutputPort;
import teetime.examples.throughput.methodcall.SchedulingInformation;
import teetime.examples.throughput.methodcall.Stage;
import teetime.examples.throughput.methodcall.StageWithPort;
import teetime.util.list.CommittableQueue;
import teetime.util.list.CommittableResizableArrayQueue;
abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
private final InputPort<I> inputPort = new InputPort<I>();
private final OutputPort<O> outputPort = new OutputPort<O>();
......@@ -98,6 +104,7 @@ abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
// execute = this.next().execute2(this.outputElements);
// execute = this.next().execute2(this.getOutputPort().pipe.getElements());
this.next().executeWithPorts();
// System.out.println("Executed " + this.next().getClass().getSimpleName());
} while (this.next().isReschedulable());
// } while (this.next().getInputPort().pipe.size() > 0);
// } while (execute.size() > 0);
......
package teetime.examples.throughput.methodcall.stage;
import teetime.examples.throughput.methodcall.ProducerStage;
import teetime.util.list.CommittableQueue;
public class Clock extends ProducerStage<Void, Long> {
private boolean initialDelayExceeded = false;
private long initialDelayInMs;
private long intervalDelayInMs;
@Override
public Long execute(final Object element) {
// TODO Auto-generated method stub
return null;
}
@Override
protected void execute4(final CommittableQueue<Void> elements) {
// TODO Auto-generated method stub
}
@Override
protected void execute5(final Void element) {
if (!this.initialDelayExceeded) {
this.initialDelayExceeded = true;
this.sleep(this.initialDelayInMs);
} else {
this.sleep(this.intervalDelayInMs);
}
// System.out.println("Emitting timestamp");
this.send(this.getCurrentTimeInNs());
}
private void sleep(final long delayInMs) {
try {
Thread.sleep(delayInMs);
} catch (InterruptedException e) {
this.setReschedulable(false);
}
}
private long getCurrentTimeInNs() {
return System.nanoTime();
}
public long getInitialDelayInMs() {
return this.initialDelayInMs;
}
public void setInitialDelayInMs(final long initialDelayInMs) {
this.initialDelayInMs = initialDelayInMs;
}
public long getIntervalDelayInMs() {
return this.intervalDelayInMs;
}
public void setIntervalDelayInMs(final long intervalDelayInMs) {
this.intervalDelayInMs = intervalDelayInMs;
}
}
......@@ -67,6 +67,10 @@ public class CollectorSink<T> extends ConsumerStage<T, Object> {
if ((this.elements.size() % THRESHOLD) == 0) {
System.out.println("size: " + this.elements.size());
}
if (this.elements.size() > 90000) {
// System.out.println("size > 90000: " + this.elements.size());
}
}
}
package teetime.examples.throughput.methodcall.stage;
import teetime.examples.throughput.methodcall.InputPort;
import teetime.util.list.CommittableQueue;
public class Delay<I> extends AbstractStage<I, I> {
private final InputPort<Long> timestampTriggerInputPort = new InputPort<Long>();
public Delay() {
// this.setReschedulable(true);
}
@Override
public void executeWithPorts() {
Long timestampTrigger = this.timestampTriggerInputPort.receive();
if (null == timestampTrigger) {
return;
}
// System.out.println("got timestamp; #elements: " + this.getInputPort().pipe.size());
// System.out.println("#elements: " + this.getInputPort().pipe.size());
// TODO implement receiveAll() and sendMultiple()
while (!this.getInputPort().pipe.isEmpty()) {
I element = this.getInputPort().receive();
this.send(element);
}
this.setReschedulable(this.getInputPort().pipe.size() > 0);
// System.out.println("delay: " + this.getInputPort().pipe.size());
}
@Override
public void onIsPipelineHead() {
this.setReschedulable(true);
}
@Override
public I execute(final Object element) {
// TODO Auto-generated method stub
return null;
}
@Override
protected void execute4(final CommittableQueue<I> elements) {
// TODO Auto-generated method stub
}
@Override
protected void execute5(final I element) {
// TODO Auto-generated method stub
}
public InputPort<Long> getTimestampTriggerInputPort() {
return this.timestampTriggerInputPort;
}
}
......@@ -20,6 +20,11 @@ public class EndStage<T> implements StageWithPort<T, T> {
return (T) element;
}
@Override
public void onIsPipelineHead() {
// do nothing
}
@Override
public CommittableQueue<T> execute2(final CommittableQueue<T> elements) {
// TODO Auto-generated method stub
......
......@@ -103,7 +103,7 @@ public class ObjectProducer<T> extends ProducerStage<Void, T> {
try {
final T newObject = this.inputObjectCreator.call();
this.numInputObjects--;
// System.out.println(this.getClass().getSimpleName() + ": sending " + this.numInputObjects);
this.send(newObject);
} catch (final Exception e) {
throw new IllegalStateException(e);
......
......@@ -27,6 +27,7 @@ public class Pipeline<I, O> implements StageWithPort<I, O>, OnDisableListener {
private OnDisableListener listener;
private boolean reschedulable;
private int firstStageIndex;
public void setFirstStage(final StageWithPort<I, ?> stage) {
this.firstStage = stage;
......@@ -67,9 +68,30 @@ public class Pipeline<I, O> implements StageWithPort<I, O>, OnDisableListener {
@Override
public void executeWithPorts() {
this.stages[0].executeWithPorts();
StageWithPort firstStage = this.stages[this.firstStageIndex];
firstStage.executeWithPorts();
this.updateRescheduable(firstStage);
// this.setReschedulable(stage.isReschedulable());
}
private final void updateRescheduable(Stage stage) {
while (!stage.isReschedulable()) {
this.firstStageIndex++;
stage = stage.next();
if (stage == null) { // loop reaches the last stage
this.setReschedulable(false);
return;
}
stage.onIsPipelineHead();
// System.out.println("firstStageIndex: " + this.firstStageIndex + ", class:" + stage.getClass().getSimpleName());
}
this.setReschedulable(true);
}
this.setReschedulable(this.stages[0].isReschedulable());
@Override
public void onIsPipelineHead() {
// do nothing
}
@Override
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment