Skip to content
Snippets Groups Projects
Commit c4ecaef5 authored by Christian Wulf's avatar Christian Wulf
Browse files

added InitialElementProducer to avoid initializing a pipe directly

parent 3ffb8b02
No related branches found
No related tags found
No related merge requests found
package teetime.variant.methodcallWithPorts.stage;
import teetime.variant.methodcallWithPorts.framework.core.ProducerStage;
public class InitialElementProducer<T> extends ProducerStage<T> {
private final T[] elements;
public InitialElementProducer(final T... elements) {
this.elements = elements;
}
@Override
protected void execute() {
for (T e : this.elements) {
this.send(this.outputPort, e);
}
this.terminate();
}
}
...@@ -54,13 +54,9 @@ public class ObjectProducer<T> extends ProducerStage<T> { ...@@ -54,13 +54,9 @@ public class ObjectProducer<T> extends ProducerStage<T> {
@Override @Override
protected void execute() { protected void execute() {
// this.logger.debug("Executing object producer..."); T newObject = this.inputObjectCreator.create();
T newObject = null;
newObject = this.inputObjectCreator.create();
this.numInputObjects--; this.numInputObjects--;
// System.out.println(this.getClass().getSimpleName() + ": sending " + this.numInputObjects);
this.send(this.outputPort, newObject); this.send(this.outputPort, newObject);
if (this.numInputObjects == 0) { if (this.numInputObjects == 0) {
......
...@@ -14,12 +14,12 @@ import java.util.Collection; ...@@ -14,12 +14,12 @@ import java.util.Collection;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import teetime.variant.methodcallWithPorts.framework.core.HeadStage;
import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline; import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline;
import teetime.variant.methodcallWithPorts.framework.core.HeadStage;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.CollectorSink; import teetime.variant.methodcallWithPorts.stage.CollectorSink;
import teetime.variant.methodcallWithPorts.stage.InitialElementProducer;
import teetime.variant.methodcallWithPorts.stage.kieker.Dir2RecordsFilter; import teetime.variant.methodcallWithPorts.stage.kieker.Dir2RecordsFilter;
import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository; import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository;
...@@ -40,21 +40,20 @@ public class KiekerLoadDriver { ...@@ -40,21 +40,20 @@ public class KiekerLoadDriver {
this.runnableStage = new RunnableStage(producerPipeline); this.runnableStage = new RunnableStage(producerPipeline);
} }
private HeadPipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>> buildProducerPipeline(final File directory) { private HeadPipeline<InitialElementProducer<File>, CollectorSink<IMonitoringRecord>> buildProducerPipeline(final File directory) {
ClassNameRegistryRepository classNameRegistryRepository = new ClassNameRegistryRepository(); ClassNameRegistryRepository classNameRegistryRepository = new ClassNameRegistryRepository();
// create stages // create stages
InitialElementProducer<File> initialElementProducer = new InitialElementProducer<File>(directory);
Dir2RecordsFilter dir2RecordsFilter = new Dir2RecordsFilter(classNameRegistryRepository); Dir2RecordsFilter dir2RecordsFilter = new Dir2RecordsFilter(classNameRegistryRepository);
CollectorSink<IMonitoringRecord> collector = new CollectorSink<IMonitoringRecord>(this.elementCollection); CollectorSink<IMonitoringRecord> collector = new CollectorSink<IMonitoringRecord>(this.elementCollection);
final HeadPipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>> pipeline = new HeadPipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>>(); final HeadPipeline<InitialElementProducer<File>, CollectorSink<IMonitoringRecord>> pipeline = new HeadPipeline<InitialElementProducer<File>, CollectorSink<IMonitoringRecord>>();
pipeline.setFirstStage(dir2RecordsFilter); pipeline.setFirstStage(initialElementProducer);
pipeline.setLastStage(collector); pipeline.setLastStage(collector);
dir2RecordsFilter.getInputPort().setPipe(new SpScPipe<File>(1)); SingleElementPipe.connect(initialElementProducer.getOutputPort(), dir2RecordsFilter.getInputPort());
SingleElementPipe.connect(dir2RecordsFilter.getOutputPort(), collector.getInputPort()); SingleElementPipe.connect(dir2RecordsFilter.getOutputPort(), collector.getInputPort());
dir2RecordsFilter.getInputPort().getPipe().add(directory);
return pipeline; return pipeline;
} }
......
package teetime.variant.methodcallWithPorts.examples.kiekerdays; package teetime.variant.methodcallWithPorts.examples.kiekerdays;
import teetime.variant.explicitScheduling.framework.core.Analysis; import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.HeadStage;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.stage.io.TCPReader; import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
public class TcpTraceLogging extends Analysis { public class TcpTraceLogging extends Analysis {
...@@ -12,7 +12,7 @@ public class TcpTraceLogging extends Analysis { ...@@ -12,7 +12,7 @@ public class TcpTraceLogging extends Analysis {
@Override @Override
public void init() { public void init() {
super.init(); super.init();
StageWithPort tcpPipeline = this.buildTcpPipeline(); HeadStage tcpPipeline = this.buildTcpPipeline();
this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); this.tcpThread = new Thread(new RunnableStage(tcpPipeline));
} }
...@@ -29,7 +29,7 @@ public class TcpTraceLogging extends Analysis { ...@@ -29,7 +29,7 @@ public class TcpTraceLogging extends Analysis {
} }
} }
private StageWithPort buildTcpPipeline() { private HeadStage buildTcpPipeline() {
// TCPReaderSink tcpReader = new TCPReaderSink(); // TCPReaderSink tcpReader = new TCPReaderSink();
TCPReader tcpReader = new TCPReader(); TCPReader tcpReader = new TCPReader();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment