diff --git a/conf/quality-config/pmd-ruleset.xml b/conf/quality-config/pmd-ruleset.xml index f7840493f84163bb1799135251c730596b9ab7e7..012fa89c12927d759d8fe0730cdeabc0ab2b6344 100644 --- a/conf/quality-config/pmd-ruleset.xml +++ b/conf/quality-config/pmd-ruleset.xml @@ -21,12 +21,12 @@ <rule ref="rulesets/java/empty.xml" /> <rule ref="rulesets/java/finalizers.xml" /> <rule ref="rulesets/java/imports.xml" /> -<!-- <rule ref="rulesets/java/j2ee.xml" /> --> -<!-- <rule ref="rulesets/java/javabeans.xml" /> --> + <!-- <rule ref="rulesets/java/j2ee.xml" /> --> + <!-- <rule ref="rulesets/java/javabeans.xml" /> --> <rule ref="rulesets/java/junit.xml" /> <rule ref="rulesets/java/logging-jakarta-commons.xml" /> <rule ref="rulesets/java/logging-java.xml" /> - + <rule ref="rulesets/java/migrating.xml" /> <rule ref="rulesets/java/naming.xml" /> <!-- <rule ref="rulesets/java/optimizations.xml" /> --> @@ -61,14 +61,15 @@ <rule ref="rulesets/java/controversial.xml"> <exclude name="AtLeastOneConstructor" /> + <exclude name="AvoidUsingVolatile" /> </rule> - + <rule ref="rulesets/java/j2ee.xml"> <exclude name="DoNotUseThreads" /> </rule> - + <rule ref="rulesets/java/javabeans.xml"> - <exclude name="BeanMembersShouldSerialize"/> + <exclude name="BeanMembersShouldSerialize" /> </rule> <rule ref="rulesets/java/naming.xml/VariableNamingConventions"> @@ -79,4 +80,5 @@ <exclude name="LocalVariableCouldBeFinal" /> </rule> + </ruleset> \ No newline at end of file diff --git a/src/main/java/org/jctools/queues/ObservableSpScArrayQueue.java b/src/main/java/org/jctools/queues/ObservableSpScArrayQueue.java index 687ab9d4a045fd6f025737ba2733c2078bd2e7b0..01b1bfadfb33c3b6f8600d69e8e9f819fe31e3ad 100644 --- a/src/main/java/org/jctools/queues/ObservableSpScArrayQueue.java +++ b/src/main/java/org/jctools/queues/ObservableSpScArrayQueue.java @@ -1,3 +1,18 @@ +/** + * Copyright (C) 2015 TeeTime (http://teetime.sourceforge.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.jctools.queues; import java.util.Collection; @@ -14,6 +29,14 @@ public final class ObservableSpScArrayQueue<E> implements Queue<E> { this.queue = new SpscArrayQueue<E>(requestedCapacity); } + public long getNumPushes() { + return queue.lvProducerIndex(); + } + + public long getNumPulls() { + return queue.lvConsumerIndex(); + } + public long getProducerFrequency() { final long currentProducerIndex = queue.lvProducerIndex(); long diff = currentProducerIndex - lastProducerIndex; diff --git a/src/main/java/teetime/framework/AbstractInterThreadPipe.java b/src/main/java/teetime/framework/AbstractInterThreadPipe.java index d759ac47d7ff86a25f7561cf47b952ee7eeb8d6e..6c3e30615fb06c0ec9d80093a76232f6775ad02c 100644 --- a/src/main/java/teetime/framework/AbstractInterThreadPipe.java +++ b/src/main/java/teetime/framework/AbstractInterThreadPipe.java @@ -78,8 +78,4 @@ public abstract class AbstractInterThreadPipe extends AbstractPipe { public final void close() { isClosed = true; } - - public abstract long getPushThroughput(); - - public abstract long getPullThroughput(); } diff --git a/src/main/java/teetime/framework/MonitoringThread.java b/src/main/java/teetime/framework/MonitoringThread.java new file mode 100644 index 0000000000000000000000000000000000000000..60f343291dca9be336b353bf62ba3d09580b9ee7 --- /dev/null +++ b/src/main/java/teetime/framework/MonitoringThread.java @@ -0,0 +1,57 @@ +package teetime.framework; + +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.LoggerFactory; + +import teetime.framework.pipe.IMonitorablePipe; +import teetime.framework.pipe.IPipe; + +public class MonitoringThread extends Thread { + + private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(MonitoringThread.class); + + private final List<IMonitorablePipe> monitoredPipes = new ArrayList<IMonitorablePipe>(); + + private volatile boolean terminated; + + @Override + public void run() { + while (!terminated) { + + for (final IMonitorablePipe pipe : monitoredPipes) { + final long pushThroughput = pipe.getPushThroughput(); + final long pullThroughput = pipe.getPullThroughput(); + final double ratio = (double) pushThroughput / pullThroughput; + + LOGGER.info("pipe: " + "size=" + pipe.size() + ", " + "ratio: " + String.format("%.1f", ratio)); + LOGGER.info("pushes: " + pushThroughput); + LOGGER.info("pulls: " + pullThroughput); + } + LOGGER.info("------------------------------------"); + + try { + Thread.sleep(1000); + } catch (final InterruptedException e) { + terminated = true; + } + } + } + + public void addPipe(final IPipe pipe) { + if (!(pipe instanceof IMonitorablePipe)) { + throw new IllegalArgumentException("The given pipe does not implement IMonitorablePipe"); + } + monitoredPipes.add((IMonitorablePipe) pipe); + } + + /** + * Sets the <code>terminated</code> flag and interrupts this thread. + */ + public void terminate() { + terminated = true; + interrupt(); + } + +} diff --git a/src/main/java/teetime/framework/pipe/IMonitorablePipe.java b/src/main/java/teetime/framework/pipe/IMonitorablePipe.java new file mode 100644 index 0000000000000000000000000000000000000000..5857e4bb2cd735366ed5a4bfbf116ca2743723bc --- /dev/null +++ b/src/main/java/teetime/framework/pipe/IMonitorablePipe.java @@ -0,0 +1,14 @@ +package teetime.framework.pipe; + +public interface IMonitorablePipe { + + long getNumPushes(); + + long getNumPulls(); + + int size(); + + long getPushThroughput(); + + long getPullThroughput(); +} diff --git a/src/main/java/teetime/framework/pipe/RelayTestPipe.java b/src/main/java/teetime/framework/pipe/RelayTestPipe.java index 767f7f3189f85aef73b522f65a324d9b006ad3ad..9afebf5f51f6fd485ec8f580dfa237b61c8e58d4 100644 --- a/src/main/java/teetime/framework/pipe/RelayTestPipe.java +++ b/src/main/java/teetime/framework/pipe/RelayTestPipe.java @@ -59,14 +59,4 @@ public final class RelayTestPipe<T> extends AbstractInterThreadPipe { return this.numInputObjects; } - @Override - public long getPushThroughput() { - return -1; - } - - @Override - public long getPullThroughput() { - return -1; - } - } diff --git a/src/main/java/teetime/framework/pipe/SpScPipe.java b/src/main/java/teetime/framework/pipe/SpScPipe.java index f19e41ef4adf8bceb9f8041ec0a1942e9656755b..cf714c4d6d959af3893196cf6bda7f55f987f148 100644 --- a/src/main/java/teetime/framework/pipe/SpScPipe.java +++ b/src/main/java/teetime/framework/pipe/SpScPipe.java @@ -21,7 +21,7 @@ import teetime.framework.AbstractInterThreadPipe; import teetime.framework.InputPort; import teetime.framework.OutputPort; -public final class SpScPipe extends AbstractInterThreadPipe { +public final class SpScPipe extends AbstractInterThreadPipe implements IMonitorablePipe { // private static final Logger LOGGER = LoggerFactory.getLogger(SpScPipe.class); @@ -93,4 +93,14 @@ public final class SpScPipe extends AbstractInterThreadPipe { return queue.getConsumerFrequency(); } + @Override + public long getNumPushes() { + return queue.getNumPushes(); + } + + @Override + public long getNumPulls() { + return queue.getNumPulls(); + } + } diff --git a/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java b/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java index 207225c0cfb7b6f1e5fb68bf9c798cbd6bfc5e75..ded5c049f8ab3d3a28f1bfe07bb5d3aab5ad2658 100644 --- a/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java +++ b/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java @@ -61,14 +61,4 @@ public final class UnboundedSpScPipe extends AbstractInterThreadPipe { return this.queue.size(); } - @Override - public long getPushThroughput() { - return -1; - } - - @Override - public long getPullThroughput() { - return -1; - } - } diff --git a/src/site/markdown/index.markdown b/src/site/markdown/index.markdown index a718c026ef724aadb2ff1f946f314f7e0ab2ee36..33964072765639fed0d020fa362852fe991788f4 100644 --- a/src/site/markdown/index.markdown +++ b/src/site/markdown/index.markdown @@ -21,17 +21,23 @@ Predefined stages are provided within the source code. You can immediatly start ## What is it? -TeeTime is a Pipes-And-Filters-Framework for Java. - -It provides various features, which will bring you to the next level of analysis programming, such as: - -- Concurrent execution of stages - -- Flexible connection between stages - -- Typed ports - -- Pre-defined stages and pipelines +TeeTime is a Pipe & Filter Framework for Java. + +It provides support for the *modeling* and the *execution* of P&F architectures. + +It features + +- a high reusability and maintainability + +- a type-safety way to develop and connect stages + +- many pre-defined ready-to-use stages + +- no or a minimal synchronization overhead + +- a hybrid stage execution model + +- and many more... ## Where to get it? diff --git a/src/site/site.xml b/src/site/site.xml index 3751706ce21202aa4aa16184e6e7c55cd0ad598b..292ac5f09e1204d18478bd24e9387248ba980f98 100644 --- a/src/site/site.xml +++ b/src/site/site.xml @@ -95,7 +95,7 @@ </brand> <slogan> <![CDATA[ - <span class="slogan">The next level Pipes-And-Filters-Framework for Java</span> + <span class="slogan">The next-generation Pipe-And-Filter Framework for Java</span> ]]> </slogan> <titleTemplate>%2$s | %1$s</titleTemplate>