diff --git a/.settings/edu.umd.cs.findbugs.core.prefs b/.settings/edu.umd.cs.findbugs.core.prefs index 32039c99ca8ea732a40a45c99121341402f02a45..3859298be5bcb28e45aa0ee74a54927731d8e2fd 100644 --- a/.settings/edu.umd.cs.findbugs.core.prefs +++ b/.settings/edu.umd.cs.findbugs.core.prefs @@ -1,5 +1,5 @@ #FindBugs User Preferences -#Wed Dec 17 07:37:54 CET 2014 +#Fri Dec 19 13:43:52 CET 2014 detector_threshold=3 effort=max excludefilter0=.fbExcludeFilterFile|true diff --git a/src/main/java/teetime/framework/AbstractConsumerStage.java b/src/main/java/teetime/framework/AbstractConsumerStage.java index 66333405a17928b2faddcc948affe6e146c4e17a..1cc3bdb0a9fbd55ecb791d603f2ca87ea20abb43 100644 --- a/src/main/java/teetime/framework/AbstractConsumerStage.java +++ b/src/main/java/teetime/framework/AbstractConsumerStage.java @@ -1,20 +1,35 @@ package teetime.framework; +import teetime.framework.idle.IdleStrategy; +import teetime.framework.idle.YieldStrategy; + public abstract class AbstractConsumerStage<I> extends AbstractStage { protected final InputPort<I> inputPort = this.createInputPort(); + private IdleStrategy idleStrategy = new YieldStrategy(); // FIXME remove this word-around + public final InputPort<I> getInputPort() { return this.inputPort; } @Override - public void executeWithPorts() { + public final void executeWithPorts() { final I element = this.getInputPort().receive(); + if (null == element) { + returnNoElement(); + } this.execute(element); } protected abstract void execute(I element); + public IdleStrategy getIdleStrategy() { + return idleStrategy; + } + + public void setIdleStrategy(final IdleStrategy idleStrategy) { + this.idleStrategy = idleStrategy; + } } diff --git a/src/main/java/teetime/framework/AbstractInterThreadPipe.java b/src/main/java/teetime/framework/AbstractInterThreadPipe.java index 980210899354588747cd2339406db559a0be709b..5471576ba9def3c1a59caa923df4237d2495559b 100644 --- a/src/main/java/teetime/framework/AbstractInterThreadPipe.java +++ b/src/main/java/teetime/framework/AbstractInterThreadPipe.java @@ -1,5 +1,6 @@ package teetime.framework; +import java.lang.Thread.State; import java.util.Queue; import org.jctools.queues.QueueFactory; @@ -20,6 +21,15 @@ public abstract class AbstractInterThreadPipe extends AbstractPipe { @Override public void sendSignal(final ISignal signal) { this.signalQueue.offer(signal); + + Thread owningThread = cachedTargetStage.getOwningThread(); + if (null != owningThread && isThreadWaiting(owningThread)) { // FIXME remove the null check for performance + owningThread.interrupt(); + } + } + + protected boolean isThreadWaiting(final Thread thread) { + return thread.getState() == State.WAITING || thread.getState() == State.TIMED_WAITING; } /** diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java index bc6a244aa1f329b878b5dfe68e4dc40fb7690804..6dc10ce0509efdc690dd58a16d8fa064e78b64e4 100644 --- a/src/main/java/teetime/framework/AbstractStage.java +++ b/src/main/java/teetime/framework/AbstractStage.java @@ -16,27 +16,22 @@ public abstract class AbstractStage extends Stage { private final List<OutputPort<?>> outputPortList = new ArrayList<OutputPort<?>>(); /** A cached instance of <code>inputPortList</code> to avoid creating an iterator each time iterating it */ - protected InputPort<?>[] cachedInputPorts; + protected InputPort<?>[] cachedInputPorts = new InputPort[0]; /** A cached instance of <code>outputPortList</code> to avoid creating an iterator each time iterating it */ protected OutputPort<?>[] cachedOutputPorts; private final Set<ISignal> triggeredSignals = new HashSet<ISignal>(); + // BETTER aggregate both states in an enum private boolean shouldTerminate; - - private void connectUnconnectedOutputPorts() { - for (OutputPort<?> outputPort : this.cachedOutputPorts) { - if (null == outputPort.getPipe()) { // if port is unconnected - this.logger.warn("Unconnected output port: " + outputPort + ". Connecting with a dummy output port."); - outputPort.setPipe(new DummyPipe()); - } - } - } + private boolean started; /** * @return the stage's input ports */ - protected InputPort<?>[] getInputPorts() { - return this.cachedInputPorts; + @Override + public InputPort<?>[] getInputPorts() { + // return this.cachedInputPorts; + return inputPortList.toArray(new InputPort<?>[0]); // FIXME remove work-around } /** @@ -49,6 +44,7 @@ public abstract class AbstractStage extends Stage { /** * May not be invoked outside of IPipe implementations */ + @SuppressWarnings("PMD.DataflowAnomalyAnalysis") @Override public void onSignal(final ISignal signal, final InputPort<?> inputPort) { if (!this.signalAlreadyReceived(signal, inputPort)) { @@ -60,6 +56,11 @@ public abstract class AbstractStage extends Stage { } } + @Override + public boolean isStarted() { + return started; + } + /** * @param signal * arriving signal @@ -87,6 +88,18 @@ public abstract class AbstractStage extends Stage { this.cachedOutputPorts = this.outputPortList.toArray(new OutputPort<?>[0]); this.connectUnconnectedOutputPorts(); + started = true; + logger.debug("Started."); + } + + @SuppressWarnings("PMD.DataflowAnomalyAnalysis") + private void connectUnconnectedOutputPorts() { + for (OutputPort<?> outputPort : this.cachedOutputPorts) { + if (null == outputPort.getPipe()) { // if port is unconnected + this.logger.warn("Unconnected output port: " + outputPort + ". Connecting with a dummy output port."); + outputPort.setPipe(new DummyPipe()); + } + } } public void onTerminating() throws Exception { @@ -117,9 +130,11 @@ public abstract class AbstractStage extends Stage { return outputPort; } + @SuppressWarnings("PMD.DataflowAnomalyAnalysis") @Override public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) { - for (OutputPort<?> outputPort : this.getOutputPorts()) { + // for (OutputPort<?> outputPort : this.getOutputPorts()) { + for (OutputPort<?> outputPort : this.outputPortList) { final IPipe pipe = outputPort.getPipe(); if (null != pipe) { // if output port is connected with another one final Class<?> sourcePortType = outputPort.getType(); diff --git a/src/main/java/teetime/framework/Analysis.java b/src/main/java/teetime/framework/Analysis.java index 778cbae553b678a0eebf3688fb17cdea18685e4b..75485f6a0b678fa1018697dc8598de4d91dbe643 100644 --- a/src/main/java/teetime/framework/Analysis.java +++ b/src/main/java/teetime/framework/Analysis.java @@ -9,6 +9,8 @@ import java.util.concurrent.ConcurrentLinkedQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import teetime.framework.signal.ValidatingSignal; +import teetime.framework.validation.AnalysisNotValidException; import teetime.util.Pair; /** @@ -30,16 +32,35 @@ public class Analysis implements UncaughtExceptionHandler { private final Collection<Pair<Thread, Throwable>> exceptions = new ConcurrentLinkedQueue<Pair<Thread, Throwable>>(); + /** + * Creates a new {@link Analysis} that skips validating the port connections. + * + * @param configuration + * to be used for the analysis + */ public Analysis(final AnalysisConfiguration configuration) { + this(configuration, false); + } + + public Analysis(final AnalysisConfiguration configuration, final boolean validationEnabled) { this.configuration = configuration; - validateStages(); + if (validationEnabled) { + validateStages(); + } } private void validateStages() { // BETTER validate concurrently final List<Stage> threadableStageJobs = this.configuration.getThreadableStageJobs(); for (Stage stage : threadableStageJobs) { - // portConnectionValidator.validate(stage); + // // portConnectionValidator.validate(stage); + // } + + final ValidatingSignal validatingSignal = new ValidatingSignal(); + stage.onSignal(validatingSignal, null); + if (validatingSignal.getInvalidPortConnections().size() > 0) { + throw new AnalysisNotValidException(validatingSignal.getInvalidPortConnections()); + } } } @@ -49,17 +70,31 @@ public class Analysis implements UncaughtExceptionHandler { public void init() { final List<Stage> threadableStageJobs = this.configuration.getThreadableStageJobs(); for (Stage stage : threadableStageJobs) { - final Thread thread = new Thread(new RunnableStage(stage)); switch (stage.getTerminationStrategy()) { - case BY_SIGNAL: + case BY_SIGNAL: { + RunnableConsumerStage runnable; + if (stage instanceof AbstractConsumerStage<?>) { + runnable = new RunnableConsumerStage(stage, ((AbstractConsumerStage<?>) stage).getIdleStrategy()); // FIXME remove this word-around + } else { + runnable = new RunnableConsumerStage(stage); + } + final Thread thread = new Thread(runnable); + stage.setOwningThread(thread); this.consumerThreads.add(thread); break; - case BY_SELF_DECISION: + } + case BY_SELF_DECISION: { + final Thread thread = new Thread(new RunnableProducerStage(stage)); + stage.setOwningThread(thread); this.finiteProducerThreads.add(thread); break; - case BY_INTERRUPT: + } + case BY_INTERRUPT: { + final Thread thread = new Thread(new RunnableProducerStage(stage)); + stage.setOwningThread(thread); this.infiniteProducerThreads.add(thread); break; + } default: break; } diff --git a/src/main/java/teetime/framework/AnalysisConfiguration.java b/src/main/java/teetime/framework/AnalysisConfiguration.java index 5c699b53089e07350e4a63a2dcfc7c34507c6b25..89876f0f9c539a032e91d13a26a348ce28a461aa 100644 --- a/src/main/java/teetime/framework/AnalysisConfiguration.java +++ b/src/main/java/teetime/framework/AnalysisConfiguration.java @@ -14,9 +14,7 @@ public class AnalysisConfiguration { protected static final PipeFactoryRegistry PIPE_FACTORY_REGISTRY = PipeFactoryRegistry.INSTANCE; private final List<Stage> threadableStageJobs = new LinkedList<Stage>(); - public AnalysisConfiguration() {} - - List<Stage> getThreadableStageJobs() { + List<Stage> getThreadableStageJobs() { return this.threadableStageJobs; } diff --git a/src/main/java/teetime/framework/NotEnoughInputException.java b/src/main/java/teetime/framework/NotEnoughInputException.java new file mode 100644 index 0000000000000000000000000000000000000000..dc241b8212c8fb16ffc05532f3672cb843e482b0 --- /dev/null +++ b/src/main/java/teetime/framework/NotEnoughInputException.java @@ -0,0 +1,13 @@ +package teetime.framework; + +public final class NotEnoughInputException extends RuntimeException { + + private static final long serialVersionUID = -2517233596919204396L; + + @SuppressWarnings("PMD.AvoidSynchronizedAtMethodLevel") + @Override + public synchronized Throwable fillInStackTrace() { + return this; + } + +} diff --git a/src/main/java/teetime/framework/RunnableConsumerStage.java b/src/main/java/teetime/framework/RunnableConsumerStage.java new file mode 100644 index 0000000000000000000000000000000000000000..655e0f82f95214cdb6d4c0aff27309e3d08fd687 --- /dev/null +++ b/src/main/java/teetime/framework/RunnableConsumerStage.java @@ -0,0 +1,78 @@ +package teetime.framework; + +import java.util.Arrays; + +import teetime.framework.idle.IdleStrategy; +import teetime.framework.idle.YieldStrategy; +import teetime.framework.pipe.IPipe; +import teetime.framework.signal.ISignal; + +final class RunnableConsumerStage extends RunnableStage { + + private final IdleStrategy idleStrategy; + + public RunnableConsumerStage(final Stage stage) { + this(stage, new YieldStrategy()); + } + + public RunnableConsumerStage(final Stage stage, final IdleStrategy idleStrategy) { + super(stage); + this.idleStrategy = idleStrategy; + } + + @Override + protected void beforeStageExecution() { + logger.trace("ENTRY beforeStageExecution"); + + do { + checkforSignals(); + Thread.yield(); + } while (!stage.isStarted()); + + logger.trace("EXIT beforeStageExecution"); + } + + @Override + protected void executeStage() { + try { + this.stage.executeWithPorts(); + } catch (NotEnoughInputException e) { + checkforSignals(); // check for termination + executeIdleStrategy(); + } + } + + private void executeIdleStrategy() { + if (stage.shouldBeTerminated()) { + return; + } + try { + idleStrategy.execute(); + } catch (InterruptedException e) { + // checkforSignals(); // check for termination + } + } + + @SuppressWarnings("PMD.DataflowAnomalyAnalysis") + private void checkforSignals() { + // FIXME should getInputPorts() really be defined in Stage? + InputPort<?>[] inputPorts = stage.getInputPorts(); + logger.debug("Checking signals for: " + Arrays.toString(inputPorts)); + for (InputPort<?> inputPort : inputPorts) { + IPipe pipe = inputPort.getPipe(); + if (pipe instanceof AbstractInterThreadPipe) { + AbstractInterThreadPipe intraThreadPipe = (AbstractInterThreadPipe) pipe; + ISignal signal = intraThreadPipe.getSignal(); + if (null != signal) { + stage.onSignal(signal, inputPort); + } + } + } + } + + @Override + protected void afterStageExecution() { + // do nothing + } + +} diff --git a/src/main/java/teetime/framework/RunnableProducerStage.java b/src/main/java/teetime/framework/RunnableProducerStage.java new file mode 100644 index 0000000000000000000000000000000000000000..a2941b066f0fd794781a07e99d5f6f6820366bc9 --- /dev/null +++ b/src/main/java/teetime/framework/RunnableProducerStage.java @@ -0,0 +1,29 @@ +package teetime.framework; + +import teetime.framework.signal.StartingSignal; +import teetime.framework.signal.TerminatingSignal; + +public final class RunnableProducerStage extends RunnableStage { + + public RunnableProducerStage(final Stage stage) { + super(stage); + } + + @Override + protected void beforeStageExecution() { + final StartingSignal startingSignal = new StartingSignal(); + this.stage.onSignal(startingSignal, null); + } + + @Override + protected void executeStage() { + this.stage.executeWithPorts(); + } + + @Override + protected void afterStageExecution() { + final TerminatingSignal terminatingSignal = new TerminatingSignal(); + this.stage.onSignal(terminatingSignal, null); + } + +} diff --git a/src/main/java/teetime/framework/RunnableStage.java b/src/main/java/teetime/framework/RunnableStage.java index 3c1ccbe388830931426533e8f76aeb0106ccbee6..b79b8d2da10441038fec0618aa4c60977e111f8d 100644 --- a/src/main/java/teetime/framework/RunnableStage.java +++ b/src/main/java/teetime/framework/RunnableStage.java @@ -3,17 +3,11 @@ package teetime.framework; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import teetime.framework.signal.StartingSignal; -import teetime.framework.signal.TerminatingSignal; -import teetime.framework.signal.ValidatingSignal; -import teetime.framework.validation.AnalysisNotValidException; +abstract class RunnableStage implements Runnable { -public class RunnableStage implements Runnable { - - private final Stage stage; + protected final Stage stage; @SuppressWarnings("PMD.LoggerIsNotStaticFinal") - private final Logger logger; - private boolean validationEnabled; + protected final Logger logger; public RunnableStage(final Stage stage) { this.stage = stage; @@ -21,44 +15,32 @@ public class RunnableStage implements Runnable { } @Override - public void run() { + public final void run() { this.logger.debug("Executing runnable stage..."); - if (this.validationEnabled) { - final ValidatingSignal validatingSignal = new ValidatingSignal(); - this.stage.onSignal(validatingSignal, null); - if (validatingSignal.getInvalidPortConnections().size() > 0) { - throw new AnalysisNotValidException(validatingSignal.getInvalidPortConnections()); - } - } - try { - final StartingSignal startingSignal = new StartingSignal(); - this.stage.onSignal(startingSignal, null); + beforeStageExecution(); do { - this.stage.executeWithPorts(); + executeStage(); } while (!this.stage.shouldBeTerminated()); - final TerminatingSignal terminatingSignal = new TerminatingSignal(); - this.stage.onSignal(terminatingSignal, null); + afterStageExecution(); } catch (Error e) { this.logger.error("Terminating thread due to the following exception: ", e); throw e; - } // catch (RuntimeException e) { - // this.logger.error("Terminating thread due to the following exception: ", e); - // throw e; - // } + } catch (RuntimeException e) { + this.logger.error("Terminating thread due to the following exception: ", e); + throw e; + } this.logger.debug("Finished runnable stage. (" + this.stage.getId() + ")"); } - public boolean isValidationEnabled() { - return this.validationEnabled; - } + protected abstract void beforeStageExecution(); - public void setValidationEnabled(final boolean validationEnabled) { - this.validationEnabled = validationEnabled; - } + protected abstract void executeStage(); + + protected abstract void afterStageExecution(); } diff --git a/src/main/java/teetime/framework/Stage.java b/src/main/java/teetime/framework/Stage.java index 561f76238327c2eb162014c3d3446f3533ec0f56..6c13ebdda92f37f4d70076ed2fa6e8f981cf0fd0 100644 --- a/src/main/java/teetime/framework/Stage.java +++ b/src/main/java/teetime/framework/Stage.java @@ -18,6 +18,7 @@ import teetime.framework.validation.InvalidPortConnection; public abstract class Stage { private static final ConcurrentMap<String, Integer> INSTANCES_COUNTER = new ConcurrentHashMap<String, Integer>(); + private static final NotEnoughInputException NOT_ENOUGH_INPUT_EXCEPTION = new NotEnoughInputException(); private final String id; /** @@ -26,9 +27,11 @@ public abstract class Stage { @SuppressWarnings("PMD.LoggerIsNotStaticFinal") protected final Logger logger; + private Thread owningThread; + protected Stage() { this.id = this.createId(); - this.logger = LoggerFactory.getLogger(this.id); + this.logger = LoggerFactory.getLogger(this.getClass().getCanonicalName() + ":" + id); } /** @@ -65,6 +68,10 @@ public abstract class Stage { // // public abstract void setParentStage(Stage parentStage, int index); + protected final void returnNoElement() { + throw NOT_ENOUGH_INPUT_EXCEPTION; + } + /** * This should check, if the OutputPorts are connected correctly. This is needed to avoid NullPointerExceptions and other errors. * @@ -82,4 +89,16 @@ public abstract class Stage { protected abstract void terminate(); protected abstract boolean shouldBeTerminated(); + + public Thread getOwningThread() { + return owningThread; + } + + public void setOwningThread(final Thread owningThread) { + this.owningThread = owningThread; + } + + protected abstract InputPort<?>[] getInputPorts(); + + protected abstract boolean isStarted(); } diff --git a/src/main/java/teetime/framework/idle/IdleStrategy.java b/src/main/java/teetime/framework/idle/IdleStrategy.java new file mode 100644 index 0000000000000000000000000000000000000000..b9dc35247a1c7fe86c3fa1d693e6fbfec3491f20 --- /dev/null +++ b/src/main/java/teetime/framework/idle/IdleStrategy.java @@ -0,0 +1,6 @@ +package teetime.framework.idle; + +public interface IdleStrategy { + + void execute() throws InterruptedException; +} diff --git a/src/main/java/teetime/framework/idle/SleepStrategy.java b/src/main/java/teetime/framework/idle/SleepStrategy.java new file mode 100644 index 0000000000000000000000000000000000000000..89796a01e29b6f6b3671f4e6bdafbcf09d80d233 --- /dev/null +++ b/src/main/java/teetime/framework/idle/SleepStrategy.java @@ -0,0 +1,17 @@ +package teetime.framework.idle; + +public final class SleepStrategy implements IdleStrategy { + + private final long timeoutInMs; + + public SleepStrategy(final long timeoutInMs) { + super(); + this.timeoutInMs = timeoutInMs; + } + + @Override + public void execute() throws InterruptedException { + Thread.sleep(timeoutInMs); + } + +} diff --git a/src/main/java/teetime/framework/idle/WaitStrategy.java b/src/main/java/teetime/framework/idle/WaitStrategy.java new file mode 100644 index 0000000000000000000000000000000000000000..6a4c5e7f115d98ae212b87326b2e160fcc45937c --- /dev/null +++ b/src/main/java/teetime/framework/idle/WaitStrategy.java @@ -0,0 +1,21 @@ +package teetime.framework.idle; + +import teetime.framework.Stage; + +public final class WaitStrategy implements IdleStrategy { + + private final Stage stage; + + public WaitStrategy(final Stage stage) { + super(); + this.stage = stage; + } + + @Override + public void execute() throws InterruptedException { + synchronized (stage) { + stage.wait(); + } + } + +} diff --git a/src/main/java/teetime/framework/idle/YieldStrategy.java b/src/main/java/teetime/framework/idle/YieldStrategy.java new file mode 100644 index 0000000000000000000000000000000000000000..c6b09d959f26f4093e96d0e97e4dd631ddf51219 --- /dev/null +++ b/src/main/java/teetime/framework/idle/YieldStrategy.java @@ -0,0 +1,10 @@ +package teetime.framework.idle; + +public final class YieldStrategy implements IdleStrategy { + + @Override + public void execute() throws InterruptedException { + Thread.yield(); + } + +} diff --git a/src/main/java/teetime/framework/pipe/PipeFactoryLoader.java b/src/main/java/teetime/framework/pipe/PipeFactoryLoader.java index 6fcbda81d8ff073ca27a9d2863b4ec02ad6f4688..1335c51a97bd15eeece300f8e045d8950ea52c40 100644 --- a/src/main/java/teetime/framework/pipe/PipeFactoryLoader.java +++ b/src/main/java/teetime/framework/pipe/PipeFactoryLoader.java @@ -38,7 +38,7 @@ public final class PipeFactoryLoader { pipeFactories.add(pipeFactory); } } catch (ClassNotFoundException e) { - LOGGER.warn("Could not find class: " + line, e); + LOGGER.warn("Could not find class: " + line, e); // NOMPD (PMD.GuardLogStatement) } catch (InstantiationException e) { LOGGER.warn("Could not instantiate pipe factory", e); } catch (IllegalAccessException e) { diff --git a/src/main/java/teetime/framework/pipe/SpScPipe.java b/src/main/java/teetime/framework/pipe/SpScPipe.java index 8facbf44a72c4aa96087f8fe2372d54a6c7d7a7f..c4b474640eae022cbd2da54be1e4b628b848ae2b 100644 --- a/src/main/java/teetime/framework/pipe/SpScPipe.java +++ b/src/main/java/teetime/framework/pipe/SpScPipe.java @@ -13,6 +13,8 @@ import teetime.framework.OutputPort; public final class SpScPipe extends AbstractInterThreadPipe { + // private static final Logger LOGGER = LoggerFactory.getLogger(SpScPipe.class); + private final Queue<Object> queue; // statistics private int numWaits; @@ -37,6 +39,14 @@ public final class SpScPipe extends AbstractInterThreadPipe { Thread.yield(); } + Thread owningThread = cachedTargetStage.getOwningThread(); + if (null != owningThread && isThreadWaiting(owningThread)) { // FIXME remove the null check for performance + synchronized (cachedTargetStage) { + cachedTargetStage.notify(); + // LOGGER.trace("Notified: " + cachedTargetStage); + } + } + return true; } diff --git a/src/main/java/teetime/framework/signal/StartingSignal.java b/src/main/java/teetime/framework/signal/StartingSignal.java index da156138db66ad0eb938483fd5cd1611bf76f77b..6b260fe6da68e9616011caf2db14ef26800b4878 100644 --- a/src/main/java/teetime/framework/signal/StartingSignal.java +++ b/src/main/java/teetime/framework/signal/StartingSignal.java @@ -19,7 +19,6 @@ public class StartingSignal implements ISignal { public void trigger(final AbstractStage stage) { try { stage.onStarting(); - LOGGER.info(stage + " started."); } catch (Exception e) { // NOCS (Stages can throw any arbitrary Exception) this.catchedExceptions.add(e); LOGGER.error("Exception while sending the start signal", e); diff --git a/src/main/java/teetime/framework/validation/InvalidPortConnection.java b/src/main/java/teetime/framework/validation/InvalidPortConnection.java index dbe36d53bca2c90a1d85c651e58c7ac13b3858fe..19ea3a4526242234ed87655e735cbe9dd24177f3 100644 --- a/src/main/java/teetime/framework/validation/InvalidPortConnection.java +++ b/src/main/java/teetime/framework/validation/InvalidPortConnection.java @@ -24,9 +24,9 @@ public class InvalidPortConnection { @Override public String toString() { - final String sourcePortTypeName = (this.sourcePort.getType() == null) ? null : this.sourcePort.getType().getName(); - final String targetPortTypeName = (this.targetPort.getType() == null) ? null : this.targetPort.getType().getName(); - return sourcePortTypeName + " != " + targetPortTypeName; + final String sourcePortTypeName = (this.sourcePort.getType() == null) ? "null" : this.sourcePort.getType().getName(); + final String targetPortTypeName = (this.targetPort.getType() == null) ? "null" : this.targetPort.getType().getName(); + return "Source port type does not match target port type: " + sourcePortTypeName + " != " + targetPortTypeName; } } diff --git a/src/main/java/teetime/stage/Relay.java b/src/main/java/teetime/stage/Relay.java index 96e303348613094e57ddec5cd177c22bdf4ecdde..ae98cdaf652eeae180bafdc1d3970af077b219f3 100644 --- a/src/main/java/teetime/stage/Relay.java +++ b/src/main/java/teetime/stage/Relay.java @@ -1,36 +1,37 @@ package teetime.stage; -import teetime.framework.AbstractInterThreadPipe; -import teetime.framework.AbstractProducerStage; -import teetime.framework.InputPort; -import teetime.framework.signal.TerminatingSignal; +import teetime.framework.AbstractConsumerStage; +import teetime.framework.OutputPort; -public final class Relay<T> extends AbstractProducerStage<T> { +public final class Relay<T> extends AbstractConsumerStage<T> { - private final InputPort<T> inputPort = this.createInputPort(); + // private final InputPort<T> inputPort = this.createInputPort(); + private final OutputPort<T> outputPort = this.createOutputPort(); - private AbstractInterThreadPipe cachedCastedInputPipe; + // private AbstractInterThreadPipe cachedCastedInputPipe; @Override - public void execute() { - T element = this.inputPort.receive(); + protected void execute(final T element) { if (null == element) { - if (this.cachedCastedInputPipe.getSignal() instanceof TerminatingSignal) { - this.terminate(); - } - Thread.yield(); - return; + // if (this.cachedCastedInputPipe.getSignal() instanceof TerminatingSignal) { + // this.terminate(); + // } + // Thread.yield(); + // return; + logger.trace("relay: returnNoElement"); + returnNoElement(); } outputPort.send(element); } - @Override - public void onStarting() throws Exception { - super.onStarting(); - this.cachedCastedInputPipe = (AbstractInterThreadPipe) this.inputPort.getPipe(); - } + // @Override + // public void onStarting() throws Exception { + // super.onStarting(); + // this.cachedCastedInputPipe = (AbstractInterThreadPipe) this.inputPort.getPipe(); + // } - public InputPort<T> getInputPort() { - return this.inputPort; + public OutputPort<T> getOutputPort() { + return outputPort; } + } diff --git a/src/main/java/teetime/stage/basic/Delay.java b/src/main/java/teetime/stage/basic/Delay.java index 29f152d0fd8589b1d5e75da901f3269baef878c1..aab87bd3c9accb1af824689b277de8371f46b95f 100644 --- a/src/main/java/teetime/stage/basic/Delay.java +++ b/src/main/java/teetime/stage/basic/Delay.java @@ -24,20 +24,34 @@ public final class Delay<T> extends AbstractStage { Long timestampTrigger = this.timestampTriggerInputPort.receive(); if (null == timestampTrigger) { - return; + return; // BETTER use returnNoElement(). so far, RunnableProducerStages cannot handle the NOT_ENOUGH__INPUT_EXCEPTION } + sendAllBufferedEllements(); + } + + private void sendAllBufferedEllements() { while (!bufferedElements.isEmpty()) { - element = bufferedElements.remove(0); + T element = bufferedElements.remove(0); outputPort.send(element); + logger.trace("Sent buffered element: " + element); } } @Override public void onTerminating() throws Exception { - while (!this.inputPort.getPipe().isEmpty()) { - this.executeWithPorts(); + while (null == timestampTriggerInputPort.receive()) { + // wait for the next trigger } + + sendAllBufferedEllements(); + + T element; + while (null != (element = inputPort.receive())) { + outputPort.send(element); + logger.trace("Sent element: " + element); + } + super.onTerminating(); } diff --git a/src/main/java/teetime/stage/basic/merger/Merger.java b/src/main/java/teetime/stage/basic/merger/Merger.java index aa84f0e34b7479ef6db5ffaad55c16fef2ef25fc..d03fcdab9cb27b01685b606b7e56293f339b2ee9 100644 --- a/src/main/java/teetime/stage/basic/merger/Merger.java +++ b/src/main/java/teetime/stage/basic/merger/Merger.java @@ -58,7 +58,7 @@ public final class Merger<T> extends AbstractStage { public void executeWithPorts() { final T token = this.strategy.getNextInput(this); if (token == null) { - return; + returnNoElement(); } outputPort.send(token); diff --git a/src/main/java/teetime/stage/io/EveryXthPrinter.java b/src/main/java/teetime/stage/io/EveryXthPrinter.java index aacc5936acd18870ab2215ffe859ac6398e5139d..0d686f107701ee7ebe19183281b813f98f5ec4f6 100644 --- a/src/main/java/teetime/stage/io/EveryXthPrinter.java +++ b/src/main/java/teetime/stage/io/EveryXthPrinter.java @@ -70,4 +70,14 @@ public final class EveryXthPrinter<T> extends Stage { return distributor.getNewOutputPort(); } + @Override + protected InputPort<?>[] getInputPorts() { + return distributor.getInputPorts(); + } + + @Override + protected boolean isStarted() { + return distributor.isStarted(); + } + } diff --git a/src/performancetest/java/teetime/examples/ChwHomeComparisonMethodcallWithPorts.java b/src/performancetest/java/teetime/examples/ChwHomeComparisonMethodcallWithPorts.java index 5f13c9b9d442310cdfeb3447a7403da6a95ad3e5..42c1732643de5af600c7433f60ae3ba3e8f14bb7 100644 --- a/src/performancetest/java/teetime/examples/ChwHomeComparisonMethodcallWithPorts.java +++ b/src/performancetest/java/teetime/examples/ChwHomeComparisonMethodcallWithPorts.java @@ -33,8 +33,6 @@ public class ChwHomeComparisonMethodcallWithPorts extends AbstractProfiledPerfor .get("testWithManyObjectsAnd2Threads(teetime.examples.experiment16.MethodCallThoughputTimestampAnalysis16Test)"); PerformanceResult test16c = performanceResults .get("testWithManyObjectsAnd4Threads(teetime.examples.experiment16.MethodCallThoughputTimestampAnalysis16Test)"); - PerformanceResult test17 = performanceResults - .get("testWithManyObjects(teetime.examples.experiment17.MethodCallThoughputTimestampAnalysis17Test)"); PerformanceResult test19a = performanceResults .get("testWithManyObjectsAnd1Thread(teetime.examples.experiment19.MethodCallThoughputTimestampAnalysis19Test)"); PerformanceResult test19b = performanceResults @@ -43,10 +41,8 @@ public class ChwHomeComparisonMethodcallWithPorts extends AbstractProfiledPerfor .get("testWithManyObjectsAnd4Threads(teetime.examples.experiment19.MethodCallThoughputTimestampAnalysis19Test)"); double value15 = (double) test15.quantiles.get(0.5) / test1.quantiles.get(0.5); - double value17 = (double) test17.quantiles.get(0.5) / test1.quantiles.get(0.5); System.out.println("value15: " + value15); - System.out.println("value17: " + value17); // until 25.06.2014 (incl.) // assertEquals(44, (double) test15.quantiles.get(0.5) / test1.quantiles.get(0.5), 4.1); @@ -74,7 +70,7 @@ public class ChwHomeComparisonMethodcallWithPorts extends AbstractProfiledPerfor // since 13.12.2014 (incl.) assertEquals(40, value15, 4.1); // -28 - assertEquals(43, value17, 4.1); // -35 + // assertEquals(43, value17, 4.1); // -35 // below results vary too much, possibly due to the OS' scheduler // assertEquals(RESULT_TESTS_16, (double) test16a.quantiles.get(0.5) / test1.quantiles.get(0.5), 5.1); diff --git a/src/performancetest/java/teetime/examples/ChwWorkComparisonMethodcallWithPorts.java b/src/performancetest/java/teetime/examples/ChwWorkComparisonMethodcallWithPorts.java index 553fad83b0117985e919044ac73639652b1d032b..cf0344ccc32806c7eba6a2c15f8d5c53ee188926 100644 --- a/src/performancetest/java/teetime/examples/ChwWorkComparisonMethodcallWithPorts.java +++ b/src/performancetest/java/teetime/examples/ChwWorkComparisonMethodcallWithPorts.java @@ -5,9 +5,9 @@ import static org.junit.Assert.assertEquals; import java.util.Map; import java.util.Map.Entry; +import util.test.AbstractProfiledPerformanceAssertion; import util.test.PerformanceResult; import util.test.PerformanceTest; -import util.test.AbstractProfiledPerformanceAssertion; public class ChwWorkComparisonMethodcallWithPorts extends AbstractProfiledPerformanceAssertion { @@ -27,8 +27,6 @@ public class ChwWorkComparisonMethodcallWithPorts extends AbstractProfiledPerfor .get("testWithManyObjects(teetime.examples.experiment01.MethodCallThoughputTimestampAnalysis1Test)"); PerformanceResult test15 = performanceResults .get("testWithManyObjects(teetime.examples.experiment15.MethodCallThoughputTimestampAnalysis15Test)"); - PerformanceResult test17 = performanceResults - .get("testWithManyObjects(teetime.examples.experiment17.MethodCallThoughputTimestampAnalysis17Test)"); PerformanceResult test19a = performanceResults .get("testWithManyObjectsAnd1Thread(teetime.examples.experiment19.MethodCallThoughputTimestampAnalysis19Test)"); PerformanceResult test19b = performanceResults @@ -37,10 +35,8 @@ public class ChwWorkComparisonMethodcallWithPorts extends AbstractProfiledPerfor .get("testWithManyObjectsAnd4Threads(teetime.examples.experiment19.MethodCallThoughputTimestampAnalysis19Test)"); double value15 = (double) test15.quantiles.get(0.5) / test1.quantiles.get(0.5); - double value17 = (double) test17.quantiles.get(0.5) / test1.quantiles.get(0.5); System.out.println("value15: " + value15); - System.out.println("value17: " + value17); // until 25.06.2014 (incl.) // assertEquals(44, (double) test15.quantiles.get(0.5) / test1.quantiles.get(0.5), 4.1); @@ -60,7 +56,7 @@ public class ChwWorkComparisonMethodcallWithPorts extends AbstractProfiledPerfor // since 14.10.2014 (incl.) assertEquals(36, value15, 4.1); // -8 - assertEquals(46, value17, 4.1); // -7 + // assertEquals(46, value17, 4.1); // -7 // below results vary too much, possibly due to the OS' scheduler // assertEquals(RESULT_TESTS_16, (double) test16a.quantiles.get(0.5) / test1.quantiles.get(0.5), 5.1); diff --git a/src/performancetest/java/teetime/examples/ComparisonMethodcallWithPorts.java b/src/performancetest/java/teetime/examples/ComparisonMethodcallWithPorts.java index 3d45a40e9e8e9e2c99223cccb775ec4bdd294ef3..bdfea3bef4b890ccd1da937b093dfa943afafafa 100644 --- a/src/performancetest/java/teetime/examples/ComparisonMethodcallWithPorts.java +++ b/src/performancetest/java/teetime/examples/ComparisonMethodcallWithPorts.java @@ -13,7 +13,6 @@ import teetime.examples.experiment11.MethodCallThoughputTimestampAnalysis11Test; import teetime.examples.experiment14.MethodCallThoughputTimestampAnalysis14Test; import teetime.examples.experiment15.MethodCallThoughputTimestampAnalysis15Test; import teetime.examples.experiment16.MethodCallThoughputTimestampAnalysis16Test; -import teetime.examples.experiment17.MethodCallThoughputTimestampAnalysis17Test; import teetime.examples.experiment19.MethodCallThoughputTimestampAnalysis19Test; import util.test.AbstractProfiledPerformanceAssertion; import util.test.PerformanceCheckProfileRepository; @@ -27,7 +26,6 @@ import util.test.PerformanceCheckProfileRepository; MethodCallThoughputTimestampAnalysis14Test.class, MethodCallThoughputTimestampAnalysis15Test.class, MethodCallThoughputTimestampAnalysis16Test.class, - MethodCallThoughputTimestampAnalysis17Test.class, MethodCallThoughputTimestampAnalysis19Test.class, }) public class ComparisonMethodcallWithPorts { diff --git a/src/performancetest/java/teetime/examples/NieWorkComparisonMethodcallWithPorts.java b/src/performancetest/java/teetime/examples/NieWorkComparisonMethodcallWithPorts.java index fc57df9bcfa6b9123e695b2b8ae3cfe990049d27..48589829e47e278ea578c5a736680f8ee98b8835 100644 --- a/src/performancetest/java/teetime/examples/NieWorkComparisonMethodcallWithPorts.java +++ b/src/performancetest/java/teetime/examples/NieWorkComparisonMethodcallWithPorts.java @@ -5,9 +5,9 @@ import static org.junit.Assert.assertEquals; import java.util.Map; import java.util.Map.Entry; +import util.test.AbstractProfiledPerformanceAssertion; import util.test.PerformanceResult; import util.test.PerformanceTest; -import util.test.AbstractProfiledPerformanceAssertion; public class NieWorkComparisonMethodcallWithPorts extends AbstractProfiledPerformanceAssertion { @@ -41,8 +41,6 @@ public class NieWorkComparisonMethodcallWithPorts extends AbstractProfiledPerfor .get("testWithManyObjectsAnd2Threads(teetime.examples.experiment16.MethodCallThoughputTimestampAnalysis16Test)"); PerformanceResult test16c = performanceResults .get("testWithManyObjectsAnd4Threads(teetime.examples.experiment16.MethodCallThoughputTimestampAnalysis16Test)"); - PerformanceResult test17 = performanceResults - .get("testWithManyObjects(teetime.examples.experiment17.MethodCallThoughputTimestampAnalysis17Test)"); PerformanceResult test19a = performanceResults .get("testWithManyObjectsAnd1Thread(teetime.examples.experiment19.MethodCallThoughputTimestampAnalysis19Test)"); PerformanceResult test19b = performanceResults @@ -65,7 +63,7 @@ public class NieWorkComparisonMethodcallWithPorts extends AbstractProfiledPerfor // assertEquals(RESULT_TESTS_19, (double) test19b.quantiles.get(0.5) / test1.quantiles.get(0.5), 5.1); // assertEquals(RESULT_TESTS_19, (double) test19c.quantiles.get(0.5) / test1.quantiles.get(0.5), 5.1); - assertEquals(56, (double) test17.quantiles.get(0.5) / test1.quantiles.get(0.5), 4.1); + // assertEquals(56, (double) test17.quantiles.get(0.5) / test1.quantiles.get(0.5), 4.1); // check speedup assertEquals(2, (double) test16a.overallDurationInNs / test16b.overallDurationInNs, 0.2); diff --git a/src/performancetest/java/teetime/examples/experiment09/ChwWorkPerformanceCheck.java b/src/performancetest/java/teetime/examples/experiment09/ChwWorkPerformanceCheck.java index ba4eda095004196eb3167016292893ac4af4b888..5359eddcfda3a73fafe83f18b61089b770152033 100644 --- a/src/performancetest/java/teetime/examples/experiment09/ChwWorkPerformanceCheck.java +++ b/src/performancetest/java/teetime/examples/experiment09/ChwWorkPerformanceCheck.java @@ -26,6 +26,8 @@ class ChwWorkPerformanceCheck extends AbstractPerformanceCheck { // since 27.08.2014 (incl.) // assertEquals(77, value9, 2.1); // +35 // since 14.10.2014 (incl.) - assertEquals(67, medianSpeedup, 3.1); // -10 + // assertEquals(67, medianSpeedup, 3.1); // -10 + // since 19.12.2014 (incl.) + assertEquals(53, medianSpeedup, 3.1); // -14 } } diff --git a/src/performancetest/java/teetime/examples/experiment09/MethodCallThroughputAnalysis9.java b/src/performancetest/java/teetime/examples/experiment09/MethodCallThroughputAnalysis9.java index e5e38cab4f897c29787c68774b51c24d8f2d5b58..9f96c90e0a321afbe7e69a2b69e81f27d3f50e1b 100644 --- a/src/performancetest/java/teetime/examples/experiment09/MethodCallThroughputAnalysis9.java +++ b/src/performancetest/java/teetime/examples/experiment09/MethodCallThroughputAnalysis9.java @@ -19,7 +19,7 @@ import java.util.List; import teetime.framework.Stage; import teetime.framework.OldHeadPipeline; -import teetime.framework.RunnableStage; +import teetime.framework.RunnableProducerStage; import teetime.framework.pipe.CommittablePipe; import teetime.stage.CollectorSink; import teetime.stage.NoopFilter; @@ -44,7 +44,7 @@ public class MethodCallThroughputAnalysis9 { public void init() { Stage pipeline = this.buildPipeline(); - this.runnable = new RunnableStage(pipeline); + this.runnable = new RunnableProducerStage(pipeline); } /** diff --git a/src/performancetest/java/teetime/examples/experiment10/MethodCallThroughputAnalysis10.java b/src/performancetest/java/teetime/examples/experiment10/MethodCallThroughputAnalysis10.java index 6fac51591452feca40044f9f7ca6d77b5023b8b7..70b476ffc2cea8ae1e15a3b66456ffd40dee650c 100644 --- a/src/performancetest/java/teetime/examples/experiment10/MethodCallThroughputAnalysis10.java +++ b/src/performancetest/java/teetime/examples/experiment10/MethodCallThroughputAnalysis10.java @@ -18,7 +18,7 @@ package teetime.examples.experiment10; import java.util.List; import teetime.framework.OldHeadPipeline; -import teetime.framework.RunnableStage; +import teetime.framework.RunnableProducerStage; import teetime.framework.pipe.SingleElementPipe; import teetime.stage.CollectorSink; import teetime.stage.NoopFilter; @@ -73,7 +73,7 @@ public class MethodCallThroughputAnalysis10 { SingleElementPipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort()); SingleElementPipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort()); - return new RunnableStage(pipeline); + return new RunnableProducerStage(pipeline); } public void start() { diff --git a/src/performancetest/java/teetime/examples/experiment11/MethodCallThroughputAnalysis11.java b/src/performancetest/java/teetime/examples/experiment11/MethodCallThroughputAnalysis11.java index ac70216dab166fb436ea93937ff58ba5f9a035f7..22618b7f33853f89f9e6db544bd58c24168a2c6c 100644 --- a/src/performancetest/java/teetime/examples/experiment11/MethodCallThroughputAnalysis11.java +++ b/src/performancetest/java/teetime/examples/experiment11/MethodCallThroughputAnalysis11.java @@ -19,7 +19,7 @@ import java.util.List; import teetime.framework.Stage; import teetime.framework.OldHeadPipeline; -import teetime.framework.RunnableStage; +import teetime.framework.RunnableProducerStage; import teetime.framework.pipe.UnorderedGrowablePipe; import teetime.stage.CollectorSink; import teetime.stage.NoopFilter; @@ -44,7 +44,7 @@ public class MethodCallThroughputAnalysis11 { public void init() { Stage pipeline = this.buildPipeline(this.numInputObjects, this.inputObjectCreator); - this.runnable = new RunnableStage(pipeline); + this.runnable = new RunnableProducerStage(pipeline); } private OldHeadPipeline<ObjectProducer<TimestampObject>, CollectorSink<TimestampObject>> buildPipeline(final long numInputObjects, diff --git a/src/performancetest/java/teetime/examples/experiment14/ChwWorkPerformanceCheck.java b/src/performancetest/java/teetime/examples/experiment14/ChwWorkPerformanceCheck.java index b8ce7c10b4e4348406cb7eef983b8fa45b4c76ce..b2d4deed66518910d7475f1dea702e425c491dbd 100644 --- a/src/performancetest/java/teetime/examples/experiment14/ChwWorkPerformanceCheck.java +++ b/src/performancetest/java/teetime/examples/experiment14/ChwWorkPerformanceCheck.java @@ -26,6 +26,8 @@ class ChwWorkPerformanceCheck extends AbstractPerformanceCheck { // since 27.08.2014 (incl.) // assertEquals(102, medianSpeedup, 5.1); // +16 // since 14.10.2014 (incl.) - assertEquals(81, medianSpeedup, 5.1); // -21 + // assertEquals(81, medianSpeedup, 5.1); // -21 + // since 19.12.2014 (incl.) + assertEquals(56, medianSpeedup, 5.1); // -25 } } diff --git a/src/performancetest/java/teetime/examples/experiment14/MethodCallThroughputAnalysis14.java b/src/performancetest/java/teetime/examples/experiment14/MethodCallThroughputAnalysis14.java index 658ce7a02980b7127307373fc08df924317b01fe..2f871761f6fbe15881927fdf22359e3ef25fc0a6 100644 --- a/src/performancetest/java/teetime/examples/experiment14/MethodCallThroughputAnalysis14.java +++ b/src/performancetest/java/teetime/examples/experiment14/MethodCallThroughputAnalysis14.java @@ -19,7 +19,7 @@ import java.util.List; import teetime.framework.Stage; import teetime.framework.OldHeadPipeline; -import teetime.framework.RunnableStage; +import teetime.framework.RunnableProducerStage; import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.PipeFactoryRegistry; import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; @@ -48,7 +48,7 @@ public class MethodCallThroughputAnalysis14 { public void init() { Stage pipeline = this.buildPipeline(); - this.runnable = new RunnableStage(pipeline); + this.runnable = new RunnableProducerStage(pipeline); } /** diff --git a/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java b/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java index f6c65eed58ff0f2c0910271db603c0b0be886636..fcde43930f195403b1e9f759733e706de6857dd4 100644 --- a/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java +++ b/src/performancetest/java/teetime/examples/experiment15/MethodCallThroughputAnalysis15.java @@ -17,11 +17,14 @@ package teetime.examples.experiment15; import java.util.List; -import teetime.framework.Stage; +import teetime.framework.AnalysisConfiguration; import teetime.framework.OldHeadPipeline; -import teetime.framework.RunnableStage; +import teetime.framework.RunnableProducerStage; +import teetime.framework.Stage; +import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.OrderedGrowableArrayPipe; -import teetime.framework.pipe.SingleElementPipe; +import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; +import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.framework.pipe.SpScPipe; import teetime.stage.Clock; import teetime.stage.CollectorSink; @@ -39,11 +42,13 @@ import teetime.util.TimestampObject; * * @since 1.10 */ -public class MethodCallThroughputAnalysis15 { +public class MethodCallThroughputAnalysis15 extends AnalysisConfiguration { // FIXME this analysis sometimes runs infinitely private static final int SPSC_INITIAL_CAPACITY = 4; + private final IPipeFactory intraThreadPipeFactory; + private int numInputObjects; private ConstructorClosure<TimestampObject> inputObjectCreator; private int numNoopFilters; @@ -53,13 +58,16 @@ public class MethodCallThroughputAnalysis15 { private Runnable runnable; private Clock clock; - public void init() { + public MethodCallThroughputAnalysis15() { + intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); + } + public void init() { OldHeadPipeline<Clock, Sink<Long>> clockPipeline = this.buildClockPipeline(); - this.clockRunnable = new RunnableStage(clockPipeline); + this.clockRunnable = new RunnableProducerStage(clockPipeline); Stage pipeline = this.buildPipeline(this.clock); - this.runnable = new RunnableStage(pipeline); + this.runnable = new RunnableProducerStage(pipeline); } private OldHeadPipeline<Clock, Sink<Long>> buildClockPipeline() { @@ -99,15 +107,15 @@ public class MethodCallThroughputAnalysis15 { SpScPipe.connect(clock.getOutputPort(), delay.getTimestampTriggerInputPort(), SPSC_INITIAL_CAPACITY); - SingleElementPipe.connect(objectProducer.getOutputPort(), startTimestampFilter.getInputPort()); - SingleElementPipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort()); + intraThreadPipeFactory.create(objectProducer.getOutputPort(), startTimestampFilter.getInputPort()); + intraThreadPipeFactory.create(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort()); for (int i = 0; i < noopFilters.length - 1; i++) { - SingleElementPipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort()); + intraThreadPipeFactory.create(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort()); } - SingleElementPipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort()); + intraThreadPipeFactory.create(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort()); OrderedGrowableArrayPipe.connect(stopTimestampFilter.getOutputPort(), delay.getInputPort()); - SingleElementPipe.connect(delay.getOutputPort(), collectorSink.getInputPort()); + intraThreadPipeFactory.create(delay.getOutputPort(), collectorSink.getInputPort()); return pipeline; } diff --git a/src/performancetest/java/teetime/examples/experiment16/MethodCallThroughputAnalysis16.java b/src/performancetest/java/teetime/examples/experiment16/AnalysisConfiguration16.java similarity index 74% rename from src/performancetest/java/teetime/examples/experiment16/MethodCallThroughputAnalysis16.java rename to src/performancetest/java/teetime/examples/experiment16/AnalysisConfiguration16.java index b48c2378c5a20ad410dd6975e8737f8f7d3697d5..f341bc27065a689a7daba8870cfd0e9a23888422 100644 --- a/src/performancetest/java/teetime/examples/experiment16/MethodCallThroughputAnalysis16.java +++ b/src/performancetest/java/teetime/examples/experiment16/AnalysisConfiguration16.java @@ -19,9 +19,11 @@ import java.util.ArrayList; import java.util.LinkedList; import java.util.List; +import teetime.framework.AnalysisConfiguration; import teetime.framework.OldHeadPipeline; -import teetime.framework.RunnableStage; -import teetime.framework.pipe.SingleElementPipe; +import teetime.framework.pipe.IPipeFactory; +import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; +import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.framework.pipe.SpScPipe; import teetime.stage.CollectorSink; import teetime.stage.NoopFilter; @@ -30,6 +32,7 @@ import teetime.stage.Relay; import teetime.stage.StartTimestampFilter; import teetime.stage.StopTimestampFilter; import teetime.stage.basic.distributor.Distributor; +import teetime.stage.io.EveryXthPrinter; import teetime.util.ConstructorClosure; import teetime.util.TimestampObject; @@ -38,37 +41,40 @@ import teetime.util.TimestampObject; * * @since 1.10 */ -public class MethodCallThroughputAnalysis16 { +class AnalysisConfiguration16 extends AnalysisConfiguration { private static final int SPSC_INITIAL_CAPACITY = 100100; private static final int NUM_WORKER_THREADS = Runtime.getRuntime().availableProcessors(); + private final IPipeFactory intraThreadPipeFactory; + private int numInputObjects; private ConstructorClosure<TimestampObject> inputObjectCreator; - private int numNoopFilters; + private final int numNoopFilters; private final List<List<TimestampObject>> timestampObjectsList = new LinkedList<List<TimestampObject>>(); - private Thread producerThread; - - private Thread[] workerThreads; - private int numWorkerThreads; - public void init() { + public AnalysisConfiguration16(final int numWorkerThreads, final int numNoopFilters) { + this.numWorkerThreads = numWorkerThreads; + this.numNoopFilters = numNoopFilters; + this.intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); + } + + public void build() { OldHeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> producerPipeline = this.buildProducerPipeline(this.numInputObjects, this.inputObjectCreator); - this.producerThread = new Thread(new RunnableStage(producerPipeline)); + addThreadableStage(producerPipeline); this.numWorkerThreads = Math.min(NUM_WORKER_THREADS, this.numWorkerThreads); - this.workerThreads = new Thread[this.numWorkerThreads]; - for (int i = 0; i < this.workerThreads.length; i++) { + for (int i = 0; i < numWorkerThreads; i++) { List<TimestampObject> resultList = new ArrayList<TimestampObject>(this.numInputObjects); this.timestampObjectsList.add(resultList); OldHeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> workerPipeline = this.buildPipeline(producerPipeline, resultList); - this.workerThreads[i] = new Thread(new RunnableStage(workerPipeline)); + addThreadableStage(workerPipeline); } } @@ -81,7 +87,7 @@ public class MethodCallThroughputAnalysis16 { pipeline.setFirstStage(objectProducer); pipeline.setLastStage(distributor); - SingleElementPipe.connect(objectProducer.getOutputPort(), distributor.getInputPort()); + intraThreadPipeFactory.create(objectProducer.getOutputPort(), distributor.getInputPort()); return pipeline; } @@ -102,6 +108,7 @@ public class MethodCallThroughputAnalysis16 { noopFilters[i] = new NoopFilter<TimestampObject>(); } final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter(); + EveryXthPrinter<TimestampObject> everyXthPrinter = new EveryXthPrinter<TimestampObject>(100000); final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects); final OldHeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new OldHeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>(); @@ -110,43 +117,19 @@ public class MethodCallThroughputAnalysis16 { SpScPipe.connect(previousStage.getLastStage().getNewOutputPort(), relay.getInputPort(), SPSC_INITIAL_CAPACITY); - SingleElementPipe.connect(relay.getOutputPort(), startTimestampFilter.getInputPort()); + intraThreadPipeFactory.create(relay.getOutputPort(), startTimestampFilter.getInputPort()); - SingleElementPipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort()); + intraThreadPipeFactory.create(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort()); for (int i = 0; i < noopFilters.length - 1; i++) { - SingleElementPipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort()); + intraThreadPipeFactory.create(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort()); } - SingleElementPipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort()); - SingleElementPipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort()); + intraThreadPipeFactory.create(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort()); + intraThreadPipeFactory.create(stopTimestampFilter.getOutputPort(), everyXthPrinter.getInputPort()); + intraThreadPipeFactory.create(everyXthPrinter.getNewOutputPort(), collectorSink.getInputPort()); return pipeline; } - public void start() { - - this.producerThread.start(); - - for (Thread workerThread : this.workerThreads) { - workerThread.start(); - } - - try { - this.producerThread.join(); - } catch (InterruptedException e1) { - // TODO Auto-generated catch block - e1.printStackTrace(); - } - - try { - for (Thread workerThread : this.workerThreads) { - workerThread.join(); - } - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - public void setInput(final int numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) { this.numInputObjects = numInputObjects; this.inputObjectCreator = inputObjectCreator; @@ -156,10 +139,6 @@ public class MethodCallThroughputAnalysis16 { return this.numNoopFilters; } - public void setNumNoopFilters(final int numNoopFilters) { - this.numNoopFilters = numNoopFilters; - } - public List<List<TimestampObject>> getTimestampObjectsList() { return this.timestampObjectsList; } @@ -168,8 +147,4 @@ public class MethodCallThroughputAnalysis16 { return this.numWorkerThreads; } - public void setNumWorkerThreads(final int numWorkerThreads) { - this.numWorkerThreads = numWorkerThreads; - } - } diff --git a/src/performancetest/java/teetime/examples/experiment16/ChwWorkPerformanceCheck.java b/src/performancetest/java/teetime/examples/experiment16/ChwWorkPerformanceCheck.java index c95c13d1dc5dd434d9d22c24b62c88e64417566d..797f2fef78d5ea8a83fc4174979731dbed008ce0 100644 --- a/src/performancetest/java/teetime/examples/experiment16/ChwWorkPerformanceCheck.java +++ b/src/performancetest/java/teetime/examples/experiment16/ChwWorkPerformanceCheck.java @@ -1,9 +1,9 @@ package teetime.examples.experiment16; import static org.junit.Assert.assertEquals; +import util.test.AbstractProfiledPerformanceAssertion; import util.test.PerformanceResult; import util.test.PerformanceTest; -import util.test.AbstractProfiledPerformanceAssertion; class ChwWorkPerformanceCheck extends AbstractProfiledPerformanceAssertion { @@ -23,7 +23,9 @@ class ChwWorkPerformanceCheck extends AbstractProfiledPerformanceAssertion { System.out.println("speedupC: " + speedupC); assertEquals(2, speedupB, 0.3); - assertEquals(2.5, speedupC, 0.3); + // assertEquals(2.5, speedupC, 0.3); + // since 19.12.2014 + assertEquals(2.0, speedupC, 0.3); } @Override diff --git a/src/performancetest/java/teetime/examples/experiment16/MethodCallThoughputTimestampAnalysis16Test.java b/src/performancetest/java/teetime/examples/experiment16/MethodCallThoughputTimestampAnalysis16Test.java index f6b61ff996bb0c27eeac2da4683ba3813b3cd586..73cdeb5275bef1e9ff98bb5043c25793e38e113d 100644 --- a/src/performancetest/java/teetime/examples/experiment16/MethodCallThoughputTimestampAnalysis16Test.java +++ b/src/performancetest/java/teetime/examples/experiment16/MethodCallThoughputTimestampAnalysis16Test.java @@ -21,6 +21,7 @@ import org.junit.FixMethodOrder; import org.junit.Test; import org.junit.runners.MethodSorters; +import teetime.framework.Analysis; import teetime.util.ConstructorClosure; import teetime.util.ListUtil; import teetime.util.TimestampObject; @@ -81,15 +82,16 @@ public class MethodCallThoughputTimestampAnalysis16Test extends PerformanceTest System.out.println("Testing teetime (mc) with NUM_OBJECTS_TO_CREATE=" + NUM_OBJECTS_TO_CREATE + ", NUM_NOOP_FILTERS=" + NUM_NOOP_FILTERS + "..."); - final MethodCallThroughputAnalysis16 analysis = new MethodCallThroughputAnalysis16(); - analysis.setNumWorkerThreads(numThreads); - analysis.setNumNoopFilters(NUM_NOOP_FILTERS); - analysis.setInput(NUM_OBJECTS_TO_CREATE, new ConstructorClosure<TimestampObject>() { + final AnalysisConfiguration16 configuration = new AnalysisConfiguration16(numThreads, NUM_NOOP_FILTERS); + configuration.setInput(NUM_OBJECTS_TO_CREATE, new ConstructorClosure<TimestampObject>() { @Override public TimestampObject create() { return new TimestampObject(); } }); + configuration.build(); + + final Analysis analysis = new Analysis(configuration); analysis.init(); this.stopWatch.start(); @@ -99,7 +101,7 @@ public class MethodCallThoughputTimestampAnalysis16Test extends PerformanceTest this.stopWatch.end(); } - this.timestampObjects = ListUtil.merge(analysis.getTimestampObjectsList()); + this.timestampObjects = ListUtil.merge(configuration.getTimestampObjectsList()); } } diff --git a/src/performancetest/java/teetime/examples/experiment17/MethodCallThoughputTimestampAnalysis17Test.java b/src/performancetest/java/teetime/examples/experiment17/MethodCallThoughputTimestampAnalysis17Test.java deleted file mode 100644 index 650d11706482de244efd7db554a15041c0731391..0000000000000000000000000000000000000000 --- a/src/performancetest/java/teetime/examples/experiment17/MethodCallThoughputTimestampAnalysis17Test.java +++ /dev/null @@ -1,60 +0,0 @@ -/*************************************************************************** - * Copyright 2014 Kieker Project (http://kieker-monitoring.net) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - ***************************************************************************/ -package teetime.examples.experiment17; - -import org.junit.Test; - -import teetime.util.ConstructorClosure; -import teetime.util.ListUtil; -import teetime.util.TimestampObject; -import util.test.PerformanceTest; - -/** - * @author Christian Wulf - * - * @since 1.10 - */ -public class MethodCallThoughputTimestampAnalysis17Test extends PerformanceTest { - - @Test - public void testWithManyObjects() { - System.out.println("Testing teetime (mc) with NUM_OBJECTS_TO_CREATE=" + NUM_OBJECTS_TO_CREATE + ", NUM_NOOP_FILTERS=" - + NUM_NOOP_FILTERS + "..."); - - // int count = 10; - // while (count-- > 0) { - final MethodCallThroughputAnalysis17 analysis = new MethodCallThroughputAnalysis17(); - analysis.setNumNoopFilters(NUM_NOOP_FILTERS); - analysis.setInput(NUM_OBJECTS_TO_CREATE, new ConstructorClosure<TimestampObject>() { - @Override - public TimestampObject create() { - return new TimestampObject(); - } - }); - analysis.init(); - - System.out.println("starting"); - this.stopWatch.start(); - try { - analysis.start(); - } finally { - this.stopWatch.end(); - } - - this.timestampObjects = ListUtil.merge(analysis.getTimestampObjectsList()); - // } - } -} diff --git a/src/performancetest/java/teetime/examples/experiment17/MethodCallThroughputAnalysis17.java b/src/performancetest/java/teetime/examples/experiment17/MethodCallThroughputAnalysis17.java deleted file mode 100644 index 60baf45d3935d88312bbc7ba788e351e9ba5981e..0000000000000000000000000000000000000000 --- a/src/performancetest/java/teetime/examples/experiment17/MethodCallThroughputAnalysis17.java +++ /dev/null @@ -1,202 +0,0 @@ -/*************************************************************************** - * Copyright 2014 Kieker Project (http://kieker-monitoring.net) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - ***************************************************************************/ -package teetime.examples.experiment17; - -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; - -import teetime.framework.OldHeadPipeline; -import teetime.framework.RunnableStage; -import teetime.framework.Stage; -import teetime.framework.pipe.DummyPipe; -import teetime.framework.pipe.IPipe; -import teetime.framework.pipe.PipeFactoryRegistry; -import teetime.framework.pipe.RelayTestPipe; -import teetime.framework.pipe.UnorderedGrowablePipe; -import teetime.framework.signal.TerminatingSignal; -import teetime.stage.CollectorSink; -import teetime.stage.NoopFilter; -import teetime.stage.ObjectProducer; -import teetime.stage.Relay; -import teetime.stage.StartTimestampFilter; -import teetime.stage.StopTimestampFilter; -import teetime.stage.basic.distributor.Distributor; -import teetime.stage.io.EveryXthPrinter; -import teetime.util.ConstructorClosure; -import teetime.util.TimestampObject; - -/** - * @author Christian Wulf - * - * @since 1.10 - */ -public class MethodCallThroughputAnalysis17 { - - private static final int SPSC_INITIAL_CAPACITY = 100100; - private static final int NUM_WORKER_THREADS = Runtime.getRuntime().availableProcessors(); - - private int numInputObjects; - private ConstructorClosure<TimestampObject> inputObjectCreator; - private int numNoopFilters; - - private final PipeFactoryRegistry pipeFactory = PipeFactoryRegistry.INSTANCE; - private final List<List<TimestampObject>> timestampObjectsList = new LinkedList<List<TimestampObject>>(); - - private Thread producerThread; - private Thread[] workerThreads; - - public void init() { - OldHeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> producerPipeline = this.buildProducerPipeline(this.numInputObjects, - this.inputObjectCreator); - this.producerThread = new Thread(new RunnableStage(producerPipeline)); - - int numWorkerThreads = Math.min(NUM_WORKER_THREADS, 1); // only for testing purpose - - this.workerThreads = new Thread[numWorkerThreads]; - for (int i = 0; i < this.workerThreads.length; i++) { - List<TimestampObject> resultList = new ArrayList<TimestampObject>(this.numInputObjects); - this.timestampObjectsList.add(resultList); - - OldHeadPipeline<?, ?> pipeline = this.buildPipeline(null, resultList); - this.workerThreads[i] = new Thread(new RunnableStage(pipeline)); - } - - // this.producerThread = new Thread(new Runnable() { - // @Override - // public void run() { - // TimestampObject ts; - // try { - // ts = MethodCallThroughputAnalysis17.this.inputObjectCreator.call(); - // System.out.println("test" + producerPipeline + ", # filters: " + MethodCallThroughputAnalysis17.this.numNoopFilters + ", ts: " - // + ts); - // MethodCallThroughputAnalysis17.this.numInputObjects++; - // System.out.println("numInputObjects: " + MethodCallThroughputAnalysis17.this.numInputObjects); - // MethodCallThroughputAnalysis17.this.numInputObjects--; - // } catch (Exception e) { - // // TODO Auto-generated catch block - // e.printStackTrace(); - // } - // System.out.println("run end"); - // } - // }); - - // this.producerThread.start(); - // this.producerThread.run(); - new RunnableStage(producerPipeline).run(); - - // try { - // this.producerThread.join(); - // } catch (InterruptedException e1) { - // // TODO Auto-generated catch block - // e1.printStackTrace(); - // } - - } - - @SuppressWarnings("unchecked") - private OldHeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> buildProducerPipeline(final int numInputObjects, - final ConstructorClosure<TimestampObject> inputObjectCreator) { - final ObjectProducer<TimestampObject> objectProducer = new ObjectProducer<TimestampObject>(numInputObjects, inputObjectCreator); - Distributor<TimestampObject> distributor = new Distributor<TimestampObject>(); - - // UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), sink.getInputPort()); - // objectProducer.getOutputPort().pipe = new UnorderedGrowablePipe<TimestampObject>(); - - UnorderedGrowablePipe.connect(objectProducer.getOutputPort(), distributor.getInputPort()); - distributor.getNewOutputPort().setPipe(new DummyPipe()); - - final OldHeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> pipeline = new OldHeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>>(); - pipeline.setFirstStage(objectProducer); - // pipeline.setFirstStage(sink); - // pipeline.setFirstStage(endStage); - pipeline.setLastStage(distributor); - // pipeline.setLastStage(sink); - // pipeline.setLastStage(new EndStage<TimestampObject>()); - return pipeline; - } - - /** - * @param numNoopFilters - * @since 1.10 - */ - private OldHeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> buildPipeline(final Stage previousStage, - final List<TimestampObject> timestampObjects) { - // create stages - Relay<TimestampObject> relay = new Relay<TimestampObject>(); - final StartTimestampFilter startTimestampFilter = new StartTimestampFilter(); - @SuppressWarnings("unchecked") - final NoopFilter<TimestampObject>[] noopFilters = new NoopFilter[this.numNoopFilters]; - for (int i = 0; i < noopFilters.length; i++) { - noopFilters[i] = new NoopFilter<TimestampObject>(); - } - final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter(); - EveryXthPrinter<TimestampObject> everyXthPrinter = new EveryXthPrinter<TimestampObject>(100000); - final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects); - - IPipe startPipe = new RelayTestPipe<TimestampObject>(this.numInputObjects, this.inputObjectCreator); - startPipe.sendSignal(new TerminatingSignal()); - - relay.getInputPort().setPipe(startPipe); - UnorderedGrowablePipe.connect(relay.getOutputPort(), startTimestampFilter.getInputPort()); - UnorderedGrowablePipe.connect(startTimestampFilter.getOutputPort(), noopFilters[0].getInputPort()); - for (int i = 0; i < noopFilters.length - 1; i++) { - UnorderedGrowablePipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort()); - } - UnorderedGrowablePipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort()); - UnorderedGrowablePipe.connect(stopTimestampFilter.getOutputPort(), everyXthPrinter.getInputPort()); - UnorderedGrowablePipe.connect(everyXthPrinter.getNewOutputPort(), collectorSink.getInputPort()); - - final OldHeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new OldHeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>(); - pipeline.setFirstStage(relay); - pipeline.setLastStage(collectorSink); - return pipeline; - } - - public void start() { - - for (Thread workerThread : this.workerThreads) { - workerThread.start(); - } - - try { - for (Thread workerThread : this.workerThreads) { - workerThread.join(); - } - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - - public void setInput(final int numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) { - this.numInputObjects = numInputObjects; - this.inputObjectCreator = inputObjectCreator; - } - - public int getNumNoopFilters() { - return this.numNoopFilters; - } - - public void setNumNoopFilters(final int numNoopFilters) { - this.numNoopFilters = numNoopFilters; - } - - public List<List<TimestampObject>> getTimestampObjectsList() { - return this.timestampObjectsList; - } - -} diff --git a/src/performancetest/java/teetime/examples/experiment19/MethodCallThoughputTimestampAnalysis19Test.java b/src/performancetest/java/teetime/examples/experiment19/MethodCallThoughputTimestampAnalysis19Test.java index 59d13f1d98a19fde419451f63662eabdc5fa4b0d..89565bdd44d3ca4a0b542b2575a77dbed70e3176 100644 --- a/src/performancetest/java/teetime/examples/experiment19/MethodCallThoughputTimestampAnalysis19Test.java +++ b/src/performancetest/java/teetime/examples/experiment19/MethodCallThoughputTimestampAnalysis19Test.java @@ -19,6 +19,7 @@ import org.junit.FixMethodOrder; import org.junit.Test; import org.junit.runners.MethodSorters; +import teetime.framework.Analysis; import teetime.util.ConstructorClosure; import teetime.util.ListUtil; import teetime.util.TimestampObject; @@ -66,15 +67,16 @@ public class MethodCallThoughputTimestampAnalysis19Test extends PerformanceTest System.out.println("Testing teetime (mc) with NUM_OBJECTS_TO_CREATE=" + NUM_OBJECTS_TO_CREATE + ", NUM_NOOP_FILTERS=" + NUM_NOOP_FILTERS + "..."); - final MethodCallThroughputAnalysis19 analysis = new MethodCallThroughputAnalysis19(); - analysis.setNumWorkerThreads(numThreads); - analysis.setNumNoopFilters(NUM_NOOP_FILTERS); - analysis.setInput(NUM_OBJECTS_TO_CREATE, new ConstructorClosure<TimestampObject>() { + final MethodCallThroughputAnalysis19 configuration = new MethodCallThroughputAnalysis19(numThreads, NUM_NOOP_FILTERS); + configuration.setInput(NUM_OBJECTS_TO_CREATE, new ConstructorClosure<TimestampObject>() { @Override public TimestampObject create() { return new TimestampObject(); } }); + configuration.build(); + + final Analysis analysis = new Analysis(configuration); analysis.init(); this.stopWatch.start(); @@ -84,7 +86,7 @@ public class MethodCallThoughputTimestampAnalysis19Test extends PerformanceTest this.stopWatch.end(); } - this.timestampObjects = ListUtil.merge(analysis.getTimestampObjectsList()); + this.timestampObjects = ListUtil.merge(configuration.getTimestampObjectsList()); } } diff --git a/src/performancetest/java/teetime/examples/experiment19/MethodCallThroughputAnalysis19.java b/src/performancetest/java/teetime/examples/experiment19/MethodCallThroughputAnalysis19.java index f38312f5d2c1e3a69a68eb6325057db434fea19a..a8fe5578e8ba755eaafb5bd46b6f9ae0bc65a8d9 100644 --- a/src/performancetest/java/teetime/examples/experiment19/MethodCallThroughputAnalysis19.java +++ b/src/performancetest/java/teetime/examples/experiment19/MethodCallThroughputAnalysis19.java @@ -19,8 +19,8 @@ import java.util.ArrayList; import java.util.LinkedList; import java.util.List; +import teetime.framework.AnalysisConfiguration; import teetime.framework.OldHeadPipeline; -import teetime.framework.RunnableStage; import teetime.framework.pipe.OrderedGrowableArrayPipe; import teetime.framework.pipe.SpScPipe; import teetime.stage.CollectorSink; @@ -30,6 +30,7 @@ import teetime.stage.Relay; import teetime.stage.StartTimestampFilter; import teetime.stage.StopTimestampFilter; import teetime.stage.basic.distributor.Distributor; +import teetime.stage.io.EveryXthPrinter; import teetime.util.ConstructorClosure; import teetime.util.TimestampObject; @@ -38,39 +39,37 @@ import teetime.util.TimestampObject; * * @since 1.10 */ -public class MethodCallThroughputAnalysis19 { +public class MethodCallThroughputAnalysis19 extends AnalysisConfiguration { private static final int SPSC_INITIAL_CAPACITY = 100100; private static final int NUM_WORKER_THREADS = Runtime.getRuntime().availableProcessors(); - private int numInputObjects; - private ConstructorClosure<TimestampObject> inputObjectCreator; - private int numNoopFilters; - private final List<List<TimestampObject>> timestampObjectsList = new LinkedList<List<TimestampObject>>(); - private Thread producerThread; - - private Thread[] workerThreads; - + private int numInputObjects; + private ConstructorClosure<TimestampObject> inputObjectCreator; + private final int numNoopFilters; private int numWorkerThreads; - public void init() { + public MethodCallThroughputAnalysis19(final int numWorkerThreads, final int numNoopFilters) { + this.numWorkerThreads = numWorkerThreads; + this.numNoopFilters = numNoopFilters; + } + + public void build() { OldHeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> producerPipeline = this.buildProducerPipeline(this.numInputObjects, this.inputObjectCreator); - this.producerThread = new Thread(new RunnableStage(producerPipeline)); + addThreadableStage(producerPipeline); this.numWorkerThreads = Math.min(NUM_WORKER_THREADS, this.numWorkerThreads); - this.workerThreads = new Thread[this.numWorkerThreads]; - for (int i = 0; i < this.workerThreads.length; i++) { + for (int i = 0; i < numWorkerThreads; i++) { List<TimestampObject> resultList = new ArrayList<TimestampObject>(this.numInputObjects); this.timestampObjectsList.add(resultList); OldHeadPipeline<?, ?> pipeline = this.buildPipeline(producerPipeline.getLastStage(), resultList); - this.workerThreads[i] = new Thread(new RunnableStage(pipeline)); + addThreadableStage(pipeline); } - } private OldHeadPipeline<ObjectProducer<TimestampObject>, Distributor<TimestampObject>> buildProducerPipeline(final int numInputObjects, @@ -97,6 +96,7 @@ public class MethodCallThroughputAnalysis19 { noopFilters[i] = new NoopFilter<TimestampObject>(); } final StopTimestampFilter stopTimestampFilter = new StopTimestampFilter(); + EveryXthPrinter<TimestampObject> everyXthPrinter = new EveryXthPrinter<TimestampObject>(100000); final CollectorSink<TimestampObject> collectorSink = new CollectorSink<TimestampObject>(timestampObjects); final OldHeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>> pipeline = new OldHeadPipeline<Relay<TimestampObject>, CollectorSink<TimestampObject>>(); @@ -112,36 +112,12 @@ public class MethodCallThroughputAnalysis19 { OrderedGrowableArrayPipe.connect(noopFilters[i].getOutputPort(), noopFilters[i + 1].getInputPort()); } OrderedGrowableArrayPipe.connect(noopFilters[noopFilters.length - 1].getOutputPort(), stopTimestampFilter.getInputPort()); - OrderedGrowableArrayPipe.connect(stopTimestampFilter.getOutputPort(), collectorSink.getInputPort()); + OrderedGrowableArrayPipe.connect(stopTimestampFilter.getOutputPort(), everyXthPrinter.getInputPort()); + OrderedGrowableArrayPipe.connect(everyXthPrinter.getNewOutputPort(), collectorSink.getInputPort()); return pipeline; } - public void start() { - - this.producerThread.start(); - - for (Thread workerThread : this.workerThreads) { - workerThread.start(); - } - - try { - this.producerThread.join(); - } catch (InterruptedException e1) { - // TODO Auto-generated catch block - e1.printStackTrace(); - } - - try { - for (Thread workerThread : this.workerThreads) { - workerThread.join(); - } - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - public void setInput(final int numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) { this.numInputObjects = numInputObjects; this.inputObjectCreator = inputObjectCreator; @@ -151,10 +127,6 @@ public class MethodCallThroughputAnalysis19 { return this.numNoopFilters; } - public void setNumNoopFilters(final int numNoopFilters) { - this.numNoopFilters = numNoopFilters; - } - public List<List<TimestampObject>> getTimestampObjectsList() { return this.timestampObjectsList; } @@ -163,8 +135,4 @@ public class MethodCallThroughputAnalysis19 { return this.numWorkerThreads; } - public void setNumWorkerThreads(final int numWorkerThreads) { - this.numWorkerThreads = numWorkerThreads; - } - } diff --git a/src/performancetest/java/teetime/framework/OldHeadPipeline.java b/src/performancetest/java/teetime/framework/OldHeadPipeline.java index eb61c5d100a01a68939eaddd377b2d6f39f9c679..46e009995d7d9a44e74283f9677aa1d50e48c7fc 100644 --- a/src/performancetest/java/teetime/framework/OldHeadPipeline.java +++ b/src/performancetest/java/teetime/framework/OldHeadPipeline.java @@ -7,13 +7,4 @@ public final class OldHeadPipeline<FirstStage extends Stage, LastStage extends S public OldHeadPipeline(final String name) {} - @Override - public boolean shouldBeTerminated() { - return this.firstStage.shouldBeTerminated(); - } - - @Override - public void terminate() { - this.firstStage.terminate(); - } } diff --git a/src/performancetest/java/teetime/framework/OldPipeline.java b/src/performancetest/java/teetime/framework/OldPipeline.java index 56c8fffd59bc22e7b2a9efec73b9c83095ed502b..3ee0a72a15b0adf7dae5839e77602e16c6792189 100644 --- a/src/performancetest/java/teetime/framework/OldPipeline.java +++ b/src/performancetest/java/teetime/framework/OldPipeline.java @@ -42,11 +42,6 @@ public class OldPipeline<FirstStage extends Stage, LastStage extends Stage> exte this.lastStage.validateOutputPorts(invalidPortConnections); } - @Override - public TerminationStrategy getTerminationStrategy() { - return firstStage.getTerminationStrategy(); - } - @Override public void terminate() { firstStage.terminate(); @@ -57,4 +52,29 @@ public class OldPipeline<FirstStage extends Stage, LastStage extends Stage> exte return firstStage.shouldBeTerminated(); } + @Override + protected InputPort<?>[] getInputPorts() { + return firstStage.getInputPorts(); + } + + @Override + public void setOwningThread(final Thread owningThread) { + firstStage.setOwningThread(owningThread); + } + + @Override + public Thread getOwningThread() { + return firstStage.getOwningThread(); + } + + @Override + public TerminationStrategy getTerminationStrategy() { + return firstStage.getTerminationStrategy(); + } + + @Override + protected boolean isStarted() { + return firstStage.isStarted(); + } + } diff --git a/src/test/java/teetime/framework/RunnableConsumerStageTest.java b/src/test/java/teetime/framework/RunnableConsumerStageTest.java new file mode 100644 index 0000000000000000000000000000000000000000..02335317b1ea182bebb49e4bd00697cf2ba5082c --- /dev/null +++ b/src/test/java/teetime/framework/RunnableConsumerStageTest.java @@ -0,0 +1,79 @@ +package teetime.framework; + +import static org.junit.Assert.assertEquals; + +import java.lang.Thread.State; +import java.util.Collection; + +import org.junit.Test; + +import teetime.util.Pair; + +import com.google.common.base.Joiner; + +public class RunnableConsumerStageTest { + + @Test + public void testWaitingInfinitely() throws Exception { + WaitStrategyConfiguration waitStrategyConfiguration = new WaitStrategyConfiguration(300, 42); + + final Analysis analysis = new Analysis(waitStrategyConfiguration); + analysis.init(); + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + start(analysis); // FIXME react on exceptions + } + }); + thread.start(); + + Thread.sleep(200); + + assertEquals(State.WAITING, thread.getState()); + assertEquals(0, waitStrategyConfiguration.getCollectorSink().getElements().size()); + } + + @Test + public void testWaitingFinitely() throws Exception { + WaitStrategyConfiguration waitStrategyConfiguration = new WaitStrategyConfiguration(300, 42); + + final Analysis analysis = new Analysis(waitStrategyConfiguration); + analysis.init(); + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + start(analysis); // FIXME react on exceptions + } + }); + thread.start(); + + Thread.sleep(400); + + assertEquals(State.TERMINATED, thread.getState()); + assertEquals(42, waitStrategyConfiguration.getCollectorSink().getElements().get(0)); + assertEquals(1, waitStrategyConfiguration.getCollectorSink().getElements().size()); + } + + @Test + public void testYieldRun() throws Exception { + YieldStrategyConfiguration waitStrategyConfiguration = new YieldStrategyConfiguration(42); + + final Analysis analysis = new Analysis(waitStrategyConfiguration); + analysis.init(); + + start(analysis); + + assertEquals(42, waitStrategyConfiguration.getCollectorSink().getElements().get(0)); + assertEquals(1, waitStrategyConfiguration.getCollectorSink().getElements().size()); + } + + private void start(final Analysis analysis) { + Collection<Pair<Thread, Throwable>> exceptions = analysis.start(); + for (Pair<Thread, Throwable> pair : exceptions) { + System.err.println(pair.getSecond()); + System.err.println(Joiner.on("\n").join(pair.getSecond().getStackTrace())); + throw new RuntimeException(pair.getSecond()); + } + assertEquals(0, exceptions.size()); + } +} diff --git a/src/test/java/teetime/framework/WaitStrategyConfiguration.java b/src/test/java/teetime/framework/WaitStrategyConfiguration.java new file mode 100644 index 0000000000000000000000000000000000000000..62f9ebff91d56ceb71732836e3649e8e4613a2b0 --- /dev/null +++ b/src/test/java/teetime/framework/WaitStrategyConfiguration.java @@ -0,0 +1,70 @@ +package teetime.framework; + +import teetime.framework.idle.WaitStrategy; +import teetime.framework.pipe.IPipeFactory; +import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; +import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; +import teetime.stage.Clock; +import teetime.stage.CollectorSink; +import teetime.stage.InitialElementProducer; +import teetime.stage.Relay; +import teetime.stage.basic.Delay; + +class WaitStrategyConfiguration extends AnalysisConfiguration { + + private final IPipeFactory intraThreadPipeFactory; + private final IPipeFactory interThreadPipeFactory; + + private Delay<Object> delay; + private CollectorSink<Object> collectorSink; + + public WaitStrategyConfiguration(final long initialDelayInMs, final Object... elements) { + intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); + interThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); + + Stage producer = buildProducer(elements); + addThreadableStage(producer); + + Stage consumer = buildConsumer(delay); + addThreadableStage(consumer); + + Clock clock = buildClock(initialDelayInMs, delay); + addThreadableStage(clock); + } + + private Clock buildClock(final long initialDelayInMs, final Delay<Object> delay) { + Clock clock = new Clock(); + clock.setInitialDelayInMs(initialDelayInMs); + + interThreadPipeFactory.create(clock.getOutputPort(), delay.getTimestampTriggerInputPort()); + + return clock; + } + + private Stage buildProducer(final Object... elements) { + InitialElementProducer<Object> initialElementProducer = new InitialElementProducer<Object>(elements); + delay = new Delay<Object>(); + + intraThreadPipeFactory.create(initialElementProducer.getOutputPort(), delay.getInputPort()); + + return initialElementProducer; + } + + private Relay<Object> buildConsumer(final Delay<Object> delay) { + Relay<Object> relay = new Relay<Object>(); + CollectorSink<Object> collectorSink = new CollectorSink<Object>(); + + relay.setIdleStrategy(new WaitStrategy(relay)); + + interThreadPipeFactory.create(delay.getOutputPort(), relay.getInputPort()); + intraThreadPipeFactory.create(relay.getOutputPort(), collectorSink.getInputPort()); + + this.collectorSink = collectorSink; + + return relay; + } + + public CollectorSink<Object> getCollectorSink() { + return collectorSink; + } +} diff --git a/src/test/java/teetime/framework/YieldStrategyConfiguration.java b/src/test/java/teetime/framework/YieldStrategyConfiguration.java new file mode 100644 index 0000000000000000000000000000000000000000..57212fd9de8a48e7360ae62e1caa734757e2b178 --- /dev/null +++ b/src/test/java/teetime/framework/YieldStrategyConfiguration.java @@ -0,0 +1,51 @@ +package teetime.framework; + +import teetime.framework.idle.YieldStrategy; +import teetime.framework.pipe.IPipeFactory; +import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; +import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; +import teetime.stage.CollectorSink; +import teetime.stage.InitialElementProducer; +import teetime.stage.Relay; + +class YieldStrategyConfiguration extends AnalysisConfiguration { + private final IPipeFactory intraThreadPipeFactory; + private final IPipeFactory interThreadPipeFactory; + + private CollectorSink<Object> collectorSink; + + public YieldStrategyConfiguration(final Object... elements) { + intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); + interThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); + + InitialElementProducer<Object> producer = buildProducer(elements); + addThreadableStage(producer); + + Stage consumer = buildConsumer(producer); + addThreadableStage(consumer); + } + + private InitialElementProducer<Object> buildProducer(final Object... elements) { + InitialElementProducer<Object> initialElementProducer = new InitialElementProducer<Object>(elements); + + return initialElementProducer; + } + + private Relay<Object> buildConsumer(final InitialElementProducer<Object> producer) { + Relay<Object> relay = new Relay<Object>(); + CollectorSink<Object> collectorSink = new CollectorSink<Object>(); + + relay.setIdleStrategy(new YieldStrategy()); + + interThreadPipeFactory.create(producer.getOutputPort(), relay.getInputPort()); + intraThreadPipeFactory.create(relay.getOutputPort(), collectorSink.getInputPort()); + + this.collectorSink = collectorSink; + + return relay; + } + + public CollectorSink<Object> getCollectorSink() { + return collectorSink; + } +} diff --git a/src/test/java/teetime/framework/pipe/SpScPipeTest.java b/src/test/java/teetime/framework/pipe/SpScPipeTest.java index 20d366d6aea0079c67138c645e373f0f5a4f71e2..78313ed5fe4a5e289e15b81cd7bb70ac7fec59a3 100644 --- a/src/test/java/teetime/framework/pipe/SpScPipeTest.java +++ b/src/test/java/teetime/framework/pipe/SpScPipeTest.java @@ -4,6 +4,7 @@ import java.util.ArrayList; import java.util.List; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; import teetime.framework.AbstractInterThreadPipe; @@ -16,9 +17,11 @@ import teetime.framework.signal.ValidatingSignal; public class SpScPipeTest { + @Ignore + // ignore as long as this test passes null ports to SpScPipe @Test public void testSignalOrdering() throws Exception { - OutputPort<? extends Object> sourcePort = null; + OutputPort<Object> sourcePort = null; InputPort<Object> targetPort = null; AbstractInterThreadPipe pipe = new SpScPipe(sourcePort, targetPort, 1); // IPipe does not provide getSignal method diff --git a/src/test/resources/logback.xml b/src/test/resources/logback-test.xml similarity index 83% rename from src/test/resources/logback.xml rename to src/test/resources/logback-test.xml index 0c6041095f144317b7397cdd45a4bd82c385a8d1..5f80ba17fdeb66f9b262a429f33e45cad15b1e69 100644 --- a/src/test/resources/logback.xml +++ b/src/test/resources/logback-test.xml @@ -20,10 +20,12 @@ </encoder> </appender> - <logger name="teetime.stage" level="INFO" /> + <logger name="teetime.framework" level="TRACE" /> + <logger name="teetime.stage" level="TRACE" /> + <logger name="teetime" level="INFO" /> <logger name="util" level="INFO" /> - <root level="ERROR"> + <root level="WARN"> <appender-ref ref="CONSOLE" /> </root> </configuration> \ No newline at end of file