diff --git a/src/main/java/kieker/analysis/TraceAnalysisConfiguration.java b/src/main/java/kieker/analysis/TraceAnalysisConfiguration.java index 127fe1a372cf3fd985f89be83251b095f4c48202..4a7804e1d31bd67ee5a77e6165e68fe36fb33507 100644 --- a/src/main/java/kieker/analysis/TraceAnalysisConfiguration.java +++ b/src/main/java/kieker/analysis/TraceAnalysisConfiguration.java @@ -7,6 +7,8 @@ import java.io.File; import java.util.ArrayList; import java.util.List; +import kieker.analysis.dev.DependencyCreatorStage; +import kieker.analysis.dev.DependencyStatisticsDecoratorStage; import kieker.analysis.domain.AggregatedOperationCall; import kieker.analysis.domain.AggregatedTrace; import kieker.analysis.domain.OperationCall; @@ -96,10 +98,10 @@ public class TraceAnalysisConfiguration extends Configuration { super.connectPorts(graphDistributor2.getNewOutputPort(), graphMLFileWriterComposite2.getInputPort()); super.connectPorts(graphDistributor2.getNewOutputPort(), dotTraceGraphFileWriterStage2.getInputPort()); - // DependencyCreatorStage dependencyCreatorStage = new DependencyCreatorStage(); - // DependencyStatisticsDecoratorStage dependencyStatisticsDecoratorStage = new DependencyStatisticsDecoratorStage(); - // super.connectPorts(aggregatedTraceDistributor.getNewOutputPort(), dependencyCreatorStage.getInputPort()); - // super.connectPorts(dependencyCreatorStage.getOutputPort(), dependencyStatisticsDecoratorStage.getInputPort()); + DependencyCreatorStage dependencyCreatorStage = new DependencyCreatorStage(); + DependencyStatisticsDecoratorStage dependencyStatisticsDecoratorStage = new DependencyStatisticsDecoratorStage(); + super.connectPorts(aggregatedTraceDistributor.getNewOutputPort(), dependencyCreatorStage.getInputPort()); + super.connectPorts(dependencyCreatorStage.getOutputPort(), dependencyStatisticsDecoratorStage.getInputPort()); /* * diff --git a/src/main/java/kieker/analysis/TraceImportConfiguration.java b/src/main/java/kieker/analysis/TraceImportConfiguration.java new file mode 100644 index 0000000000000000000000000000000000000000..afa52e4e8660229348a4d38cfe544c68bc2260c5 --- /dev/null +++ b/src/main/java/kieker/analysis/TraceImportConfiguration.java @@ -0,0 +1,130 @@ +/** + * + */ +package kieker.analysis; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +import kieker.analysis.domain.AggregatedOperationCall; +import kieker.analysis.domain.AggregatedTrace; +import kieker.analysis.domain.OperationCall; +import kieker.analysis.domain.Trace; +import kieker.analysis.stage.tracediagnosis.AllowedRecordsFilter; +import kieker.analysis.stage.tracediagnosis.BeginEndOfMonitoringDetector; +import kieker.analysis.stage.tracediagnosis.OperationCallAggregator; +import kieker.analysis.stage.tracediagnosis.OperationCallExtractor; +import kieker.analysis.stage.tracediagnosis.ReadingComposite; +import kieker.analysis.stage.tracediagnosis.TraceAggregationComposite; +import kieker.analysis.stage.tracediagnosis.TraceReconstructionComposite; +import kieker.common.record.IMonitoringRecord; +import kieker.common.record.misc.KiekerMetadataRecord; + +import teetime.framework.Configuration; +import teetime.stage.CollectorSink; +import teetime.stage.MultipleInstanceOfFilter; +import teetime.stage.basic.distributor.Distributor; +import teetime.stage.basic.distributor.strategy.CopyByReferenceStrategy; + +/** + * This {@link Configuration} imports monitoring records from a directory, + * processes them to {@link OperationCall}, {@link AggregatedOperationCall}, + * {@link Trace}, and {@link AggregatedTrace}, and collects them. Additionally, + * further statistics will be collected. + * + * @author Sören Henning + * + */ +public class TraceImportConfiguration extends Configuration { + + private final List<OperationCall> operationCalls = new ArrayList<>(1000); + private final List<AggregatedOperationCall> aggregatedOperationCalls = new ArrayList<>(1000); + private final List<Trace> traces = new ArrayList<>(1000); + private final List<AggregatedTrace> aggregatedTraces = new ArrayList<>(1000); + private final List<KiekerMetadataRecord> metadataRecords = new ArrayList<>(1000); + + private final TraceReconstructionComposite reconstruction; + private final BeginEndOfMonitoringDetector beginEndOfMonitoringDetector; + private final AllowedRecordsFilter allowedRecordsFilter; + + public TraceImportConfiguration(final File importDirectory, final boolean activateAdditionalLogChecks) { + + // Create the stages + final ReadingComposite reader = new ReadingComposite(importDirectory); + final MultipleInstanceOfFilter<IMonitoringRecord> typeFilter = new MultipleInstanceOfFilter<>(); + final Distributor<Trace> traceDistributor = new Distributor<>(new CopyByReferenceStrategy()); + final Distributor<OperationCall> callDistributor = new Distributor<>(new CopyByReferenceStrategy()); + final TraceAggregationComposite traceAggregator = new TraceAggregationComposite(); + final OperationCallExtractor callExtractor = new OperationCallExtractor(); + final OperationCallAggregator callAggregator = new OperationCallAggregator(); + + final CollectorSink<OperationCall> callCollector = new CollectorSink<>(this.operationCalls); + final CollectorSink<AggregatedOperationCall> aggrCallCollector = new CollectorSink<>(this.aggregatedOperationCalls); + final CollectorSink<Trace> traceCollector = new CollectorSink<>(this.traces); + final CollectorSink<AggregatedTrace> aggrTraceCollector = new CollectorSink<>(this.aggregatedTraces); + final CollectorSink<KiekerMetadataRecord> metadataCollector = new CollectorSink<>(this.metadataRecords); + + this.allowedRecordsFilter = new AllowedRecordsFilter(); + this.beginEndOfMonitoringDetector = new BeginEndOfMonitoringDetector(); + this.reconstruction = new TraceReconstructionComposite(activateAdditionalLogChecks); // TODO Check parameter + + // Connect the stages + super.connectPorts(reader.getOutputPort(), allowedRecordsFilter.getInputPort()); + super.connectPorts(allowedRecordsFilter.getOutputPort(), typeFilter.getInputPort()); + super.connectPorts(typeFilter.getOutputPortForType(IMonitoringRecord.class), this.beginEndOfMonitoringDetector.getInputPort()); + super.connectPorts(this.beginEndOfMonitoringDetector.getOutputPort(), this.reconstruction.getInputPort()); + super.connectPorts(this.reconstruction.getOutputPort(), traceDistributor.getInputPort()); + super.connectPorts(traceDistributor.getNewOutputPort(), traceCollector.getInputPort()); + super.connectPorts(traceDistributor.getNewOutputPort(), callExtractor.getInputPort()); + super.connectPorts(callExtractor.getOutputPort(), callDistributor.getInputPort()); + super.connectPorts(callDistributor.getNewOutputPort(), callCollector.getInputPort()); + super.connectPorts(callDistributor.getNewOutputPort(), callAggregator.getInputPort()); + super.connectPorts(callAggregator.getOutputPort(), aggrCallCollector.getInputPort()); + super.connectPorts(traceDistributor.getNewOutputPort(), traceAggregator.getInputPort()); + super.connectPorts(traceAggregator.getOutputPort(), aggrTraceCollector.getInputPort()); + super.connectPorts(typeFilter.getOutputPortForType(KiekerMetadataRecord.class), metadataCollector.getInputPort()); + + } + + public long getBeginTimestamp() { + return this.beginEndOfMonitoringDetector.getBeginTimestamp(); + } + + public long getEndTimestamp() { + return this.beginEndOfMonitoringDetector.getEndTimestamp(); + } + + public int countIncompleteTraces() { + return this.reconstruction.countIncompleteTraces(); + } + + public int countDanglingEvents() { + return this.reconstruction.countDanglingRecords(); + } + + public int countIgnoredRecords() { + return this.allowedRecordsFilter.getIgnoredRecords(); + } + + public List<KiekerMetadataRecord> getMetadataRecords() { + return this.metadataRecords; + } + + public List<OperationCall> getOperationCalls() { + return this.operationCalls; + } + + public List<AggregatedOperationCall> getAggregatedOperationCalls() { + return this.aggregatedOperationCalls; + } + + public List<Trace> getTraces() { + return traces; + } + + public List<AggregatedTrace> getAggregatedTraces() { + return aggregatedTraces; + } + +}