diff --git a/conf/quality-config/pmd-ruleset.xml b/conf/quality-config/pmd-ruleset.xml index f7840493f84163bb1799135251c730596b9ab7e7..012fa89c12927d759d8fe0730cdeabc0ab2b6344 100644 --- a/conf/quality-config/pmd-ruleset.xml +++ b/conf/quality-config/pmd-ruleset.xml @@ -21,12 +21,12 @@ <rule ref="rulesets/java/empty.xml" /> <rule ref="rulesets/java/finalizers.xml" /> <rule ref="rulesets/java/imports.xml" /> -<!-- <rule ref="rulesets/java/j2ee.xml" /> --> -<!-- <rule ref="rulesets/java/javabeans.xml" /> --> + <!-- <rule ref="rulesets/java/j2ee.xml" /> --> + <!-- <rule ref="rulesets/java/javabeans.xml" /> --> <rule ref="rulesets/java/junit.xml" /> <rule ref="rulesets/java/logging-jakarta-commons.xml" /> <rule ref="rulesets/java/logging-java.xml" /> - + <rule ref="rulesets/java/migrating.xml" /> <rule ref="rulesets/java/naming.xml" /> <!-- <rule ref="rulesets/java/optimizations.xml" /> --> @@ -61,14 +61,15 @@ <rule ref="rulesets/java/controversial.xml"> <exclude name="AtLeastOneConstructor" /> + <exclude name="AvoidUsingVolatile" /> </rule> - + <rule ref="rulesets/java/j2ee.xml"> <exclude name="DoNotUseThreads" /> </rule> - + <rule ref="rulesets/java/javabeans.xml"> - <exclude name="BeanMembersShouldSerialize"/> + <exclude name="BeanMembersShouldSerialize" /> </rule> <rule ref="rulesets/java/naming.xml/VariableNamingConventions"> @@ -79,4 +80,5 @@ <exclude name="LocalVariableCouldBeFinal" /> </rule> + </ruleset> \ No newline at end of file diff --git a/run-configurations/teetime-mvn-site.launch b/run-configurations/teetime-mvn-site.launch index 9d851888e2fb95ea1f4c63eec326212bedf6a130..cdc79de27db69688f37bd7a1e9fb352689142809 100644 --- a/run-configurations/teetime-mvn-site.launch +++ b/run-configurations/teetime-mvn-site.launch @@ -3,11 +3,11 @@ <booleanAttribute key="M2_DEBUG_OUTPUT" value="false"/> <stringAttribute key="M2_GOALS" value="site"/> <booleanAttribute key="M2_NON_RECURSIVE" value="false"/> -<booleanAttribute key="M2_OFFLINE" value="false"/> +<booleanAttribute key="M2_OFFLINE" value="true"/> <stringAttribute key="M2_PROFILES" value=""/> <listAttribute key="M2_PROPERTIES"/> <stringAttribute key="M2_RUNTIME" value="EMBEDDED"/> -<booleanAttribute key="M2_SKIP_TESTS" value="false"/> +<booleanAttribute key="M2_SKIP_TESTS" value="true"/> <intAttribute key="M2_THREADS" value="1"/> <booleanAttribute key="M2_UPDATE_SNAPSHOTS" value="false"/> <stringAttribute key="M2_USER_SETTINGS" value=""/> diff --git a/src/main/java/org/jctools/queues/ObservableSpScArrayQueue.java b/src/main/java/org/jctools/queues/ObservableSpScArrayQueue.java new file mode 100644 index 0000000000000000000000000000000000000000..01b1bfadfb33c3b6f8600d69e8e9f819fe31e3ad --- /dev/null +++ b/src/main/java/org/jctools/queues/ObservableSpScArrayQueue.java @@ -0,0 +1,162 @@ +/** + * Copyright (C) 2015 TeeTime (http://teetime.sourceforge.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jctools.queues; + +import java.util.Collection; +import java.util.Iterator; +import java.util.Queue; + +public final class ObservableSpScArrayQueue<E> implements Queue<E> { + + private final SpscArrayQueue<E> queue; + + private transient long lastProducerIndex, lastConsumerIndex; + + public ObservableSpScArrayQueue(final int requestedCapacity) { + this.queue = new SpscArrayQueue<E>(requestedCapacity); + } + + public long getNumPushes() { + return queue.lvProducerIndex(); + } + + public long getNumPulls() { + return queue.lvConsumerIndex(); + } + + public long getProducerFrequency() { + final long currentProducerIndex = queue.lvProducerIndex(); + long diff = currentProducerIndex - lastProducerIndex; + lastProducerIndex = currentProducerIndex; + return diff; + } + + public long getConsumerFrequency() { + final long currentConsumerIndex = queue.lvConsumerIndex(); + long diff = currentConsumerIndex - lastConsumerIndex; + lastConsumerIndex = currentConsumerIndex; + return diff; + } + + @Override + public int hashCode() { + return queue.hashCode(); + } + + @Override + public boolean add(final E e) { + return queue.add(e); + } + + @Override + public boolean isEmpty() { + return queue.isEmpty(); + } + + @Override + public boolean contains(final Object o) { + return queue.contains(o); + } + + @Override + public E remove() { + return queue.remove(); + } + + @Override + public Object[] toArray() { + return queue.toArray(); + } + + @Override + public boolean equals(final Object obj) { + if (obj instanceof ObservableSpScArrayQueue) { + return queue.equals(((ObservableSpScArrayQueue<?>) obj).queue); + } + return false; + } + + @Override + public E element() { + return queue.element(); + } + + @Override + public boolean offer(final E e) { + return queue.offer(e); + } + + @Override + public <T> T[] toArray(final T[] a) { + return queue.toArray(a); + } + + @Override + public boolean addAll(final Collection<? extends E> c) { + return queue.addAll(c); + } + + @Override + public E poll() { + return queue.poll(); + } + + @Override + public E peek() { + return queue.peek(); + } + + @Override + public int size() { + return queue.size(); + } + + @Override + public Iterator<E> iterator() { + return queue.iterator(); + } + + @Override + public void clear() { + queue.clear(); + } + + @Override + public boolean remove(final Object o) { + return queue.remove(o); + } + + @Override + public boolean containsAll(final Collection<?> c) { + return queue.containsAll(c); + } + + @Override + public boolean removeAll(final Collection<?> c) { + return queue.removeAll(c); + } + + @Override + public boolean retainAll(final Collection<?> c) { + return queue.retainAll(c); + } + + @Override + public String toString() { + return queue.toString(); + } + +} diff --git a/src/main/java/teetime/framework/AbstractRunnableStage.java b/src/main/java/teetime/framework/AbstractRunnableStage.java index eb5fed55563d5d43022d2ab2719ec950b59e58eb..3e496643b303d56b576f729922b516d7d8f91e93 100644 --- a/src/main/java/teetime/framework/AbstractRunnableStage.java +++ b/src/main/java/teetime/framework/AbstractRunnableStage.java @@ -12,6 +12,8 @@ abstract class AbstractRunnableStage implements Runnable { private final StageExceptionHandler exceptionHandler; + private static final String TERMINATING_THREAD_DUE_TO_THE_FOLLOWING_EXCEPTION = "Terminating thread due to the following exception: "; + private final Stage stage; @SuppressWarnings("PMD.LoggerIsNotStaticFinal") protected final Logger logger; @@ -43,14 +45,11 @@ abstract class AbstractRunnableStage implements Runnable { afterStageExecution(stage); - } catch (Error e) { - this.logger.error("Terminating thread due to the following exception: ", e); - throw e; } catch (RuntimeException e) { - this.logger.error("Terminating thread due to the following exception: ", e); + this.logger.error(TERMINATING_THREAD_DUE_TO_THE_FOLLOWING_EXCEPTION, e); throw e; } catch (InterruptedException e) { - this.logger.error("Terminating thread due to the following exception: ", e); + this.logger.error(TERMINATING_THREAD_DUE_TO_THE_FOLLOWING_EXCEPTION, e); } this.logger.debug("Finished runnable stage. (" + this.stage.getId() + ")"); diff --git a/src/main/java/teetime/framework/MonitoringThread.java b/src/main/java/teetime/framework/MonitoringThread.java new file mode 100644 index 0000000000000000000000000000000000000000..5ca6287907ddd8b26581ff16b5b4e8db26ff4b68 --- /dev/null +++ b/src/main/java/teetime/framework/MonitoringThread.java @@ -0,0 +1,72 @@ +/** + * Copyright (C) 2015 TeeTime (http://teetime.sourceforge.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package teetime.framework; + +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.LoggerFactory; + +import teetime.framework.pipe.IMonitorablePipe; +import teetime.framework.pipe.IPipe; + +public class MonitoringThread extends Thread { + + private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(MonitoringThread.class); + + private final List<IMonitorablePipe> monitoredPipes = new ArrayList<IMonitorablePipe>(); + + private volatile boolean terminated; + + @Override + public void run() { + while (!terminated) { + + for (final IMonitorablePipe pipe : monitoredPipes) { + final long pushThroughput = pipe.getPushThroughput(); + final long pullThroughput = pipe.getPullThroughput(); + final double ratio = (double) pushThroughput / pullThroughput; + + LOGGER.info("pipe: " + "size=" + pipe.size() + ", " + "ratio: " + String.format("%.1f", ratio)); + LOGGER.info("pushes: " + pushThroughput); + LOGGER.info("pulls: " + pullThroughput); + } + LOGGER.info("------------------------------------"); + + try { + Thread.sleep(1000); + } catch (final InterruptedException e) { + terminated = true; + } + } + } + + public void addPipe(final IPipe pipe) { + if (!(pipe instanceof IMonitorablePipe)) { + throw new IllegalArgumentException("The given pipe does not implement IMonitorablePipe"); + } + monitoredPipes.add((IMonitorablePipe) pipe); + } + + /** + * Sets the <code>terminated</code> flag and interrupts this thread. + */ + public void terminate() { + terminated = true; + interrupt(); + } + +} diff --git a/src/main/java/teetime/framework/RunnableConsumerStage.java b/src/main/java/teetime/framework/RunnableConsumerStage.java index 72631c122b7de7fc9042efac0c80f114fe1eeaa1..9f7ac0c88521dabf21fcbccf2dd5503149d74652 100644 --- a/src/main/java/teetime/framework/RunnableConsumerStage.java +++ b/src/main/java/teetime/framework/RunnableConsumerStage.java @@ -41,19 +41,14 @@ final class RunnableConsumerStage extends AbstractRunnableStage { this.inputPorts = stage.getInputPorts(); // FIXME should getInputPorts() really be defined in Stage? } + @SuppressWarnings("PMD.GuardLogStatement") @Override protected void beforeStageExecution(final Stage stage) throws InterruptedException { - logger.trace("ENTRY beforeStageExecution"); - logger.trace("Waiting for start signals..." + inputPorts); for (InputPort<?> inputPort : inputPorts) { inputPort.waitForStartSignal(); } logger.trace("Starting..." + stage); - - // stage.onSignal(signal, inputPort); - - logger.trace("EXIT beforeStageExecution"); } @Override diff --git a/src/main/java/teetime/framework/idle/IdleStrategy.java b/src/main/java/teetime/framework/idle/IdleStrategy.java index b54002037ea4ff7da39e1bb90b3ca1d90a14985c..10acced5a3bd152addb774b971c5e2229bee823c 100644 --- a/src/main/java/teetime/framework/idle/IdleStrategy.java +++ b/src/main/java/teetime/framework/idle/IdleStrategy.java @@ -15,6 +15,13 @@ */ package teetime.framework.idle; +/** + * + * @author Christian Wulf + * + * @deprecated since 1.1 + */ +@Deprecated public interface IdleStrategy { void execute() throws InterruptedException; diff --git a/src/main/java/teetime/framework/idle/YieldStrategy.java b/src/main/java/teetime/framework/idle/YieldStrategy.java index 9bb649436e470cfe17956f8d5407d79cdc845c61..a23a99697f70737d1d29eb3c6118d2ce8e0e5b32 100644 --- a/src/main/java/teetime/framework/idle/YieldStrategy.java +++ b/src/main/java/teetime/framework/idle/YieldStrategy.java @@ -15,6 +15,10 @@ */ package teetime.framework.idle; +/** + * @deprecated since 1.1 + */ +@Deprecated public final class YieldStrategy implements IdleStrategy { @Override diff --git a/src/main/java/teetime/framework/pipe/IMonitorablePipe.java b/src/main/java/teetime/framework/pipe/IMonitorablePipe.java new file mode 100644 index 0000000000000000000000000000000000000000..546e64ddca86531179bbda5c5ff964e21302e60d --- /dev/null +++ b/src/main/java/teetime/framework/pipe/IMonitorablePipe.java @@ -0,0 +1,29 @@ +/** + * Copyright (C) 2015 TeeTime (http://teetime.sourceforge.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package teetime.framework.pipe; + +public interface IMonitorablePipe { + + long getNumPushes(); + + long getNumPulls(); + + int size(); + + long getPushThroughput(); + + long getPullThroughput(); +} diff --git a/src/main/java/teetime/framework/pipe/SpScPipe.java b/src/main/java/teetime/framework/pipe/SpScPipe.java index 106506de63287c84e55649fb665c472531943805..cf714c4d6d959af3893196cf6bda7f55f987f148 100644 --- a/src/main/java/teetime/framework/pipe/SpScPipe.java +++ b/src/main/java/teetime/framework/pipe/SpScPipe.java @@ -15,26 +15,23 @@ */ package teetime.framework.pipe; -import java.util.Queue; - -import org.jctools.queues.QueueFactory; -import org.jctools.queues.spec.ConcurrentQueueSpec; +import org.jctools.queues.ObservableSpScArrayQueue; import teetime.framework.AbstractInterThreadPipe; import teetime.framework.InputPort; import teetime.framework.OutputPort; -public final class SpScPipe extends AbstractInterThreadPipe { +public final class SpScPipe extends AbstractInterThreadPipe implements IMonitorablePipe { // private static final Logger LOGGER = LoggerFactory.getLogger(SpScPipe.class); - private final Queue<Object> queue; + private final ObservableSpScArrayQueue<Object> queue; // statistics private int numWaits; <T> SpScPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { super(sourcePort, targetPort); - this.queue = QueueFactory.newQueue(ConcurrentQueueSpec.createBoundedSpsc(capacity)); + this.queue = new ObservableSpScArrayQueue<Object>(capacity); } @Deprecated @@ -86,4 +83,24 @@ public final class SpScPipe extends AbstractInterThreadPipe { return this.numWaits; } + @Override + public long getPushThroughput() { + return queue.getProducerFrequency(); + } + + @Override + public long getPullThroughput() { + return queue.getConsumerFrequency(); + } + + @Override + public long getNumPushes() { + return queue.getNumPushes(); + } + + @Override + public long getNumPulls() { + return queue.getNumPulls(); + } + } diff --git a/src/main/java/teetime/stage/basic/merger/Merger.java b/src/main/java/teetime/stage/basic/merger/Merger.java index f162b2d598e0d96113798e0bb916b1d61b14cb20..52ef50be6df93ccbc3ca20fc4aedf7af65a4c3de 100644 --- a/src/main/java/teetime/stage/basic/merger/Merger.java +++ b/src/main/java/teetime/stage/basic/merger/Merger.java @@ -24,6 +24,8 @@ import teetime.framework.AbstractStage; import teetime.framework.InputPort; import teetime.framework.OutputPort; import teetime.framework.signal.ISignal; +import teetime.framework.signal.StartingSignal; +import teetime.framework.signal.TerminatingSignal; /** * @@ -59,7 +61,6 @@ public final class Merger<T> extends AbstractStage { if (token == null) { returnNoElement(); } - outputPort.send(token); } @@ -88,19 +89,27 @@ public final class Merger<T> extends AbstractStage { if (!set.add(inputPort)) { this.logger.warn("Received more than one signal - " + signal + " - from input port: " + inputPort); } - + if (signalMap.get(signalClass).size() == this.getInputPorts().length && signalClass == TerminatingSignal.class) { + signal.trigger(this); + sendSignalToOutputPorts(signal); + signalMap.remove(signalClass); + } } else { Set<InputPort<?>> tempSet = new HashSet<InputPort<?>>(); tempSet.add(inputPort); signalMap.put((Class<ISignal>) signalClass, tempSet); + if (signalClass == StartingSignal.class) { + signal.trigger(this); + sendSignalToOutputPorts(signal); + } } - if (signalMap.get(signalClass).size() == this.getInputPorts().length) { - signal.trigger(this); - this.outputPort.sendSignal(signal); - signalMap.remove(signalClass); - } + } + private void sendSignalToOutputPorts(final ISignal signal) { + for (OutputPort<?> outputPort : getOutputPorts()) { + outputPort.sendSignal(signal); + } } public IMergerStrategy getMergerStrategy() { diff --git a/src/main/java/teetime/stage/io/File2Lines.java b/src/main/java/teetime/stage/io/File2Lines.java index e19f9c6dc6bdbee14a2308344f8d04dead46d2b6..e4118b4769a6c4f268db199e99fbd1ff1cf38b73 100644 --- a/src/main/java/teetime/stage/io/File2Lines.java +++ b/src/main/java/teetime/stage/io/File2Lines.java @@ -28,12 +28,33 @@ import teetime.framework.OutputPort; /** * @author Christian Wulf * + * @since 1.1 + * */ public final class File2Lines extends AbstractConsumerStage<File> { private final OutputPort<String> outputPort = this.createOutputPort(); - private String charset = "UTF-8"; + private final String charset; + + /** + * <ol> + * <li>charset = UTF-8 + * </ol> + */ + public File2Lines() { + this("UTF-8"); + } + + /** + * + * @param charset + * to be used when interpreting text files + */ + public File2Lines(final String charset) { + super(); + this.charset = charset; + } @Override protected void execute(final File textFile) { @@ -66,10 +87,6 @@ public final class File2Lines extends AbstractConsumerStage<File> { return this.charset; } - public void setCharset(final String charset) { - this.charset = charset; - } - public OutputPort<String> getOutputPort() { return outputPort; } diff --git a/src/main/java/teetime/stage/io/File2SeqOfWords.java b/src/main/java/teetime/stage/io/File2SeqOfWords.java new file mode 100644 index 0000000000000000000000000000000000000000..d09589a66c86e07544ff00ce41733ad8f8144529 --- /dev/null +++ b/src/main/java/teetime/stage/io/File2SeqOfWords.java @@ -0,0 +1,125 @@ +/** + * Copyright (C) 2015 TeeTime (http://teetime.sourceforge.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package teetime.stage.io; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.CharBuffer; + +import teetime.framework.AbstractConsumerStage; +import teetime.framework.OutputPort; + +/** + * @author Christian Wulf + * + */ +public final class File2SeqOfWords extends AbstractConsumerStage<File> { + + private final OutputPort<String> outputPort = this.createOutputPort(); + + private final String charset; + private final int bufferCapacity; + + /** + * <ol> + * <li>charset = UTF-8 + * <li>bufferCapacity = 1024 + * </ol> + */ + public File2SeqOfWords() { + this("UTF-8", 1024); + } + + public File2SeqOfWords(final String charset, final int bufferCapacity) { + super(); + this.charset = charset; + this.bufferCapacity = bufferCapacity; + } + + @Override + protected void execute(final File textFile) { + BufferedReader reader = null; + try { + reader = new BufferedReader(new InputStreamReader(new FileInputStream(textFile), this.charset)); + CharBuffer charBuffer = CharBuffer.allocate(bufferCapacity); + while (reader.read(charBuffer) != -1) { + final int position = getPreviousWhitespacePosition(charBuffer); + if (-1 == position) { + if (logger.isErrorEnabled()) { + logger.error("A word in the following text file is bigger than the buffer's capacity: " + textFile.getAbsolutePath()); + return; + } + } + final int limit = charBuffer.limit(); + + charBuffer.limit(position); + charBuffer.rewind(); + outputPort.send(charBuffer.toString()); // from position to limit-1 + + charBuffer.limit(limit); + charBuffer.position(position); + charBuffer.compact(); + } + } catch (final FileNotFoundException e) { + this.logger.error("", e); + } catch (final IOException e) { + this.logger.error("", e); + } finally { + try { + if (reader != null) { + reader.close(); + } + } catch (final IOException e) { + this.logger.warn("", e); + } + } + } + + private int getPreviousWhitespacePosition(final CharBuffer charBuffer) { + char[] characters = charBuffer.array(); + int index = charBuffer.arrayOffset() + charBuffer.position() - 1; + + while (index >= 0) { + switch (characters[index]) { + case ' ': + case '\n': + case '\r': + case '\t': + return index - charBuffer.arrayOffset(); + default: + index--; + } + } + return -1; + } + + public String getCharset() { + return this.charset; + } + + public int getBufferCapacity() { + return bufferCapacity; + } + + public OutputPort<String> getOutputPort() { + return outputPort; + } + +} diff --git a/src/main/java/teetime/stage/io/File2TextLinesFilter.java b/src/main/java/teetime/stage/io/File2TextLinesFilter.java index 977fef59d491ea0a2663171537a25d33209ce1cf..e4861a77a29d1bd2d481aaeab2b50574447926cd 100644 --- a/src/main/java/teetime/stage/io/File2TextLinesFilter.java +++ b/src/main/java/teetime/stage/io/File2TextLinesFilter.java @@ -29,12 +29,37 @@ import teetime.stage.util.TextLine; /** * @author Christian Wulf * + * @since 1.0 + * */ public final class File2TextLinesFilter extends AbstractConsumerStage<File> { private final OutputPort<TextLine> outputPort = this.createOutputPort(); - private String charset = "UTF-8"; + private final String charset; + + /** + * <ol> + * <li>charset = UTF-8 + * </ol> + * + * @since 1.1 + */ + public File2TextLinesFilter() { + this("UTF-8"); + } + + /** + * + * @param charset + * to be used when interpreting text files + * + * @since 1.1 + */ + public File2TextLinesFilter(final String charset) { + super(); + this.charset = charset; + } @Override protected void execute(final File textFile) { @@ -67,10 +92,6 @@ public final class File2TextLinesFilter extends AbstractConsumerStage<File> { return this.charset; } - public void setCharset(final String charset) { - this.charset = charset; - } - public OutputPort<TextLine> getOutputPort() { return outputPort; } diff --git a/src/performancetest/java/teetime/examples/ChwWorkComparisonMethodcallWithPorts.java b/src/performancetest/java/teetime/examples/ChwWorkComparisonMethodcallWithPorts.java index d65e58f59c433bd527fe18a637dc50b0fc9c6f3b..ac34f1b42d9b141eda700c4219ceb06f8b81eea7 100644 --- a/src/performancetest/java/teetime/examples/ChwWorkComparisonMethodcallWithPorts.java +++ b/src/performancetest/java/teetime/examples/ChwWorkComparisonMethodcallWithPorts.java @@ -28,7 +28,7 @@ public class ChwWorkComparisonMethodcallWithPorts extends AbstractProfiledPerfor @Override public String getCorrespondingPerformanceProfile() { - return "ChwWork"; + return HostName.CHW_WORK.toString(); } @Override diff --git a/src/performancetest/java/teetime/examples/experiment01/ChwWorkPerformanceCheck.java b/src/performancetest/java/teetime/examples/experiment01/ChwWorkPerformanceCheck.java index 6c7a37bb07ea43b72abfce361e74eedf3398b964..d7762b8c40ea5beaa6d925b5935c009e6833d443 100644 --- a/src/performancetest/java/teetime/examples/experiment01/ChwWorkPerformanceCheck.java +++ b/src/performancetest/java/teetime/examples/experiment01/ChwWorkPerformanceCheck.java @@ -16,15 +16,16 @@ package teetime.examples.experiment01; import static org.junit.Assert.assertEquals; +import teetime.examples.HostName; import teetime.util.test.eval.PerformanceResult; -import util.test.PerformanceTest; import util.test.AbstractProfiledPerformanceAssertion; +import util.test.PerformanceTest; class ChwWorkPerformanceCheck extends AbstractProfiledPerformanceAssertion { @Override public String getCorrespondingPerformanceProfile() { - return "ChwWork"; + return HostName.CHW_WORK.toString(); } @Override diff --git a/src/performancetest/java/teetime/examples/experiment09pipeimpls/ChwWorkPerformanceCheck.java b/src/performancetest/java/teetime/examples/experiment09pipeimpls/ChwWorkPerformanceCheck.java index 384049059db2a2bb7a45ca4929a7e30e8c3f9da0..cc670a79a849d0b9b8ce76be1dcef445da89c1ca 100644 --- a/src/performancetest/java/teetime/examples/experiment09pipeimpls/ChwWorkPerformanceCheck.java +++ b/src/performancetest/java/teetime/examples/experiment09pipeimpls/ChwWorkPerformanceCheck.java @@ -16,12 +16,13 @@ package teetime.examples.experiment09pipeimpls; import static org.junit.Assert.assertEquals; +import teetime.examples.HostName; class ChwWorkPerformanceCheck extends AbstractPerformanceCheck { @Override public String getCorrespondingPerformanceProfile() { - return "ChwWork"; + return HostName.CHW_WORK.toString(); } @Override diff --git a/src/performancetest/java/teetime/examples/experiment11/ChwWorkPerformanceCheck.java b/src/performancetest/java/teetime/examples/experiment11/ChwWorkPerformanceCheck.java index f20539b6e24a0e36dad90d6d7a9cd07425fad3e9..ea4c302b269369302e0fe415e80db707ddf5602a 100644 --- a/src/performancetest/java/teetime/examples/experiment11/ChwWorkPerformanceCheck.java +++ b/src/performancetest/java/teetime/examples/experiment11/ChwWorkPerformanceCheck.java @@ -16,15 +16,16 @@ package teetime.examples.experiment11; import static org.junit.Assert.assertEquals; +import teetime.examples.HostName; import teetime.util.test.eval.PerformanceResult; -import util.test.PerformanceTest; import util.test.AbstractProfiledPerformanceAssertion; +import util.test.PerformanceTest; class ChwWorkPerformanceCheck extends AbstractProfiledPerformanceAssertion { @Override public String getCorrespondingPerformanceProfile() { - return "ChwWork"; + return HostName.CHW_WORK.toString(); } @Override diff --git a/src/performancetest/java/teetime/examples/experiment16/ChwWorkPerformanceCheck.java b/src/performancetest/java/teetime/examples/experiment16/ChwWorkPerformanceCheck.java index c9c07e70add55ee979112f01846f5991eda82b7a..210b13f6ffa9a3978da77dbf5d49124351c71272 100644 --- a/src/performancetest/java/teetime/examples/experiment16/ChwWorkPerformanceCheck.java +++ b/src/performancetest/java/teetime/examples/experiment16/ChwWorkPerformanceCheck.java @@ -16,12 +16,18 @@ package teetime.examples.experiment16; import static org.junit.Assert.assertEquals; +import teetime.examples.HostName; import teetime.util.test.eval.PerformanceResult; import util.test.AbstractProfiledPerformanceAssertion; import util.test.PerformanceTest; class ChwWorkPerformanceCheck extends AbstractProfiledPerformanceAssertion { + @Override + public String getCorrespondingPerformanceProfile() { + return HostName.CHW_WORK.toString(); + } + @Override public void check() { PerformanceResult test16a = PerformanceTest.measurementRepository.performanceResults @@ -43,8 +49,4 @@ class ChwWorkPerformanceCheck extends AbstractProfiledPerformanceAssertion { assertEquals(2.0, speedupC, 0.3); } - @Override - public String getCorrespondingPerformanceProfile() { - return "ChwWork"; - } } diff --git a/src/site/markdown/index.markdown b/src/site/markdown/index.markdown index a718c026ef724aadb2ff1f946f314f7e0ab2ee36..eaae70ae49d27b34e8c0b94744c48720f8118784 100644 --- a/src/site/markdown/index.markdown +++ b/src/site/markdown/index.markdown @@ -21,17 +21,22 @@ Predefined stages are provided within the source code. You can immediatly start ## What is it? -TeeTime is a Pipes-And-Filters-Framework for Java. - -It provides various features, which will bring you to the next level of analysis programming, such as: - -- Concurrent execution of stages - -- Flexible connection between stages - -- Typed ports - -- Pre-defined stages and pipelines +TeeTime is a Pipe-and-Filter Framework for Java. + +It provides support for the *modeling* and the *execution* of P&F architectures. +In particular, it features... + +- many primitive and composite ready-to-use stages + +- a type-safety way to develop and connect stages + +- no(!) single-threaded overhead + +- only a minimal multi-threaded overhead + +- threads can be assigned to stages arbitrarily + +- and many more... ## Where to get it? diff --git a/src/site/site.xml b/src/site/site.xml index 3751706ce21202aa4aa16184e6e7c55cd0ad598b..25d1340c5b72b72385c394707af69c49a6566b2d 100644 --- a/src/site/site.xml +++ b/src/site/site.xml @@ -95,7 +95,7 @@ </brand> <slogan> <![CDATA[ - <span class="slogan">The next level Pipes-And-Filters-Framework for Java</span> + <span class="slogan">The next-generation Pipe-and-Filter Framework for Java</span> ]]> </slogan> <titleTemplate>%2$s | %1$s</titleTemplate> diff --git a/src/test/java/teetime/stage/basic/merger/MergerSignalTest.java b/src/test/java/teetime/stage/basic/merger/MergerSignalTest.java index 597cee060ae699ad1d4a2ad78deb6802b8843bc1..f4deb2bc2553ecb911ab49c33e46d1379896e670 100644 --- a/src/test/java/teetime/stage/basic/merger/MergerSignalTest.java +++ b/src/test/java/teetime/stage/basic/merger/MergerSignalTest.java @@ -15,7 +15,9 @@ */ package teetime.stage.basic.merger; -import org.junit.Assert; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + import org.junit.Test; import teetime.framework.InputPort; @@ -43,59 +45,67 @@ public class MergerSignalTest { public void testSameSignal() { this.beforeSignalTesting(); merger.onSignal(new StartingSignal(), firstPort); - Assert.assertFalse(testPipe.startSent()); - + assertTrue(testPipe.startSent()); + testPipe.reset(); merger.onSignal(new StartingSignal(), secondPort); - Assert.assertTrue(testPipe.startSent()); + assertFalse(testPipe.startSent()); } @Test public void testDifferentSignals() { this.beforeSignalTesting(); merger.onSignal(new StartingSignal(), firstPort); - Assert.assertFalse(testPipe.startSent()); + assertTrue(testPipe.startSent()); + testPipe.reset(); merger.onSignal(new TerminatingSignal(), secondPort); - Assert.assertFalse(testPipe.startSent()); + assertFalse(testPipe.startSent()); } @Test public void testInterleavedSignals() { this.beforeSignalTesting(); merger.onSignal(new StartingSignal(), firstPort); - Assert.assertFalse(testPipe.startSent()); - Assert.assertFalse(testPipe.terminateSent()); + assertTrue(testPipe.startSent()); + assertFalse(testPipe.terminateSent()); + testPipe.reset(); merger.onSignal(new TerminatingSignal(), secondPort); - Assert.assertFalse(testPipe.startSent()); - Assert.assertFalse(testPipe.terminateSent()); + assertFalse(testPipe.startSent()); + assertFalse(testPipe.terminateSent()); + testPipe.reset(); merger.onSignal(new TerminatingSignal(), firstPort); - Assert.assertFalse(testPipe.startSent()); - Assert.assertTrue(testPipe.terminateSent()); + assertFalse(testPipe.startSent()); + assertTrue(testPipe.terminateSent()); + testPipe.reset(); merger.onSignal(new TerminatingSignal(), firstPort); - Assert.assertFalse(testPipe.startSent()); - Assert.assertTrue(testPipe.terminateSent()); + assertFalse(testPipe.startSent()); + assertFalse(testPipe.terminateSent()); + testPipe.reset(); merger.onSignal(new StartingSignal(), secondPort); - Assert.assertTrue(testPipe.startSent()); - Assert.assertTrue(testPipe.terminateSent()); + assertFalse(testPipe.startSent()); + assertFalse(testPipe.terminateSent()); } @Test public void testMultipleSignals() { this.beforeSignalTesting(); merger.onSignal(new StartingSignal(), firstPort); - Assert.assertFalse(testPipe.startSent()); + assertTrue(testPipe.startSent()); + testPipe.reset(); merger.onSignal(new StartingSignal(), firstPort); - Assert.assertFalse(testPipe.startSent()); + assertFalse(testPipe.startSent()); + testPipe.reset(); merger.onSignal(new StartingSignal(), firstPort); - Assert.assertFalse(testPipe.startSent()); + assertFalse(testPipe.startSent()); + testPipe.reset(); merger.onSignal(new StartingSignal(), secondPort); - Assert.assertTrue(testPipe.startSent()); + assertFalse(testPipe.startSent()); } } diff --git a/src/test/java/teetime/stage/basic/merger/MergerTest.java b/src/test/java/teetime/stage/basic/merger/MergerTest.java index 234537e01dca9f28c400ed90c2043d6085013f45..7a8546090157a8195799a53abfd11861109ed605 100644 --- a/src/test/java/teetime/stage/basic/merger/MergerTest.java +++ b/src/test/java/teetime/stage/basic/merger/MergerTest.java @@ -16,7 +16,15 @@ package teetime.stage.basic.merger; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThat; +import static teetime.framework.test.StageTester.test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import org.junit.Before; import org.junit.Test; @@ -25,6 +33,7 @@ import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.SingleElementPipeFactory; import teetime.stage.CollectorSink; import teetime.stage.InitialElementProducer; +import teetime.util.Pair; /** * @author Nils Christian Ehmke @@ -72,4 +81,19 @@ public class MergerTest { assertThat(this.collector.getElements(), contains(1, 2, 3)); } + @Test + public void roundRobinShouldWork2() { + mergerUnderTest = new Merger<Integer>(new RoundRobinStrategy()); + + List<Integer> outputList = new ArrayList<Integer>(); + Collection<Pair<Thread, Throwable>> exceptions = test(mergerUnderTest) + .and().send(1, 2, 3).to(mergerUnderTest.getNewInputPort()) + .and().send(4, 5, 6).to(mergerUnderTest.getNewInputPort()) + .and().receive(outputList).from(mergerUnderTest.getOutputPort()) + .start(); + + assertThat(exceptions, is(empty())); + assertThat(outputList, is(not(empty()))); + assertThat(outputList, contains(1, 4, 2, 5, 3, 6)); + } } diff --git a/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java b/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java index 9b0e3904c1ef14a643ab5620fb6df67ce4f03747..9defb4511cadc946b212a1f857f3ffb2c464a712 100644 --- a/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java +++ b/src/test/java/teetime/stage/basic/merger/MergerTestingPipe.java @@ -46,6 +46,11 @@ public class MergerTestingPipe implements IPipe { return this.terminateSent; } + public void reset() { + this.startSent = false; + this.terminateSent = false; + } + @Override public boolean add(final Object element) { return false;