From d9cdcc95c034c90cfe7dab82f28c37ddc1e44e38 Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Thu, 26 Mar 2015 10:59:58 +0100 Subject: [PATCH] added first draft of a monitoring thread --- conf/quality-config/pmd-ruleset.xml | 14 +++-- .../queues/ObservableSpScArrayQueue.java | 23 ++++++++ .../framework/AbstractInterThreadPipe.java | 4 -- .../teetime/framework/MonitoringThread.java | 57 +++++++++++++++++++ .../framework/pipe/IMonitorablePipe.java | 14 +++++ .../teetime/framework/pipe/RelayTestPipe.java | 10 ---- .../java/teetime/framework/pipe/SpScPipe.java | 12 +++- .../framework/pipe/UnboundedSpScPipe.java | 10 ---- src/site/markdown/index.markdown | 28 +++++---- src/site/site.xml | 2 +- 10 files changed, 131 insertions(+), 43 deletions(-) create mode 100644 src/main/java/teetime/framework/MonitoringThread.java create mode 100644 src/main/java/teetime/framework/pipe/IMonitorablePipe.java diff --git a/conf/quality-config/pmd-ruleset.xml b/conf/quality-config/pmd-ruleset.xml index f7840493..012fa89c 100644 --- a/conf/quality-config/pmd-ruleset.xml +++ b/conf/quality-config/pmd-ruleset.xml @@ -21,12 +21,12 @@ <rule ref="rulesets/java/empty.xml" /> <rule ref="rulesets/java/finalizers.xml" /> <rule ref="rulesets/java/imports.xml" /> -<!-- <rule ref="rulesets/java/j2ee.xml" /> --> -<!-- <rule ref="rulesets/java/javabeans.xml" /> --> + <!-- <rule ref="rulesets/java/j2ee.xml" /> --> + <!-- <rule ref="rulesets/java/javabeans.xml" /> --> <rule ref="rulesets/java/junit.xml" /> <rule ref="rulesets/java/logging-jakarta-commons.xml" /> <rule ref="rulesets/java/logging-java.xml" /> - + <rule ref="rulesets/java/migrating.xml" /> <rule ref="rulesets/java/naming.xml" /> <!-- <rule ref="rulesets/java/optimizations.xml" /> --> @@ -61,14 +61,15 @@ <rule ref="rulesets/java/controversial.xml"> <exclude name="AtLeastOneConstructor" /> + <exclude name="AvoidUsingVolatile" /> </rule> - + <rule ref="rulesets/java/j2ee.xml"> <exclude name="DoNotUseThreads" /> </rule> - + <rule ref="rulesets/java/javabeans.xml"> - <exclude name="BeanMembersShouldSerialize"/> + <exclude name="BeanMembersShouldSerialize" /> </rule> <rule ref="rulesets/java/naming.xml/VariableNamingConventions"> @@ -79,4 +80,5 @@ <exclude name="LocalVariableCouldBeFinal" /> </rule> + </ruleset> \ No newline at end of file diff --git a/src/main/java/org/jctools/queues/ObservableSpScArrayQueue.java b/src/main/java/org/jctools/queues/ObservableSpScArrayQueue.java index 687ab9d4..01b1bfad 100644 --- a/src/main/java/org/jctools/queues/ObservableSpScArrayQueue.java +++ b/src/main/java/org/jctools/queues/ObservableSpScArrayQueue.java @@ -1,3 +1,18 @@ +/** + * Copyright (C) 2015 TeeTime (http://teetime.sourceforge.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.jctools.queues; import java.util.Collection; @@ -14,6 +29,14 @@ public final class ObservableSpScArrayQueue<E> implements Queue<E> { this.queue = new SpscArrayQueue<E>(requestedCapacity); } + public long getNumPushes() { + return queue.lvProducerIndex(); + } + + public long getNumPulls() { + return queue.lvConsumerIndex(); + } + public long getProducerFrequency() { final long currentProducerIndex = queue.lvProducerIndex(); long diff = currentProducerIndex - lastProducerIndex; diff --git a/src/main/java/teetime/framework/AbstractInterThreadPipe.java b/src/main/java/teetime/framework/AbstractInterThreadPipe.java index d759ac47..6c3e3061 100644 --- a/src/main/java/teetime/framework/AbstractInterThreadPipe.java +++ b/src/main/java/teetime/framework/AbstractInterThreadPipe.java @@ -78,8 +78,4 @@ public abstract class AbstractInterThreadPipe extends AbstractPipe { public final void close() { isClosed = true; } - - public abstract long getPushThroughput(); - - public abstract long getPullThroughput(); } diff --git a/src/main/java/teetime/framework/MonitoringThread.java b/src/main/java/teetime/framework/MonitoringThread.java new file mode 100644 index 00000000..60f34329 --- /dev/null +++ b/src/main/java/teetime/framework/MonitoringThread.java @@ -0,0 +1,57 @@ +package teetime.framework; + +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.LoggerFactory; + +import teetime.framework.pipe.IMonitorablePipe; +import teetime.framework.pipe.IPipe; + +public class MonitoringThread extends Thread { + + private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(MonitoringThread.class); + + private final List<IMonitorablePipe> monitoredPipes = new ArrayList<IMonitorablePipe>(); + + private volatile boolean terminated; + + @Override + public void run() { + while (!terminated) { + + for (final IMonitorablePipe pipe : monitoredPipes) { + final long pushThroughput = pipe.getPushThroughput(); + final long pullThroughput = pipe.getPullThroughput(); + final double ratio = (double) pushThroughput / pullThroughput; + + LOGGER.info("pipe: " + "size=" + pipe.size() + ", " + "ratio: " + String.format("%.1f", ratio)); + LOGGER.info("pushes: " + pushThroughput); + LOGGER.info("pulls: " + pullThroughput); + } + LOGGER.info("------------------------------------"); + + try { + Thread.sleep(1000); + } catch (final InterruptedException e) { + terminated = true; + } + } + } + + public void addPipe(final IPipe pipe) { + if (!(pipe instanceof IMonitorablePipe)) { + throw new IllegalArgumentException("The given pipe does not implement IMonitorablePipe"); + } + monitoredPipes.add((IMonitorablePipe) pipe); + } + + /** + * Sets the <code>terminated</code> flag and interrupts this thread. + */ + public void terminate() { + terminated = true; + interrupt(); + } + +} diff --git a/src/main/java/teetime/framework/pipe/IMonitorablePipe.java b/src/main/java/teetime/framework/pipe/IMonitorablePipe.java new file mode 100644 index 00000000..5857e4bb --- /dev/null +++ b/src/main/java/teetime/framework/pipe/IMonitorablePipe.java @@ -0,0 +1,14 @@ +package teetime.framework.pipe; + +public interface IMonitorablePipe { + + long getNumPushes(); + + long getNumPulls(); + + int size(); + + long getPushThroughput(); + + long getPullThroughput(); +} diff --git a/src/main/java/teetime/framework/pipe/RelayTestPipe.java b/src/main/java/teetime/framework/pipe/RelayTestPipe.java index 767f7f31..9afebf5f 100644 --- a/src/main/java/teetime/framework/pipe/RelayTestPipe.java +++ b/src/main/java/teetime/framework/pipe/RelayTestPipe.java @@ -59,14 +59,4 @@ public final class RelayTestPipe<T> extends AbstractInterThreadPipe { return this.numInputObjects; } - @Override - public long getPushThroughput() { - return -1; - } - - @Override - public long getPullThroughput() { - return -1; - } - } diff --git a/src/main/java/teetime/framework/pipe/SpScPipe.java b/src/main/java/teetime/framework/pipe/SpScPipe.java index f19e41ef..cf714c4d 100644 --- a/src/main/java/teetime/framework/pipe/SpScPipe.java +++ b/src/main/java/teetime/framework/pipe/SpScPipe.java @@ -21,7 +21,7 @@ import teetime.framework.AbstractInterThreadPipe; import teetime.framework.InputPort; import teetime.framework.OutputPort; -public final class SpScPipe extends AbstractInterThreadPipe { +public final class SpScPipe extends AbstractInterThreadPipe implements IMonitorablePipe { // private static final Logger LOGGER = LoggerFactory.getLogger(SpScPipe.class); @@ -93,4 +93,14 @@ public final class SpScPipe extends AbstractInterThreadPipe { return queue.getConsumerFrequency(); } + @Override + public long getNumPushes() { + return queue.getNumPushes(); + } + + @Override + public long getNumPulls() { + return queue.getNumPulls(); + } + } diff --git a/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java b/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java index 207225c0..ded5c049 100644 --- a/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java +++ b/src/main/java/teetime/framework/pipe/UnboundedSpScPipe.java @@ -61,14 +61,4 @@ public final class UnboundedSpScPipe extends AbstractInterThreadPipe { return this.queue.size(); } - @Override - public long getPushThroughput() { - return -1; - } - - @Override - public long getPullThroughput() { - return -1; - } - } diff --git a/src/site/markdown/index.markdown b/src/site/markdown/index.markdown index a718c026..33964072 100644 --- a/src/site/markdown/index.markdown +++ b/src/site/markdown/index.markdown @@ -21,17 +21,23 @@ Predefined stages are provided within the source code. You can immediatly start ## What is it? -TeeTime is a Pipes-And-Filters-Framework for Java. - -It provides various features, which will bring you to the next level of analysis programming, such as: - -- Concurrent execution of stages - -- Flexible connection between stages - -- Typed ports - -- Pre-defined stages and pipelines +TeeTime is a Pipe & Filter Framework for Java. + +It provides support for the *modeling* and the *execution* of P&F architectures. + +It features + +- a high reusability and maintainability + +- a type-safety way to develop and connect stages + +- many pre-defined ready-to-use stages + +- no or a minimal synchronization overhead + +- a hybrid stage execution model + +- and many more... ## Where to get it? diff --git a/src/site/site.xml b/src/site/site.xml index 3751706c..292ac5f0 100644 --- a/src/site/site.xml +++ b/src/site/site.xml @@ -95,7 +95,7 @@ </brand> <slogan> <![CDATA[ - <span class="slogan">The next level Pipes-And-Filters-Framework for Java</span> + <span class="slogan">The next-generation Pipe-And-Filter Framework for Java</span> ]]> </slogan> <titleTemplate>%2$s | %1$s</titleTemplate> -- GitLab