Skip to content
Snippets Groups Projects
Commit f01000b1 authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

Merge remote-tracking branch 'origin/master' into InitSignal-final

parents d3d93858 1df08c1d
No related branches found
No related tags found
No related merge requests found
Showing
with 13 additions and 728 deletions
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>net.sourceforge.teetime</groupId>
<artifactId>teetime</artifactId>
<name>teetime</name>
<version>1.0-SNAPSHOT</version>
<url>http://maven.apache.org</url>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>versions-maven-plugin</artifactId>
<version>2.1</version>
</plugin>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
<configuration>
<finalName>${project.artifactId}-${project.version}</finalName>
</configuration>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>sonatype.oss.snapshots</id>
<url>https://oss.sonatype.org/content/repositories/snapshots/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
</dependencies>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
</project>
...@@ -157,7 +157,7 @@ ...@@ -157,7 +157,7 @@
<plugin> <plugin>
<groupId>org.codehaus.mojo</groupId> <groupId>org.codehaus.mojo</groupId>
<artifactId>versions-maven-plugin</artifactId> <artifactId>versions-maven-plugin</artifactId>
<version>2.1</version> <version>2.2</version>
</plugin> </plugin>
<!-- goals to build a jar with binaries: jar:jar, jar:test-jar --> <!-- goals to build a jar with binaries: jar:jar, jar:test-jar -->
...@@ -433,7 +433,7 @@ ...@@ -433,7 +433,7 @@
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId> <artifactId>maven-gpg-plugin</artifactId>
<version>1.5</version> <version>1.6</version>
<executions> <executions>
<execution> <execution>
<id>sign-artifacts</id> <id>sign-artifacts</id>
......
...@@ -31,8 +31,8 @@ public abstract class AbstractIntraThreadPipe extends AbstractPipe { ...@@ -31,8 +31,8 @@ public abstract class AbstractIntraThreadPipe extends AbstractPipe {
} }
@Override @Override
// getTargetPort is always non-null since the framework adds dummy ports if necessary
public final void sendSignal(final ISignal signal) { public final void sendSignal(final ISignal signal) {
// getTargetPort is always non-null since the framework adds dummy ports if necessary
this.cachedTargetStage.onSignal(signal, this.getTargetPort()); this.cachedTargetStage.onSignal(signal, this.getTargetPort());
} }
......
...@@ -96,8 +96,8 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught ...@@ -96,8 +96,8 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
init(); init();
} }
// BETTER validate concurrently
private void validateStages() { private void validateStages() {
// BETTER validate concurrently
final List<Stage> threadableStageJobs = this.configuration.getThreadableStageJobs(); final List<Stage> threadableStageJobs = this.configuration.getThreadableStageJobs();
for (Stage stage : threadableStageJobs) { for (Stage stage : threadableStageJobs) {
// // portConnectionValidator.validate(stage); // // portConnectionValidator.validate(stage);
...@@ -190,7 +190,6 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught ...@@ -190,7 +190,6 @@ public final class Analysis<T extends AnalysisConfiguration> implements Uncaught
* @since 1.1 * @since 1.1
*/ */
public void waitForTermination() { public void waitForTermination() {
try { try {
for (Thread thread : this.finiteProducerThreads) { for (Thread thread : this.finiteProducerThreads) {
thread.join(); thread.join();
......
...@@ -24,14 +24,14 @@ import teetime.framework.pipe.IPipe; ...@@ -24,14 +24,14 @@ import teetime.framework.pipe.IPipe;
public class Traversor { public class Traversor {
private final IPipeVisitor pipeVisitor; private final IPipeVisitor pipeVisitor;
private final Set<Stage> visitedStage = new HashSet<Stage>(); private final Set<Stage> visitedStages = new HashSet<Stage>();
public Traversor(final IPipeVisitor pipeVisitor) { public Traversor(final IPipeVisitor pipeVisitor) {
this.pipeVisitor = pipeVisitor; this.pipeVisitor = pipeVisitor;
} }
public void traverse(final Stage stage) { public void traverse(final Stage stage) {
if (!visitedStage.add(stage)) { if (!visitedStages.add(stage)) {
return; return;
} }
...@@ -46,6 +46,6 @@ public class Traversor { ...@@ -46,6 +46,6 @@ public class Traversor {
} }
public Set<Stage> getVisitedStage() { public Set<Stage> getVisitedStage() {
return visitedStage; return visitedStages;
} }
} }
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework.pipe;
import teetime.framework.AbstractIntraThreadPipe;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.util.list.CommittableResizableArrayQueue;
final class CommittablePipe extends AbstractIntraThreadPipe {
private final CommittableResizableArrayQueue<Object> elements = new CommittableResizableArrayQueue<Object>(null, 4);
<T> CommittablePipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
super(sourcePort, targetPort);
}
@Deprecated
public static <T> void connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
final IPipe pipe = new CommittablePipe(null, null);
pipe.connectPorts(sourcePort, targetPort);
}
@Override
public boolean add(final Object element) {
this.elements.addToTailUncommitted(element);
this.elements.commit();
return true;
}
@Override
public Object removeLast() {
final Object element = this.elements.removeFromHeadUncommitted();
this.elements.commit();
return element;
}
@Override
public boolean isEmpty() {
return this.elements.isEmpty();
}
public CommittableResizableArrayQueue<?> getElements() {
return this.elements;
}
@Override
public int size() {
return this.elements.size();
}
}
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework.pipe;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
public final class CommittablePipeFactory implements IPipeFactory {
@Override
public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
return this.create(sourcePort, targetPort, 1);
}
@Override
public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
return new CommittablePipe(sourcePort, targetPort);
}
@Override
public ThreadCommunication getThreadCommunication() {
return ThreadCommunication.INTRA;
}
@Override
public PipeOrdering getOrdering() {
return PipeOrdering.STACK_BASED;
}
@Override
public boolean isGrowable() {
return true;
}
}
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework.pipe;
import java.util.concurrent.ConcurrentLinkedQueue;
import teetime.framework.AbstractIntraThreadPipe;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
final class ConcurrentBlockingIntraThreadPipe<T> extends AbstractIntraThreadPipe {
private final ConcurrentLinkedQueue<Object> queue;
ConcurrentBlockingIntraThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
super(sourcePort, targetPort);
queue = new ConcurrentLinkedQueue<Object>();
}
@Override
public boolean add(final Object element) {
return queue.add(element);
}
@Override
public boolean isEmpty() {
return queue.isEmpty();
}
@Override
public int size() {
return queue.size();
}
@Override
public Object removeLast() {
return queue.poll();
}
}
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework.pipe;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
public final class ConcurrentBlockingIntraThreadPipeFactory implements IPipeFactory {
@Override
public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
return this.create(sourcePort, targetPort, 4);
}
@Override
public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
return new ConcurrentBlockingIntraThreadPipe<T>(sourcePort, targetPort);
}
@Override
public ThreadCommunication getThreadCommunication() {
return ThreadCommunication.INTRA;
}
@Override
public PipeOrdering getOrdering() {
return PipeOrdering.QUEUE_BASED;
}
@Override
public boolean isGrowable() {
return true;
}
}
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework.pipe;
import teetime.framework.AbstractIntraThreadPipe;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.util.concurrent.workstealing.CircularArray;
final class OrderedGrowableArrayPipe extends AbstractIntraThreadPipe {
private final CircularArray<Object> elements;
private int head;
private int tail;
<T> OrderedGrowableArrayPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
super(sourcePort, targetPort);
this.elements = new CircularArray<Object>(capacity);
}
@Deprecated
public static <T> void connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
final IPipe pipe = new OrderedGrowableArrayPipe(sourcePort, targetPort, 4);
pipe.connectPorts(sourcePort, targetPort);
}
@Override
public boolean add(final Object element) {
this.elements.put(this.tail++, element);
return true;
}
@Override
public Object removeLast() {
if (this.head < this.tail) {
return this.elements.get(this.head++);
} else {
return null;
}
}
@Override
public boolean isEmpty() {
return this.size() == 0;
}
@Override
public int size() {
return this.tail - this.head;
}
}
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework.pipe;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
public final class OrderedGrowableArrayPipeFactory implements IPipeFactory {
@Override
public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
return this.create(sourcePort, targetPort, 4);
}
@Override
public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
return new OrderedGrowableArrayPipe(sourcePort, targetPort, capacity);
}
@Override
public ThreadCommunication getThreadCommunication() {
return ThreadCommunication.INTRA;
}
@Override
public PipeOrdering getOrdering() {
return PipeOrdering.QUEUE_BASED;
}
@Override
public boolean isGrowable() {
return true;
}
}
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework.pipe;
import java.util.LinkedList;
import teetime.framework.AbstractIntraThreadPipe;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
class OrderedGrowablePipe extends AbstractIntraThreadPipe {
private final LinkedList<Object> elements;
<T> OrderedGrowablePipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
super(sourcePort, targetPort);
this.elements = new LinkedList<Object>();
}
@Deprecated
public static <T> void connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
final IPipe pipe = new OrderedGrowablePipe(null, null, 100000);
pipe.connectPorts(sourcePort, targetPort);
}
@Override
public boolean add(final Object element) {
return this.elements.offer(element);
}
@Override
public Object removeLast() {
return this.elements.poll();
}
@Override
public boolean isEmpty() {
return this.elements.isEmpty();
}
@Override
public int size() {
return this.elements.size();
}
}
...@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory; ...@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory;
import teetime.util.classpath.FileSearcher; import teetime.util.classpath.FileSearcher;
public final class PipeFactoryLoader { final class PipeFactoryLoader {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeFactoryLoader.class); private static final Logger LOGGER = LoggerFactory.getLogger(PipeFactoryLoader.class);
...@@ -70,7 +70,6 @@ public final class PipeFactoryLoader { ...@@ -70,7 +70,6 @@ public final class PipeFactoryLoader {
} }
public static List<IPipeFactory> loadPipeFactoriesFromClasspath(final String configFileName) { public static List<IPipeFactory> loadPipeFactoriesFromClasspath(final String configFileName) {
List<URL> files = null; List<URL> files = null;
try { try {
......
...@@ -28,7 +28,9 @@ import org.slf4j.LoggerFactory; ...@@ -28,7 +28,9 @@ import org.slf4j.LoggerFactory;
* <p> * <p>
* To get a PipeFactory instance, call {@link #getPipeFactory(ThreadCommunication, PipeOrdering, boolean)}. * To get a PipeFactory instance, call {@link #getPipeFactory(ThreadCommunication, PipeOrdering, boolean)}.
* *
* @Deprecated since 1.2
*/ */
@Deprecated
public final class PipeFactoryRegistry { public final class PipeFactoryRegistry {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeFactoryRegistry.class); private static final Logger LOGGER = LoggerFactory.getLogger(PipeFactoryRegistry.class);
......
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework.pipe;
import java.util.Queue;
import org.jctools.queues.QueueFactory;
import org.jctools.queues.spec.ConcurrentQueueSpec;
import teetime.framework.AbstractIntraThreadPipe;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
/**
* Represents a less efficient implementation of an intra-thread pipe.
*
* @author Christian Wulf
*
* @param <T>
*/
final class SpScIntraThreadPipe<T> extends AbstractIntraThreadPipe {
private final Queue<Object> queue;
SpScIntraThreadPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
super(sourcePort, targetPort);
queue = QueueFactory.newQueue(ConcurrentQueueSpec.createBoundedSpsc(1));
}
@Override
public boolean add(final Object element) {
return queue.offer(element);
}
@Override
public boolean isEmpty() {
return queue.isEmpty();
}
@Override
public int size() {
return queue.size();
}
@Override
public Object removeLast() {
return queue.poll();
}
}
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework.pipe;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
public final class SpScIntraThreadPipeFactory implements IPipeFactory {
@Override
public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
return this.create(sourcePort, targetPort, 4);
}
@Override
public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
return new SpScIntraThreadPipe<T>(sourcePort, targetPort);
}
@Override
public ThreadCommunication getThreadCommunication() {
return ThreadCommunication.INTRA;
}
@Override
public PipeOrdering getOrdering() {
return PipeOrdering.QUEUE_BASED;
}
@Override
public boolean isGrowable() {
return false;
}
}
...@@ -41,9 +41,9 @@ final class SpScPipe extends AbstractInterThreadPipe implements IMonitorablePipe ...@@ -41,9 +41,9 @@ final class SpScPipe extends AbstractInterThreadPipe implements IMonitorablePipe
return pipe; return pipe;
} }
// BETTER introduce a QueueIsFullStrategy
@Override @Override
public boolean add(final Object element) { public boolean add(final Object element) {
// BETTER introduce a QueueIsFullStrategy
while (!this.queue.offer(element)) { while (!this.queue.offer(element)) {
// Thread.yield(); // Thread.yield();
if (this.cachedTargetStage.getCurrentState() == StageState.TERMINATED) { if (this.cachedTargetStage.getCurrentState() == StageState.TERMINATED) {
......
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework.pipe;
import teetime.framework.AbstractIntraThreadPipe;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
final class UnorderedGrowablePipe extends AbstractIntraThreadPipe {
private Object[] elements;
// private final ArrayWrapper2<T> elements = new ArrayWrapper2<T>(2);
private int lastFreeIndex;
<T> UnorderedGrowablePipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
super(sourcePort, targetPort);
this.elements = new Object[capacity];
}
@Deprecated
public static <T> void connect(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
final IPipe pipe = new UnorderedGrowablePipe(null, null, 4);
pipe.connectPorts(sourcePort, targetPort);
}
@Override
public boolean add(final Object element) {
if (this.lastFreeIndex == this.elements.length) {
// if (this.lastFreeIndex == this.elements.getCapacity()) {
this.elements = this.grow();
}
this.elements[this.lastFreeIndex++] = element;
// this.elements.put(this.lastFreeIndex++, element);
return true;
}
@Override
public Object removeLast() {
// if (this.lastFreeIndex == 0) {
// return null;
// }
final Object element = this.elements[--this.lastFreeIndex];
this.elements[this.lastFreeIndex] = null;
// T element = this.elements.get(--this.lastFreeIndex);
return element;
}
@Override
public boolean isEmpty() {
return this.lastFreeIndex == 0;
}
@Override
public int size() {
return this.lastFreeIndex;
}
private Object[] grow() {
final int newSize = this.elements.length * 2;
// System.out.println("growing to " + newSize);
return this.newArray(newSize);
}
// we do not support shrink since it causes too much overhead due to the capacity checks
// private T[] shrink() {
// int newSize = this.elements.length / 2;
// return this.newArray(newSize);
// }
private Object[] newArray(final int newSize) {
final Object[] newElements = new Object[newSize];
System.arraycopy(this.elements, 0, newElements, 0, this.elements.length);
return newElements;
}
}
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework.pipe;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
public final class UnorderedGrowablePipeFactory implements IPipeFactory {
@Override
public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
return this.create(sourcePort, targetPort, 4);
}
@Override
public <T> IPipe create(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
return new UnorderedGrowablePipe(sourcePort, targetPort, capacity);
}
@Override
public ThreadCommunication getThreadCommunication() {
return ThreadCommunication.INTRA;
}
@Override
public PipeOrdering getOrdering() {
return PipeOrdering.STACK_BASED;
}
@Override
public boolean isGrowable() {
return true;
}
}
...@@ -82,8 +82,8 @@ public final class CipherStage extends AbstractConsumerStage<byte[]> { ...@@ -82,8 +82,8 @@ public final class CipherStage extends AbstractConsumerStage<byte[]> {
@Override @Override
protected void execute(final byte[] element) { protected void execute(final byte[] element) {
try { try {
byte[] output = this.cipher.doFinal(element); byte[] outputBytes = this.cipher.doFinal(element);
this.outputPort.send(output); this.outputPort.send(outputBytes);
} catch (IllegalBlockSizeException e) { } catch (IllegalBlockSizeException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} catch (BadPaddingException e) { } catch (BadPaddingException e) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment