Skip to content
Snippets Groups Projects
Commit 9e71a14c authored by Sören Henning's avatar Sören Henning
Browse files

added TraceImportConfiguration

parent 7f303a21
No related branches found
No related tags found
1 merge request!17Get impletemented stages and Java 8
Pipeline #
......@@ -7,6 +7,8 @@ import java.io.File;
import java.util.ArrayList;
import java.util.List;
import kieker.analysis.dev.DependencyCreatorStage;
import kieker.analysis.dev.DependencyStatisticsDecoratorStage;
import kieker.analysis.domain.AggregatedOperationCall;
import kieker.analysis.domain.AggregatedTrace;
import kieker.analysis.domain.OperationCall;
......@@ -96,10 +98,10 @@ public class TraceAnalysisConfiguration extends Configuration {
super.connectPorts(graphDistributor2.getNewOutputPort(), graphMLFileWriterComposite2.getInputPort());
super.connectPorts(graphDistributor2.getNewOutputPort(), dotTraceGraphFileWriterStage2.getInputPort());
// DependencyCreatorStage dependencyCreatorStage = new DependencyCreatorStage();
// DependencyStatisticsDecoratorStage dependencyStatisticsDecoratorStage = new DependencyStatisticsDecoratorStage();
// super.connectPorts(aggregatedTraceDistributor.getNewOutputPort(), dependencyCreatorStage.getInputPort());
// super.connectPorts(dependencyCreatorStage.getOutputPort(), dependencyStatisticsDecoratorStage.getInputPort());
DependencyCreatorStage dependencyCreatorStage = new DependencyCreatorStage();
DependencyStatisticsDecoratorStage dependencyStatisticsDecoratorStage = new DependencyStatisticsDecoratorStage();
super.connectPorts(aggregatedTraceDistributor.getNewOutputPort(), dependencyCreatorStage.getInputPort());
super.connectPorts(dependencyCreatorStage.getOutputPort(), dependencyStatisticsDecoratorStage.getInputPort());
/*
*
......
/**
*
*/
package kieker.analysis;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import kieker.analysis.domain.AggregatedOperationCall;
import kieker.analysis.domain.AggregatedTrace;
import kieker.analysis.domain.OperationCall;
import kieker.analysis.domain.Trace;
import kieker.analysis.stage.tracediagnosis.AllowedRecordsFilter;
import kieker.analysis.stage.tracediagnosis.BeginEndOfMonitoringDetector;
import kieker.analysis.stage.tracediagnosis.OperationCallAggregator;
import kieker.analysis.stage.tracediagnosis.OperationCallExtractor;
import kieker.analysis.stage.tracediagnosis.ReadingComposite;
import kieker.analysis.stage.tracediagnosis.TraceAggregationComposite;
import kieker.analysis.stage.tracediagnosis.TraceReconstructionComposite;
import kieker.common.record.IMonitoringRecord;
import kieker.common.record.misc.KiekerMetadataRecord;
import teetime.framework.Configuration;
import teetime.stage.CollectorSink;
import teetime.stage.MultipleInstanceOfFilter;
import teetime.stage.basic.distributor.Distributor;
import teetime.stage.basic.distributor.strategy.CopyByReferenceStrategy;
/**
* This {@link Configuration} imports monitoring records from a directory,
* processes them to {@link OperationCall}, {@link AggregatedOperationCall},
* {@link Trace}, and {@link AggregatedTrace}, and collects them. Additionally,
* further statistics will be collected.
*
* @author Sören Henning
*
*/
public class TraceImportConfiguration extends Configuration {
private final List<OperationCall> operationCalls = new ArrayList<>(1000);
private final List<AggregatedOperationCall> aggregatedOperationCalls = new ArrayList<>(1000);
private final List<Trace> traces = new ArrayList<>(1000);
private final List<AggregatedTrace> aggregatedTraces = new ArrayList<>(1000);
private final List<KiekerMetadataRecord> metadataRecords = new ArrayList<>(1000);
private final TraceReconstructionComposite reconstruction;
private final BeginEndOfMonitoringDetector beginEndOfMonitoringDetector;
private final AllowedRecordsFilter allowedRecordsFilter;
public TraceImportConfiguration(final File importDirectory, final boolean activateAdditionalLogChecks) {
// Create the stages
final ReadingComposite reader = new ReadingComposite(importDirectory);
final MultipleInstanceOfFilter<IMonitoringRecord> typeFilter = new MultipleInstanceOfFilter<>();
final Distributor<Trace> traceDistributor = new Distributor<>(new CopyByReferenceStrategy());
final Distributor<OperationCall> callDistributor = new Distributor<>(new CopyByReferenceStrategy());
final TraceAggregationComposite traceAggregator = new TraceAggregationComposite();
final OperationCallExtractor callExtractor = new OperationCallExtractor();
final OperationCallAggregator callAggregator = new OperationCallAggregator();
final CollectorSink<OperationCall> callCollector = new CollectorSink<>(this.operationCalls);
final CollectorSink<AggregatedOperationCall> aggrCallCollector = new CollectorSink<>(this.aggregatedOperationCalls);
final CollectorSink<Trace> traceCollector = new CollectorSink<>(this.traces);
final CollectorSink<AggregatedTrace> aggrTraceCollector = new CollectorSink<>(this.aggregatedTraces);
final CollectorSink<KiekerMetadataRecord> metadataCollector = new CollectorSink<>(this.metadataRecords);
this.allowedRecordsFilter = new AllowedRecordsFilter();
this.beginEndOfMonitoringDetector = new BeginEndOfMonitoringDetector();
this.reconstruction = new TraceReconstructionComposite(activateAdditionalLogChecks); // TODO Check parameter
// Connect the stages
super.connectPorts(reader.getOutputPort(), allowedRecordsFilter.getInputPort());
super.connectPorts(allowedRecordsFilter.getOutputPort(), typeFilter.getInputPort());
super.connectPorts(typeFilter.getOutputPortForType(IMonitoringRecord.class), this.beginEndOfMonitoringDetector.getInputPort());
super.connectPorts(this.beginEndOfMonitoringDetector.getOutputPort(), this.reconstruction.getInputPort());
super.connectPorts(this.reconstruction.getOutputPort(), traceDistributor.getInputPort());
super.connectPorts(traceDistributor.getNewOutputPort(), traceCollector.getInputPort());
super.connectPorts(traceDistributor.getNewOutputPort(), callExtractor.getInputPort());
super.connectPorts(callExtractor.getOutputPort(), callDistributor.getInputPort());
super.connectPorts(callDistributor.getNewOutputPort(), callCollector.getInputPort());
super.connectPorts(callDistributor.getNewOutputPort(), callAggregator.getInputPort());
super.connectPorts(callAggregator.getOutputPort(), aggrCallCollector.getInputPort());
super.connectPorts(traceDistributor.getNewOutputPort(), traceAggregator.getInputPort());
super.connectPorts(traceAggregator.getOutputPort(), aggrTraceCollector.getInputPort());
super.connectPorts(typeFilter.getOutputPortForType(KiekerMetadataRecord.class), metadataCollector.getInputPort());
}
public long getBeginTimestamp() {
return this.beginEndOfMonitoringDetector.getBeginTimestamp();
}
public long getEndTimestamp() {
return this.beginEndOfMonitoringDetector.getEndTimestamp();
}
public int countIncompleteTraces() {
return this.reconstruction.countIncompleteTraces();
}
public int countDanglingEvents() {
return this.reconstruction.countDanglingRecords();
}
public int countIgnoredRecords() {
return this.allowedRecordsFilter.getIgnoredRecords();
}
public List<KiekerMetadataRecord> getMetadataRecords() {
return this.metadataRecords;
}
public List<OperationCall> getOperationCalls() {
return this.operationCalls;
}
public List<AggregatedOperationCall> getAggregatedOperationCalls() {
return this.aggregatedOperationCalls;
}
public List<Trace> getTraces() {
return traces;
}
public List<AggregatedTrace> getAggregatedTraces() {
return aggregatedTraces;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment