Skip to content
Snippets Groups Projects
Commit d9cdcc95 authored by Christian Wulf's avatar Christian Wulf
Browse files

added first draft of a monitoring thread

parent e7a2885c
No related branches found
No related tags found
No related merge requests found
......@@ -21,12 +21,12 @@
<rule ref="rulesets/java/empty.xml" />
<rule ref="rulesets/java/finalizers.xml" />
<rule ref="rulesets/java/imports.xml" />
<!-- <rule ref="rulesets/java/j2ee.xml" /> -->
<!-- <rule ref="rulesets/java/javabeans.xml" /> -->
<!-- <rule ref="rulesets/java/j2ee.xml" /> -->
<!-- <rule ref="rulesets/java/javabeans.xml" /> -->
<rule ref="rulesets/java/junit.xml" />
<rule ref="rulesets/java/logging-jakarta-commons.xml" />
<rule ref="rulesets/java/logging-java.xml" />
<rule ref="rulesets/java/migrating.xml" />
<rule ref="rulesets/java/naming.xml" />
<!-- <rule ref="rulesets/java/optimizations.xml" /> -->
......@@ -61,14 +61,15 @@
<rule ref="rulesets/java/controversial.xml">
<exclude name="AtLeastOneConstructor" />
<exclude name="AvoidUsingVolatile" />
</rule>
<rule ref="rulesets/java/j2ee.xml">
<exclude name="DoNotUseThreads" />
</rule>
<rule ref="rulesets/java/javabeans.xml">
<exclude name="BeanMembersShouldSerialize"/>
<exclude name="BeanMembersShouldSerialize" />
</rule>
<rule ref="rulesets/java/naming.xml/VariableNamingConventions">
......@@ -79,4 +80,5 @@
<exclude name="LocalVariableCouldBeFinal" />
</rule>
</ruleset>
\ No newline at end of file
/**
* Copyright (C) 2015 TeeTime (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jctools.queues;
import java.util.Collection;
......@@ -14,6 +29,14 @@ public final class ObservableSpScArrayQueue<E> implements Queue<E> {
this.queue = new SpscArrayQueue<E>(requestedCapacity);
}
public long getNumPushes() {
return queue.lvProducerIndex();
}
public long getNumPulls() {
return queue.lvConsumerIndex();
}
public long getProducerFrequency() {
final long currentProducerIndex = queue.lvProducerIndex();
long diff = currentProducerIndex - lastProducerIndex;
......
......@@ -78,8 +78,4 @@ public abstract class AbstractInterThreadPipe extends AbstractPipe {
public final void close() {
isClosed = true;
}
public abstract long getPushThroughput();
public abstract long getPullThroughput();
}
package teetime.framework;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.LoggerFactory;
import teetime.framework.pipe.IMonitorablePipe;
import teetime.framework.pipe.IPipe;
public class MonitoringThread extends Thread {
private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(MonitoringThread.class);
private final List<IMonitorablePipe> monitoredPipes = new ArrayList<IMonitorablePipe>();
private volatile boolean terminated;
@Override
public void run() {
while (!terminated) {
for (final IMonitorablePipe pipe : monitoredPipes) {
final long pushThroughput = pipe.getPushThroughput();
final long pullThroughput = pipe.getPullThroughput();
final double ratio = (double) pushThroughput / pullThroughput;
LOGGER.info("pipe: " + "size=" + pipe.size() + ", " + "ratio: " + String.format("%.1f", ratio));
LOGGER.info("pushes: " + pushThroughput);
LOGGER.info("pulls: " + pullThroughput);
}
LOGGER.info("------------------------------------");
try {
Thread.sleep(1000);
} catch (final InterruptedException e) {
terminated = true;
}
}
}
public void addPipe(final IPipe pipe) {
if (!(pipe instanceof IMonitorablePipe)) {
throw new IllegalArgumentException("The given pipe does not implement IMonitorablePipe");
}
monitoredPipes.add((IMonitorablePipe) pipe);
}
/**
* Sets the <code>terminated</code> flag and interrupts this thread.
*/
public void terminate() {
terminated = true;
interrupt();
}
}
package teetime.framework.pipe;
public interface IMonitorablePipe {
long getNumPushes();
long getNumPulls();
int size();
long getPushThroughput();
long getPullThroughput();
}
......@@ -59,14 +59,4 @@ public final class RelayTestPipe<T> extends AbstractInterThreadPipe {
return this.numInputObjects;
}
@Override
public long getPushThroughput() {
return -1;
}
@Override
public long getPullThroughput() {
return -1;
}
}
......@@ -21,7 +21,7 @@ import teetime.framework.AbstractInterThreadPipe;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
public final class SpScPipe extends AbstractInterThreadPipe {
public final class SpScPipe extends AbstractInterThreadPipe implements IMonitorablePipe {
// private static final Logger LOGGER = LoggerFactory.getLogger(SpScPipe.class);
......@@ -93,4 +93,14 @@ public final class SpScPipe extends AbstractInterThreadPipe {
return queue.getConsumerFrequency();
}
@Override
public long getNumPushes() {
return queue.getNumPushes();
}
@Override
public long getNumPulls() {
return queue.getNumPulls();
}
}
......@@ -61,14 +61,4 @@ public final class UnboundedSpScPipe extends AbstractInterThreadPipe {
return this.queue.size();
}
@Override
public long getPushThroughput() {
return -1;
}
@Override
public long getPullThroughput() {
return -1;
}
}
......@@ -21,17 +21,23 @@ Predefined stages are provided within the source code. You can immediatly start
## What is it?
TeeTime is a Pipes-And-Filters-Framework for Java.
It provides various features, which will bring you to the next level of analysis programming, such as:
- Concurrent execution of stages
- Flexible connection between stages
- Typed ports
- Pre-defined stages and pipelines
TeeTime is a Pipe & Filter Framework for Java.
It provides support for the *modeling* and the *execution* of P&F architectures.
It features
- a high reusability and maintainability
- a type-safety way to develop and connect stages
- many pre-defined ready-to-use stages
- no or a minimal synchronization overhead
- a hybrid stage execution model
- and many more...
## Where to get it?
......
......@@ -95,7 +95,7 @@
</brand>
<slogan>
<![CDATA[
<span class="slogan">The next level Pipes-And-Filters-Framework for Java</span>
<span class="slogan">The next-generation Pipe-And-Filter Framework for Java</span>
]]>
</slogan>
<titleTemplate>%2$s | %1$s</titleTemplate>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment