Skip to content
Snippets Groups Projects
Commit 87314fb9 authored by Christian Wulf's avatar Christian Wulf
Browse files

added InitialElementProducer to avoid initializing a pipe directly

parent 35174f2d
No related branches found
No related tags found
No related merge requests found
package teetime.variant.methodcallWithPorts.stage;
import teetime.variant.methodcallWithPorts.framework.core.ProducerStage;
public class InitialElementProducer<T> extends ProducerStage<T> {
private final T[] elements;
public InitialElementProducer(final T... elements) {
this.elements = elements;
}
@Override
protected void execute() {
for (T e : this.elements) {
this.send(this.outputPort, e);
}
this.terminate();
}
}
......@@ -54,13 +54,9 @@ public class ObjectProducer<T> extends ProducerStage<T> {
@Override
protected void execute() {
// this.logger.debug("Executing object producer...");
T newObject = null;
newObject = this.inputObjectCreator.create();
T newObject = this.inputObjectCreator.create();
this.numInputObjects--;
// System.out.println(this.getClass().getSimpleName() + ": sending " + this.numInputObjects);
this.send(this.outputPort, newObject);
if (this.numInputObjects == 0) {
......
......@@ -14,12 +14,12 @@ import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import teetime.variant.methodcallWithPorts.framework.core.HeadStage;
import teetime.variant.methodcallWithPorts.framework.core.HeadPipeline;
import teetime.variant.methodcallWithPorts.framework.core.HeadStage;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.CollectorSink;
import teetime.variant.methodcallWithPorts.stage.InitialElementProducer;
import teetime.variant.methodcallWithPorts.stage.kieker.Dir2RecordsFilter;
import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository;
......@@ -40,21 +40,20 @@ public class KiekerLoadDriver {
this.runnableStage = new RunnableStage(producerPipeline);
}
private HeadPipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>> buildProducerPipeline(final File directory) {
private HeadPipeline<InitialElementProducer<File>, CollectorSink<IMonitoringRecord>> buildProducerPipeline(final File directory) {
ClassNameRegistryRepository classNameRegistryRepository = new ClassNameRegistryRepository();
// create stages
InitialElementProducer<File> initialElementProducer = new InitialElementProducer<File>(directory);
Dir2RecordsFilter dir2RecordsFilter = new Dir2RecordsFilter(classNameRegistryRepository);
CollectorSink<IMonitoringRecord> collector = new CollectorSink<IMonitoringRecord>(this.elementCollection);
final HeadPipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>> pipeline = new HeadPipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>>();
pipeline.setFirstStage(dir2RecordsFilter);
final HeadPipeline<InitialElementProducer<File>, CollectorSink<IMonitoringRecord>> pipeline = new HeadPipeline<InitialElementProducer<File>, CollectorSink<IMonitoringRecord>>();
pipeline.setFirstStage(initialElementProducer);
pipeline.setLastStage(collector);
dir2RecordsFilter.getInputPort().setPipe(new SpScPipe<File>(1));
SingleElementPipe.connect(initialElementProducer.getOutputPort(), dir2RecordsFilter.getInputPort());
SingleElementPipe.connect(dir2RecordsFilter.getOutputPort(), collector.getInputPort());
dir2RecordsFilter.getInputPort().getPipe().add(directory);
return pipeline;
}
......
package teetime.variant.methodcallWithPorts.examples.kiekerdays;
import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.HeadStage;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
public class TcpTraceLogging extends Analysis {
......@@ -12,7 +12,7 @@ public class TcpTraceLogging extends Analysis {
@Override
public void init() {
super.init();
StageWithPort tcpPipeline = this.buildTcpPipeline();
HeadStage tcpPipeline = this.buildTcpPipeline();
this.tcpThread = new Thread(new RunnableStage(tcpPipeline));
}
......@@ -29,7 +29,7 @@ public class TcpTraceLogging extends Analysis {
}
}
private StageWithPort buildTcpPipeline() {
private HeadStage buildTcpPipeline() {
// TCPReaderSink tcpReader = new TCPReaderSink();
TCPReader tcpReader = new TCPReader();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment