diff --git a/src/main/java/teetime/framework/AbstractIntraThreadPipe.java b/src/main/java/teetime/framework/AbstractIntraThreadPipe.java index 92fe284b21c74a10efe6b84be454639acbd59348..b3e47b9cf8e333a5f26cc55d70d7b7234b310e88 100644 --- a/src/main/java/teetime/framework/AbstractIntraThreadPipe.java +++ b/src/main/java/teetime/framework/AbstractIntraThreadPipe.java @@ -31,8 +31,8 @@ public abstract class AbstractIntraThreadPipe extends AbstractPipe { } @Override + // getTargetPort is always non-null since the framework adds dummy ports if necessary public final void sendSignal(final ISignal signal) { - // getTargetPort is always non-null since the framework adds dummy ports if necessary this.cachedTargetStage.onSignal(signal, this.getTargetPort()); } diff --git a/src/main/java/teetime/framework/Analysis.java b/src/main/java/teetime/framework/Analysis.java index ea357146a290b441c56b5b2a3764e1a5b41c8b74..78375efac944c52a8983dac250f9ea3dd524154c 100644 --- a/src/main/java/teetime/framework/Analysis.java +++ b/src/main/java/teetime/framework/Analysis.java @@ -96,8 +96,8 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught init(); } + // BETTER validate concurrently private void validateStages() { - // BETTER validate concurrently final List<Stage> threadableStageJobs = this.configuration.getThreadableStageJobs(); for (Stage stage : threadableStageJobs) { // // portConnectionValidator.validate(stage); @@ -186,7 +186,6 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught * @since 1.1 */ public void waitForTermination() { - try { for (Thread thread : this.finiteProducerThreads) { thread.join(); diff --git a/src/main/java/teetime/framework/Traversor.java b/src/main/java/teetime/framework/Traversor.java index 82ac218e31b10d93a25de53ead1f845b94a333c5..39ebed6e43ec796831c79c04b300ea5286dd7c62 100644 --- a/src/main/java/teetime/framework/Traversor.java +++ b/src/main/java/teetime/framework/Traversor.java @@ -24,14 +24,14 @@ import teetime.framework.pipe.IPipe; public class Traversor { private final IPipeVisitor pipeVisitor; - private final Set<Stage> visitedStage = new HashSet<Stage>(); + private final Set<Stage> visitedStages = new HashSet<Stage>(); public Traversor(final IPipeVisitor pipeVisitor) { this.pipeVisitor = pipeVisitor; } public void traverse(final Stage stage) { - if (!visitedStage.add(stage)) { + if (!visitedStages.add(stage)) { return; } @@ -46,6 +46,6 @@ public class Traversor { } public Set<Stage> getVisitedStage() { - return visitedStage; + return visitedStages; } } diff --git a/src/main/java/teetime/framework/pipe/PipeFactoryLoader.java b/src/main/java/teetime/framework/pipe/PipeFactoryLoader.java index 1c75785c41ee6999a11f09aaef36dc240850b2b3..b2673d4d0df8a7510b9e029ffc3d57bd4018fb28 100644 --- a/src/main/java/teetime/framework/pipe/PipeFactoryLoader.java +++ b/src/main/java/teetime/framework/pipe/PipeFactoryLoader.java @@ -70,7 +70,6 @@ public final class PipeFactoryLoader { } public static List<IPipeFactory> loadPipeFactoriesFromClasspath(final String configFileName) { - List<URL> files = null; try { diff --git a/src/main/java/teetime/framework/pipe/SpScPipe.java b/src/main/java/teetime/framework/pipe/SpScPipe.java index 4c1e52ae7edb9fce65bfe349314afb5b379e4f34..7d68aafb9af1ffdefcb242f1eb1d042bcfe493d3 100644 --- a/src/main/java/teetime/framework/pipe/SpScPipe.java +++ b/src/main/java/teetime/framework/pipe/SpScPipe.java @@ -41,9 +41,9 @@ final class SpScPipe extends AbstractInterThreadPipe implements IMonitorablePipe return pipe; } + // BETTER introduce a QueueIsFullStrategy @Override public boolean add(final Object element) { - // BETTER introduce a QueueIsFullStrategy while (!this.queue.offer(element)) { // Thread.yield(); if (this.cachedTargetStage.getCurrentState() == StageState.TERMINATED) { diff --git a/src/main/java/teetime/framework/pipe/UnorderedGrowablePipe.java b/src/main/java/teetime/framework/pipe/UnorderedGrowablePipe.java index 9baa545f3a0788ac384df2cd4ae5768cced97d55..e9ef0e9688b5969d1d43780b363059673bf2c621 100644 --- a/src/main/java/teetime/framework/pipe/UnorderedGrowablePipe.java +++ b/src/main/java/teetime/framework/pipe/UnorderedGrowablePipe.java @@ -49,9 +49,6 @@ final class UnorderedGrowablePipe extends AbstractIntraThreadPipe { @Override public Object removeLast() { - // if (this.lastFreeIndex == 0) { - // return null; - // } final Object element = this.elements[--this.lastFreeIndex]; this.elements[this.lastFreeIndex] = null; // T element = this.elements.get(--this.lastFreeIndex); diff --git a/src/main/java/teetime/stage/io/ByteArrayFileWriter.java b/src/main/java/teetime/stage/io/ByteArrayFileWriter.java index 93375cd7d877cbbcbb516d7f6570776b83074efe..abebaeab55c8fd83757ec4a7903a6e5be60eeee1 100644 --- a/src/main/java/teetime/stage/io/ByteArrayFileWriter.java +++ b/src/main/java/teetime/stage/io/ByteArrayFileWriter.java @@ -40,7 +40,6 @@ public final class ByteArrayFileWriter extends AbstractConsumerStage<byte[]> { @Override protected void execute(final byte[] element) { - try { fo.write(element); } catch (Exception e) { diff --git a/src/main/java/teetime/stage/io/File2ByteArray.java b/src/main/java/teetime/stage/io/File2ByteArray.java index 387a999746f6df0247be3e7ffd65775fedfa8cf1..caa51fb5f5b608ee6d3525e7d323469ee82e5eae 100644 --- a/src/main/java/teetime/stage/io/File2ByteArray.java +++ b/src/main/java/teetime/stage/io/File2ByteArray.java @@ -30,8 +30,8 @@ public final class File2ByteArray extends AbstractConsumerStage<File> { @Override protected void execute(final File element) { try { - byte[] cache = Files.toByteArray(element); - outputPort.send(cache); + byte[] fileBytes = Files.toByteArray(element); + outputPort.send(fileBytes); } catch (IOException e) { throw new IllegalStateException(e); } diff --git a/src/main/java/teetime/stage/string/buffer/StringBufferFilter.java b/src/main/java/teetime/stage/string/buffer/StringBufferFilter.java deleted file mode 100644 index cd187eff2679e60fff5a7312b88c35b585d24064..0000000000000000000000000000000000000000 --- a/src/main/java/teetime/stage/string/buffer/StringBufferFilter.java +++ /dev/null @@ -1,86 +0,0 @@ -/** - * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package teetime.stage.string.buffer; - -import java.util.Collection; -import java.util.LinkedList; - -import teetime.framework.AbstractConsumerStage; -import teetime.framework.OutputPort; -import teetime.stage.string.buffer.handler.AbstractDataTypeHandler; -import teetime.stage.string.buffer.util.KiekerHashMap; - -/** - * @author Christian Wulf - * - * @since 1.10 - */ -public final class StringBufferFilter<T> extends AbstractConsumerStage<T> { - - private final OutputPort<T> outputPort = this.createOutputPort(); - - // BETTER use a non shared data structure to avoid synchronization between threads - private KiekerHashMap kiekerHashMap = new KiekerHashMap(); - - private Collection<AbstractDataTypeHandler<?>> dataTypeHandlers = new LinkedList<AbstractDataTypeHandler<?>>(); - - @Override - protected void execute(final T element) { - final T returnedElement = this.handle(element); - outputPort.send(returnedElement); - } - - @Override - public void onStarting() throws Exception { - super.onStarting(); - for (final AbstractDataTypeHandler<?> handler : this.dataTypeHandlers) { - handler.setLogger(this.logger); - handler.setStringRepository(this.kiekerHashMap); - } - } - - private T handle(final T object) { - for (final AbstractDataTypeHandler<?> handler : this.dataTypeHandlers) { - if (handler.canHandle(object)) { - @SuppressWarnings("unchecked") - final T returnedObject = ((AbstractDataTypeHandler<T>) handler).handle(object); - return returnedObject; - } - } - return object; // else relay given object - } - - public KiekerHashMap getKiekerHashMap() { - return this.kiekerHashMap; - } - - public void setKiekerHashMap(final KiekerHashMap kiekerHashMap) { - this.kiekerHashMap = kiekerHashMap; - } - - public Collection<AbstractDataTypeHandler<?>> getDataTypeHandlers() { - return this.dataTypeHandlers; - } - - public void setDataTypeHandlers(final Collection<AbstractDataTypeHandler<?>> dataTypeHandlers) { - this.dataTypeHandlers = dataTypeHandlers; - } - - public OutputPort<T> getOutputPort() { - return outputPort; - } - -} diff --git a/src/main/java/teetime/stage/string/buffer/handler/AbstractDataTypeHandler.java b/src/main/java/teetime/stage/string/buffer/handler/AbstractDataTypeHandler.java deleted file mode 100644 index 6f4540cdfd1040c94e2971142543a29d4f4a2a59..0000000000000000000000000000000000000000 --- a/src/main/java/teetime/stage/string/buffer/handler/AbstractDataTypeHandler.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package teetime.stage.string.buffer.handler; - -import org.slf4j.Logger; - -import teetime.stage.string.buffer.util.KiekerHashMap; - -/** - * @author Christian Wulf - * - * @since 1.10 - */ -public abstract class AbstractDataTypeHandler<T> { - - protected Logger logger; - protected KiekerHashMap stringRepository; - - /** - * @since 1.10 - */ - public abstract boolean canHandle(Object object); - - /** - * @since 1.10 - */ - public abstract T handle(T object); - - /** - * @since 1.10 - */ - public void setLogger(final Logger logger) { - this.logger = logger; - } - - /** - * @since 1.10 - */ - public void setStringRepository(final KiekerHashMap stringRepository) { - this.stringRepository = stringRepository; - } - -} diff --git a/src/main/java/teetime/stage/string/buffer/handler/StringHandler.java b/src/main/java/teetime/stage/string/buffer/handler/StringHandler.java deleted file mode 100644 index bef40e5d09b74761661ad93725f3342d902c0edc..0000000000000000000000000000000000000000 --- a/src/main/java/teetime/stage/string/buffer/handler/StringHandler.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package teetime.stage.string.buffer.handler; - -/** - * @author Christian Wulf - * - * @since 1.10 - */ -public class StringHandler extends AbstractDataTypeHandler<String> { - - @Override - public boolean canHandle(final Object object) { - return object instanceof String; - } - - @Override - public String handle(final String object) { - return this.stringRepository.get(object); - } - -} diff --git a/src/main/java/teetime/stage/string/buffer/util/KiekerHashMap.java b/src/main/java/teetime/stage/string/buffer/util/KiekerHashMap.java deleted file mode 100644 index 1806abe5718cf67b3c6bf714ea30c741b82348db..0000000000000000000000000000000000000000 --- a/src/main/java/teetime/stage/string/buffer/util/KiekerHashMap.java +++ /dev/null @@ -1,291 +0,0 @@ -/** - * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package teetime.stage.string.buffer.util; - -import java.lang.ref.SoftReference; -import java.util.concurrent.locks.ReentrantLock; - -/** - * @author Christian Wulf - * - * @since 1.10 - */ -public class KiekerHashMap { - - private static final int INITIAL_CAPACITY = 16; - private static final double LOAD_FACTOR = 0.75d; - private static final int CONCURRENCY_LEVEL = 16; - private static final int MAXIMUM_CAPACITY = 1 << 30; - - /** - * Mask value for indexing into segments. The upper bits of a key's hash code are used to choose the segment. - */ - private final int segmentMask; - - /** - * Shift value for indexing within segments. - */ - private final int segmentShift; - - /** - * The segments, each of which is a specialized hash table. - */ - private final Segment[] segments; - - /** - * @since 1.10 - */ - public KiekerHashMap() { - // Find power-of-two sizes best matching arguments - int sshift = 0; - int ssize = 1; - while (ssize < CONCURRENCY_LEVEL) { - ++sshift; - ssize <<= 1; - } - this.segmentShift = 32 - sshift; - this.segmentMask = ssize - 1; - this.segments = new Segment[ssize]; - int c = INITIAL_CAPACITY / ssize; - if ((c * ssize) < INITIAL_CAPACITY) { - ++c; - } - int cap = 1; - while (cap < c) { - cap <<= 1; - } - for (int i = 0; i < this.segments.length; ++i) { - this.segments[i] = new Segment(cap, LOAD_FACTOR); - } - } - - /** - * Applies a supplemental hash function to a given hashCode, which defends against poor quality hash functions. This is critical because ConcurrentHashMap uses - * power-of-two length hash tables, that otherwise encounter collisions for hashCodes that do not differ in lower or upper bits. - */ - private static final int hash(final String value) { - // Spread bits to regularize both segment and index locations, using variant of single-word Wang/Jenkins hash. - int h = value.hashCode(); - h += (h << 15) ^ 0xffffcd7d; - h ^= h >>> 10; - h += h << 3; - h ^= h >>> 6; - h += (h << 2) + (h << 14); - return h ^ (h >>> 16); - } - - public final String get(final String value) { - final int hash = KiekerHashMap.hash(value); - Segment segment = this.segments[(hash >>> this.segmentShift) & this.segmentMask]; - return segment.get(value, hash); - } - - // ---------------- Inner Classes -------------- - - /** - * StringBuffer entry. - */ - private static final class HashEntry extends SoftReference<String> { - final int hash; - final HashEntry next; - - protected HashEntry(final String value, final int hash, final HashEntry next) { - super(value); - this.hash = hash; - this.next = next; - } - } - - /** - * Segments are specialized versions of hash tables. This subclasses from ReentrantLock opportunistically, just to simplify some locking and avoid separate - * construction. - * - * Segments maintain a table of entry lists that are ALWAYS kept in a consistent state, so can be read without locking. Next fields of nodes are immutable - * (final). All list additions are performed at the front of each bin. This makes it easy to check changes, and also fast to traverse. When nodes would - * otherwise be changed, new nodes are created to replace them. This works well for hash tables since the bin lists tend to be short. (The average length is - * less than two for the default load factor threshold.) - * - * Read operations can thus proceed without locking, but rely on selected uses of volatiles to ensure that completed write operations performed by other - * threads are noticed. For most purposes, the "count" field, tracking the number of elements, serves as that volatile variable ensuring visibility. This is - * convenient because this field needs to be read in many read operations anyway: - * - * - All (unsynchronized) read operations must first read the "count" field, and should not look at table entries if it is 0. - * - * - All (synchronized) write operations should write to the "count" field after structurally changing any bin. The operations must not take any action that - * could even momentarily cause a concurrent read operation to see inconsistent data. This is made easier by the nature of the read operations in Map. For - * example, no operation can reveal that the table has grown but the threshold has not yet been updated, so there are no atomicity requirements for this with - * respect to reads. - * - * As a guide, all critical volatile reads and writes to the count field are marked in code comments. - */ - private static final class Segment extends ReentrantLock { - - private static final long serialVersionUID = 1L; - - /** - * The number of elements in this segment's region. - */ - private volatile int count; - - /** - * The per-segment table. - */ - private HashEntry[] table; - - /** - * The table is rehashed when its size exceeds this threshold. (The value of this field is always <tt>(int)(capacity * loadFactor)</tt>.) - */ - private int threshold; - - protected Segment(final int initialCapacity, final double lf) { - this.table = new HashEntry[initialCapacity]; - this.threshold = (int) (initialCapacity * lf); - this.count = 0; - } - - protected final String get(final String value, final int hash) { - HashEntry e = null; - String cachedString; - if (this.count != 0) { // volatile read! search for entry without locking - final HashEntry[] tab = this.table; - final int index = hash & (tab.length - 1); - final HashEntry first = tab[index]; - e = first; - while (e != null) { - if (e.hash == hash) { - cachedString = e.get(); - if (value.equals(cachedString)) { - return cachedString; - } - } - e = e.next; - } - } - this.lock(); - try { - final int c = this.count + 1; - if (c >= this.threshold) { - this.cleanup(); - if (c >= this.threshold) { // if still full - this.rehash(); - } - this.count = c; // write volatile - } - final HashEntry[] tab = this.table; - final int index = hash & (tab.length - 1); - final HashEntry first = tab[index]; // the bin the value may be inside - e = first; - while (e != null) { - if (e.hash == hash) { - cachedString = e.get(); - if (value.equals(cachedString)) { - return cachedString; - } - } - e = e.next; - } - tab[index] = new HashEntry(value, hash, first); - this.count = c; // write-volatile - return value; // return now cached string - } finally { - this.unlock(); - } - } - - private final void cleanup() { - int c = this.count; - final HashEntry[] tab = this.table; - for (int index = 0; index < tab.length; index++) { - // find first remaining entry - HashEntry e = tab[index]; - while ((e != null) && (e.get() == null)) { - e = e.next; - c--; - } - if (e == null) { - tab[index] = null; - } else { - // find more existing entries and enqueue before this one - HashEntry first = new HashEntry(e.get(), e.hash, null); - e = e.next; - while (e != null) { - final String s = e.get(); - if (s != null) { - first = new HashEntry(s, e.hash, first); - } else { - c--; - } - e = e.next; - } - tab[index] = first; - } - } - c--; - this.count = c; // write-volatile - } - - /** - * Reclassify nodes in each list to new Map. Because we are using power-of-two expansion, the elements from each bin must either stay at same index, or - * move with a power of two offset. We eliminate unnecessary node creation by catching cases where old nodes can be reused because their next fields - * won't change. Statistically, at the default threshold, only about one-sixth of them need cloning when a table doubles. The nodes they replace will be - * garbage collectable as soon as they are no longer referenced by any reader thread that may be in the midst of traversing table right now. - */ - private final void rehash() { - final HashEntry[] oldTable = this.table; - final int oldCapacity = oldTable.length; - if (oldCapacity >= MAXIMUM_CAPACITY) { - return; - } - final HashEntry[] newTable = new HashEntry[oldCapacity << 1]; - this.threshold = (int) (newTable.length * LOAD_FACTOR); - final int sizeMask = newTable.length - 1; - for (int i = 0; i < oldCapacity; i++) { - // We need to guarantee that any existing reads of old Map can proceed. So we cannot yet null out each bin. - final HashEntry e = oldTable[i]; - - if (e != null) { - final HashEntry next = e.next; - final int idx = e.hash & sizeMask; - - // Single node on list - if (next == null) { - newTable[idx] = e; - } else { - // Reuse trailing consecutive sequence at same slot - HashEntry lastRun = e; - int lastIdx = idx; - for (HashEntry last = next; last != null; last = last.next) { // find end of bin - final int k = last.hash & sizeMask; - if (k != lastIdx) { // NOCS (nested if) - lastIdx = k; - lastRun = last; - } - } - newTable[lastIdx] = lastRun; - - // Clone all remaining nodes - for (HashEntry p = e; p != lastRun; p = p.next) { - final int k = p.hash & sizeMask; - final HashEntry n = newTable[k]; - newTable[k] = new HashEntry(p.get(), p.hash, n); - } - } - } - } - this.table = newTable; - } - } -}