Skip to content
Snippets Groups Projects
Commit ee219505 authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

Merge remote-tracking branch 'origin/master' into site

parents 1f780213 31c4ba40
Branches
Tags
No related merge requests found
Showing
with 138 additions and 49 deletions
...@@ -21,15 +21,15 @@ import teetime.framework.OutputPort; ...@@ -21,15 +21,15 @@ import teetime.framework.OutputPort;
/** /**
* @author Christian Wulf * @author Christian Wulf
* *
* @since 1.10 * @since 1.0
*/ */
public class NoopFilter<T> extends AbstractConsumerStage<T> { public final class NoopFilter<T> extends AbstractConsumerStage<T> {
private final OutputPort<T> outputPort = this.createOutputPort(); private final OutputPort<T> outputPort = this.createOutputPort();
@Override @Override
protected void execute(final T element) { protected void execute(final T element) {
this.send(this.outputPort, element); outputPort.send(element);
} }
public OutputPort<T> getOutputPort() { public OutputPort<T> getOutputPort() {
......
...@@ -21,15 +21,15 @@ import teetime.util.ConstructorClosure; ...@@ -21,15 +21,15 @@ import teetime.util.ConstructorClosure;
/** /**
* @author Christian Wulf * @author Christian Wulf
* *
* @since 1.10 * @since 1.0
*/ */
public class ObjectProducer<T> extends AbstractProducerStage<T> { public final class ObjectProducer<T> extends AbstractProducerStage<T> {
private long numInputObjects; private long numInputObjects;
private ConstructorClosure<T> inputObjectCreator; private ConstructorClosure<T> inputObjectCreator;
/** /**
* @since 1.10 * @since 1.0
*/ */
public ObjectProducer(final long numInputObjects, final ConstructorClosure<T> inputObjectCreator) { public ObjectProducer(final long numInputObjects, final ConstructorClosure<T> inputObjectCreator) {
this.numInputObjects = numInputObjects; this.numInputObjects = numInputObjects;
...@@ -57,7 +57,7 @@ public class ObjectProducer<T> extends AbstractProducerStage<T> { ...@@ -57,7 +57,7 @@ public class ObjectProducer<T> extends AbstractProducerStage<T> {
T newObject = this.inputObjectCreator.create(); T newObject = this.inputObjectCreator.create();
this.numInputObjects--; this.numInputObjects--;
this.send(this.outputPort, newObject); outputPort.send(newObject);
if (this.numInputObjects == 0) { if (this.numInputObjects == 0) {
this.terminate(); this.terminate();
......
package teetime.stage; package teetime.stage;
import teetime.framework.InputPort; import teetime.framework.AbstractInterThreadPipe;
import teetime.framework.AbstractProducerStage; import teetime.framework.AbstractProducerStage;
import teetime.framework.pipe.AbstractInterThreadPipe; import teetime.framework.InputPort;
import teetime.framework.signal.TerminatingSignal; import teetime.framework.signal.TerminatingSignal;
public class Relay<T> extends AbstractProducerStage<T> { public final class Relay<T> extends AbstractProducerStage<T> {
private final InputPort<T> inputPort = this.createInputPort(); private final InputPort<T> inputPort = this.createInputPort();
...@@ -21,7 +21,7 @@ public class Relay<T> extends AbstractProducerStage<T> { ...@@ -21,7 +21,7 @@ public class Relay<T> extends AbstractProducerStage<T> {
Thread.yield(); Thread.yield();
return; return;
} }
this.send(this.outputPort, element); outputPort.send(element);
} }
@Override @Override
......
...@@ -24,14 +24,14 @@ import teetime.util.TimestampObject; ...@@ -24,14 +24,14 @@ import teetime.util.TimestampObject;
* *
* @since 1.10 * @since 1.10
*/ */
public class StartTimestampFilter extends AbstractConsumerStage<TimestampObject> { public final class StartTimestampFilter extends AbstractConsumerStage<TimestampObject> {
private final OutputPort<TimestampObject> outputPort = this.createOutputPort(); private final OutputPort<TimestampObject> outputPort = this.createOutputPort();
@Override @Override
protected void execute(final TimestampObject element) { protected void execute(final TimestampObject element) {
element.setStartTimestamp(System.nanoTime()); element.setStartTimestamp(System.nanoTime());
this.send(this.outputPort, element); outputPort.send(element);
} }
public OutputPort<TimestampObject> getOutputPort() { public OutputPort<TimestampObject> getOutputPort() {
......
...@@ -24,14 +24,14 @@ import teetime.util.TimestampObject; ...@@ -24,14 +24,14 @@ import teetime.util.TimestampObject;
* *
* @since 1.10 * @since 1.10
*/ */
public class StopTimestampFilter extends AbstractConsumerStage<TimestampObject> { public final class StopTimestampFilter extends AbstractConsumerStage<TimestampObject> {
private final OutputPort<TimestampObject> outputPort = this.createOutputPort(); private final OutputPort<TimestampObject> outputPort = this.createOutputPort();
@Override @Override
protected void execute(final TimestampObject element) { protected void execute(final TimestampObject element) {
element.setStopTimestamp(System.nanoTime()); element.setStopTimestamp(System.nanoTime());
this.send(this.outputPort, element); outputPort.send(element);
} }
public OutputPort<TimestampObject> getOutputPort() { public OutputPort<TimestampObject> getOutputPort() {
......
...@@ -15,7 +15,7 @@ import teetime.framework.OutputPort; ...@@ -15,7 +15,7 @@ import teetime.framework.OutputPort;
* @author Nelson Tavares de Sousa * @author Nelson Tavares de Sousa
* *
*/ */
public class ZipByteArray extends AbstractConsumerStage<byte[]> { public final class ZipByteArray extends AbstractConsumerStage<byte[]> {
private final OutputPort<byte[]> outputPort = this.createOutputPort(); private final OutputPort<byte[]> outputPort = this.createOutputPort();
private final ZipMode mode; private final ZipMode mode;
...@@ -40,7 +40,7 @@ public class ZipByteArray extends AbstractConsumerStage<byte[]> { ...@@ -40,7 +40,7 @@ public class ZipByteArray extends AbstractConsumerStage<byte[]> {
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
this.send(this.outputPort, cache); outputPort.send(cache);
} }
private byte[] compress(final byte[] data) throws IOException { private byte[] compress(final byte[] data) throws IOException {
......
...@@ -7,7 +7,7 @@ import teetime.framework.AbstractStage; ...@@ -7,7 +7,7 @@ import teetime.framework.AbstractStage;
import teetime.framework.InputPort; import teetime.framework.InputPort;
import teetime.framework.OutputPort; import teetime.framework.OutputPort;
public class Delay<T> extends AbstractStage { public final class Delay<T> extends AbstractStage {
private final InputPort<T> inputPort = this.createInputPort(); private final InputPort<T> inputPort = this.createInputPort();
private final InputPort<Long> timestampTriggerInputPort = this.createInputPort(); private final InputPort<Long> timestampTriggerInputPort = this.createInputPort();
...@@ -29,7 +29,7 @@ public class Delay<T> extends AbstractStage { ...@@ -29,7 +29,7 @@ public class Delay<T> extends AbstractStage {
while (!bufferedElements.isEmpty()) { while (!bufferedElements.isEmpty()) {
element = bufferedElements.remove(0); element = bufferedElements.remove(0);
this.send(this.outputPort, element); outputPort.send(element);
} }
} }
......
...@@ -2,7 +2,7 @@ package teetime.stage.basic; ...@@ -2,7 +2,7 @@ package teetime.stage.basic;
import teetime.framework.AbstractConsumerStage; import teetime.framework.AbstractConsumerStage;
public class Sink<T> extends AbstractConsumerStage<T> { public final class Sink<T> extends AbstractConsumerStage<T> {
// PERFORMANCE let the sink remove all available input at once by using a new method receiveAll() that clears the pipe's buffer // PERFORMANCE let the sink remove all available input at once by using a new method receiveAll() that clears the pipe's buffer
......
...@@ -26,8 +26,8 @@ public final class CopyByReferenceStrategy<T> implements IDistributorStrategy<T> ...@@ -26,8 +26,8 @@ public final class CopyByReferenceStrategy<T> implements IDistributorStrategy<T>
@Override @Override
public boolean distribute(final OutputPort<T>[] outputPorts, final T element) { public boolean distribute(final OutputPort<T>[] outputPorts, final T element) {
for (final OutputPort<T> port : outputPorts) { for (final OutputPort<T> outputPort : outputPorts) {
port.send(element); outputPort.send(element);
} }
return true; return true;
......
...@@ -22,7 +22,7 @@ import teetime.framework.OutputPort; ...@@ -22,7 +22,7 @@ import teetime.framework.OutputPort;
/** /**
* @author Christian Wulf * @author Christian Wulf
* *
* @since 1.10 * @since 1.0
* *
* @param T * @param T
* the type of the input port and the output ports * the type of the input port and the output ports
......
...@@ -17,7 +17,9 @@ ...@@ -17,7 +17,9 @@
package teetime.stage.basic.merger; package teetime.stage.basic.merger;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set;
import teetime.framework.AbstractStage; import teetime.framework.AbstractStage;
import teetime.framework.InputPort; import teetime.framework.InputPort;
...@@ -27,6 +29,7 @@ import teetime.framework.signal.ISignal; ...@@ -27,6 +29,7 @@ import teetime.framework.signal.ISignal;
/** /**
* *
* This stage merges data from the input ports, by taking elements according to the chosen merge strategy and by putting them to the output port. * This stage merges data from the input ports, by taking elements according to the chosen merge strategy and by putting them to the output port.
* For its signal handling behavior see {@link #onSignal(ISignal, InputPort)}
* *
* @author Christian Wulf * @author Christian Wulf
* *
...@@ -35,13 +38,13 @@ import teetime.framework.signal.ISignal; ...@@ -35,13 +38,13 @@ import teetime.framework.signal.ISignal;
* @param <T> * @param <T>
* the type of both the input and output ports * the type of both the input and output ports
*/ */
public class Merger<T> extends AbstractStage { public final class Merger<T> extends AbstractStage {
private final OutputPort<T> outputPort = this.createOutputPort(); private final OutputPort<T> outputPort = this.createOutputPort();
private IMergerStrategy<T> strategy = new RoundRobinStrategy<T>(); private IMergerStrategy<T> strategy = new RoundRobinStrategy<T>();
private final Map<Class<?>, Integer> signalMap = new HashMap<Class<?>, Integer>(); private final Map<Class<?>, Set<InputPort<?>>> signalMap = new HashMap<Class<?>, Set<InputPort<?>>>();
@Override @Override
public void executeWithPorts() { public void executeWithPorts() {
...@@ -50,25 +53,39 @@ public class Merger<T> extends AbstractStage { ...@@ -50,25 +53,39 @@ public class Merger<T> extends AbstractStage {
return; return;
} }
this.send(this.outputPort, token); outputPort.send(token);
} }
/**
* This method is executed, if a signal is sent to a instance of this class.
* Multiple signals of one certain type are ignored, if they are sent to same port.
* Hence a signal is only passed on, when it arrived on all input ports, regardless how often.
*
* @param signal
* Signal which is sent
*
* @param inputPort
* The port which the signal was sent to
*/
@Override @Override
public void onSignal(final ISignal signal, final InputPort<?> inputPort) { public void onSignal(final ISignal signal, final InputPort<?> inputPort) {
this.logger.trace("Got signal: " + signal + " from input port: " + inputPort); this.logger.trace("Got signal: " + signal + " from input port: " + inputPort);
if (signalMap.containsKey(signal.getClass())) { if (signalMap.containsKey(signal.getClass())) {
int value = signalMap.get(signal.getClass()); Set<InputPort<?>> set = signalMap.get(signal.getClass());
value++; if (!set.add(inputPort)) {
if (value == this.getInputPorts().length) { this.logger.warn("Received more than one signal - " + signal + " - from input port: " + inputPort);
}
if (set.size() == this.getInputPorts().length) {
this.outputPort.sendSignal(signal); this.outputPort.sendSignal(signal);
signalMap.remove(signal.getClass()); signalMap.remove(signal.getClass());
} else {
signalMap.put(signal.getClass(), value);
} }
} else { } else {
signal.trigger(this); signal.trigger(this);
signalMap.put(signal.getClass(), 1); Set<InputPort<?>> tempSet = new HashSet<InputPort<?>>();
tempSet.add(inputPort);
signalMap.put(signal.getClass(), tempSet);
} }
} }
......
...@@ -79,7 +79,7 @@ public class Directory2FilesFilter extends AbstractConsumerStage<File> { ...@@ -79,7 +79,7 @@ public class Directory2FilesFilter extends AbstractConsumerStage<File> {
} }
for (final File file : inputFiles) { for (final File file : inputFiles) {
this.send(this.outputPort, file); outputPort.send(file);
} }
} }
......
package teetime.stage.io;
import java.util.List;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.Stage;
import teetime.framework.TerminationStrategy;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.framework.signal.ISignal;
import teetime.framework.validation.InvalidPortConnection;
import teetime.stage.EveryXthStage;
import teetime.stage.basic.distributor.CopyByReferenceStrategy;
import teetime.stage.basic.distributor.Distributor;
public final class EveryXthPrinter<T> extends Stage {
private final Distributor<T> distributor;
public EveryXthPrinter(final int threshold) {
distributor = new Distributor<T>();
EveryXthStage<T> everyXthStage = new EveryXthStage<T>(threshold);
Printer<Integer> printer = new Printer<Integer>();
IPipeFactory pipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
pipeFactory.create(distributor.getNewOutputPort(), everyXthStage.getInputPort());
pipeFactory.create(everyXthStage.getOutputPort(), printer.getInputPort());
distributor.setStrategy(new CopyByReferenceStrategy<T>());
}
@Override
protected void executeWithPorts() {
distributor.executeWithPorts();
}
@Override
public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) {
distributor.validateOutputPorts(invalidPortConnections);
}
@Override
protected void onSignal(final ISignal signal, final InputPort<?> inputPort) {
distributor.onSignal(signal, inputPort);
}
@Override
protected TerminationStrategy getTerminationStrategy() {
return distributor.getTerminationStrategy();
}
@Override
protected void terminate() {
distributor.terminate();
}
@Override
protected boolean shouldBeTerminated() {
return distributor.shouldBeTerminated();
}
public InputPort<T> getInputPort() {
return distributor.getInputPort();
}
public OutputPort<T> getNewOutputPort() {
return distributor.getNewOutputPort();
}
}
...@@ -5,11 +5,10 @@ import java.io.IOException; ...@@ -5,11 +5,10 @@ import java.io.IOException;
import teetime.framework.AbstractConsumerStage; import teetime.framework.AbstractConsumerStage;
import teetime.framework.OutputPort; import teetime.framework.OutputPort;
import teetime.framework.IStage;
import com.google.common.io.Files; import com.google.common.io.Files;
public class File2ByteArray extends AbstractConsumerStage<File> implements IStage { public class File2ByteArray extends AbstractConsumerStage<File> {
private final OutputPort<byte[]> outputPort = this.createOutputPort(); private final OutputPort<byte[]> outputPort = this.createOutputPort();
...@@ -17,7 +16,7 @@ public class File2ByteArray extends AbstractConsumerStage<File> implements IStag ...@@ -17,7 +16,7 @@ public class File2ByteArray extends AbstractConsumerStage<File> implements IStag
protected void execute(final File element) { protected void execute(final File element) {
try { try {
byte[] cache = Files.toByteArray(element); byte[] cache = Files.toByteArray(element);
this.send(this.outputPort, cache); outputPort.send(cache);
} catch (IOException e) { } catch (IOException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
......
...@@ -46,7 +46,7 @@ public class File2TextLinesFilter extends AbstractConsumerStage<File> { ...@@ -46,7 +46,7 @@ public class File2TextLinesFilter extends AbstractConsumerStage<File> {
while ((line = reader.readLine()) != null) { while ((line = reader.readLine()) != null) {
line = line.trim(); line = line.trim();
if (line.length() != 0) { if (line.length() != 0) {
this.send(this.outputPort, new TextLine(textFile, line)); outputPort.send(new TextLine(textFile, line));
} // else: ignore empty line } // else: ignore empty line
} }
} catch (final FileNotFoundException e) { } catch (final FileNotFoundException e) {
......
package teetime.stage; package teetime.stage.string;
import java.util.StringTokenizer; import java.util.StringTokenizer;
import teetime.framework.AbstractConsumerStage; import teetime.framework.AbstractConsumerStage;
import teetime.framework.OutputPort; import teetime.framework.OutputPort;
public class Tokenizer extends AbstractConsumerStage<String> { public final class Tokenizer extends AbstractConsumerStage<String> {
private final OutputPort<String> outputPort = this.createOutputPort(); private final OutputPort<String> outputPort = this.createOutputPort();
private final String regex; private final String regex;
...@@ -18,7 +18,7 @@ public class Tokenizer extends AbstractConsumerStage<String> { ...@@ -18,7 +18,7 @@ public class Tokenizer extends AbstractConsumerStage<String> {
protected void execute(final String element) { protected void execute(final String element) {
StringTokenizer st = new StringTokenizer(element, this.regex); StringTokenizer st = new StringTokenizer(element, this.regex);
while (st.hasMoreTokens()) { while (st.hasMoreTokens()) {
this.send(this.outputPort, st.nextToken()); outputPort.send(st.nextToken());
} }
} }
......
...@@ -13,22 +13,22 @@ ...@@ -13,22 +13,22 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
***************************************************************************/ ***************************************************************************/
package teetime.stage.stringBuffer; package teetime.stage.string.buffer;
import java.util.Collection; import java.util.Collection;
import java.util.LinkedList; import java.util.LinkedList;
import teetime.framework.AbstractConsumerStage; import teetime.framework.AbstractConsumerStage;
import teetime.framework.OutputPort; import teetime.framework.OutputPort;
import teetime.stage.stringBuffer.handler.AbstractDataTypeHandler; import teetime.stage.string.buffer.handler.AbstractDataTypeHandler;
import teetime.stage.stringBuffer.util.KiekerHashMap; import teetime.stage.string.buffer.util.KiekerHashMap;
/** /**
* @author Christian Wulf * @author Christian Wulf
* *
* @since 1.10 * @since 1.10
*/ */
public class StringBufferFilter<T> extends AbstractConsumerStage<T> { public final class StringBufferFilter<T> extends AbstractConsumerStage<T> {
private final OutputPort<T> outputPort = this.createOutputPort(); private final OutputPort<T> outputPort = this.createOutputPort();
...@@ -40,7 +40,7 @@ public class StringBufferFilter<T> extends AbstractConsumerStage<T> { ...@@ -40,7 +40,7 @@ public class StringBufferFilter<T> extends AbstractConsumerStage<T> {
@Override @Override
protected void execute(final T element) { protected void execute(final T element) {
final T returnedElement = this.handle(element); final T returnedElement = this.handle(element);
this.send(this.outputPort, returnedElement); outputPort.send(returnedElement);
} }
@Override @Override
......
...@@ -13,11 +13,11 @@ ...@@ -13,11 +13,11 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
***************************************************************************/ ***************************************************************************/
package teetime.stage.stringBuffer.handler; package teetime.stage.string.buffer.handler;
import org.slf4j.Logger; import org.slf4j.Logger;
import teetime.stage.stringBuffer.util.KiekerHashMap; import teetime.stage.string.buffer.util.KiekerHashMap;
/** /**
* @author Christian Wulf * @author Christian Wulf
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
***************************************************************************/ ***************************************************************************/
package teetime.stage.stringBuffer.handler; package teetime.stage.string.buffer.handler;
/** /**
* @author Christian Wulf * @author Christian Wulf
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
***************************************************************************/ ***************************************************************************/
package teetime.stage.stringBuffer.util; package teetime.stage.string.buffer.util;
import java.lang.ref.SoftReference; import java.lang.ref.SoftReference;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment