Skip to content
Snippets Groups Projects
Commit ee0a3e8a authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

fixed some pmd issues and moved some kieker related stages to another

project
parent 11fa45a1
No related branches found
No related tags found
No related merge requests found
Showing
with 8 additions and 482 deletions
...@@ -31,8 +31,8 @@ public abstract class AbstractIntraThreadPipe extends AbstractPipe { ...@@ -31,8 +31,8 @@ public abstract class AbstractIntraThreadPipe extends AbstractPipe {
} }
@Override @Override
// getTargetPort is always non-null since the framework adds dummy ports if necessary
public final void sendSignal(final ISignal signal) { public final void sendSignal(final ISignal signal) {
// getTargetPort is always non-null since the framework adds dummy ports if necessary
this.cachedTargetStage.onSignal(signal, this.getTargetPort()); this.cachedTargetStage.onSignal(signal, this.getTargetPort());
} }
......
...@@ -96,8 +96,8 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught ...@@ -96,8 +96,8 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
init(); init();
} }
// BETTER validate concurrently
private void validateStages() { private void validateStages() {
// BETTER validate concurrently
final List<Stage> threadableStageJobs = this.configuration.getThreadableStageJobs(); final List<Stage> threadableStageJobs = this.configuration.getThreadableStageJobs();
for (Stage stage : threadableStageJobs) { for (Stage stage : threadableStageJobs) {
// // portConnectionValidator.validate(stage); // // portConnectionValidator.validate(stage);
...@@ -186,7 +186,6 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught ...@@ -186,7 +186,6 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
* @since 1.1 * @since 1.1
*/ */
public void waitForTermination() { public void waitForTermination() {
try { try {
for (Thread thread : this.finiteProducerThreads) { for (Thread thread : this.finiteProducerThreads) {
thread.join(); thread.join();
......
...@@ -24,14 +24,14 @@ import teetime.framework.pipe.IPipe; ...@@ -24,14 +24,14 @@ import teetime.framework.pipe.IPipe;
public class Traversor { public class Traversor {
private final IPipeVisitor pipeVisitor; private final IPipeVisitor pipeVisitor;
private final Set<Stage> visitedStage = new HashSet<Stage>(); private final Set<Stage> visitedStages = new HashSet<Stage>();
public Traversor(final IPipeVisitor pipeVisitor) { public Traversor(final IPipeVisitor pipeVisitor) {
this.pipeVisitor = pipeVisitor; this.pipeVisitor = pipeVisitor;
} }
public void traverse(final Stage stage) { public void traverse(final Stage stage) {
if (!visitedStage.add(stage)) { if (!visitedStages.add(stage)) {
return; return;
} }
...@@ -46,6 +46,6 @@ public class Traversor { ...@@ -46,6 +46,6 @@ public class Traversor {
} }
public Set<Stage> getVisitedStage() { public Set<Stage> getVisitedStage() {
return visitedStage; return visitedStages;
} }
} }
...@@ -70,7 +70,6 @@ public final class PipeFactoryLoader { ...@@ -70,7 +70,6 @@ public final class PipeFactoryLoader {
} }
public static List<IPipeFactory> loadPipeFactoriesFromClasspath(final String configFileName) { public static List<IPipeFactory> loadPipeFactoriesFromClasspath(final String configFileName) {
List<URL> files = null; List<URL> files = null;
try { try {
......
...@@ -41,9 +41,9 @@ final class SpScPipe extends AbstractInterThreadPipe implements IMonitorablePipe ...@@ -41,9 +41,9 @@ final class SpScPipe extends AbstractInterThreadPipe implements IMonitorablePipe
return pipe; return pipe;
} }
// BETTER introduce a QueueIsFullStrategy
@Override @Override
public boolean add(final Object element) { public boolean add(final Object element) {
// BETTER introduce a QueueIsFullStrategy
while (!this.queue.offer(element)) { while (!this.queue.offer(element)) {
// Thread.yield(); // Thread.yield();
if (this.cachedTargetStage.getCurrentState() == StageState.TERMINATED) { if (this.cachedTargetStage.getCurrentState() == StageState.TERMINATED) {
......
...@@ -49,9 +49,6 @@ final class UnorderedGrowablePipe extends AbstractIntraThreadPipe { ...@@ -49,9 +49,6 @@ final class UnorderedGrowablePipe extends AbstractIntraThreadPipe {
@Override @Override
public Object removeLast() { public Object removeLast() {
// if (this.lastFreeIndex == 0) {
// return null;
// }
final Object element = this.elements[--this.lastFreeIndex]; final Object element = this.elements[--this.lastFreeIndex];
this.elements[this.lastFreeIndex] = null; this.elements[this.lastFreeIndex] = null;
// T element = this.elements.get(--this.lastFreeIndex); // T element = this.elements.get(--this.lastFreeIndex);
......
...@@ -40,7 +40,6 @@ public final class ByteArrayFileWriter extends AbstractConsumerStage<byte[]> { ...@@ -40,7 +40,6 @@ public final class ByteArrayFileWriter extends AbstractConsumerStage<byte[]> {
@Override @Override
protected void execute(final byte[] element) { protected void execute(final byte[] element) {
try { try {
fo.write(element); fo.write(element);
} catch (Exception e) { } catch (Exception e) {
......
...@@ -30,8 +30,8 @@ public final class File2ByteArray extends AbstractConsumerStage<File> { ...@@ -30,8 +30,8 @@ public final class File2ByteArray extends AbstractConsumerStage<File> {
@Override @Override
protected void execute(final File element) { protected void execute(final File element) {
try { try {
byte[] cache = Files.toByteArray(element); byte[] fileBytes = Files.toByteArray(element);
outputPort.send(cache); outputPort.send(fileBytes);
} catch (IOException e) { } catch (IOException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
......
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.stage.string.buffer;
import java.util.Collection;
import java.util.LinkedList;
import teetime.framework.AbstractConsumerStage;
import teetime.framework.OutputPort;
import teetime.stage.string.buffer.handler.AbstractDataTypeHandler;
import teetime.stage.string.buffer.util.KiekerHashMap;
/**
* @author Christian Wulf
*
* @since 1.10
*/
public final class StringBufferFilter<T> extends AbstractConsumerStage<T> {
private final OutputPort<T> outputPort = this.createOutputPort();
// BETTER use a non shared data structure to avoid synchronization between threads
private KiekerHashMap kiekerHashMap = new KiekerHashMap();
private Collection<AbstractDataTypeHandler<?>> dataTypeHandlers = new LinkedList<AbstractDataTypeHandler<?>>();
@Override
protected void execute(final T element) {
final T returnedElement = this.handle(element);
outputPort.send(returnedElement);
}
@Override
public void onStarting() throws Exception {
super.onStarting();
for (final AbstractDataTypeHandler<?> handler : this.dataTypeHandlers) {
handler.setLogger(this.logger);
handler.setStringRepository(this.kiekerHashMap);
}
}
private T handle(final T object) {
for (final AbstractDataTypeHandler<?> handler : this.dataTypeHandlers) {
if (handler.canHandle(object)) {
@SuppressWarnings("unchecked")
final T returnedObject = ((AbstractDataTypeHandler<T>) handler).handle(object);
return returnedObject;
}
}
return object; // else relay given object
}
public KiekerHashMap getKiekerHashMap() {
return this.kiekerHashMap;
}
public void setKiekerHashMap(final KiekerHashMap kiekerHashMap) {
this.kiekerHashMap = kiekerHashMap;
}
public Collection<AbstractDataTypeHandler<?>> getDataTypeHandlers() {
return this.dataTypeHandlers;
}
public void setDataTypeHandlers(final Collection<AbstractDataTypeHandler<?>> dataTypeHandlers) {
this.dataTypeHandlers = dataTypeHandlers;
}
public OutputPort<T> getOutputPort() {
return outputPort;
}
}
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.stage.string.buffer.handler;
import org.slf4j.Logger;
import teetime.stage.string.buffer.util.KiekerHashMap;
/**
* @author Christian Wulf
*
* @since 1.10
*/
public abstract class AbstractDataTypeHandler<T> {
protected Logger logger;
protected KiekerHashMap stringRepository;
/**
* @since 1.10
*/
public abstract boolean canHandle(Object object);
/**
* @since 1.10
*/
public abstract T handle(T object);
/**
* @since 1.10
*/
public void setLogger(final Logger logger) {
this.logger = logger;
}
/**
* @since 1.10
*/
public void setStringRepository(final KiekerHashMap stringRepository) {
this.stringRepository = stringRepository;
}
}
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.stage.string.buffer.handler;
/**
* @author Christian Wulf
*
* @since 1.10
*/
public class StringHandler extends AbstractDataTypeHandler<String> {
@Override
public boolean canHandle(final Object object) {
return object instanceof String;
}
@Override
public String handle(final String object) {
return this.stringRepository.get(object);
}
}
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.stage.string.buffer.util;
import java.lang.ref.SoftReference;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author Christian Wulf
*
* @since 1.10
*/
public class KiekerHashMap {
private static final int INITIAL_CAPACITY = 16;
private static final double LOAD_FACTOR = 0.75d;
private static final int CONCURRENCY_LEVEL = 16;
private static final int MAXIMUM_CAPACITY = 1 << 30;
/**
* Mask value for indexing into segments. The upper bits of a key's hash code are used to choose the segment.
*/
private final int segmentMask;
/**
* Shift value for indexing within segments.
*/
private final int segmentShift;
/**
* The segments, each of which is a specialized hash table.
*/
private final Segment[] segments;
/**
* @since 1.10
*/
public KiekerHashMap() {
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < CONCURRENCY_LEVEL) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
this.segments = new Segment[ssize];
int c = INITIAL_CAPACITY / ssize;
if ((c * ssize) < INITIAL_CAPACITY) {
++c;
}
int cap = 1;
while (cap < c) {
cap <<= 1;
}
for (int i = 0; i < this.segments.length; ++i) {
this.segments[i] = new Segment(cap, LOAD_FACTOR);
}
}
/**
* Applies a supplemental hash function to a given hashCode, which defends against poor quality hash functions. This is critical because ConcurrentHashMap uses
* power-of-two length hash tables, that otherwise encounter collisions for hashCodes that do not differ in lower or upper bits.
*/
private static final int hash(final String value) {
// Spread bits to regularize both segment and index locations, using variant of single-word Wang/Jenkins hash.
int h = value.hashCode();
h += (h << 15) ^ 0xffffcd7d;
h ^= h >>> 10;
h += h << 3;
h ^= h >>> 6;
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}
public final String get(final String value) {
final int hash = KiekerHashMap.hash(value);
Segment segment = this.segments[(hash >>> this.segmentShift) & this.segmentMask];
return segment.get(value, hash);
}
// ---------------- Inner Classes --------------
/**
* StringBuffer entry.
*/
private static final class HashEntry extends SoftReference<String> {
final int hash;
final HashEntry next;
protected HashEntry(final String value, final int hash, final HashEntry next) {
super(value);
this.hash = hash;
this.next = next;
}
}
/**
* Segments are specialized versions of hash tables. This subclasses from ReentrantLock opportunistically, just to simplify some locking and avoid separate
* construction.
*
* Segments maintain a table of entry lists that are ALWAYS kept in a consistent state, so can be read without locking. Next fields of nodes are immutable
* (final). All list additions are performed at the front of each bin. This makes it easy to check changes, and also fast to traverse. When nodes would
* otherwise be changed, new nodes are created to replace them. This works well for hash tables since the bin lists tend to be short. (The average length is
* less than two for the default load factor threshold.)
*
* Read operations can thus proceed without locking, but rely on selected uses of volatiles to ensure that completed write operations performed by other
* threads are noticed. For most purposes, the "count" field, tracking the number of elements, serves as that volatile variable ensuring visibility. This is
* convenient because this field needs to be read in many read operations anyway:
*
* - All (unsynchronized) read operations must first read the "count" field, and should not look at table entries if it is 0.
*
* - All (synchronized) write operations should write to the "count" field after structurally changing any bin. The operations must not take any action that
* could even momentarily cause a concurrent read operation to see inconsistent data. This is made easier by the nature of the read operations in Map. For
* example, no operation can reveal that the table has grown but the threshold has not yet been updated, so there are no atomicity requirements for this with
* respect to reads.
*
* As a guide, all critical volatile reads and writes to the count field are marked in code comments.
*/
private static final class Segment extends ReentrantLock {
private static final long serialVersionUID = 1L;
/**
* The number of elements in this segment's region.
*/
private volatile int count;
/**
* The per-segment table.
*/
private HashEntry[] table;
/**
* The table is rehashed when its size exceeds this threshold. (The value of this field is always <tt>(int)(capacity * loadFactor)</tt>.)
*/
private int threshold;
protected Segment(final int initialCapacity, final double lf) {
this.table = new HashEntry[initialCapacity];
this.threshold = (int) (initialCapacity * lf);
this.count = 0;
}
protected final String get(final String value, final int hash) {
HashEntry e = null;
String cachedString;
if (this.count != 0) { // volatile read! search for entry without locking
final HashEntry[] tab = this.table;
final int index = hash & (tab.length - 1);
final HashEntry first = tab[index];
e = first;
while (e != null) {
if (e.hash == hash) {
cachedString = e.get();
if (value.equals(cachedString)) {
return cachedString;
}
}
e = e.next;
}
}
this.lock();
try {
final int c = this.count + 1;
if (c >= this.threshold) {
this.cleanup();
if (c >= this.threshold) { // if still full
this.rehash();
}
this.count = c; // write volatile
}
final HashEntry[] tab = this.table;
final int index = hash & (tab.length - 1);
final HashEntry first = tab[index]; // the bin the value may be inside
e = first;
while (e != null) {
if (e.hash == hash) {
cachedString = e.get();
if (value.equals(cachedString)) {
return cachedString;
}
}
e = e.next;
}
tab[index] = new HashEntry(value, hash, first);
this.count = c; // write-volatile
return value; // return now cached string
} finally {
this.unlock();
}
}
private final void cleanup() {
int c = this.count;
final HashEntry[] tab = this.table;
for (int index = 0; index < tab.length; index++) {
// find first remaining entry
HashEntry e = tab[index];
while ((e != null) && (e.get() == null)) {
e = e.next;
c--;
}
if (e == null) {
tab[index] = null;
} else {
// find more existing entries and enqueue before this one
HashEntry first = new HashEntry(e.get(), e.hash, null);
e = e.next;
while (e != null) {
final String s = e.get();
if (s != null) {
first = new HashEntry(s, e.hash, first);
} else {
c--;
}
e = e.next;
}
tab[index] = first;
}
}
c--;
this.count = c; // write-volatile
}
/**
* Reclassify nodes in each list to new Map. Because we are using power-of-two expansion, the elements from each bin must either stay at same index, or
* move with a power of two offset. We eliminate unnecessary node creation by catching cases where old nodes can be reused because their next fields
* won't change. Statistically, at the default threshold, only about one-sixth of them need cloning when a table doubles. The nodes they replace will be
* garbage collectable as soon as they are no longer referenced by any reader thread that may be in the midst of traversing table right now.
*/
private final void rehash() {
final HashEntry[] oldTable = this.table;
final int oldCapacity = oldTable.length;
if (oldCapacity >= MAXIMUM_CAPACITY) {
return;
}
final HashEntry[] newTable = new HashEntry[oldCapacity << 1];
this.threshold = (int) (newTable.length * LOAD_FACTOR);
final int sizeMask = newTable.length - 1;
for (int i = 0; i < oldCapacity; i++) {
// We need to guarantee that any existing reads of old Map can proceed. So we cannot yet null out each bin.
final HashEntry e = oldTable[i];
if (e != null) {
final HashEntry next = e.next;
final int idx = e.hash & sizeMask;
// Single node on list
if (next == null) {
newTable[idx] = e;
} else {
// Reuse trailing consecutive sequence at same slot
HashEntry lastRun = e;
int lastIdx = idx;
for (HashEntry last = next; last != null; last = last.next) { // find end of bin
final int k = last.hash & sizeMask;
if (k != lastIdx) { // NOCS (nested if)
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// Clone all remaining nodes
for (HashEntry p = e; p != lastRun; p = p.next) {
final int k = p.hash & sizeMask;
final HashEntry n = newTable[k];
newTable[k] = new HashEntry(p.get(), p.hash, n);
}
}
}
}
this.table = newTable;
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment