diff --git a/src/main/java/kieker/gui/common/model/importer/ImportAnalysisConfiguration.java b/src/main/java/kieker/gui/common/model/importer/ImportAnalysisConfiguration.java index f99f60151288785cc7b563bf4075c2e911582f6c..2a74026b6d45a26431efb007c05e8f3585f492c5 100644 --- a/src/main/java/kieker/gui/common/model/importer/ImportAnalysisConfiguration.java +++ b/src/main/java/kieker/gui/common/model/importer/ImportAnalysisConfiguration.java @@ -26,24 +26,18 @@ import kieker.common.record.misc.KiekerMetadataRecord; import kieker.gui.common.domain.AggregatedExecution; import kieker.gui.common.domain.Execution; import kieker.gui.common.domain.Record; -import kieker.gui.common.model.importer.stages.FailedAggregatedTraceFilter; -import kieker.gui.common.model.importer.stages.FailedTraceFilter; -import kieker.gui.common.model.importer.stages.FailureContainingAggregatedTraceFilter; -import kieker.gui.common.model.importer.stages.FailureContainingTraceFilter; -import kieker.gui.common.model.importer.stages.RecordSimplificator; -import kieker.gui.common.model.importer.stages.TraceAggregator; -import kieker.gui.common.model.importer.stages.TraceReconstructor; +import kieker.gui.common.model.importer.stages.ReadingComposite; +import kieker.gui.common.model.importer.stages.RecordSimplificatorComposite; +import kieker.gui.common.model.importer.stages.TraceAggregationComposite; +import kieker.gui.common.model.importer.stages.TraceReconstructionComposite; import teetime.framework.AnalysisConfiguration; import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.stage.CollectorSink; -import teetime.stage.InitialElementProducer; import teetime.stage.MultipleInstanceOfFilter; import teetime.stage.basic.distributor.CopyByReferenceStrategy; import teetime.stage.basic.distributor.Distributor; -import teetime.stage.className.ClassNameRegistryRepository; -import teetime.stage.io.filesystem.Dir2RecordsFilter; /** * A configuration for the import and analysis of monitoring logs. @@ -52,88 +46,64 @@ import teetime.stage.io.filesystem.Dir2RecordsFilter; */ public final class ImportAnalysisConfiguration extends AnalysisConfiguration { - private final List<Record> recordsList = new Vector<>(1000); - private final List<Execution> failedTracesList = new Vector<>(1000); - private final List<Execution> failureContainingTracesList = new Vector<>(1000); - private final List<Execution> tracesList = new Vector<>(1000); + private final List<Record> records = new Vector<>(1000); + + private final List<Execution> traces = new Vector<>(1000); + private final List<Execution> failedTraces = new Vector<>(1000); + private final List<Execution> failureContainingTraces = new Vector<>(1000); + private final List<AggregatedExecution> aggregatedTraces = new Vector<>(1000); - private final List<AggregatedExecution> failedAggregatedTracesList = new Vector<>(1000); - private final List<AggregatedExecution> failureContainingAggregatedTracesList = new Vector<>(1000); + private final List<AggregatedExecution> failedAggregatedTraces = new Vector<>(1000); + private final List<AggregatedExecution> failureContainingAggregatedTraces = new Vector<>(1000); + private final List<KiekerMetadataRecord> metadataRecords = new Vector<>(1000); public ImportAnalysisConfiguration(final File importDirectory) { // Create the stages - final InitialElementProducer<File> producer = new InitialElementProducer<>(importDirectory); - final Dir2RecordsFilter reader = new Dir2RecordsFilter(new ClassNameRegistryRepository()); + final ReadingComposite reader = new ReadingComposite(importDirectory); final MultipleInstanceOfFilter<IMonitoringRecord> typeFilter = new MultipleInstanceOfFilter<>(); - final Distributor<IFlowRecord> fstDistributor = new Distributor<>(new CopyByReferenceStrategy()); - final RecordSimplificator recordSimplificator = new RecordSimplificator(); - final CollectorSink<Record> recordCollector = new CollectorSink<>(this.recordsList); - final TraceReconstructor traceReconstructor = new TraceReconstructor(); - final Distributor<Execution> sndDistributor = new Distributor<>(new CopyByReferenceStrategy()); - final CollectorSink<Execution> traceCollector = new CollectorSink<>(this.tracesList); - final FailedTraceFilter failedTraceFilter = new FailedTraceFilter(); - final CollectorSink<Execution> failedTraceCollector = new CollectorSink<>(this.failedTracesList); - final FailureContainingTraceFilter failureContainingTraceFilter = new FailureContainingTraceFilter(); - final CollectorSink<Execution> failureContainingTraceCollector = new CollectorSink<>(this.failureContainingTracesList); - final TraceAggregator traceAggregator = new TraceAggregator(); - final CollectorSink<AggregatedExecution> aggregatedTraceCollector = new CollectorSink<>(this.aggregatedTraces); + final Distributor<IFlowRecord> distributor = new Distributor<>(new CopyByReferenceStrategy()); + final RecordSimplificatorComposite recordSimplificator = new RecordSimplificatorComposite(this.records); + final TraceReconstructionComposite traceReconstruction = new TraceReconstructionComposite(this.traces, this.failedTraces, this.failureContainingTraces); + final TraceAggregationComposite traceAggregation = new TraceAggregationComposite(this.aggregatedTraces, this.failedAggregatedTraces, this.failureContainingAggregatedTraces); + final CollectorSink<KiekerMetadataRecord> metadataCollector = new CollectorSink<>(this.metadataRecords); - final FailedAggregatedTraceFilter failedAggregatedTraceFilter = new FailedAggregatedTraceFilter(); - final Distributor<AggregatedExecution> thrdDistributor = new Distributor<>(new CopyByReferenceStrategy()); - final CollectorSink<AggregatedExecution> failedAggregatedTraceCollector = new CollectorSink<>(this.failedAggregatedTracesList); - final FailureContainingAggregatedTraceFilter failureContainingAggregatedTraceFilter = new FailureContainingAggregatedTraceFilter(); - final CollectorSink<AggregatedExecution> failureContainingAggregatedTraceCollector = new CollectorSink<>(this.failureContainingAggregatedTracesList); // Connect the stages final IPipeFactory pipeFactory = AnalysisConfiguration.PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); - pipeFactory.create(producer.getOutputPort(), reader.getInputPort()); pipeFactory.create(reader.getOutputPort(), typeFilter.getInputPort()); - pipeFactory.create(typeFilter.getOutputPortForType(IFlowRecord.class), fstDistributor.getInputPort()); - pipeFactory.create(fstDistributor.getNewOutputPort(), recordSimplificator.getInputPort()); - pipeFactory.create(recordSimplificator.getOutputPort(), recordCollector.getInputPort()); - pipeFactory.create(fstDistributor.getNewOutputPort(), traceReconstructor.getInputPort()); - pipeFactory.create(traceReconstructor.getOutputPort(), sndDistributor.getInputPort()); - pipeFactory.create(sndDistributor.getNewOutputPort(), traceAggregator.getInputPort()); - pipeFactory.create(sndDistributor.getNewOutputPort(), traceCollector.getInputPort()); - pipeFactory.create(sndDistributor.getNewOutputPort(), failedTraceFilter.getInputPort()); - pipeFactory.create(failedTraceFilter.getOutputPort(), failedTraceCollector.getInputPort()); - pipeFactory.create(sndDistributor.getNewOutputPort(), failureContainingTraceFilter.getInputPort()); - pipeFactory.create(failureContainingTraceFilter.getOutputPort(), failureContainingTraceCollector.getInputPort()); - pipeFactory.create(traceAggregator.getOutputPort(), thrdDistributor.getInputPort()); - pipeFactory.create(thrdDistributor.getNewOutputPort(), aggregatedTraceCollector.getInputPort()); - pipeFactory.create(thrdDistributor.getNewOutputPort(), failedAggregatedTraceFilter.getInputPort()); - pipeFactory.create(thrdDistributor.getNewOutputPort(), failureContainingAggregatedTraceFilter.getInputPort()); - pipeFactory.create(failedAggregatedTraceFilter.getOutputPort(), failedAggregatedTraceCollector.getInputPort()); - pipeFactory.create(failureContainingAggregatedTraceFilter.getOutputPort(), failureContainingAggregatedTraceCollector.getInputPort()); + pipeFactory.create(typeFilter.getOutputPortForType(IFlowRecord.class), distributor.getInputPort()); + pipeFactory.create(distributor.getNewOutputPort(), recordSimplificator.getInputPort()); + pipeFactory.create(distributor.getNewOutputPort(), traceReconstruction.getInputPort()); + pipeFactory.create(traceReconstruction.getOutputPort(), traceAggregation.getInputPort()); pipeFactory.create(typeFilter.getOutputPortForType(KiekerMetadataRecord.class), metadataCollector.getInputPort()); // Make sure that the producer is executed by the analysis - super.addThreadableStage(producer); + super.addThreadableStage(reader); } public List<Record> getRecordsList() { - return this.recordsList; + return this.records; } public List<Execution> getTracesList() { - return this.tracesList; + return this.traces; } public List<Execution> getFailedTracesList() { - return this.failedTracesList; + return this.failedTraces; } public List<Execution> getFailureContainingTracesList() { - return this.failureContainingTracesList; + return this.failureContainingTraces; } public List<AggregatedExecution> getFailedAggregatedTracesList() { - return this.failedAggregatedTracesList; + return this.failedAggregatedTraces; } public List<AggregatedExecution> getFailureContainingAggregatedTracesList() { - return this.failureContainingAggregatedTracesList; + return this.failureContainingAggregatedTraces; } public List<AggregatedExecution> getAggregatedTraces() { diff --git a/src/main/java/kieker/gui/common/model/importer/stages/FailedAggregatedTraceFilter.java b/src/main/java/kieker/gui/common/model/importer/stages/FailedAggregatedTraceFilter.java deleted file mode 100644 index f02d0976ab59f850a637c69e6d61406ce493cb57..0000000000000000000000000000000000000000 --- a/src/main/java/kieker/gui/common/model/importer/stages/FailedAggregatedTraceFilter.java +++ /dev/null @@ -1,38 +0,0 @@ -/*************************************************************************** - * Copyright 2014 Kieker Project (http://kieker-monitoring.net) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - ***************************************************************************/ - -package kieker.gui.common.model.importer.stages; - -import kieker.gui.common.domain.AggregatedExecution; -import teetime.framework.AbstractConsumerStage; -import teetime.framework.OutputPort; - -public final class FailedAggregatedTraceFilter extends AbstractConsumerStage<AggregatedExecution> { - - private final OutputPort<AggregatedExecution> outputPort = super.createOutputPort(); - - @Override - protected void execute(final AggregatedExecution element) { - if (element.isFailed()) { - this.outputPort.send(element); - } - } - - public OutputPort<AggregatedExecution> getOutputPort() { - return this.outputPort; - } - -} diff --git a/src/main/java/kieker/gui/common/model/importer/stages/FailedTraceFilter.java b/src/main/java/kieker/gui/common/model/importer/stages/FailedTraceFilter.java index 11e3d189727622906d2d6708d58ac50752eef002..2a0aa0302f8c5ab037ac03ab17f825d77806f569 100644 --- a/src/main/java/kieker/gui/common/model/importer/stages/FailedTraceFilter.java +++ b/src/main/java/kieker/gui/common/model/importer/stages/FailedTraceFilter.java @@ -16,22 +16,22 @@ package kieker.gui.common.model.importer.stages; -import kieker.gui.common.domain.Execution; +import kieker.gui.common.domain.AbstractExecution; import teetime.framework.AbstractConsumerStage; import teetime.framework.OutputPort; -public final class FailedTraceFilter extends AbstractConsumerStage<Execution> { +final class FailedTraceFilter<T extends AbstractExecution<?>> extends AbstractConsumerStage<T> { - private final OutputPort<Execution> outputPort = super.createOutputPort(); + private final OutputPort<T> outputPort = super.createOutputPort(); @Override - protected void execute(final Execution element) { + protected void execute(final T element) { if (element.isFailed()) { this.outputPort.send(element); } } - public OutputPort<Execution> getOutputPort() { + public OutputPort<T> getOutputPort() { return this.outputPort; } diff --git a/src/main/java/kieker/gui/common/model/importer/stages/FailureContainingAggregatedTraceFilter.java b/src/main/java/kieker/gui/common/model/importer/stages/FailureContainingAggregatedTraceFilter.java deleted file mode 100644 index 9bdf795935fdfb1ced5229e57e30cc3abe002ec6..0000000000000000000000000000000000000000 --- a/src/main/java/kieker/gui/common/model/importer/stages/FailureContainingAggregatedTraceFilter.java +++ /dev/null @@ -1,38 +0,0 @@ -/*************************************************************************** - * Copyright 2014 Kieker Project (http://kieker-monitoring.net) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - ***************************************************************************/ - -package kieker.gui.common.model.importer.stages; - -import kieker.gui.common.domain.AggregatedExecution; -import teetime.framework.AbstractConsumerStage; -import teetime.framework.OutputPort; - -public final class FailureContainingAggregatedTraceFilter extends AbstractConsumerStage<AggregatedExecution> { - - private final OutputPort<AggregatedExecution> outputPort = super.createOutputPort(); - - @Override - protected void execute(final AggregatedExecution element) { - if (element.containsFailure()) { - this.outputPort.send(element); - } - } - - public OutputPort<AggregatedExecution> getOutputPort() { - return this.outputPort; - } - -} diff --git a/src/main/java/kieker/gui/common/model/importer/stages/FailureContainingTraceFilter.java b/src/main/java/kieker/gui/common/model/importer/stages/FailureContainingTraceFilter.java index 5b71e3f3bf3cf9fcfef02b81d264050d9908e122..efb1c07166c9509e45df418a8aae8e9a73c7252d 100644 --- a/src/main/java/kieker/gui/common/model/importer/stages/FailureContainingTraceFilter.java +++ b/src/main/java/kieker/gui/common/model/importer/stages/FailureContainingTraceFilter.java @@ -16,22 +16,22 @@ package kieker.gui.common.model.importer.stages; -import kieker.gui.common.domain.Execution; +import kieker.gui.common.domain.AbstractExecution; import teetime.framework.AbstractConsumerStage; import teetime.framework.OutputPort; -public final class FailureContainingTraceFilter extends AbstractConsumerStage<Execution> { +final class FailureContainingTraceFilter<T extends AbstractExecution<?>> extends AbstractConsumerStage<T> { - private final OutputPort<Execution> outputPort = super.createOutputPort(); + private final OutputPort<T> outputPort = super.createOutputPort(); @Override - protected void execute(final Execution element) { + protected void execute(final T element) { if (element.containsFailure()) { this.outputPort.send(element); } } - public OutputPort<Execution> getOutputPort() { + public OutputPort<T> getOutputPort() { return this.outputPort; } diff --git a/src/main/java/kieker/gui/common/model/importer/stages/ReadingComposite.java b/src/main/java/kieker/gui/common/model/importer/stages/ReadingComposite.java new file mode 100644 index 0000000000000000000000000000000000000000..b5a7df363d8f39ae602060dccaa6041760d6d010 --- /dev/null +++ b/src/main/java/kieker/gui/common/model/importer/stages/ReadingComposite.java @@ -0,0 +1,68 @@ +package kieker.gui.common.model.importer.stages; + +import java.io.File; +import java.util.List; + +import kieker.common.record.IMonitoringRecord; +import teetime.framework.InputPort; +import teetime.framework.OutputPort; +import teetime.framework.Stage; +import teetime.framework.TerminationStrategy; +import teetime.framework.pipe.IPipeFactory; +import teetime.framework.pipe.PipeFactoryRegistry; +import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; +import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; +import teetime.framework.signal.ISignal; +import teetime.framework.validation.InvalidPortConnection; +import teetime.stage.InitialElementProducer; +import teetime.stage.className.ClassNameRegistryRepository; +import teetime.stage.io.filesystem.Dir2RecordsFilter; + +public final class ReadingComposite extends Stage { + + private final InitialElementProducer<File> producer; + private final Dir2RecordsFilter reader; + + public ReadingComposite(final File importDirectory) { + this.producer = new InitialElementProducer<>(importDirectory); + this.reader = new Dir2RecordsFilter(new ClassNameRegistryRepository()); + + final IPipeFactory pipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); + pipeFactory.create(this.producer.getOutputPort(), this.reader.getInputPort()); + } + + @Override + protected void executeWithPorts() { + this.producer.executeWithPorts(); + } + + public OutputPort<IMonitoringRecord> getOutputPort() { + return this.reader.getOutputPort(); + } + + @Override + public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) { + this.reader.validateOutputPorts(invalidPortConnections); + } + + @Override + protected void onSignal(final ISignal signal, final InputPort<?> inputPort) { + this.producer.onSignal(signal, inputPort); + } + + @Override + protected TerminationStrategy getTerminationStrategy() { + return this.producer.getTerminationStrategy(); + } + + @Override + protected void terminate() { + this.producer.terminate(); + } + + @Override + protected boolean shouldBeTerminated() { + return this.producer.shouldBeTerminated(); + } + +} diff --git a/src/main/java/kieker/gui/common/model/importer/stages/RecordSimplificator.java b/src/main/java/kieker/gui/common/model/importer/stages/RecordSimplificator.java index 644a82c5cb149b2b52b1f587ad7db50c79d4fae8..e4df7aefa29823e2c1462e6c60f54474e4e5bdeb 100644 --- a/src/main/java/kieker/gui/common/model/importer/stages/RecordSimplificator.java +++ b/src/main/java/kieker/gui/common/model/importer/stages/RecordSimplificator.java @@ -26,7 +26,7 @@ import teetime.framework.OutputPort; * * @author Nils Christian Ehmke */ -public final class RecordSimplificator extends AbstractConsumerStage<IMonitoringRecord> { +final class RecordSimplificator extends AbstractConsumerStage<IMonitoringRecord> { private final OutputPort<Record> outputPort = super.createOutputPort(); diff --git a/src/main/java/kieker/gui/common/model/importer/stages/RecordSimplificatorComposite.java b/src/main/java/kieker/gui/common/model/importer/stages/RecordSimplificatorComposite.java new file mode 100644 index 0000000000000000000000000000000000000000..4d7cda2b98539e0e42dc636da79c0d65ea2bb174 --- /dev/null +++ b/src/main/java/kieker/gui/common/model/importer/stages/RecordSimplificatorComposite.java @@ -0,0 +1,81 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ + +package kieker.gui.common.model.importer.stages; + +import java.util.List; + +import kieker.common.record.IMonitoringRecord; +import kieker.gui.common.domain.Record; +import teetime.framework.InputPort; +import teetime.framework.Stage; +import teetime.framework.TerminationStrategy; +import teetime.framework.pipe.IPipeFactory; +import teetime.framework.pipe.PipeFactoryRegistry; +import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; +import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; +import teetime.framework.signal.ISignal; +import teetime.framework.validation.InvalidPortConnection; +import teetime.stage.CollectorSink; + +public final class RecordSimplificatorComposite extends Stage { + + private final RecordSimplificator simplificator; + private final CollectorSink<Record> collector; + + public RecordSimplificatorComposite(final List<Record> records) { + this.simplificator = new RecordSimplificator(); + this.collector = new CollectorSink<>(records); + + final IPipeFactory pipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); + pipeFactory.create(this.simplificator.getOutputPort(), this.collector.getInputPort()); + } + + @Override + protected void executeWithPorts() { + this.simplificator.executeWithPorts(); + } + + public InputPort<IMonitoringRecord> getInputPort() { + return this.simplificator.getInputPort(); + } + + @Override + public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) { + // No code necessary + } + + @Override + protected void onSignal(final ISignal signal, final InputPort<?> inputPort) { + this.simplificator.onSignal(signal, inputPort); + } + + @Override + protected TerminationStrategy getTerminationStrategy() { + return this.simplificator.getTerminationStrategy(); + } + + @Override + protected void terminate() { + this.simplificator.terminate(); + } + + @Override + protected boolean shouldBeTerminated() { + return this.simplificator.shouldBeTerminated(); + } + +} diff --git a/src/main/java/kieker/gui/common/model/importer/stages/TraceAggregationComposite.java b/src/main/java/kieker/gui/common/model/importer/stages/TraceAggregationComposite.java new file mode 100644 index 0000000000000000000000000000000000000000..cd32860f2bdfccf37ee2690e615dbc621d219687 --- /dev/null +++ b/src/main/java/kieker/gui/common/model/importer/stages/TraceAggregationComposite.java @@ -0,0 +1,102 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ + +package kieker.gui.common.model.importer.stages; + +import java.util.List; + +import kieker.gui.common.domain.AggregatedExecution; +import kieker.gui.common.domain.Execution; +import teetime.framework.InputPort; +import teetime.framework.Stage; +import teetime.framework.TerminationStrategy; +import teetime.framework.pipe.IPipeFactory; +import teetime.framework.pipe.PipeFactoryRegistry; +import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; +import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; +import teetime.framework.signal.ISignal; +import teetime.framework.validation.InvalidPortConnection; +import teetime.stage.CollectorSink; +import teetime.stage.basic.distributor.CopyByReferenceStrategy; +import teetime.stage.basic.distributor.Distributor; + +public final class TraceAggregationComposite extends Stage { + + private final TraceAggregator aggregator; + private final Distributor<AggregatedExecution> distributor; + private final FailedTraceFilter<AggregatedExecution> failedTraceFilter; + private final FailureContainingTraceFilter<AggregatedExecution> failureContainingTraceFilter; + + private final CollectorSink<AggregatedExecution> tracesCollector; + private final CollectorSink<AggregatedExecution> failedTracesCollector; + private final CollectorSink<AggregatedExecution> failureContainingTracesCollector; + + public TraceAggregationComposite(final List<AggregatedExecution> traces, final List<AggregatedExecution> failedTraces, final List<AggregatedExecution> failureContainingTraces) { + this.aggregator = new TraceAggregator(); + this.distributor = new Distributor<>(new CopyByReferenceStrategy()); + this.failedTraceFilter = new FailedTraceFilter<>(); + this.failureContainingTraceFilter = new FailureContainingTraceFilter<>(); + + this.tracesCollector = new CollectorSink<>(traces); + this.failedTracesCollector = new CollectorSink<>(failedTraces); + this.failureContainingTracesCollector = new CollectorSink<>(failureContainingTraces); + + final IPipeFactory pipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); + pipeFactory.create(this.aggregator.getOutputPort(), this.distributor.getInputPort()); + + pipeFactory.create(this.distributor.getNewOutputPort(), this.tracesCollector.getInputPort()); + pipeFactory.create(this.distributor.getNewOutputPort(), this.failedTraceFilter.getInputPort()); + pipeFactory.create(this.distributor.getNewOutputPort(), this.failureContainingTraceFilter.getInputPort()); + + pipeFactory.create(this.failedTraceFilter.getOutputPort(), this.failedTracesCollector.getInputPort()); + pipeFactory.create(this.failureContainingTraceFilter.getOutputPort(), this.failureContainingTracesCollector.getInputPort()); + } + + @Override + protected void executeWithPorts() { + this.aggregator.executeWithPorts(); + } + + public InputPort<Execution> getInputPort() { + return this.aggregator.getInputPort(); + } + + @Override + public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) { + // No code necessary + } + + @Override + protected void onSignal(final ISignal signal, final InputPort<?> inputPort) { + this.aggregator.onSignal(signal, inputPort); + } + + @Override + protected TerminationStrategy getTerminationStrategy() { + return this.aggregator.getTerminationStrategy(); + } + + @Override + protected void terminate() { + this.aggregator.terminate(); + } + + @Override + protected boolean shouldBeTerminated() { + return this.aggregator.shouldBeTerminated(); + } + +} diff --git a/src/main/java/kieker/gui/common/model/importer/stages/TraceAggregator.java b/src/main/java/kieker/gui/common/model/importer/stages/TraceAggregator.java index 652e56a385542660abaa72afd34e2cc91f0e8401..7de51bd207f07050341abb236bf4f9b6935d51a6 100644 --- a/src/main/java/kieker/gui/common/model/importer/stages/TraceAggregator.java +++ b/src/main/java/kieker/gui/common/model/importer/stages/TraceAggregator.java @@ -29,7 +29,7 @@ import teetime.framework.OutputPort; * * @author Nils Christian Ehmke */ -public final class TraceAggregator extends AbstractConsumerStage<Execution> { +final class TraceAggregator extends AbstractConsumerStage<Execution> { private final OutputPort<AggregatedExecution> outputPort = super.createOutputPort(); private final Map<Execution, AggregatedExecution> aggregationMap = new HashMap<>(); diff --git a/src/main/java/kieker/gui/common/model/importer/stages/TraceReconstructionComposite.java b/src/main/java/kieker/gui/common/model/importer/stages/TraceReconstructionComposite.java new file mode 100644 index 0000000000000000000000000000000000000000..fb726d9b278c7c1be0b6f512399840521d564f01 --- /dev/null +++ b/src/main/java/kieker/gui/common/model/importer/stages/TraceReconstructionComposite.java @@ -0,0 +1,94 @@ +package kieker.gui.common.model.importer.stages; + +import java.util.List; + +import kieker.common.record.flow.IFlowRecord; +import kieker.gui.common.domain.Execution; +import teetime.framework.InputPort; +import teetime.framework.OutputPort; +import teetime.framework.Stage; +import teetime.framework.TerminationStrategy; +import teetime.framework.pipe.IPipeFactory; +import teetime.framework.pipe.PipeFactoryRegistry; +import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; +import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; +import teetime.framework.signal.ISignal; +import teetime.framework.validation.InvalidPortConnection; +import teetime.stage.CollectorSink; +import teetime.stage.basic.distributor.CopyByReferenceStrategy; +import teetime.stage.basic.distributor.Distributor; + +public final class TraceReconstructionComposite extends Stage { + + private final TraceReconstructor reconstructor; + private final Distributor<Execution> distributor; + private final FailedTraceFilter<Execution> failedTraceFilter; + private final FailureContainingTraceFilter<Execution> failureContainingTraceFilter; + + private final CollectorSink<Execution> tracesCollector; + private final CollectorSink<Execution> failedTracesCollector; + private final CollectorSink<Execution> failureContainingTracescollector; + private final OutputPort<Execution> outputPort; + + public TraceReconstructionComposite(final List<Execution> traces, final List<Execution> failedTraces, final List<Execution> failureContainingTraces) { + this.reconstructor = new TraceReconstructor(); + this.distributor = new Distributor<>(new CopyByReferenceStrategy()); + this.failedTraceFilter = new FailedTraceFilter<>(); + this.failureContainingTraceFilter = new FailureContainingTraceFilter<>(); + + this.tracesCollector = new CollectorSink<>(traces); + this.failedTracesCollector = new CollectorSink<>(failedTraces); + this.failureContainingTracescollector = new CollectorSink<>(failureContainingTraces); + + this.outputPort = this.distributor.getNewOutputPort(); + + final IPipeFactory pipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); + pipeFactory.create(this.reconstructor.getOutputPort(), this.distributor.getInputPort()); + + pipeFactory.create(this.distributor.getNewOutputPort(), this.tracesCollector.getInputPort()); + pipeFactory.create(this.distributor.getNewOutputPort(), this.failedTraceFilter.getInputPort()); + pipeFactory.create(this.distributor.getNewOutputPort(), this.failureContainingTraceFilter.getInputPort()); + + pipeFactory.create(this.failedTraceFilter.getOutputPort(), this.failedTracesCollector.getInputPort()); + pipeFactory.create(this.failureContainingTraceFilter.getOutputPort(), this.failureContainingTracescollector.getInputPort()); + } + + @Override + protected void executeWithPorts() { + this.reconstructor.executeWithPorts(); + } + + public InputPort<IFlowRecord> getInputPort() { + return this.reconstructor.getInputPort(); + } + + public OutputPort<Execution> getOutputPort() { + return this.outputPort; + } + + @Override + public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) { + this.distributor.validateOutputPorts(invalidPortConnections); + } + + @Override + protected void onSignal(final ISignal signal, final InputPort<?> inputPort) { + this.reconstructor.onSignal(signal, inputPort); + } + + @Override + protected TerminationStrategy getTerminationStrategy() { + return this.reconstructor.getTerminationStrategy(); + } + + @Override + protected void terminate() { + this.reconstructor.terminate(); + } + + @Override + protected boolean shouldBeTerminated() { + return this.reconstructor.shouldBeTerminated(); + } + +} diff --git a/src/main/java/kieker/gui/common/model/importer/stages/TraceReconstructor.java b/src/main/java/kieker/gui/common/model/importer/stages/TraceReconstructor.java index 7bb687b931201757d08127de972cbc4d16f945a6..0479a4f1ff87ffc22384be377efc33ad04f0a59f 100644 --- a/src/main/java/kieker/gui/common/model/importer/stages/TraceReconstructor.java +++ b/src/main/java/kieker/gui/common/model/importer/stages/TraceReconstructor.java @@ -37,7 +37,7 @@ import teetime.framework.OutputPort; * * @author Nils Christian Ehmke */ -public final class TraceReconstructor extends AbstractConsumerStage<IFlowRecord> { +final class TraceReconstructor extends AbstractConsumerStage<IFlowRecord> { private final OutputPort<Execution> outputPort = super.createOutputPort(); private final Map<Long, TraceBuffer> traceBuffers = new HashMap<>();