Skip to content
Snippets Groups Projects
Commit b2580b40 authored by Christian Wulf's avatar Christian Wulf
Browse files

fixed all compiler errors

parent cc6d2512
No related branches found
No related tags found
No related merge requests found
Showing
with 109 additions and 111 deletions
package teetime.variant.methodcallWithPorts.framework.core; package teetime.variant.methodcallWithPorts.framework.core;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.UUID; import java.util.UUID;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -22,8 +24,6 @@ public abstract class AbstractStage implements StageWithPort { ...@@ -22,8 +24,6 @@ public abstract class AbstractStage implements StageWithPort {
private StageWithPort parentStage; private StageWithPort parentStage;
private boolean reschedulable;
private final List<InputPort<?>> inputPortList = new ArrayList<InputPort<?>>(); private final List<InputPort<?>> inputPortList = new ArrayList<InputPort<?>>();
private final List<OutputPort<?>> outputPortList = new ArrayList<OutputPort<?>>(); private final List<OutputPort<?>> outputPortList = new ArrayList<OutputPort<?>>();
...@@ -32,6 +32,8 @@ public abstract class AbstractStage implements StageWithPort { ...@@ -32,6 +32,8 @@ public abstract class AbstractStage implements StageWithPort {
/** A cached instance of <code>outputPortList</code> to avoid creating an iterator each time iterating it */ /** A cached instance of <code>outputPortList</code> to avoid creating an iterator each time iterating it */
protected OutputPort<?>[] cachedOutputPorts; protected OutputPort<?>[] cachedOutputPorts;
private final Map<Signal, Void> visited = new HashMap<Signal, Void>();
public AbstractStage() { public AbstractStage() {
this.id = UUID.randomUUID().toString(); // the id should only be represented by a UUID, not additionally by the class name this.id = UUID.randomUUID().toString(); // the id should only be represented by a UUID, not additionally by the class name
this.logger = LoggerFactory.getLogger(this.getClass().getName() + "(" + this.id + ")"); this.logger = LoggerFactory.getLogger(this.getClass().getName() + "(" + this.id + ")");
...@@ -50,12 +52,6 @@ public abstract class AbstractStage implements StageWithPort { ...@@ -50,12 +52,6 @@ public abstract class AbstractStage implements StageWithPort {
outputPort.reportNewElement(); outputPort.reportNewElement();
// StageWithPort next = outputPort.getCachedTargetStage();
//
// do {
// next.executeWithPorts(); // PERFORMANCE use the return value as indicator for re-schedulability instead
// } while (next.isReschedulable());
return true; return true;
} }
...@@ -97,14 +93,25 @@ public abstract class AbstractStage implements StageWithPort { ...@@ -97,14 +93,25 @@ public abstract class AbstractStage implements StageWithPort {
*/ */
@Override @Override
public void onSignal(final Signal signal, final InputPort<?> inputPort) { public void onSignal(final Signal signal, final InputPort<?> inputPort) {
this.logger.trace("Got signal: " + signal + " from input port: " + inputPort); if (!this.alreadyVisited(signal, inputPort)) {
signal.trigger(this); signal.trigger(this);
for (OutputPort<?> outputPort : this.outputPortList) { for (OutputPort<?> outputPort : this.outputPortList) {
outputPort.sendSignal(signal); outputPort.sendSignal(signal);
} }
} }
}
protected boolean alreadyVisited(final Signal signal, final InputPort<?> inputPort) {
if (this.visited.containsKey(signal)) {
this.logger.trace("Got signal: " + signal + " again from input port: " + inputPort);
return true;
} else {
this.logger.trace("Got signal: " + signal + " from input port: " + inputPort);
this.visited.put(signal, null);
return false;
}
}
public void onValidating(final List<InvalidPortConnection> invalidPortConnections) { public void onValidating(final List<InvalidPortConnection> invalidPortConnections) {
this.validateOutputPorts(invalidPortConnections); this.validateOutputPorts(invalidPortConnections);
......
package teetime.variant.methodcallWithPorts.framework.core.pipe;
public class CouldNotFindPipeImplException extends RuntimeException {
private static final long serialVersionUID = 5242260988104493402L;
public CouldNotFindPipeImplException(final String key) {
super("Could not find any pipe implementation that conforms to the key: " + key);
}
}
...@@ -55,6 +55,9 @@ public class PipeFactory { ...@@ -55,6 +55,9 @@ public class PipeFactory {
public <T> IPipe<T> create(final ThreadCommunication tc, final PipeOrdering ordering, final boolean growable, final int capacity) { public <T> IPipe<T> create(final ThreadCommunication tc, final PipeOrdering ordering, final boolean growable, final int capacity) {
String key = this.buildKey(tc, ordering, growable); String key = this.buildKey(tc, ordering, growable);
IPipeFactory pipeClass = this.pipeFactories.get(key); IPipeFactory pipeClass = this.pipeFactories.get(key);
if (null == pipeClass) {
throw new CouldNotFindPipeImplException(key);
}
return pipeClass.create(capacity); return pipeClass.create(capacity);
} }
......
...@@ -19,7 +19,7 @@ public class SpScPipe<T> extends AbstractPipe<T> { ...@@ -19,7 +19,7 @@ public class SpScPipe<T> extends AbstractPipe<T> {
// statistics // statistics
private int numWaits; private int numWaits;
public SpScPipe(final int capacity) { SpScPipe(final int capacity) {
ConcurrentQueueSpec concurrentQueueSpec = new ConcurrentQueueSpec(1, 1, capacity, Ordering.FIFO, Preference.THROUGHPUT); ConcurrentQueueSpec concurrentQueueSpec = new ConcurrentQueueSpec(1, 1, capacity, Ordering.FIFO, Preference.THROUGHPUT);
this.queue = QueueFactory.newQueue(concurrentQueueSpec); this.queue = QueueFactory.newQueue(concurrentQueueSpec);
} }
......
...@@ -3,8 +3,13 @@ package util; ...@@ -3,8 +3,13 @@ package util;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PerformanceCheckProfileRepository { public class PerformanceCheckProfileRepository {
private static final Logger LOGGER = LoggerFactory.getLogger(PerformanceCheckProfileRepository.class);
public static final PerformanceCheckProfileRepository INSTANCE = new PerformanceCheckProfileRepository(); public static final PerformanceCheckProfileRepository INSTANCE = new PerformanceCheckProfileRepository();
private final Map<Class<?>, PerformanceCheckProfile> performanceCheckProfiles = new HashMap<Class<?>, PerformanceCheckProfile>(); private final Map<Class<?>, PerformanceCheckProfile> performanceCheckProfiles = new HashMap<Class<?>, PerformanceCheckProfile>();
...@@ -13,6 +18,7 @@ public class PerformanceCheckProfileRepository { ...@@ -13,6 +18,7 @@ public class PerformanceCheckProfileRepository {
public PerformanceCheckProfileRepository() { public PerformanceCheckProfileRepository() {
this.currentProfile = System.getProperty("TestProfile", "ChwWork"); this.currentProfile = System.getProperty("TestProfile", "ChwWork");
LOGGER.info("Using test profile '" + this.currentProfile + "'");
} }
public void setCurrentProfile(final String currentProfile) { public void setCurrentProfile(final String currentProfile) {
......
...@@ -34,6 +34,7 @@ public class ComparisonMethodcallWithPorts { ...@@ -34,6 +34,7 @@ public class ComparisonMethodcallWithPorts {
@BeforeClass @BeforeClass
public static void beforeClass() { public static void beforeClass() {
System.setProperty("logback.configurationFile", "src/test/resources/logback-test.groovy");
PerformanceCheckProfileRepository.INSTANCE.register(ComparisonMethodcallWithPorts.class, new ChwWorkComparisonMethodcallWithPorts()); PerformanceCheckProfileRepository.INSTANCE.register(ComparisonMethodcallWithPorts.class, new ChwWorkComparisonMethodcallWithPorts());
PerformanceCheckProfileRepository.INSTANCE.register(ComparisonMethodcallWithPorts.class, new ChwHomeComparisonMethodcallWithPorts()); PerformanceCheckProfileRepository.INSTANCE.register(ComparisonMethodcallWithPorts.class, new ChwHomeComparisonMethodcallWithPorts());
PerformanceCheckProfileRepository.INSTANCE.register(ComparisonMethodcallWithPorts.class, new NieWorkComparisonMethodcallWithPorts()); PerformanceCheckProfileRepository.INSTANCE.register(ComparisonMethodcallWithPorts.class, new NieWorkComparisonMethodcallWithPorts());
......
...@@ -21,8 +21,8 @@ import teetime.util.ConstructorClosure; ...@@ -21,8 +21,8 @@ import teetime.util.ConstructorClosure;
import teetime.variant.explicitScheduling.examples.throughput.TimestampObject; import teetime.variant.explicitScheduling.examples.throughput.TimestampObject;
import teetime.variant.explicitScheduling.framework.core.Analysis; import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline;
import teetime.variant.methodcallWithPorts.framework.core.HeadStage;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.Pipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.Pipe;
import teetime.variant.methodcallWithPorts.stage.CollectorSink; import teetime.variant.methodcallWithPorts.stage.CollectorSink;
import teetime.variant.methodcallWithPorts.stage.NoopFilter; import teetime.variant.methodcallWithPorts.stage.NoopFilter;
...@@ -46,15 +46,16 @@ public class MethodCallThroughputAnalysis9 extends Analysis { ...@@ -46,15 +46,16 @@ public class MethodCallThroughputAnalysis9 extends Analysis {
@Override @Override
public void init() { public void init() {
super.init(); super.init();
StageWithPort pipeline = this.buildPipeline(); HeadStage pipeline = this.buildPipeline();
this.runnable = new RunnableStage(pipeline); this.runnable = new RunnableStage(pipeline);
} }
/** /**
* @param numNoopFilters * @param numNoopFilters
* @return
* @since 1.10 * @since 1.10
*/ */
private StageWithPort buildPipeline() { private HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> buildPipeline() {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters]; final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters];
// create stages // create stages
...@@ -68,9 +69,6 @@ public class MethodCallThroughputAnalysis9 extends Analysis { ...@@ -68,9 +69,6 @@ public class MethodCallThroughputAnalysis9 extends Analysis {
final HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>(); final HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>();
pipeline.setFirstStage(objectProducer); pipeline.setFirstStage(objectProducer);
pipeline.addIntermediateStage(startTimestampFilter);
pipeline.addIntermediateStages(noopFilters);
pipeline.addIntermediateStage(stopTimestampFilter);
pipeline.setLastStage(collectorSink); pipeline.setLastStage(collectorSink);
Pipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort()); Pipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort());
......
...@@ -66,9 +66,6 @@ public class MethodCallThroughputAnalysis10 extends Analysis { ...@@ -66,9 +66,6 @@ public class MethodCallThroughputAnalysis10 extends Analysis {
final HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>(); final HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>();
pipeline.setFirstStage(objectProducer); pipeline.setFirstStage(objectProducer);
pipeline.addIntermediateStage(startTimestampFilter);
pipeline.addIntermediateStages(noopFilters);
pipeline.addIntermediateStage(stopTimestampFilter);
pipeline.setLastStage(collectorSink); pipeline.setLastStage(collectorSink);
SingleElementPipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort()); SingleElementPipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort());
......
...@@ -21,8 +21,8 @@ import teetime.util.ConstructorClosure; ...@@ -21,8 +21,8 @@ import teetime.util.ConstructorClosure;
import teetime.variant.explicitScheduling.examples.throughput.TimestampObject; import teetime.variant.explicitScheduling.examples.throughput.TimestampObject;
import teetime.variant.explicitScheduling.framework.core.Analysis; import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline;
import teetime.variant.methodcallWithPorts.framework.core.HeadStage;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.UnorderedGrowablePipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.UnorderedGrowablePipe;
import teetime.variant.methodcallWithPorts.stage.CollectorSink; import teetime.variant.methodcallWithPorts.stage.CollectorSink;
import teetime.variant.methodcallWithPorts.stage.NoopFilter; import teetime.variant.methodcallWithPorts.stage.NoopFilter;
...@@ -46,11 +46,12 @@ public class MethodCallThroughputAnalysis11 extends Analysis { ...@@ -46,11 +46,12 @@ public class MethodCallThroughputAnalysis11 extends Analysis {
@Override @Override
public void init() { public void init() {
super.init(); super.init();
StageWithPort pipeline = this.buildPipeline(this.numInputObjects, this.inputObjectCreator); HeadStage pipeline = this.buildPipeline(this.numInputObjects, this.inputObjectCreator);
this.runnable = new RunnableStage(pipeline); this.runnable = new RunnableStage(pipeline);
} }
private StageWithPort buildPipeline(final long numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) { private HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> buildPipeline(final long numInputObjects,
final ConstructorClosure<TimestampObject> inputObjectCreator) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters]; final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters];
// create stages // create stages
...@@ -66,10 +67,6 @@ public class MethodCallThroughputAnalysis11 extends Analysis { ...@@ -66,10 +67,6 @@ public class MethodCallThroughputAnalysis11 extends Analysis {
final HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>(); final HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>();
pipeline.setFirstStage(objectProducer); pipeline.setFirstStage(objectProducer);
// pipeline.addIntermediateStage(relayFake);
pipeline.addIntermediateStage(startTimestampFilter);
pipeline.addIntermediateStages(noopFilters);
pipeline.addIntermediateStage(stopTimestampFilter);
pipeline.setLastStage(collectorSink); pipeline.setLastStage(collectorSink);
UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort()); UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort());
......
...@@ -21,8 +21,8 @@ import teetime.util.ConstructorClosure; ...@@ -21,8 +21,8 @@ import teetime.util.ConstructorClosure;
import teetime.variant.explicitScheduling.examples.throughput.TimestampObject; import teetime.variant.explicitScheduling.examples.throughput.TimestampObject;
import teetime.variant.explicitScheduling.framework.core.Analysis; import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline;
import teetime.variant.methodcallWithPorts.framework.core.HeadStage;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory; import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory;
import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication; import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication;
...@@ -48,15 +48,16 @@ public class MethodCallThroughputAnalysis14 extends Analysis { ...@@ -48,15 +48,16 @@ public class MethodCallThroughputAnalysis14 extends Analysis {
@Override @Override
public void init() { public void init() {
super.init(); super.init();
StageWithPort pipeline = this.buildPipeline(); HeadStage pipeline = this.buildPipeline();
this.runnable = new RunnableStage(pipeline); this.runnable = new RunnableStage(pipeline);
} }
/** /**
* @param numNoopFilters * @param numNoopFilters
* @return
* @since 1.10 * @since 1.10
*/ */
private StageWithPort buildPipeline() { private HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> buildPipeline() {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters]; final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters];
// create stages // create stages
...@@ -70,9 +71,6 @@ public class MethodCallThroughputAnalysis14 extends Analysis { ...@@ -70,9 +71,6 @@ public class MethodCallThroughputAnalysis14 extends Analysis {
final HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>(); final HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>();
pipeline.setFirstStage(objectProducer); pipeline.setFirstStage(objectProducer);
pipeline.addIntermediateStage(startTimestampFilter);
pipeline.addIntermediateStages(noopFilters);
pipeline.addIntermediateStage(stopTimestampFilter);
pipeline.setLastStage(collectorSink); pipeline.setLastStage(collectorSink);
PipeFactory pipeFactory = new PipeFactory(); PipeFactory pipeFactory = new PipeFactory();
......
...@@ -21,8 +21,8 @@ import teetime.util.ConstructorClosure; ...@@ -21,8 +21,8 @@ import teetime.util.ConstructorClosure;
import teetime.variant.explicitScheduling.examples.throughput.TimestampObject; import teetime.variant.explicitScheduling.examples.throughput.TimestampObject;
import teetime.variant.explicitScheduling.framework.core.Analysis; import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline;
import teetime.variant.methodcallWithPorts.framework.core.HeadStage;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.OrderedGrowableArrayPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.OrderedGrowableArrayPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
...@@ -58,14 +58,14 @@ public class MethodCallThroughputAnalysis15 extends Analysis { ...@@ -58,14 +58,14 @@ public class MethodCallThroughputAnalysis15 extends Analysis {
public void init() { public void init() {
super.init(); super.init();
StageWithPort clockPipeline = this.buildClockPipeline(); HeadPipeline<Clock, Sink<Long>> clockPipeline = this.buildClockPipeline();
this.clockRunnable = new RunnableStage(clockPipeline); this.clockRunnable = new RunnableStage(clockPipeline);
StageWithPort pipeline = this.buildPipeline(this.clock); HeadStage pipeline = this.buildPipeline(this.clock);
this.runnable = new RunnableStage(pipeline); this.runnable = new RunnableStage(pipeline);
} }
private StageWithPort buildClockPipeline() { private HeadPipeline<Clock, Sink<Long>> buildClockPipeline() {
this.clock = new Clock(); this.clock = new Clock();
this.clock.setInitialDelayInMs(100); this.clock.setInitialDelayInMs(100);
...@@ -80,9 +80,10 @@ public class MethodCallThroughputAnalysis15 extends Analysis { ...@@ -80,9 +80,10 @@ public class MethodCallThroughputAnalysis15 extends Analysis {
/** /**
* @param numNoopFilters * @param numNoopFilters
* @return
* @since 1.10 * @since 1.10
*/ */
private StageWithPort buildPipeline(final Clock clock) { private HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> buildPipeline(final Clock clock) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters]; final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters];
// create stages // create stages
...@@ -97,10 +98,6 @@ public class MethodCallThroughputAnalysis15 extends Analysis { ...@@ -97,10 +98,6 @@ public class MethodCallThroughputAnalysis15 extends Analysis {
final HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>(); final HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>();
pipeline.setFirstStage(objectProducer); pipeline.setFirstStage(objectProducer);
pipeline.addIntermediateStage(startTimestampFilter);
pipeline.addIntermediateStages(noopFilters);
pipeline.addIntermediateStage(stopTimestampFilter);
pipeline.addIntermediateStage(delay);
pipeline.setLastStage(collectorSink); pipeline.setLastStage(collectorSink);
SpScPipe.connect(clock.getOutputPort(), delay.getTimestampTriggerInputPort(), SPSC_INITIAL_CAPACITY); SpScPipe.connect(clock.getOutputPort(), delay.getTimestampTriggerInputPort(), SPSC_INITIAL_CAPACITY);
......
...@@ -28,6 +28,6 @@ public class ChwHomePerformanceCheck implements PerformanceCheckProfile { ...@@ -28,6 +28,6 @@ public class ChwHomePerformanceCheck implements PerformanceCheckProfile {
System.out.println("speedupC: " + speedupC); System.out.println("speedupC: " + speedupC);
assertEquals(2, speedupB, 0.3); assertEquals(2, speedupB, 0.3);
assertEquals(3, speedupC, 0.3); assertEquals(4, speedupC, 0.3);
} }
} }
...@@ -109,9 +109,6 @@ public class MethodCallThroughputAnalysis16 extends Analysis { ...@@ -109,9 +109,6 @@ public class MethodCallThroughputAnalysis16 extends Analysis {
final HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>(); final HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>();
pipeline.setFirstStage(relay); pipeline.setFirstStage(relay);
pipeline.addIntermediateStage(startTimestampFilter);
pipeline.addIntermediateStages(noopFilters);
pipeline.addIntermediateStage(stopTimestampFilter);
pipeline.setLastStage(collectorSink); pipeline.setLastStage(collectorSink);
SpScPipe.connect(previousStage.getLastStage().getNewOutputPort(), relay.getInputPort(), SPSC_INITIAL_CAPACITY); SpScPipe.connect(previousStage.getLastStage().getNewOutputPort(), relay.getInputPort(), SPSC_INITIAL_CAPACITY);
......
...@@ -26,7 +26,9 @@ import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; ...@@ -26,7 +26,9 @@ import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory;
import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering;
import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication;
import teetime.variant.methodcallWithPorts.framework.core.pipe.UnorderedGrowablePipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.UnorderedGrowablePipe;
import teetime.variant.methodcallWithPorts.framework.core.signal.TerminatingSignal; import teetime.variant.methodcallWithPorts.framework.core.signal.TerminatingSignal;
import teetime.variant.methodcallWithPorts.stage.CollectorSink; import teetime.variant.methodcallWithPorts.stage.CollectorSink;
...@@ -52,6 +54,7 @@ public class MethodCallThroughputAnalysis17 extends Analysis { ...@@ -52,6 +54,7 @@ public class MethodCallThroughputAnalysis17 extends Analysis {
private ConstructorClosure<TimestampObject> inputObjectCreator; private ConstructorClosure<TimestampObject> inputObjectCreator;
private int numNoopFilters; private int numNoopFilters;
private final PipeFactory pipeFactory = new PipeFactory();
private final List<List<TimestampObject>> timestampObjectsList = new LinkedList<List<TimestampObject>>(); private final List<List<TimestampObject>> timestampObjectsList = new LinkedList<List<TimestampObject>>();
private Thread producerThread; private Thread producerThread;
...@@ -59,7 +62,8 @@ public class MethodCallThroughputAnalysis17 extends Analysis { ...@@ -59,7 +62,8 @@ public class MethodCallThroughputAnalysis17 extends Analysis {
@Override @Override
public void init() { public void init() {
final StageWithPort producerPipeline = this.buildProducerPipeline(this.numInputObjects, this.inputObjectCreator); HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> producerPipeline = this.buildProducerPipeline(this.numInputObjects,
this.inputObjectCreator);
this.producerThread = new Thread(new RunnableStage(producerPipeline)); this.producerThread = new Thread(new RunnableStage(producerPipeline));
int numWorkerThreads = Math.min(NUM_WORKER_THREADS, 1); // only for testing purpose int numWorkerThreads = Math.min(NUM_WORKER_THREADS, 1); // only for testing purpose
...@@ -69,8 +73,8 @@ public class MethodCallThroughputAnalysis17 extends Analysis { ...@@ -69,8 +73,8 @@ public class MethodCallThroughputAnalysis17 extends Analysis {
List<TimestampObject> resultList = new ArrayList<TimestampObject>(this.numInputObjects); List<TimestampObject> resultList = new ArrayList<TimestampObject>(this.numInputObjects);
this.timestampObjectsList.add(resultList); this.timestampObjectsList.add(resultList);
Runnable workerRunnable = this.buildPipeline(null, resultList); HeadPipeline<?, ?> pipeline = this.buildPipeline(null, resultList);
this.workerThreads[i] = new Thread(workerRunnable); this.workerThreads[i] = new Thread(new RunnableStage(pipeline));
} }
// this.producerThread = new Thread(new Runnable() { // this.producerThread = new Thread(new Runnable() {
...@@ -106,7 +110,7 @@ public class MethodCallThroughputAnalysis17 extends Analysis { ...@@ -106,7 +110,7 @@ public class MethodCallThroughputAnalysis17 extends Analysis {
super.init(); super.init();
} }
private StageWithPort buildProducerPipeline(final int numInputObjects, private HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> buildProducerPipeline(final int numInputObjects,
final ConstructorClosure<TimestampObject> inputObjectCreator) { final ConstructorClosure<TimestampObject> inputObjectCreator) {
final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(numInputObjects, inputObjectCreator); final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(numInputObjects, inputObjectCreator);
Distributor<TimestampObject> distributor = new Distributor<TimestampObject>(); Distributor<TimestampObject> distributor = new Distributor<TimestampObject>();
...@@ -135,9 +139,10 @@ public class MethodCallThroughputAnalysis17 extends Analysis { ...@@ -135,9 +139,10 @@ public class MethodCallThroughputAnalysis17 extends Analysis {
* @param numNoopFilters * @param numNoopFilters
* @since 1.10 * @since 1.10
*/ */
private Runnable buildPipeline(final StageWithPort previousStage, final List<TimestampObject> timestampObjects) { private HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> buildPipeline(final StageWithPort previousStage,
Relay<TimestampObject> relay = new Relay<TimestampObject>(); final List<TimestampObject> timestampObjects) {
// create stages // create stages
Relay<TimestampObject> relay = new Relay<TimestampObject>();
final StartTimestampFilter startTimestampFilter = new StartTimestampFilter(); final StartTimestampFilter startTimestampFilter = new StartTimestampFilter();
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters]; final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters];
...@@ -149,12 +154,10 @@ public class MethodCallThroughputAnalysis17 extends Analysis { ...@@ -149,12 +154,10 @@ public class MethodCallThroughputAnalysis17 extends Analysis {
final HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>(); final HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>();
pipeline.setFirstStage(relay); pipeline.setFirstStage(relay);
pipeline.addIntermediateStage(startTimestampFilter);
pipeline.addIntermediateStages(noopFilters);
pipeline.addIntermediateStage(stopTimestampFilter);
pipeline.setLastStage(collectorSink); pipeline.setLastStage(collectorSink);
relay.getInputPort().setPipe(new SpScPipe<TimestampObject>(SPSC_INITIAL_CAPACITY)); IPipe<TimestampObject> pipe = this.pipeFactory.create(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false, SPSC_INITIAL_CAPACITY);
relay.getInputPort().setPipe(pipe);
IPipe<TimestampObject> startPipe = relay.getInputPort().getPipe(); IPipe<TimestampObject> startPipe = relay.getInputPort().getPipe();
for (int i = 0; i < this.numInputObjects; i++) { for (int i = 0; i < this.numInputObjects; i++) {
startPipe.add(this.inputObjectCreator.create()); startPipe.add(this.inputObjectCreator.create());
...@@ -171,7 +174,7 @@ public class MethodCallThroughputAnalysis17 extends Analysis { ...@@ -171,7 +174,7 @@ public class MethodCallThroughputAnalysis17 extends Analysis {
UnorderedGrowablePipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort()); UnorderedGrowablePipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
UnorderedGrowablePipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort()); UnorderedGrowablePipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort());
return new RunnableStage(pipeline); return pipeline;
} }
@Override @Override
......
...@@ -24,7 +24,6 @@ import teetime.variant.explicitScheduling.examples.throughput.TimestampObject; ...@@ -24,7 +24,6 @@ import teetime.variant.explicitScheduling.examples.throughput.TimestampObject;
import teetime.variant.explicitScheduling.framework.core.Analysis; import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.UnorderedGrowablePipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.UnorderedGrowablePipe;
import teetime.variant.methodcallWithPorts.stage.CollectorSink; import teetime.variant.methodcallWithPorts.stage.CollectorSink;
...@@ -71,7 +70,7 @@ public class MethodCallThroughputAnalysis18 extends Analysis { ...@@ -71,7 +70,7 @@ public class MethodCallThroughputAnalysis18 extends Analysis {
List<TimestampObject> resultList = new ArrayList<TimestampObject>(this.numInputObjects); List<TimestampObject> resultList = new ArrayList<TimestampObject>(this.numInputObjects);
this.timestampObjectsList.add(resultList); this.timestampObjectsList.add(resultList);
StageWithPort pipeline = this.buildPipeline(producerPipeline, resultList); HeadPipeline<?, ?> pipeline = this.buildPipeline(producerPipeline, resultList);
this.workerThreads[i] = new Thread(new RunnableStage(pipeline)); this.workerThreads[i] = new Thread(new RunnableStage(pipeline));
} }
} }
...@@ -110,9 +109,6 @@ public class MethodCallThroughputAnalysis18 extends Analysis { ...@@ -110,9 +109,6 @@ public class MethodCallThroughputAnalysis18 extends Analysis {
final HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>(); final HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>();
pipeline.setFirstStage(relay); pipeline.setFirstStage(relay);
pipeline.addIntermediateStage(startTimestampFilter);
pipeline.addIntermediateStages(noopFilters);
pipeline.addIntermediateStage(stopTimestampFilter);
pipeline.setLastStage(collectorSink); pipeline.setLastStage(collectorSink);
SpScPipe.connect(previousStage.getLastStage().getNewOutputPort(), relay.getInputPort(), SPSC_INITIAL_CAPACITY); SpScPipe.connect(previousStage.getLastStage().getNewOutputPort(), relay.getInputPort(), SPSC_INITIAL_CAPACITY);
......
...@@ -70,8 +70,8 @@ public class MethodCallThroughputAnalysis19 extends Analysis { ...@@ -70,8 +70,8 @@ public class MethodCallThroughputAnalysis19 extends Analysis {
List<TimestampObject> resultList = new ArrayList<TimestampObject>(this.numInputObjects); List<TimestampObject> resultList = new ArrayList<TimestampObject>(this.numInputObjects);
this.timestampObjectsList.add(resultList); this.timestampObjectsList.add(resultList);
Runnable workerRunnable = this.buildPipeline(producerPipeline.getLastStage(), resultList); HeadPipeline<?, ?> pipeline = this.buildPipeline(producerPipeline.getLastStage(), resultList);
this.workerThreads[i] = new Thread(workerRunnable); this.workerThreads[i] = new Thread(new RunnableStage(pipeline));
} }
} }
...@@ -90,7 +90,7 @@ public class MethodCallThroughputAnalysis19 extends Analysis { ...@@ -90,7 +90,7 @@ public class MethodCallThroughputAnalysis19 extends Analysis {
return pipeline; return pipeline;
} }
private Runnable buildPipeline(final Distributor<TimestampObject> previousStage, final List<TimestampObject> timestampObjects) { private HeadPipeline<?, ?> buildPipeline(final Distributor<TimestampObject> previousStage, final List<TimestampObject> timestampObjects) {
Relay<TimestampObject> relay = new Relay<TimestampObject>(); Relay<TimestampObject> relay = new Relay<TimestampObject>();
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters]; final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters];
...@@ -104,9 +104,6 @@ public class MethodCallThroughputAnalysis19 extends Analysis { ...@@ -104,9 +104,6 @@ public class MethodCallThroughputAnalysis19 extends Analysis {
final HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>(); final HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>();
pipeline.setFirstStage(relay); pipeline.setFirstStage(relay);
pipeline.addIntermediateStage(startTimestampFilter);
pipeline.addIntermediateStages(noopFilters);
pipeline.addIntermediateStage(stopTimestampFilter);
pipeline.setLastStage(collectorSink); pipeline.setLastStage(collectorSink);
SpScPipe.connect(previousStage.getNewOutputPort(), relay.getInputPort(), SPSC_INITIAL_CAPACITY); SpScPipe.connect(previousStage.getNewOutputPort(), relay.getInputPort(), SPSC_INITIAL_CAPACITY);
...@@ -120,7 +117,7 @@ public class MethodCallThroughputAnalysis19 extends Analysis { ...@@ -120,7 +117,7 @@ public class MethodCallThroughputAnalysis19 extends Analysis {
OrderedGrowableArrayPipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort()); OrderedGrowableArrayPipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort());
OrderedGrowableArrayPipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort()); OrderedGrowableArrayPipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort());
return new RunnableStage(pipeline); return pipeline;
} }
@Override @Override
......
...@@ -21,13 +21,12 @@ import java.util.List; ...@@ -21,13 +21,12 @@ import java.util.List;
import teetime.variant.methodcallWithPorts.framework.core.Configuration; import teetime.variant.methodcallWithPorts.framework.core.Configuration;
import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory; import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory;
import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering; import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering;
import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication; import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.CollectorSink; import teetime.variant.methodcallWithPorts.stage.CollectorSink;
import teetime.variant.methodcallWithPorts.stage.InitialElementProducer;
import teetime.variant.methodcallWithPorts.stage.kieker.Dir2RecordsFilter; import teetime.variant.methodcallWithPorts.stage.kieker.Dir2RecordsFilter;
import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository; import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository;
...@@ -48,25 +47,27 @@ public class RecordReaderConfiguration extends Configuration { ...@@ -48,25 +47,27 @@ public class RecordReaderConfiguration extends Configuration {
} }
public void buildConfiguration() { public void buildConfiguration() {
StageWithPort producerPipeline = this.buildProducerPipeline(); HeadPipeline<?, ?> producerPipeline = this.buildProducerPipeline();
this.getFiniteProducerStages().add(producerPipeline); this.getFiniteProducerStages().add(producerPipeline);
} }
private StageWithPort buildProducerPipeline() { private HeadPipeline<?, ?> buildProducerPipeline() {
ClassNameRegistryRepository classNameRegistryRepository = new ClassNameRegistryRepository(); ClassNameRegistryRepository classNameRegistryRepository = new ClassNameRegistryRepository();
File logDir = new File("src/test/data/bookstore-logs");
// create stages // create stages
InitialElementProducer<File> initialElementProducer = new InitialElementProducer<File>(logDir);
Dir2RecordsFilter dir2RecordsFilter = new Dir2RecordsFilter(classNameRegistryRepository); Dir2RecordsFilter dir2RecordsFilter = new Dir2RecordsFilter(classNameRegistryRepository);
CollectorSink<IMonitoringRecord> collector = new CollectorSink<IMonitoringRecord>(this.elementCollection); CollectorSink<IMonitoringRecord> collector = new CollectorSink<IMonitoringRecord>(this.elementCollection);
final HeadPipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>> pipeline = new HeadPipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>>(); final HeadPipeline<InitialElementProducer<File>, CollectorSink<IMonitoringRecord>> pipeline = new HeadPipeline<InitialElementProducer<File>, CollectorSink<IMonitoringRecord>>();
pipeline.setFirstStage(dir2RecordsFilter); pipeline.setFirstStage(initialElementProducer);
pipeline.setLastStage(collector); pipeline.setLastStage(collector);
IPipe<IMonitoringRecord> pipe = this.pipeFactory.create(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false, 1); IPipe<File> pipe = this.pipeFactory.create(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false, 1);
pipe.connectPorts(dir2RecordsFilter.getOutputPort(), collector.getInputPort()); pipe.connectPorts(initialElementProducer.getOutputPort(), dir2RecordsFilter.getInputPort());
dir2RecordsFilter.getInputPort().setPipe(new SpScPipe<File>(1)); IPipe<IMonitoringRecord> pipe1 = this.pipeFactory.create(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false, 1);
dir2RecordsFilter.getInputPort().getPipe().add(new File("src/test/data/bookstore-logs")); pipe1.connectPorts(dir2RecordsFilter.getOutputPort(), collector.getInputPort());
return pipeline; return pipeline;
} }
......
...@@ -5,7 +5,6 @@ import java.util.List; ...@@ -5,7 +5,6 @@ import java.util.List;
import teetime.variant.explicitScheduling.framework.core.Analysis; import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.Clock; import teetime.variant.methodcallWithPorts.stage.Clock;
...@@ -40,7 +39,7 @@ public class TcpTraceLoggingExtAnalysis extends Analysis { ...@@ -40,7 +39,7 @@ public class TcpTraceLoggingExtAnalysis extends Analysis {
return pipeline; return pipeline;
} }
private StageWithPort buildTcpPipeline(final Distributor<Long> previousClockStage) { private HeadPipeline<?, ?> buildTcpPipeline(final Distributor<Long> previousClockStage) {
TCPReader tcpReader = new TCPReader(); TCPReader tcpReader = new TCPReader();
this.recordCounter = new Counter<IMonitoringRecord>(); this.recordCounter = new Counter<IMonitoringRecord>();
this.recordThroughputStage = new ElementThroughputMeasuringStage<IMonitoringRecord>(); this.recordThroughputStage = new ElementThroughputMeasuringStage<IMonitoringRecord>();
...@@ -56,8 +55,6 @@ public class TcpTraceLoggingExtAnalysis extends Analysis { ...@@ -56,8 +55,6 @@ public class TcpTraceLoggingExtAnalysis extends Analysis {
// create and configure pipeline // create and configure pipeline
HeadPipeline<TCPReader, Sink<IMonitoringRecord>> pipeline = new HeadPipeline<TCPReader, Sink<IMonitoringRecord>>(); HeadPipeline<TCPReader, Sink<IMonitoringRecord>> pipeline = new HeadPipeline<TCPReader, Sink<IMonitoringRecord>>();
pipeline.setFirstStage(tcpReader); pipeline.setFirstStage(tcpReader);
pipeline.addIntermediateStage(this.recordCounter);
// pipeline.addIntermediateStage(this.recordThroughputStage);
pipeline.setLastStage(endStage); pipeline.setLastStage(endStage);
return pipeline; return pipeline;
} }
...@@ -69,7 +66,7 @@ public class TcpTraceLoggingExtAnalysis extends Analysis { ...@@ -69,7 +66,7 @@ public class TcpTraceLoggingExtAnalysis extends Analysis {
HeadPipeline<Clock, Distributor<Long>> clockPipeline = this.buildClockPipeline(1000); HeadPipeline<Clock, Distributor<Long>> clockPipeline = this.buildClockPipeline(1000);
this.clockThread = new Thread(new RunnableStage(clockPipeline)); this.clockThread = new Thread(new RunnableStage(clockPipeline));
StageWithPort tcpPipeline = this.buildTcpPipeline(clockPipeline.getLastStage()); HeadPipeline<?, ?> tcpPipeline = this.buildTcpPipeline(clockPipeline.getLastStage());
this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); this.tcpThread = new Thread(new RunnableStage(tcpPipeline));
} }
......
...@@ -9,7 +9,6 @@ import teetime.util.concurrent.hashmap.TraceBuffer; ...@@ -9,7 +9,6 @@ import teetime.util.concurrent.hashmap.TraceBuffer;
import teetime.variant.explicitScheduling.framework.core.Analysis; import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.Cache; import teetime.variant.methodcallWithPorts.stage.Cache;
...@@ -17,6 +16,7 @@ import teetime.variant.methodcallWithPorts.stage.Clock; ...@@ -17,6 +16,7 @@ import teetime.variant.methodcallWithPorts.stage.Clock;
import teetime.variant.methodcallWithPorts.stage.CollectorSink; import teetime.variant.methodcallWithPorts.stage.CollectorSink;
import teetime.variant.methodcallWithPorts.stage.Counter; import teetime.variant.methodcallWithPorts.stage.Counter;
import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage; import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage;
import teetime.variant.methodcallWithPorts.stage.InitialElementProducer;
import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter; import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
import teetime.variant.methodcallWithPorts.stage.basic.merger.Merger; import teetime.variant.methodcallWithPorts.stage.basic.merger.Merger;
import teetime.variant.methodcallWithPorts.stage.kieker.Dir2RecordsFilter; import teetime.variant.methodcallWithPorts.stage.kieker.Dir2RecordsFilter;
...@@ -52,7 +52,7 @@ public class TraceReconstructionAnalysis extends Analysis { ...@@ -52,7 +52,7 @@ public class TraceReconstructionAnalysis extends Analysis {
Clock clockStage = this.buildClockPipeline(); Clock clockStage = this.buildClockPipeline();
this.clockThread = new Thread(new RunnableStage(clockStage)); this.clockThread = new Thread(new RunnableStage(clockStage));
StageWithPort pipeline = this.buildPipeline(clockStage); HeadPipeline<?, ?> pipeline = this.buildPipeline(clockStage);
this.workerThread = new Thread(new RunnableStage(pipeline)); this.workerThread = new Thread(new RunnableStage(pipeline));
} }
...@@ -63,10 +63,11 @@ public class TraceReconstructionAnalysis extends Analysis { ...@@ -63,10 +63,11 @@ public class TraceReconstructionAnalysis extends Analysis {
return clock; return clock;
} }
private StageWithPort buildPipeline(final Clock clockStage) { private HeadPipeline<?, ?> buildPipeline(final Clock clockStage) {
this.classNameRegistryRepository = new ClassNameRegistryRepository(); this.classNameRegistryRepository = new ClassNameRegistryRepository();
// create stages // create stages
InitialElementProducer<File> initialElementProducer = new InitialElementProducer<File>(this.inputDir);
final Dir2RecordsFilter dir2RecordsFilter = new Dir2RecordsFilter(this.classNameRegistryRepository); final Dir2RecordsFilter dir2RecordsFilter = new Dir2RecordsFilter(this.classNameRegistryRepository);
this.recordCounter = new Counter<IMonitoringRecord>(); this.recordCounter = new Counter<IMonitoringRecord>();
final Cache<IMonitoringRecord> cache = new Cache<IMonitoringRecord>(); final Cache<IMonitoringRecord> cache = new Cache<IMonitoringRecord>();
...@@ -85,7 +86,7 @@ public class TraceReconstructionAnalysis extends Analysis { ...@@ -85,7 +86,7 @@ public class TraceReconstructionAnalysis extends Analysis {
stringBufferFilter.getDataTypeHandlers().add(new StringHandler()); stringBufferFilter.getDataTypeHandlers().add(new StringHandler());
// connect stages // connect stages
dir2RecordsFilter.getInputPort().setPipe(new SingleElementPipe<File>()); SingleElementPipe.connect(initialElementProducer.getOutputPort(), dir2RecordsFilter.getInputPort());
SingleElementPipe.connect(dir2RecordsFilter.getOutputPort(), this.recordCounter.getInputPort()); SingleElementPipe.connect(dir2RecordsFilter.getOutputPort(), this.recordCounter.getInputPort());
SingleElementPipe.connect(this.recordCounter.getOutputPort(), cache.getInputPort()); SingleElementPipe.connect(this.recordCounter.getOutputPort(), cache.getInputPort());
SingleElementPipe.connect(cache.getOutputPort(), stringBufferFilter.getInputPort()); SingleElementPipe.connect(cache.getOutputPort(), stringBufferFilter.getInputPort());
...@@ -100,20 +101,9 @@ public class TraceReconstructionAnalysis extends Analysis { ...@@ -100,20 +101,9 @@ public class TraceReconstructionAnalysis extends Analysis {
SpScPipe.connect(clockStage.getOutputPort(), this.throughputFilter.getTriggerInputPort(), 1); SpScPipe.connect(clockStage.getOutputPort(), this.throughputFilter.getTriggerInputPort(), 1);
// fill input ports
dir2RecordsFilter.getInputPort().getPipe().add(this.inputDir);
// create and configure pipeline // create and configure pipeline
HeadPipeline<Dir2RecordsFilter, CollectorSink<TraceEventRecords>> pipeline = new HeadPipeline<Dir2RecordsFilter, CollectorSink<TraceEventRecords>>(); HeadPipeline<InitialElementProducer<File>, CollectorSink<TraceEventRecords>> pipeline = new HeadPipeline<InitialElementProducer<File>, CollectorSink<TraceEventRecords>>();
pipeline.setFirstStage(dir2RecordsFilter); pipeline.setFirstStage(initialElementProducer);
pipeline.addIntermediateStage(this.recordCounter);
pipeline.addIntermediateStage(cache);
pipeline.addIntermediateStage(stringBufferFilter);
pipeline.addIntermediateStage(instanceOfFilter);
pipeline.addIntermediateStage(this.throughputFilter);
pipeline.addIntermediateStage(traceReconstructionFilter);
pipeline.addIntermediateStage(merger);
pipeline.addIntermediateStage(this.traceCounter);
pipeline.setLastStage(collector); pipeline.setLastStage(collector);
return pipeline; return pipeline;
} }
......
...@@ -20,3 +20,5 @@ appender("CONSOLE", ConsoleAppender) { ...@@ -20,3 +20,5 @@ appender("CONSOLE", ConsoleAppender) {
} }
root ERROR, ["CONSOLE"] root ERROR, ["CONSOLE"]
logger "util", INFO
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment