Skip to content
Snippets Groups Projects
Commit 36f88d59 authored by Christian Wulf's avatar Christian Wulf
Browse files

added config for TraceReconstructionAnalysis2

parent e7bdf201
No related branches found
No related tags found
No related merge requests found
......@@ -42,6 +42,8 @@ import teetime.stage.predicate.IsIMonitoringRecordInRange;
import teetime.stage.predicate.IsOperationExecutionRecordTraceIdPredicate;
import teetime.stage.predicate.PredicateFilter;
import teetime.stage.stringBuffer.StringBufferFilter;
import teetime.stage.stringBuffer.handler.IMonitoringRecordHandler;
import teetime.stage.stringBuffer.handler.StringHandler;
import teetime.stage.util.TextLine;
/**
......@@ -56,13 +58,12 @@ public class TraceReconstructionAnalysis2 extends Analysis {
private ClassNameRegistryRepository classNameRegistryRepository;
@Override
public void init() {
super.init();
// IPipeline clockPipeline = buildClockPipeline();
// this.clockThread = new WorkerThread(clockPipeline, 1);
// Clock clockStage=(Clock) clockPipeline.getStartStages().get(0);
// IPipeline clockPipeline = buildClockPipeline();
// this.clockThread = new WorkerThread(clockPipeline, 1);
// Clock clockStage=(Clock) clockPipeline.getStartStages().get(0);
final IPipeline pipeline = this.buildPipeline();
this.workerThread = new WorkerThread(pipeline, 0);
......@@ -75,21 +76,30 @@ public class TraceReconstructionAnalysis2 extends Analysis {
private IPipeline buildPipeline() {
// predicates TODO
final IsIMonitoringRecordInRange isIMonitoringRecordInRange = new IsIMonitoringRecordInRange(0, 1000);
final IsOperationExecutionRecordTraceIdPredicate isOperationExecutionRecordTraceIdPredicate = new IsOperationExecutionRecordTraceIdPredicate(false, null);
final IsOperationExecutionRecordTraceIdPredicate isOperationExecutionRecordTraceIdPredicate = new IsOperationExecutionRecordTraceIdPredicate(
false, null);
final ClassNameRegistryCreationFilter classNameRegistryCreationFilter = new ClassNameRegistryCreationFilter(this.classNameRegistryRepository);
final ClassNameRegistryCreationFilter classNameRegistryCreationFilter = new ClassNameRegistryCreationFilter(
this.classNameRegistryRepository);
final MonitoringLogDirectory2Files directory2FilesFilter = new MonitoringLogDirectory2Files();
final File2TextLinesFilter file2TextLinesFilter = new File2TextLinesFilter();
final Cache<TextLine> cache = new Cache<TextLine>();
final TextLine2RecordFilter textLine2RecordFilter = new TextLine2RecordFilter(this.classNameRegistryRepository);
final StringBufferFilter<IMonitoringRecord> stringBufferFilter = new StringBufferFilter<IMonitoringRecord>();
final PredicateFilter<IMonitoringRecord> timestampFilter = new PredicateFilter<IMonitoringRecord>(isIMonitoringRecordInRange);
final PredicateFilter<OperationExecutionRecord> traceIdFilter = new PredicateFilter<OperationExecutionRecord>(isOperationExecutionRecordTraceIdPredicate);
final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>(IFlowRecord.class);
final PredicateFilter<IMonitoringRecord> timestampFilter = new PredicateFilter<IMonitoringRecord>(
isIMonitoringRecordInRange);
final PredicateFilter<OperationExecutionRecord> traceIdFilter = new PredicateFilter<OperationExecutionRecord>(
isOperationExecutionRecordTraceIdPredicate);
final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>(
IFlowRecord.class);
final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter();
final CountingFilter<TraceEventRecords> countingFilter = new CountingFilter<TraceEventRecords>();
// configure stages
stringBufferFilter.getDataTypeHandlers().add(new IMonitoringRecordHandler());
stringBufferFilter.getDataTypeHandlers().add(new StringHandler());
// add each stage to a stage list
final LinkedList<IStage> startStages = new LinkedList<IStage>();
startStages.add(classNameRegistryCreationFilter);
......@@ -109,19 +119,22 @@ public class TraceReconstructionAnalysis2 extends Analysis {
stages.add(countingFilter);
// connect pipes
QueuePipe.connect(classNameRegistryCreationFilter.filePrefixOutputPort, directory2FilesFilter.filePrefixInputPort);
QueuePipe.connect(classNameRegistryCreationFilter.relayDirectoryOutputPort, directory2FilesFilter.directoryInputPort);
QueuePipe.connect(classNameRegistryCreationFilter.filePrefixOutputPort,
directory2FilesFilter.filePrefixInputPort);
QueuePipe.connect(classNameRegistryCreationFilter.relayDirectoryOutputPort,
directory2FilesFilter.directoryInputPort);
QueuePipe.connect(directory2FilesFilter.fileOutputPort, file2TextLinesFilter.fileInputPort);
QueuePipe.connect(file2TextLinesFilter.textLineOutputPort, cache.objectInputPort);
// QueuePipe.connect(XXX, cache.sendInputPort);
// QueuePipe.connect(XXX, cache.sendInputPort);
QueuePipe.connect(cache.objectOutputPort, textLine2RecordFilter.textLineInputPort);
QueuePipe.connect(textLine2RecordFilter.recordOutputPort, stringBufferFilter.objectInputPort);
QueuePipe.connect(stringBufferFilter.objectOutputPort, timestampFilter.inputPort);
// QueuePipe.connect(timestampFilter.matchingOutputPort, traceIdFilter.inputPort);
// QueuePipe.connect(timestampFilter.matchingOutputPort, traceIdFilter.inputPort);
// QueuePipe.connect(timestampFilter.mismatchingOutputPort, YYY); // ignore this case
QueuePipe.connect(traceIdFilter.matchingOutputPort, instanceOfFilter.inputPort);
// QueuePipe.connect(traceIdFilter.mismatchingOutputPort, traceIdFilter.inputPort); // ignore this case
// QueuePipe.connect(clockStage.timestampOutputPort, traceReconstructionFilter.timestampInputPort); // ignore this case
// QueuePipe.connect(clockStage.timestampOutputPort, traceReconstructionFilter.timestampInputPort); // ignore
// this case
QueuePipe.connect(instanceOfFilter.matchingOutputPort, traceReconstructionFilter.recordInputPort);
// QueuePipe.connect(instanceOfFilter.mismatchingOutputPort, instanceOfFilter.inputPort); // ignore this case
QueuePipe.connect(traceReconstructionFilter.traceValidOutputPort, countingFilter.INPUT_OBJECT);
......
......@@ -15,8 +15,8 @@
***************************************************************************/
package teetime.stage.stringBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import teetime.framework.core.AbstractFilter;
import teetime.framework.core.Context;
......@@ -39,7 +39,7 @@ public class StringBufferFilter<T> extends AbstractFilter<StringBufferFilter<T>>
// BETTER use a non shared data structure to avoid synchronization between threads
private KiekerHashMap kiekerHashMap = new KiekerHashMap();
private Collection<AbstractDataTypeHandler<?>> dataTypeHandlers = new ArrayList<AbstractDataTypeHandler<?>>();
private Collection<AbstractDataTypeHandler<?>> dataTypeHandlers = new LinkedList<AbstractDataTypeHandler<?>>();
@Override
protected boolean execute(final Context<StringBufferFilter<T>> context) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment