Skip to content
Snippets Groups Projects
Commit 47aed756 authored by Christian Wulf's avatar Christian Wulf
Browse files

moved StringBufferFilter;

declared some stages as final;
added EveryXthPrinter;
removed logging from CollectorSink;
moved id and logger from AbstractStage to Stage
parent 85284375
No related branches found
No related tags found
No related merge requests found
Showing
with 88 additions and 85 deletions
...@@ -4,10 +4,6 @@ import java.util.ArrayList; ...@@ -4,10 +4,6 @@ import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import teetime.framework.pipe.DummyPipe; import teetime.framework.pipe.DummyPipe;
import teetime.framework.pipe.IPipe; import teetime.framework.pipe.IPipe;
...@@ -16,14 +12,6 @@ import teetime.framework.validation.InvalidPortConnection; ...@@ -16,14 +12,6 @@ import teetime.framework.validation.InvalidPortConnection;
public abstract class AbstractStage extends Stage { public abstract class AbstractStage extends Stage {
private final String id;
/**
* A unique logger instance per stage instance
*/
protected final Logger logger; // NOPMD
private Stage parentStage;
private final List<InputPort<?>> inputPortList = new ArrayList<InputPort<?>>(); private final List<InputPort<?>> inputPortList = new ArrayList<InputPort<?>>();
private final List<OutputPort<?>> outputPortList = new ArrayList<OutputPort<?>>(); private final List<OutputPort<?>> outputPortList = new ArrayList<OutputPort<?>>();
...@@ -35,11 +23,6 @@ public abstract class AbstractStage extends Stage { ...@@ -35,11 +23,6 @@ public abstract class AbstractStage extends Stage {
private final Set<ISignal> triggeredSignals = new HashSet<ISignal>(); private final Set<ISignal> triggeredSignals = new HashSet<ISignal>();
private boolean shouldTerminate; private boolean shouldTerminate;
public AbstractStage() {
this.id = UUID.randomUUID().toString(); // the id should only be represented by a UUID, not additionally by the class name
this.logger = LoggerFactory.getLogger(this.getClass().getName() + "(" + this.id + ")");
}
private void connectUnconnectedOutputPorts() { private void connectUnconnectedOutputPorts() {
for (OutputPort<?> outputPort : this.cachedOutputPorts) { for (OutputPort<?> outputPort : this.cachedOutputPorts) {
if (null == outputPort.getPipe()) { // if port is unconnected if (null == outputPort.getPipe()) { // if port is unconnected
...@@ -57,21 +40,6 @@ public abstract class AbstractStage extends Stage { ...@@ -57,21 +40,6 @@ public abstract class AbstractStage extends Stage {
return this.cachedOutputPorts; return this.cachedOutputPorts;
} }
@Override
public Stage getParentStage() {
return this.parentStage;
}
@Override
public void setParentStage(final Stage parentStage, final int index) {
this.parentStage = parentStage;
}
@Override
public String getId() {
return this.id;
}
/** /**
* May not be invoked outside of IPipe implementations * May not be invoked outside of IPipe implementations
*/ */
...@@ -141,11 +109,6 @@ public abstract class AbstractStage extends Stage { ...@@ -141,11 +109,6 @@ public abstract class AbstractStage extends Stage {
} }
} }
@Override
public String toString() {
return this.getClass().getName() + ": " + this.id;
}
@Override @Override
public void terminate() { public void terminate() {
this.shouldTerminate = true; this.shouldTerminate = true;
......
package teetime.framework; package teetime.framework;
import java.util.List; import java.util.List;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import teetime.framework.signal.ISignal; import teetime.framework.signal.ISignal;
import teetime.framework.validation.InvalidPortConnection; import teetime.framework.validation.InvalidPortConnection;
public abstract class Stage { public abstract class Stage {
public abstract String getId(); private final String id;
/**
* A unique logger instance per stage instance
*/
protected final Logger logger; // NOPMD
protected Stage() {
this.id = UUID.randomUUID().toString(); // the id should only be represented by a UUID, not additionally by the class name
this.logger = LoggerFactory.getLogger(this.getClass().getName() + "(" + this.id + ")");
}
public String getId() {
return this.id;
}
public abstract Stage getParentStage(); @Override
public String toString() {
return this.getClass().getName() + ": " + this.getId();
}
public abstract void setParentStage(Stage parentStage, int index); // public abstract Stage getParentStage();
//
// public abstract void setParentStage(Stage parentStage, int index);
/** /**
* *
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
***************************************************************************/ ***************************************************************************/
package teetime.stage; package teetime.stage;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import teetime.framework.AbstractConsumerStage; import teetime.framework.AbstractConsumerStage;
...@@ -24,39 +25,24 @@ import teetime.framework.AbstractConsumerStage; ...@@ -24,39 +25,24 @@ import teetime.framework.AbstractConsumerStage;
* *
* @since 1.0 * @since 1.0
*/ */
public class CollectorSink<T> extends AbstractConsumerStage<T> { public final class CollectorSink<T> extends AbstractConsumerStage<T> {
private final List<T> elements; private final List<T> elements;
private final int threshold;
public CollectorSink(final List<T> list, final int threshold) { /**
this.elements = list; * Creates a new {@link CollectorSink} with an {@link ArrayList}.
this.threshold = threshold; */
public CollectorSink() {
this(new ArrayList<T>());
} }
public CollectorSink(final List<T> list) { public CollectorSink(final List<T> list) {
this(list, 100000); this.elements = list;
}
@Override
public void onTerminating() throws Exception {
logNumElements();
super.onTerminating();
} }
@Override @Override
protected void execute(final T element) { protected void execute(final T element) {
this.elements.add(element); this.elements.add(element);
if ((this.elements.size() % this.threshold) == 0) {
logNumElements();
}
}
private void logNumElements() {
if (logger.isInfoEnabled()) {
logger.info("size: " + this.elements.size());
}
} }
} }
...@@ -12,7 +12,7 @@ public class Counter<T> extends AbstractConsumerStage<T> { ...@@ -12,7 +12,7 @@ public class Counter<T> extends AbstractConsumerStage<T> {
@Override @Override
protected void execute(final T element) { protected void execute(final T element) {
this.numElementsPassed++; this.numElementsPassed++;
// this.logger.debug("count: " + this.numElementsPassed);
outputPort.send(element); outputPort.send(element);
} }
......
package teetime.stage;
import teetime.framework.AbstractConsumerStage;
import teetime.framework.OutputPort;
public class EveryXthStage<T> extends AbstractConsumerStage<T> {
private final OutputPort<T> outputPort = createOutputPort();
private final int threshold;
private int counter;
public EveryXthStage(final int threshold) {
this.threshold = threshold;
}
@Override
protected void execute(final T element) {
counter++;
if (counter % threshold == 0) {
outputPort.send(element);
}
}
public OutputPort<T> getOutputPort() {
return outputPort;
}
}
...@@ -9,14 +9,17 @@ import teetime.framework.OutputPort; ...@@ -9,14 +9,17 @@ import teetime.framework.OutputPort;
import com.google.common.io.Files; import com.google.common.io.Files;
public class FileExtensionSwitch extends AbstractConsumerStage<File> { public final class FileExtensionSwitch extends AbstractConsumerStage<File> {
private final Map<String, OutputPort<File>> fileExtensions = new HashMap<String, OutputPort<File>>(); private final Map<String, OutputPort<File>> fileExtensions = new HashMap<String, OutputPort<File>>();
@Override @Override
protected void execute(final File file) { protected void execute(final File file) {
String fileExtension = Files.getFileExtension(file.getAbsolutePath()); String fileExtension = Files.getFileExtension(file.getAbsolutePath());
if (logger.isDebugEnabled()) {
this.logger.debug("fileExtension: " + fileExtension); this.logger.debug("fileExtension: " + fileExtension);
}
OutputPort<File> outputPort = this.fileExtensions.get(fileExtension); OutputPort<File> outputPort = this.fileExtensions.get(fileExtension);
outputPort.send(file); outputPort.send(file);
} }
......
...@@ -3,7 +3,7 @@ package teetime.stage; ...@@ -3,7 +3,7 @@ package teetime.stage;
import teetime.framework.AbstractConsumerStage; import teetime.framework.AbstractConsumerStage;
import teetime.framework.OutputPort; import teetime.framework.OutputPort;
public class InstanceCounter<T, C extends T> extends AbstractConsumerStage<T> { public final class InstanceCounter<T, C extends T> extends AbstractConsumerStage<T> {
private final OutputPort<T> outputPort = this.createOutputPort(); private final OutputPort<T> outputPort = this.createOutputPort();
......
...@@ -7,7 +7,7 @@ import teetime.framework.OutputPort; ...@@ -7,7 +7,7 @@ import teetime.framework.OutputPort;
* @author Jan Waller, Nils Christian Ehmke, Christian Wulf * @author Jan Waller, Nils Christian Ehmke, Christian Wulf
* *
*/ */
public class InstanceOfFilter<I, O> extends AbstractConsumerStage<I> { public final class InstanceOfFilter<I, O> extends AbstractConsumerStage<I> {
private final OutputPort<O> outputPort = this.createOutputPort(); private final OutputPort<O> outputPort = this.createOutputPort();
......
...@@ -2,7 +2,7 @@ package teetime.stage; ...@@ -2,7 +2,7 @@ package teetime.stage;
import teetime.framework.AbstractProducerStage; import teetime.framework.AbstractProducerStage;
public class IterableProducer<O extends Iterable<T>, T> extends AbstractProducerStage<T> { public final class IterableProducer<O extends Iterable<T>, T> extends AbstractProducerStage<T> {
private O iter = null; private O iter = null;
......
...@@ -21,9 +21,9 @@ import teetime.framework.OutputPort; ...@@ -21,9 +21,9 @@ import teetime.framework.OutputPort;
/** /**
* @author Christian Wulf * @author Christian Wulf
* *
* @since 1.10 * @since 1.0
*/ */
public class NoopFilter<T> extends AbstractConsumerStage<T> { public final class NoopFilter<T> extends AbstractConsumerStage<T> {
private final OutputPort<T> outputPort = this.createOutputPort(); private final OutputPort<T> outputPort = this.createOutputPort();
......
...@@ -21,15 +21,15 @@ import teetime.util.ConstructorClosure; ...@@ -21,15 +21,15 @@ import teetime.util.ConstructorClosure;
/** /**
* @author Christian Wulf * @author Christian Wulf
* *
* @since 1.10 * @since 1.0
*/ */
public class ObjectProducer<T> extends AbstractProducerStage<T> { public final class ObjectProducer<T> extends AbstractProducerStage<T> {
private long numInputObjects; private long numInputObjects;
private ConstructorClosure<T> inputObjectCreator; private ConstructorClosure<T> inputObjectCreator;
/** /**
* @since 1.10 * @since 1.0
*/ */
public ObjectProducer(final long numInputObjects, final ConstructorClosure<T> inputObjectCreator) { public ObjectProducer(final long numInputObjects, final ConstructorClosure<T> inputObjectCreator) {
this.numInputObjects = numInputObjects; this.numInputObjects = numInputObjects;
......
package teetime.stage; package teetime.stage;
import teetime.framework.AbstractInterThreadPipe; import teetime.framework.AbstractInterThreadPipe;
import teetime.framework.InputPort;
import teetime.framework.AbstractProducerStage; import teetime.framework.AbstractProducerStage;
import teetime.framework.InputPort;
import teetime.framework.signal.TerminatingSignal; import teetime.framework.signal.TerminatingSignal;
public class Relay<T> extends AbstractProducerStage<T> { public final class Relay<T> extends AbstractProducerStage<T> {
private final InputPort<T> inputPort = this.createInputPort(); private final InputPort<T> inputPort = this.createInputPort();
......
...@@ -24,7 +24,7 @@ import teetime.util.TimestampObject; ...@@ -24,7 +24,7 @@ import teetime.util.TimestampObject;
* *
* @since 1.10 * @since 1.10
*/ */
public class StartTimestampFilter extends AbstractConsumerStage<TimestampObject> { public final class StartTimestampFilter extends AbstractConsumerStage<TimestampObject> {
private final OutputPort<TimestampObject> outputPort = this.createOutputPort(); private final OutputPort<TimestampObject> outputPort = this.createOutputPort();
......
...@@ -24,7 +24,7 @@ import teetime.util.TimestampObject; ...@@ -24,7 +24,7 @@ import teetime.util.TimestampObject;
* *
* @since 1.10 * @since 1.10
*/ */
public class StopTimestampFilter extends AbstractConsumerStage<TimestampObject> { public final class StopTimestampFilter extends AbstractConsumerStage<TimestampObject> {
private final OutputPort<TimestampObject> outputPort = this.createOutputPort(); private final OutputPort<TimestampObject> outputPort = this.createOutputPort();
......
...@@ -15,7 +15,7 @@ import teetime.framework.OutputPort; ...@@ -15,7 +15,7 @@ import teetime.framework.OutputPort;
* @author Nelson Tavares de Sousa * @author Nelson Tavares de Sousa
* *
*/ */
public class ZipByteArray extends AbstractConsumerStage<byte[]> { public final class ZipByteArray extends AbstractConsumerStage<byte[]> {
private final OutputPort<byte[]> outputPort = this.createOutputPort(); private final OutputPort<byte[]> outputPort = this.createOutputPort();
private final ZipMode mode; private final ZipMode mode;
......
...@@ -7,7 +7,7 @@ import teetime.framework.AbstractStage; ...@@ -7,7 +7,7 @@ import teetime.framework.AbstractStage;
import teetime.framework.InputPort; import teetime.framework.InputPort;
import teetime.framework.OutputPort; import teetime.framework.OutputPort;
public class Delay<T> extends AbstractStage { public final class Delay<T> extends AbstractStage {
private final InputPort<T> inputPort = this.createInputPort(); private final InputPort<T> inputPort = this.createInputPort();
private final InputPort<Long> timestampTriggerInputPort = this.createInputPort(); private final InputPort<Long> timestampTriggerInputPort = this.createInputPort();
......
...@@ -2,7 +2,7 @@ package teetime.stage.basic; ...@@ -2,7 +2,7 @@ package teetime.stage.basic;
import teetime.framework.AbstractConsumerStage; import teetime.framework.AbstractConsumerStage;
public class Sink<T> extends AbstractConsumerStage<T> { public final class Sink<T> extends AbstractConsumerStage<T> {
// PERFORMANCE let the sink remove all available input at once by using a new method receiveAll() that clears the pipe's buffer // PERFORMANCE let the sink remove all available input at once by using a new method receiveAll() that clears the pipe's buffer
......
...@@ -26,8 +26,8 @@ public final class CopyByReferenceStrategy<T> implements IDistributorStrategy<T> ...@@ -26,8 +26,8 @@ public final class CopyByReferenceStrategy<T> implements IDistributorStrategy<T>
@Override @Override
public boolean distribute(final OutputPort<T>[] outputPorts, final T element) { public boolean distribute(final OutputPort<T>[] outputPorts, final T element) {
for (final OutputPort<T> port : outputPorts) { for (final OutputPort<T> outputPort : outputPorts) {
port.send(element); outputPort.send(element);
} }
return true; return true;
......
...@@ -22,7 +22,7 @@ import teetime.framework.OutputPort; ...@@ -22,7 +22,7 @@ import teetime.framework.OutputPort;
/** /**
* @author Christian Wulf * @author Christian Wulf
* *
* @since 1.10 * @since 1.0
* *
* @param T * @param T
* the type of the input port and the output ports * the type of the input port and the output ports
......
...@@ -38,7 +38,7 @@ import teetime.framework.signal.ISignal; ...@@ -38,7 +38,7 @@ import teetime.framework.signal.ISignal;
* @param <T> * @param <T>
* the type of both the input and output ports * the type of both the input and output ports
*/ */
public class Merger<T> extends AbstractStage { public final class Merger<T> extends AbstractStage {
private final OutputPort<T> outputPort = this.createOutputPort(); private final OutputPort<T> outputPort = this.createOutputPort();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment