diff --git a/src/main/java/teetime/examples/traceReconstruction/TraceReconstructionAnalysis2.java b/src/main/java/teetime/examples/traceReconstruction/TraceReconstructionAnalysis2.java index 0ebf4a7926e5fb4ba9d7d90627361c5eee52c977..a950bacd9c7ca109e40a213a9c5b1ff26689644b 100644 --- a/src/main/java/teetime/examples/traceReconstruction/TraceReconstructionAnalysis2.java +++ b/src/main/java/teetime/examples/traceReconstruction/TraceReconstructionAnalysis2.java @@ -42,6 +42,8 @@ import teetime.stage.predicate.IsIMonitoringRecordInRange; import teetime.stage.predicate.IsOperationExecutionRecordTraceIdPredicate; import teetime.stage.predicate.PredicateFilter; import teetime.stage.stringBuffer.StringBufferFilter; +import teetime.stage.stringBuffer.handler.IMonitoringRecordHandler; +import teetime.stage.stringBuffer.handler.StringHandler; import teetime.stage.util.TextLine; /** @@ -56,13 +58,12 @@ public class TraceReconstructionAnalysis2 extends Analysis { private ClassNameRegistryRepository classNameRegistryRepository; - @Override public void init() { super.init(); -// IPipeline clockPipeline = buildClockPipeline(); -// this.clockThread = new WorkerThread(clockPipeline, 1); -// Clock clockStage=(Clock) clockPipeline.getStartStages().get(0); + // IPipeline clockPipeline = buildClockPipeline(); + // this.clockThread = new WorkerThread(clockPipeline, 1); + // Clock clockStage=(Clock) clockPipeline.getStartStages().get(0); final IPipeline pipeline = this.buildPipeline(); this.workerThread = new WorkerThread(pipeline, 0); @@ -75,21 +76,30 @@ public class TraceReconstructionAnalysis2 extends Analysis { private IPipeline buildPipeline() { // predicates TODO final IsIMonitoringRecordInRange isIMonitoringRecordInRange = new IsIMonitoringRecordInRange(0, 1000); - final IsOperationExecutionRecordTraceIdPredicate isOperationExecutionRecordTraceIdPredicate = new IsOperationExecutionRecordTraceIdPredicate(false, null); + final IsOperationExecutionRecordTraceIdPredicate isOperationExecutionRecordTraceIdPredicate = new IsOperationExecutionRecordTraceIdPredicate( + false, null); - final ClassNameRegistryCreationFilter classNameRegistryCreationFilter = new ClassNameRegistryCreationFilter(this.classNameRegistryRepository); + final ClassNameRegistryCreationFilter classNameRegistryCreationFilter = new ClassNameRegistryCreationFilter( + this.classNameRegistryRepository); final MonitoringLogDirectory2Files directory2FilesFilter = new MonitoringLogDirectory2Files(); final File2TextLinesFilter file2TextLinesFilter = new File2TextLinesFilter(); final Cache<TextLine> cache = new Cache<TextLine>(); final TextLine2RecordFilter textLine2RecordFilter = new TextLine2RecordFilter(this.classNameRegistryRepository); final StringBufferFilter<IMonitoringRecord> stringBufferFilter = new StringBufferFilter<IMonitoringRecord>(); - final PredicateFilter<IMonitoringRecord> timestampFilter = new PredicateFilter<IMonitoringRecord>(isIMonitoringRecordInRange); - final PredicateFilter<OperationExecutionRecord> traceIdFilter = new PredicateFilter<OperationExecutionRecord>(isOperationExecutionRecordTraceIdPredicate); - final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>(IFlowRecord.class); + final PredicateFilter<IMonitoringRecord> timestampFilter = new PredicateFilter<IMonitoringRecord>( + isIMonitoringRecordInRange); + final PredicateFilter<OperationExecutionRecord> traceIdFilter = new PredicateFilter<OperationExecutionRecord>( + isOperationExecutionRecordTraceIdPredicate); + final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>( + IFlowRecord.class); final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(); final CountingFilter<TraceEventRecords> countingFilter = new CountingFilter<TraceEventRecords>(); + // configure stages + stringBufferFilter.getDataTypeHandlers().add(new IMonitoringRecordHandler()); + stringBufferFilter.getDataTypeHandlers().add(new StringHandler()); + // add each stage to a stage list final LinkedList<IStage> startStages = new LinkedList<IStage>(); startStages.add(classNameRegistryCreationFilter); @@ -109,19 +119,22 @@ public class TraceReconstructionAnalysis2 extends Analysis { stages.add(countingFilter); // connect pipes - QueuePipe.connect(classNameRegistryCreationFilter.filePrefixOutputPort, directory2FilesFilter.filePrefixInputPort); - QueuePipe.connect(classNameRegistryCreationFilter.relayDirectoryOutputPort, directory2FilesFilter.directoryInputPort); + QueuePipe.connect(classNameRegistryCreationFilter.filePrefixOutputPort, + directory2FilesFilter.filePrefixInputPort); + QueuePipe.connect(classNameRegistryCreationFilter.relayDirectoryOutputPort, + directory2FilesFilter.directoryInputPort); QueuePipe.connect(directory2FilesFilter.fileOutputPort, file2TextLinesFilter.fileInputPort); QueuePipe.connect(file2TextLinesFilter.textLineOutputPort, cache.objectInputPort); -// QueuePipe.connect(XXX, cache.sendInputPort); + // QueuePipe.connect(XXX, cache.sendInputPort); QueuePipe.connect(cache.objectOutputPort, textLine2RecordFilter.textLineInputPort); QueuePipe.connect(textLine2RecordFilter.recordOutputPort, stringBufferFilter.objectInputPort); QueuePipe.connect(stringBufferFilter.objectOutputPort, timestampFilter.inputPort); -// QueuePipe.connect(timestampFilter.matchingOutputPort, traceIdFilter.inputPort); + // QueuePipe.connect(timestampFilter.matchingOutputPort, traceIdFilter.inputPort); // QueuePipe.connect(timestampFilter.mismatchingOutputPort, YYY); // ignore this case QueuePipe.connect(traceIdFilter.matchingOutputPort, instanceOfFilter.inputPort); // QueuePipe.connect(traceIdFilter.mismatchingOutputPort, traceIdFilter.inputPort); // ignore this case -// QueuePipe.connect(clockStage.timestampOutputPort, traceReconstructionFilter.timestampInputPort); // ignore this case + // QueuePipe.connect(clockStage.timestampOutputPort, traceReconstructionFilter.timestampInputPort); // ignore + // this case QueuePipe.connect(instanceOfFilter.matchingOutputPort, traceReconstructionFilter.recordInputPort); // QueuePipe.connect(instanceOfFilter.mismatchingOutputPort, instanceOfFilter.inputPort); // ignore this case QueuePipe.connect(traceReconstructionFilter.traceValidOutputPort, countingFilter.INPUT_OBJECT); diff --git a/src/main/java/teetime/stage/stringBuffer/StringBufferFilter.java b/src/main/java/teetime/stage/stringBuffer/StringBufferFilter.java index 1147d6e6a145f40e13af56d359149c848eb82f80..0419220cf4c2e60fc011388604a33d3d7de8ff3e 100644 --- a/src/main/java/teetime/stage/stringBuffer/StringBufferFilter.java +++ b/src/main/java/teetime/stage/stringBuffer/StringBufferFilter.java @@ -15,8 +15,8 @@ ***************************************************************************/ package teetime.stage.stringBuffer; -import java.util.ArrayList; import java.util.Collection; +import java.util.LinkedList; import teetime.framework.core.AbstractFilter; import teetime.framework.core.Context; @@ -39,7 +39,7 @@ public class StringBufferFilter<T> extends AbstractFilter<StringBufferFilter<T>> // BETTER use a non shared data structure to avoid synchronization between threads private KiekerHashMap kiekerHashMap = new KiekerHashMap(); - private Collection<AbstractDataTypeHandler<?>> dataTypeHandlers = new ArrayList<AbstractDataTypeHandler<?>>(); + private Collection<AbstractDataTypeHandler<?>> dataTypeHandlers = new LinkedList<AbstractDataTypeHandler<?>>(); @Override protected boolean execute(final Context<StringBufferFilter<T>> context) {