From 9df0e15176cee600b0edbc5d6d8ac7d927c8f703 Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Sun, 31 Aug 2014 16:39:31 +0200 Subject: [PATCH] fixed all compiler errors --- .../framework/core/AbstractStage.java | 31 ++++++++++++------- .../pipe/CouldNotFindPipeImplException.java | 11 +++++++ .../framework/core/pipe/PipeFactory.java | 3 ++ .../framework/core/pipe/SpScPipe.java | 2 +- .../PerformanceCheckProfileRepository.java | 6 ++++ .../ComparisonMethodcallWithPorts.java | 1 + .../MethodCallThroughputAnalysis9.java | 12 +++---- .../MethodCallThroughputAnalysis10.java | 5 +-- .../MethodCallThroughputAnalysis11.java | 13 +++----- .../MethodCallThroughputAnalysis14.java | 10 +++--- .../MethodCallThroughputAnalysis15.java | 17 +++++----- .../experiment16/ChwHomePerformanceCheck.java | 2 +- .../MethodCallThroughputAnalysis16.java | 5 +-- .../MethodCallThroughputAnalysis17.java | 27 +++++++++------- .../MethodCallThroughputAnalysis18.java | 8 ++--- .../MethodCallThroughputAnalysis19.java | 13 +++----- .../RecordReaderConfiguration.java | 21 +++++++------ .../TcpTraceLoggingExtAnalysis.java | 7 ++--- .../TraceReconstructionAnalysis.java | 24 +++++--------- src/test/resources/logback-test.groovy | 2 ++ 20 files changed, 109 insertions(+), 111 deletions(-) create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/CouldNotFindPipeImplException.java diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java index 3eff9cfc..83c2a52f 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java @@ -1,7 +1,9 @@ package teetime.variant.methodcallWithPorts.framework.core; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import org.slf4j.Logger; @@ -22,8 +24,6 @@ public abstract class AbstractStage implements StageWithPort { private StageWithPort parentStage; - private boolean reschedulable; - private final List<InputPort<?>> inputPortList = new ArrayList<InputPort<?>>(); private final List<OutputPort<?>> outputPortList = new ArrayList<OutputPort<?>>(); @@ -32,6 +32,8 @@ public abstract class AbstractStage implements StageWithPort { /** A cached instance of <code>outputPortList</code> to avoid creating an iterator each time iterating it */ protected OutputPort<?>[] cachedOutputPorts; + private final Map<Signal, Void> visited = new HashMap<Signal, Void>(); + public AbstractStage() { this.id = UUID.randomUUID().toString(); // the id should only be represented by a UUID, not additionally by the class name this.logger = LoggerFactory.getLogger(this.getClass().getName() + "(" + this.id + ")"); @@ -50,12 +52,6 @@ public abstract class AbstractStage implements StageWithPort { outputPort.reportNewElement(); - // StageWithPort next = outputPort.getCachedTargetStage(); - // - // do { - // next.executeWithPorts(); // PERFORMANCE use the return value as indicator for re-schedulability instead - // } while (next.isReschedulable()); - return true; } @@ -97,12 +93,23 @@ public abstract class AbstractStage implements StageWithPort { */ @Override public void onSignal(final Signal signal, final InputPort<?> inputPort) { - this.logger.trace("Got signal: " + signal + " from input port: " + inputPort); + if (!this.alreadyVisited(signal, inputPort)) { + signal.trigger(this); - signal.trigger(this); + for (OutputPort<?> outputPort : this.outputPortList) { + outputPort.sendSignal(signal); + } + } + } - for (OutputPort<?> outputPort : this.outputPortList) { - outputPort.sendSignal(signal); + protected boolean alreadyVisited(final Signal signal, final InputPort<?> inputPort) { + if (this.visited.containsKey(signal)) { + this.logger.trace("Got signal: " + signal + " again from input port: " + inputPort); + return true; + } else { + this.logger.trace("Got signal: " + signal + " from input port: " + inputPort); + this.visited.put(signal, null); + return false; } } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/CouldNotFindPipeImplException.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/CouldNotFindPipeImplException.java new file mode 100644 index 00000000..4cc05179 --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/CouldNotFindPipeImplException.java @@ -0,0 +1,11 @@ +package teetime.variant.methodcallWithPorts.framework.core.pipe; + +public class CouldNotFindPipeImplException extends RuntimeException { + + private static final long serialVersionUID = 5242260988104493402L; + + public CouldNotFindPipeImplException(final String key) { + super("Could not find any pipe implementation that conforms to the key: " + key); + } + +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/PipeFactory.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/PipeFactory.java index fae524cf..a68dbb2b 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/PipeFactory.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/PipeFactory.java @@ -55,6 +55,9 @@ public class PipeFactory { public <T> IPipe<T> create(final ThreadCommunication tc, final PipeOrdering ordering, final boolean growable, final int capacity) { String key = this.buildKey(tc, ordering, growable); IPipeFactory pipeClass = this.pipeFactories.get(key); + if (null == pipeClass) { + throw new CouldNotFindPipeImplException(key); + } return pipeClass.create(capacity); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java index 3e6f4694..9153edaf 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java @@ -19,7 +19,7 @@ public class SpScPipe<T> extends AbstractPipe<T> { // statistics private int numWaits; - public SpScPipe(final int capacity) { + SpScPipe(final int capacity) { ConcurrentQueueSpec concurrentQueueSpec = new ConcurrentQueueSpec(1, 1, capacity, Ordering.FIFO, Preference.THROUGHPUT); this.queue = QueueFactory.newQueue(concurrentQueueSpec); } diff --git a/src/main/java/util/PerformanceCheckProfileRepository.java b/src/main/java/util/PerformanceCheckProfileRepository.java index d5941015..066fdeb6 100644 --- a/src/main/java/util/PerformanceCheckProfileRepository.java +++ b/src/main/java/util/PerformanceCheckProfileRepository.java @@ -3,8 +3,13 @@ package util; import java.util.HashMap; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class PerformanceCheckProfileRepository { + private static final Logger LOGGER = LoggerFactory.getLogger(PerformanceCheckProfileRepository.class); + public static final PerformanceCheckProfileRepository INSTANCE = new PerformanceCheckProfileRepository(); private final Map<Class<?>, PerformanceCheckProfile> performanceCheckProfiles = new HashMap<Class<?>, PerformanceCheckProfile>(); @@ -13,6 +18,7 @@ public class PerformanceCheckProfileRepository { public PerformanceCheckProfileRepository() { this.currentProfile = System.getProperty("TestProfile", "ChwWork"); + LOGGER.info("Using test profile '" + this.currentProfile + "'"); } public void setCurrentProfile(final String currentProfile) { diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/ComparisonMethodcallWithPorts.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/ComparisonMethodcallWithPorts.java index c2a142e7..c06482ba 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/ComparisonMethodcallWithPorts.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/ComparisonMethodcallWithPorts.java @@ -34,6 +34,7 @@ public class ComparisonMethodcallWithPorts { @BeforeClass public static void beforeClass() { + System.setProperty("logback.configurationFile", "src/test/resources/logback-test.groovy"); PerformanceCheckProfileRepository.INSTANCE.register(ComparisonMethodcallWithPorts.class, new ChwWorkComparisonMethodcallWithPorts()); PerformanceCheckProfileRepository.INSTANCE.register(ComparisonMethodcallWithPorts.class, new ChwHomeComparisonMethodcallWithPorts()); PerformanceCheckProfileRepository.INSTANCE.register(ComparisonMethodcallWithPorts.class, new NieWorkComparisonMethodcallWithPorts()); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment09/MethodCallThroughputAnalysis9.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment09/MethodCallThroughputAnalysis9.java index bd2ae16e..f55156f5 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment09/MethodCallThroughputAnalysis9.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment09/MethodCallThroughputAnalysis9.java @@ -21,8 +21,8 @@ import teetime.util.ConstructorClosure; import teetime.variant.explicitScheduling.examples.throughput.TimestampObject; import teetime.variant.explicitScheduling.framework.core.Analysis; import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; +import teetime.variant.methodcallWithPorts.framework.core.HeadStage; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; -import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.Pipe; import teetime.variant.methodcallWithPorts.stage.CollectorSink; import teetime.variant.methodcallWithPorts.stage.NoopFilter; @@ -32,7 +32,7 @@ import teetime.variant.methodcallWithPorts.stage.StopTimestampFilter; /** * @author Christian Wulf - * + * * @since 1.10 */ public class MethodCallThroughputAnalysis9 extends Analysis { @@ -46,15 +46,16 @@ public class MethodCallThroughputAnalysis9 extends Analysis { @Override public void init() { super.init(); - StageWithPort pipeline = this.buildPipeline(); + HeadStage pipeline = this.buildPipeline(); this.runnable = new RunnableStage(pipeline); } /** * @param numNoopFilters + * @return * @since 1.10 */ - private StageWithPort buildPipeline() { + private HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> buildPipeline() { @SuppressWarnings("unchecked") final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters]; // create stages @@ -68,9 +69,6 @@ public class MethodCallThroughputAnalysis9 extends Analysis { final HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>(); pipeline.setFirstStage(objectProducer); - pipeline.addIntermediateStage(startTimestampFilter); - pipeline.addIntermediateStages(noopFilters); - pipeline.addIntermediateStage(stopTimestampFilter); pipeline.setLastStage(collectorSink); Pipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort()); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment10/MethodCallThroughputAnalysis10.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment10/MethodCallThroughputAnalysis10.java index bd08ab4a..3267dc93 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment10/MethodCallThroughputAnalysis10.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment10/MethodCallThroughputAnalysis10.java @@ -31,7 +31,7 @@ import teetime.variant.methodcallWithPorts.stage.StopTimestampFilter; /** * @author Christian Wulf - * + * * @since 1.10 */ public class MethodCallThroughputAnalysis10 extends Analysis { @@ -66,9 +66,6 @@ public class MethodCallThroughputAnalysis10 extends Analysis { final HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>(); pipeline.setFirstStage(objectProducer); - pipeline.addIntermediateStage(startTimestampFilter); - pipeline.addIntermediateStages(noopFilters); - pipeline.addIntermediateStage(stopTimestampFilter); pipeline.setLastStage(collectorSink); SingleElementPipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort()); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment11/MethodCallThroughputAnalysis11.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment11/MethodCallThroughputAnalysis11.java index ce10958f..f7cba876 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment11/MethodCallThroughputAnalysis11.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment11/MethodCallThroughputAnalysis11.java @@ -21,8 +21,8 @@ import teetime.util.ConstructorClosure; import teetime.variant.explicitScheduling.examples.throughput.TimestampObject; import teetime.variant.explicitScheduling.framework.core.Analysis; import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; +import teetime.variant.methodcallWithPorts.framework.core.HeadStage; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; -import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.UnorderedGrowablePipe; import teetime.variant.methodcallWithPorts.stage.CollectorSink; import teetime.variant.methodcallWithPorts.stage.NoopFilter; @@ -32,7 +32,7 @@ import teetime.variant.methodcallWithPorts.stage.StopTimestampFilter; /** * @author Christian Wulf - * + * * @since 1.10 */ public class MethodCallThroughputAnalysis11 extends Analysis { @@ -46,11 +46,12 @@ public class MethodCallThroughputAnalysis11 extends Analysis { @Override public void init() { super.init(); - StageWithPort pipeline = this.buildPipeline(this.numInputObjects, this.inputObjectCreator); + HeadStage pipeline = this.buildPipeline(this.numInputObjects, this.inputObjectCreator); this.runnable = new RunnableStage(pipeline); } - private StageWithPort buildPipeline(final long numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) { + private HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> buildPipeline(final long numInputObjects, + final ConstructorClosure<TimestampObject> inputObjectCreator) { @SuppressWarnings("unchecked") final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters]; // create stages @@ -66,10 +67,6 @@ public class MethodCallThroughputAnalysis11 extends Analysis { final HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>(); pipeline.setFirstStage(objectProducer); - // pipeline.addIntermediateStage(relayFake); - pipeline.addIntermediateStage(startTimestampFilter); - pipeline.addIntermediateStages(noopFilters); - pipeline.addIntermediateStage(stopTimestampFilter); pipeline.setLastStage(collectorSink); UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort()); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment14/MethodCallThroughputAnalysis14.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment14/MethodCallThroughputAnalysis14.java index 1cd3f963..176e6757 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment14/MethodCallThroughputAnalysis14.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment14/MethodCallThroughputAnalysis14.java @@ -21,8 +21,8 @@ import teetime.util.ConstructorClosure; import teetime.variant.explicitScheduling.examples.throughput.TimestampObject; import teetime.variant.explicitScheduling.framework.core.Analysis; import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; +import teetime.variant.methodcallWithPorts.framework.core.HeadStage; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; -import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory; import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication; @@ -48,15 +48,16 @@ public class MethodCallThroughputAnalysis14 extends Analysis { @Override public void init() { super.init(); - StageWithPort pipeline = this.buildPipeline(); + HeadStage pipeline = this.buildPipeline(); this.runnable = new RunnableStage(pipeline); } /** * @param numNoopFilters + * @return * @since 1.10 */ - private StageWithPort buildPipeline() { + private HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> buildPipeline() { @SuppressWarnings("unchecked") final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters]; // create stages @@ -70,9 +71,6 @@ public class MethodCallThroughputAnalysis14 extends Analysis { final HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>(); pipeline.setFirstStage(objectProducer); - pipeline.addIntermediateStage(startTimestampFilter); - pipeline.addIntermediateStages(noopFilters); - pipeline.addIntermediateStage(stopTimestampFilter); pipeline.setLastStage(collectorSink); PipeFactory pipeFactory = new PipeFactory(); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment15/MethodCallThroughputAnalysis15.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment15/MethodCallThroughputAnalysis15.java index 702c3cc4..ba2ade3a 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment15/MethodCallThroughputAnalysis15.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment15/MethodCallThroughputAnalysis15.java @@ -21,8 +21,8 @@ import teetime.util.ConstructorClosure; import teetime.variant.explicitScheduling.examples.throughput.TimestampObject; import teetime.variant.explicitScheduling.framework.core.Analysis; import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; +import teetime.variant.methodcallWithPorts.framework.core.HeadStage; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; -import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.OrderedGrowableArrayPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; @@ -37,7 +37,7 @@ import teetime.variant.methodcallWithPorts.stage.basic.Sink; /** * @author Christian Wulf - * + * * @since 1.10 */ public class MethodCallThroughputAnalysis15 extends Analysis { @@ -58,14 +58,14 @@ public class MethodCallThroughputAnalysis15 extends Analysis { public void init() { super.init(); - StageWithPort clockPipeline = this.buildClockPipeline(); + HeadPipeline<Clock, Sink<Long>> clockPipeline = this.buildClockPipeline(); this.clockRunnable = new RunnableStage(clockPipeline); - StageWithPort pipeline = this.buildPipeline(this.clock); + HeadStage pipeline = this.buildPipeline(this.clock); this.runnable = new RunnableStage(pipeline); } - private StageWithPort buildClockPipeline() { + private HeadPipeline<Clock, Sink<Long>> buildClockPipeline() { this.clock = new Clock(); this.clock.setInitialDelayInMs(100); @@ -80,9 +80,10 @@ public class MethodCallThroughputAnalysis15 extends Analysis { /** * @param numNoopFilters + * @return * @since 1.10 */ - private StageWithPort buildPipeline(final Clock clock) { + private HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> buildPipeline(final Clock clock) { @SuppressWarnings("unchecked") final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters]; // create stages @@ -97,10 +98,6 @@ public class MethodCallThroughputAnalysis15 extends Analysis { final HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>>(); pipeline.setFirstStage(objectProducer); - pipeline.addIntermediateStage(startTimestampFilter); - pipeline.addIntermediateStages(noopFilters); - pipeline.addIntermediateStage(stopTimestampFilter); - pipeline.addIntermediateStage(delay); pipeline.setLastStage(collectorSink); SpScPipe.connect(clock.getOutputPort(), delay.getTimestampTriggerInputPort(), SPSC_INITIAL_CAPACITY); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment16/ChwHomePerformanceCheck.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment16/ChwHomePerformanceCheck.java index bf966214..ce47462c 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment16/ChwHomePerformanceCheck.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment16/ChwHomePerformanceCheck.java @@ -28,6 +28,6 @@ public class ChwHomePerformanceCheck implements PerformanceCheckProfile { System.out.println("speedupC: " + speedupC); assertEquals(2, speedupB, 0.3); - assertEquals(3, speedupC, 0.3); + assertEquals(4, speedupC, 0.3); } } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment16/MethodCallThroughputAnalysis16.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment16/MethodCallThroughputAnalysis16.java index ba7cd435..1230451f 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment16/MethodCallThroughputAnalysis16.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment16/MethodCallThroughputAnalysis16.java @@ -36,7 +36,7 @@ import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor; /** * @author Christian Wulf - * + * * @since 1.10 */ public class MethodCallThroughputAnalysis16 extends Analysis { @@ -109,9 +109,6 @@ public class MethodCallThroughputAnalysis16 extends Analysis { final HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>(); pipeline.setFirstStage(relay); - pipeline.addIntermediateStage(startTimestampFilter); - pipeline.addIntermediateStages(noopFilters); - pipeline.addIntermediateStage(stopTimestampFilter); pipeline.setLastStage(collectorSink); SpScPipe.connect(previousStage.getLastStage().getNewOutputPort(), relay.getInputPort(), SPSC_INITIAL_CAPACITY); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java index a5ac5b39..1cf3daf7 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment17/MethodCallThroughputAnalysis17.java @@ -26,7 +26,9 @@ import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe; -import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; +import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory; +import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering; +import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication; import teetime.variant.methodcallWithPorts.framework.core.pipe.UnorderedGrowablePipe; import teetime.variant.methodcallWithPorts.framework.core.signal.TerminatingSignal; import teetime.variant.methodcallWithPorts.stage.CollectorSink; @@ -52,6 +54,7 @@ public class MethodCallThroughputAnalysis17 extends Analysis { private ConstructorClosure<TimestampObject> inputObjectCreator; private int numNoopFilters; + private final PipeFactory pipeFactory = new PipeFactory(); private final List<List<TimestampObject>> timestampObjectsList = new LinkedList<List<TimestampObject>>(); private Thread producerThread; @@ -59,7 +62,8 @@ public class MethodCallThroughputAnalysis17 extends Analysis { @Override public void init() { - final StageWithPort producerPipeline = this.buildProducerPipeline(this.numInputObjects, this.inputObjectCreator); + HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> producerPipeline = this.buildProducerPipeline(this.numInputObjects, + this.inputObjectCreator); this.producerThread = new Thread(new RunnableStage(producerPipeline)); int numWorkerThreads = Math.min(NUM_WORKER_THREADS, 1); // only for testing purpose @@ -69,8 +73,8 @@ public class MethodCallThroughputAnalysis17 extends Analysis { List<TimestampObject> resultList = new ArrayList<TimestampObject>(this.numInputObjects); this.timestampObjectsList.add(resultList); - Runnable workerRunnable = this.buildPipeline(null, resultList); - this.workerThreads[i] = new Thread(workerRunnable); + HeadPipeline<?, ?> pipeline = this.buildPipeline(null, resultList); + this.workerThreads[i] = new Thread(new RunnableStage(pipeline)); } // this.producerThread = new Thread(new Runnable() { @@ -106,7 +110,7 @@ public class MethodCallThroughputAnalysis17 extends Analysis { super.init(); } - private StageWithPort buildProducerPipeline(final int numInputObjects, + private HeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> buildProducerPipeline(final int numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) { final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(numInputObjects, inputObjectCreator); Distributor<TimestampObject> distributor = new Distributor<TimestampObject>(); @@ -135,9 +139,10 @@ public class MethodCallThroughputAnalysis17 extends Analysis { * @param numNoopFilters * @since 1.10 */ - private Runnable buildPipeline(final StageWithPort previousStage, final List<TimestampObject> timestampObjects) { - Relay<TimestampObject> relay = new Relay<TimestampObject>(); + private HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> buildPipeline(final StageWithPort previousStage, + final List<TimestampObject> timestampObjects) { // create stages + Relay<TimestampObject> relay = new Relay<TimestampObject>(); final StartTimestampFilter startTimestampFilter = new StartTimestampFilter(); @SuppressWarnings("unchecked") final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters]; @@ -149,12 +154,10 @@ public class MethodCallThroughputAnalysis17 extends Analysis { final HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>(); pipeline.setFirstStage(relay); - pipeline.addIntermediateStage(startTimestampFilter); - pipeline.addIntermediateStages(noopFilters); - pipeline.addIntermediateStage(stopTimestampFilter); pipeline.setLastStage(collectorSink); - relay.getInputPort().setPipe(new SpScPipe<TimestampObject>(SPSC_INITIAL_CAPACITY)); + IPipe<TimestampObject> pipe = this.pipeFactory.create(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false, SPSC_INITIAL_CAPACITY); + relay.getInputPort().setPipe(pipe); IPipe<TimestampObject> startPipe = relay.getInputPort().getPipe(); for (int i = 0; i < this.numInputObjects; i++) { startPipe.add(this.inputObjectCreator.create()); @@ -171,7 +174,7 @@ public class MethodCallThroughputAnalysis17 extends Analysis { UnorderedGrowablePipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort()); UnorderedGrowablePipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort()); - return new RunnableStage(pipeline); + return pipeline; } @Override diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment18/MethodCallThroughputAnalysis18.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment18/MethodCallThroughputAnalysis18.java index 5307b5bf..6a264973 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment18/MethodCallThroughputAnalysis18.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment18/MethodCallThroughputAnalysis18.java @@ -24,7 +24,6 @@ import teetime.variant.explicitScheduling.examples.throughput.TimestampObject; import teetime.variant.explicitScheduling.framework.core.Analysis; import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; -import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.UnorderedGrowablePipe; import teetime.variant.methodcallWithPorts.stage.CollectorSink; @@ -37,7 +36,7 @@ import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor; /** * @author Christian Wulf - * + * * @since 1.10 */ public class MethodCallThroughputAnalysis18 extends Analysis { @@ -71,7 +70,7 @@ public class MethodCallThroughputAnalysis18 extends Analysis { List<TimestampObject> resultList = new ArrayList<TimestampObject>(this.numInputObjects); this.timestampObjectsList.add(resultList); - StageWithPort pipeline = this.buildPipeline(producerPipeline, resultList); + HeadPipeline<?, ?> pipeline = this.buildPipeline(producerPipeline, resultList); this.workerThreads[i] = new Thread(new RunnableStage(pipeline)); } } @@ -110,9 +109,6 @@ public class MethodCallThroughputAnalysis18 extends Analysis { final HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>(); pipeline.setFirstStage(relay); - pipeline.addIntermediateStage(startTimestampFilter); - pipeline.addIntermediateStages(noopFilters); - pipeline.addIntermediateStage(stopTimestampFilter); pipeline.setLastStage(collectorSink); SpScPipe.connect(previousStage.getLastStage().getNewOutputPort(), relay.getInputPort(), SPSC_INITIAL_CAPACITY); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment19/MethodCallThroughputAnalysis19.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment19/MethodCallThroughputAnalysis19.java index 9d16a88b..b7e01fe0 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment19/MethodCallThroughputAnalysis19.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/experiment19/MethodCallThroughputAnalysis19.java @@ -36,7 +36,7 @@ import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor; /** * @author Christian Wulf - * + * * @since 1.10 */ public class MethodCallThroughputAnalysis19 extends Analysis { @@ -70,8 +70,8 @@ public class MethodCallThroughputAnalysis19 extends Analysis { List<TimestampObject> resultList = new ArrayList<TimestampObject>(this.numInputObjects); this.timestampObjectsList.add(resultList); - Runnable workerRunnable = this.buildPipeline(producerPipeline.getLastStage(), resultList); - this.workerThreads[i] = new Thread(workerRunnable); + HeadPipeline<?, ?> pipeline = this.buildPipeline(producerPipeline.getLastStage(), resultList); + this.workerThreads[i] = new Thread(new RunnableStage(pipeline)); } } @@ -90,7 +90,7 @@ public class MethodCallThroughputAnalysis19 extends Analysis { return pipeline; } - private Runnable buildPipeline(final Distributor<TimestampObject> previousStage, final List<TimestampObject> timestampObjects) { + private HeadPipeline<?, ?> buildPipeline(final Distributor<TimestampObject> previousStage, final List<TimestampObject> timestampObjects) { Relay<TimestampObject> relay = new Relay<TimestampObject>(); @SuppressWarnings("unchecked") final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters]; @@ -104,9 +104,6 @@ public class MethodCallThroughputAnalysis19 extends Analysis { final HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new HeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>(); pipeline.setFirstStage(relay); - pipeline.addIntermediateStage(startTimestampFilter); - pipeline.addIntermediateStages(noopFilters); - pipeline.addIntermediateStage(stopTimestampFilter); pipeline.setLastStage(collectorSink); SpScPipe.connect(previousStage.getNewOutputPort(), relay.getInputPort(), SPSC_INITIAL_CAPACITY); @@ -120,7 +117,7 @@ public class MethodCallThroughputAnalysis19 extends Analysis { OrderedGrowableArrayPipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort()); OrderedGrowableArrayPipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort()); - return new RunnableStage(pipeline); + return pipeline; } @Override diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderConfiguration.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderConfiguration.java index 6b530cc0..57570a8a 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderConfiguration.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/recordReader/RecordReaderConfiguration.java @@ -21,13 +21,12 @@ import java.util.List; import teetime.variant.methodcallWithPorts.framework.core.Configuration; import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; -import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory; import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.PipeOrdering; import teetime.variant.methodcallWithPorts.framework.core.pipe.PipeFactory.ThreadCommunication; -import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import teetime.variant.methodcallWithPorts.stage.CollectorSink; +import teetime.variant.methodcallWithPorts.stage.InitialElementProducer; import teetime.variant.methodcallWithPorts.stage.kieker.Dir2RecordsFilter; import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository; @@ -48,25 +47,27 @@ public class RecordReaderConfiguration extends Configuration { } public void buildConfiguration() { - StageWithPort producerPipeline = this.buildProducerPipeline(); + HeadPipeline<?, ?> producerPipeline = this.buildProducerPipeline(); this.getFiniteProducerStages().add(producerPipeline); } - private StageWithPort buildProducerPipeline() { + private HeadPipeline<?, ?> buildProducerPipeline() { ClassNameRegistryRepository classNameRegistryRepository = new ClassNameRegistryRepository(); + File logDir = new File("src/test/data/bookstore-logs"); // create stages + InitialElementProducer<File> initialElementProducer = new InitialElementProducer<File>(logDir); Dir2RecordsFilter dir2RecordsFilter = new Dir2RecordsFilter(classNameRegistryRepository); CollectorSink<IMonitoringRecord> collector = new CollectorSink<IMonitoringRecord>(this.elementCollection); - final HeadPipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>> pipeline = new HeadPipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>>(); - pipeline.setFirstStage(dir2RecordsFilter); + final HeadPipeline<InitialElementProducer<File>, CollectorSink<IMonitoringRecord>> pipeline = new HeadPipeline<InitialElementProducer<File>, CollectorSink<IMonitoringRecord>>(); + pipeline.setFirstStage(initialElementProducer); pipeline.setLastStage(collector); - IPipe<IMonitoringRecord> pipe = this.pipeFactory.create(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false, 1); - pipe.connectPorts(dir2RecordsFilter.getOutputPort(), collector.getInputPort()); + IPipe<File> pipe = this.pipeFactory.create(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false, 1); + pipe.connectPorts(initialElementProducer.getOutputPort(), dir2RecordsFilter.getInputPort()); - dir2RecordsFilter.getInputPort().setPipe(new SpScPipe<File>(1)); - dir2RecordsFilter.getInputPort().getPipe().add(new File("src/test/data/bookstore-logs")); + IPipe<IMonitoringRecord> pipe1 = this.pipeFactory.create(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false, 1); + pipe1.connectPorts(dir2RecordsFilter.getOutputPort(), collector.getInputPort()); return pipeline; } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/TcpTraceLoggingExtAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/TcpTraceLoggingExtAnalysis.java index 2d459fd6..580cc59f 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/TcpTraceLoggingExtAnalysis.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReading/TcpTraceLoggingExtAnalysis.java @@ -5,7 +5,6 @@ import java.util.List; import teetime.variant.explicitScheduling.framework.core.Analysis; import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; -import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import teetime.variant.methodcallWithPorts.stage.Clock; @@ -40,7 +39,7 @@ public class TcpTraceLoggingExtAnalysis extends Analysis { return pipeline; } - private StageWithPort buildTcpPipeline(final Distributor<Long> previousClockStage) { + private HeadPipeline<?, ?> buildTcpPipeline(final Distributor<Long> previousClockStage) { TCPReader tcpReader = new TCPReader(); this.recordCounter = new Counter<IMonitoringRecord>(); this.recordThroughputStage = new ElementThroughputMeasuringStage<IMonitoringRecord>(); @@ -56,8 +55,6 @@ public class TcpTraceLoggingExtAnalysis extends Analysis { // create and configure pipeline HeadPipeline<TCPReader, Sink<IMonitoringRecord>> pipeline = new HeadPipeline<TCPReader, Sink<IMonitoringRecord>>(); pipeline.setFirstStage(tcpReader); - pipeline.addIntermediateStage(this.recordCounter); - // pipeline.addIntermediateStage(this.recordThroughputStage); pipeline.setLastStage(endStage); return pipeline; } @@ -69,7 +66,7 @@ public class TcpTraceLoggingExtAnalysis extends Analysis { HeadPipeline<Clock, Distributor<Long>> clockPipeline = this.buildClockPipeline(1000); this.clockThread = new Thread(new RunnableStage(clockPipeline)); - StageWithPort tcpPipeline = this.buildTcpPipeline(clockPipeline.getLastStage()); + HeadPipeline<?, ?> tcpPipeline = this.buildTcpPipeline(clockPipeline.getLastStage()); this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java index 9ff72902..d51eec21 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java @@ -9,7 +9,6 @@ import teetime.util.concurrent.hashmap.TraceBuffer; import teetime.variant.explicitScheduling.framework.core.Analysis; import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; -import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import teetime.variant.methodcallWithPorts.stage.Cache; @@ -17,6 +16,7 @@ import teetime.variant.methodcallWithPorts.stage.Clock; import teetime.variant.methodcallWithPorts.stage.CollectorSink; import teetime.variant.methodcallWithPorts.stage.Counter; import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage; +import teetime.variant.methodcallWithPorts.stage.InitialElementProducer; import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter; import teetime.variant.methodcallWithPorts.stage.basic.merger.Merger; import teetime.variant.methodcallWithPorts.stage.kieker.Dir2RecordsFilter; @@ -52,7 +52,7 @@ public class TraceReconstructionAnalysis extends Analysis { Clock clockStage = this.buildClockPipeline(); this.clockThread = new Thread(new RunnableStage(clockStage)); - StageWithPort pipeline = this.buildPipeline(clockStage); + HeadPipeline<?, ?> pipeline = this.buildPipeline(clockStage); this.workerThread = new Thread(new RunnableStage(pipeline)); } @@ -63,10 +63,11 @@ public class TraceReconstructionAnalysis extends Analysis { return clock; } - private StageWithPort buildPipeline(final Clock clockStage) { + private HeadPipeline<?, ?> buildPipeline(final Clock clockStage) { this.classNameRegistryRepository = new ClassNameRegistryRepository(); // create stages + InitialElementProducer<File> initialElementProducer = new InitialElementProducer<File>(this.inputDir); final Dir2RecordsFilter dir2RecordsFilter = new Dir2RecordsFilter(this.classNameRegistryRepository); this.recordCounter = new Counter<IMonitoringRecord>(); final Cache<IMonitoringRecord> cache = new Cache<IMonitoringRecord>(); @@ -85,7 +86,7 @@ public class TraceReconstructionAnalysis extends Analysis { stringBufferFilter.getDataTypeHandlers().add(new StringHandler()); // connect stages - dir2RecordsFilter.getInputPort().setPipe(new SingleElementPipe<File>()); + SingleElementPipe.connect(initialElementProducer.getOutputPort(), dir2RecordsFilter.getInputPort()); SingleElementPipe.connect(dir2RecordsFilter.getOutputPort(), this.recordCounter.getInputPort()); SingleElementPipe.connect(this.recordCounter.getOutputPort(), cache.getInputPort()); SingleElementPipe.connect(cache.getOutputPort(), stringBufferFilter.getInputPort()); @@ -100,20 +101,9 @@ public class TraceReconstructionAnalysis extends Analysis { SpScPipe.connect(clockStage.getOutputPort(), this.throughputFilter.getTriggerInputPort(), 1); - // fill input ports - dir2RecordsFilter.getInputPort().getPipe().add(this.inputDir); - // create and configure pipeline - HeadPipeline<Dir2RecordsFilter, CollectorSink<TraceEventRecords>> pipeline = new HeadPipeline<Dir2RecordsFilter, CollectorSink<TraceEventRecords>>(); - pipeline.setFirstStage(dir2RecordsFilter); - pipeline.addIntermediateStage(this.recordCounter); - pipeline.addIntermediateStage(cache); - pipeline.addIntermediateStage(stringBufferFilter); - pipeline.addIntermediateStage(instanceOfFilter); - pipeline.addIntermediateStage(this.throughputFilter); - pipeline.addIntermediateStage(traceReconstructionFilter); - pipeline.addIntermediateStage(merger); - pipeline.addIntermediateStage(this.traceCounter); + HeadPipeline<InitialElementProducer<File>, CollectorSink<TraceEventRecords>> pipeline = new HeadPipeline<InitialElementProducer<File>, CollectorSink<TraceEventRecords>>(); + pipeline.setFirstStage(initialElementProducer); pipeline.setLastStage(collector); return pipeline; } diff --git a/src/test/resources/logback-test.groovy b/src/test/resources/logback-test.groovy index a9a3608d..23fb52cc 100644 --- a/src/test/resources/logback-test.groovy +++ b/src/test/resources/logback-test.groovy @@ -20,3 +20,5 @@ appender("CONSOLE", ConsoleAppender) { } root ERROR, ["CONSOLE"] + +logger "util", INFO \ No newline at end of file -- GitLab