Skip to content
Snippets Groups Projects
Commit c20dfd8e authored by Nils Christian Ehmke's avatar Nils Christian Ehmke
Browse files

Refactoring

parent 3ea575f7
No related branches found
No related tags found
No related merge requests found
Showing
with 388 additions and 111 deletions
......@@ -26,24 +26,18 @@ import kieker.common.record.misc.KiekerMetadataRecord;
import kieker.gui.common.domain.AggregatedExecution;
import kieker.gui.common.domain.Execution;
import kieker.gui.common.domain.Record;
import kieker.gui.common.model.importer.stages.FailedAggregatedTraceFilter;
import kieker.gui.common.model.importer.stages.FailedTraceFilter;
import kieker.gui.common.model.importer.stages.FailureContainingAggregatedTraceFilter;
import kieker.gui.common.model.importer.stages.FailureContainingTraceFilter;
import kieker.gui.common.model.importer.stages.RecordSimplificator;
import kieker.gui.common.model.importer.stages.TraceAggregator;
import kieker.gui.common.model.importer.stages.TraceReconstructor;
import kieker.gui.common.model.importer.stages.ReadingComposite;
import kieker.gui.common.model.importer.stages.RecordSimplificatorComposite;
import kieker.gui.common.model.importer.stages.TraceAggregationComposite;
import kieker.gui.common.model.importer.stages.TraceReconstructionComposite;
import teetime.framework.AnalysisConfiguration;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.stage.CollectorSink;
import teetime.stage.InitialElementProducer;
import teetime.stage.MultipleInstanceOfFilter;
import teetime.stage.basic.distributor.CopyByReferenceStrategy;
import teetime.stage.basic.distributor.Distributor;
import teetime.stage.className.ClassNameRegistryRepository;
import teetime.stage.io.filesystem.Dir2RecordsFilter;
/**
* A configuration for the import and analysis of monitoring logs.
......@@ -52,88 +46,64 @@ import teetime.stage.io.filesystem.Dir2RecordsFilter;
*/
public final class ImportAnalysisConfiguration extends AnalysisConfiguration {
private final List<Record> recordsList = new Vector<>(1000);
private final List<Execution> failedTracesList = new Vector<>(1000);
private final List<Execution> failureContainingTracesList = new Vector<>(1000);
private final List<Execution> tracesList = new Vector<>(1000);
private final List<Record> records = new Vector<>(1000);
private final List<Execution> traces = new Vector<>(1000);
private final List<Execution> failedTraces = new Vector<>(1000);
private final List<Execution> failureContainingTraces = new Vector<>(1000);
private final List<AggregatedExecution> aggregatedTraces = new Vector<>(1000);
private final List<AggregatedExecution> failedAggregatedTracesList = new Vector<>(1000);
private final List<AggregatedExecution> failureContainingAggregatedTracesList = new Vector<>(1000);
private final List<AggregatedExecution> failedAggregatedTraces = new Vector<>(1000);
private final List<AggregatedExecution> failureContainingAggregatedTraces = new Vector<>(1000);
private final List<KiekerMetadataRecord> metadataRecords = new Vector<>(1000);
public ImportAnalysisConfiguration(final File importDirectory) {
// Create the stages
final InitialElementProducer<File> producer = new InitialElementProducer<>(importDirectory);
final Dir2RecordsFilter reader = new Dir2RecordsFilter(new ClassNameRegistryRepository());
final ReadingComposite reader = new ReadingComposite(importDirectory);
final MultipleInstanceOfFilter<IMonitoringRecord> typeFilter = new MultipleInstanceOfFilter<>();
final Distributor<IFlowRecord> fstDistributor = new Distributor<>(new CopyByReferenceStrategy());
final RecordSimplificator recordSimplificator = new RecordSimplificator();
final CollectorSink<Record> recordCollector = new CollectorSink<>(this.recordsList);
final TraceReconstructor traceReconstructor = new TraceReconstructor();
final Distributor<Execution> sndDistributor = new Distributor<>(new CopyByReferenceStrategy());
final CollectorSink<Execution> traceCollector = new CollectorSink<>(this.tracesList);
final FailedTraceFilter failedTraceFilter = new FailedTraceFilter();
final CollectorSink<Execution> failedTraceCollector = new CollectorSink<>(this.failedTracesList);
final FailureContainingTraceFilter failureContainingTraceFilter = new FailureContainingTraceFilter();
final CollectorSink<Execution> failureContainingTraceCollector = new CollectorSink<>(this.failureContainingTracesList);
final TraceAggregator traceAggregator = new TraceAggregator();
final CollectorSink<AggregatedExecution> aggregatedTraceCollector = new CollectorSink<>(this.aggregatedTraces);
final Distributor<IFlowRecord> distributor = new Distributor<>(new CopyByReferenceStrategy());
final RecordSimplificatorComposite recordSimplificator = new RecordSimplificatorComposite(this.records);
final TraceReconstructionComposite traceReconstruction = new TraceReconstructionComposite(this.traces, this.failedTraces, this.failureContainingTraces);
final TraceAggregationComposite traceAggregation = new TraceAggregationComposite(this.aggregatedTraces, this.failedAggregatedTraces, this.failureContainingAggregatedTraces);
final CollectorSink<KiekerMetadataRecord> metadataCollector = new CollectorSink<>(this.metadataRecords);
final FailedAggregatedTraceFilter failedAggregatedTraceFilter = new FailedAggregatedTraceFilter();
final Distributor<AggregatedExecution> thrdDistributor = new Distributor<>(new CopyByReferenceStrategy());
final CollectorSink<AggregatedExecution> failedAggregatedTraceCollector = new CollectorSink<>(this.failedAggregatedTracesList);
final FailureContainingAggregatedTraceFilter failureContainingAggregatedTraceFilter = new FailureContainingAggregatedTraceFilter();
final CollectorSink<AggregatedExecution> failureContainingAggregatedTraceCollector = new CollectorSink<>(this.failureContainingAggregatedTracesList);
// Connect the stages
final IPipeFactory pipeFactory = AnalysisConfiguration.PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
pipeFactory.create(producer.getOutputPort(), reader.getInputPort());
pipeFactory.create(reader.getOutputPort(), typeFilter.getInputPort());
pipeFactory.create(typeFilter.getOutputPortForType(IFlowRecord.class), fstDistributor.getInputPort());
pipeFactory.create(fstDistributor.getNewOutputPort(), recordSimplificator.getInputPort());
pipeFactory.create(recordSimplificator.getOutputPort(), recordCollector.getInputPort());
pipeFactory.create(fstDistributor.getNewOutputPort(), traceReconstructor.getInputPort());
pipeFactory.create(traceReconstructor.getOutputPort(), sndDistributor.getInputPort());
pipeFactory.create(sndDistributor.getNewOutputPort(), traceAggregator.getInputPort());
pipeFactory.create(sndDistributor.getNewOutputPort(), traceCollector.getInputPort());
pipeFactory.create(sndDistributor.getNewOutputPort(), failedTraceFilter.getInputPort());
pipeFactory.create(failedTraceFilter.getOutputPort(), failedTraceCollector.getInputPort());
pipeFactory.create(sndDistributor.getNewOutputPort(), failureContainingTraceFilter.getInputPort());
pipeFactory.create(failureContainingTraceFilter.getOutputPort(), failureContainingTraceCollector.getInputPort());
pipeFactory.create(traceAggregator.getOutputPort(), thrdDistributor.getInputPort());
pipeFactory.create(thrdDistributor.getNewOutputPort(), aggregatedTraceCollector.getInputPort());
pipeFactory.create(thrdDistributor.getNewOutputPort(), failedAggregatedTraceFilter.getInputPort());
pipeFactory.create(thrdDistributor.getNewOutputPort(), failureContainingAggregatedTraceFilter.getInputPort());
pipeFactory.create(failedAggregatedTraceFilter.getOutputPort(), failedAggregatedTraceCollector.getInputPort());
pipeFactory.create(failureContainingAggregatedTraceFilter.getOutputPort(), failureContainingAggregatedTraceCollector.getInputPort());
pipeFactory.create(typeFilter.getOutputPortForType(IFlowRecord.class), distributor.getInputPort());
pipeFactory.create(distributor.getNewOutputPort(), recordSimplificator.getInputPort());
pipeFactory.create(distributor.getNewOutputPort(), traceReconstruction.getInputPort());
pipeFactory.create(traceReconstruction.getOutputPort(), traceAggregation.getInputPort());
pipeFactory.create(typeFilter.getOutputPortForType(KiekerMetadataRecord.class), metadataCollector.getInputPort());
// Make sure that the producer is executed by the analysis
super.addThreadableStage(producer);
super.addThreadableStage(reader);
}
public List<Record> getRecordsList() {
return this.recordsList;
return this.records;
}
public List<Execution> getTracesList() {
return this.tracesList;
return this.traces;
}
public List<Execution> getFailedTracesList() {
return this.failedTracesList;
return this.failedTraces;
}
public List<Execution> getFailureContainingTracesList() {
return this.failureContainingTracesList;
return this.failureContainingTraces;
}
public List<AggregatedExecution> getFailedAggregatedTracesList() {
return this.failedAggregatedTracesList;
return this.failedAggregatedTraces;
}
public List<AggregatedExecution> getFailureContainingAggregatedTracesList() {
return this.failureContainingAggregatedTracesList;
return this.failureContainingAggregatedTraces;
}
public List<AggregatedExecution> getAggregatedTraces() {
......
......@@ -16,22 +16,22 @@
package kieker.gui.common.model.importer.stages;
import kieker.gui.common.domain.Execution;
import kieker.gui.common.domain.AbstractExecution;
import teetime.framework.AbstractConsumerStage;
import teetime.framework.OutputPort;
public final class FailedTraceFilter extends AbstractConsumerStage<Execution> {
final class FailedTraceFilter<T extends AbstractExecution<?>> extends AbstractConsumerStage<T> {
private final OutputPort<Execution> outputPort = super.createOutputPort();
private final OutputPort<T> outputPort = super.createOutputPort();
@Override
protected void execute(final Execution element) {
protected void execute(final T element) {
if (element.isFailed()) {
this.outputPort.send(element);
}
}
public OutputPort<Execution> getOutputPort() {
public OutputPort<T> getOutputPort() {
return this.outputPort;
}
......
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package kieker.gui.common.model.importer.stages;
import kieker.gui.common.domain.AggregatedExecution;
import teetime.framework.AbstractConsumerStage;
import teetime.framework.OutputPort;
public final class FailureContainingAggregatedTraceFilter extends AbstractConsumerStage<AggregatedExecution> {
private final OutputPort<AggregatedExecution> outputPort = super.createOutputPort();
@Override
protected void execute(final AggregatedExecution element) {
if (element.containsFailure()) {
this.outputPort.send(element);
}
}
public OutputPort<AggregatedExecution> getOutputPort() {
return this.outputPort;
}
}
......@@ -16,22 +16,22 @@
package kieker.gui.common.model.importer.stages;
import kieker.gui.common.domain.Execution;
import kieker.gui.common.domain.AbstractExecution;
import teetime.framework.AbstractConsumerStage;
import teetime.framework.OutputPort;
public final class FailureContainingTraceFilter extends AbstractConsumerStage<Execution> {
final class FailureContainingTraceFilter<T extends AbstractExecution<?>> extends AbstractConsumerStage<T> {
private final OutputPort<Execution> outputPort = super.createOutputPort();
private final OutputPort<T> outputPort = super.createOutputPort();
@Override
protected void execute(final Execution element) {
protected void execute(final T element) {
if (element.containsFailure()) {
this.outputPort.send(element);
}
}
public OutputPort<Execution> getOutputPort() {
public OutputPort<T> getOutputPort() {
return this.outputPort;
}
......
package kieker.gui.common.model.importer.stages;
import java.io.File;
import java.util.List;
import kieker.common.record.IMonitoringRecord;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.Stage;
import teetime.framework.TerminationStrategy;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.framework.signal.ISignal;
import teetime.framework.validation.InvalidPortConnection;
import teetime.stage.InitialElementProducer;
import teetime.stage.className.ClassNameRegistryRepository;
import teetime.stage.io.filesystem.Dir2RecordsFilter;
public final class ReadingComposite extends Stage {
private final InitialElementProducer<File> producer;
private final Dir2RecordsFilter reader;
public ReadingComposite(final File importDirectory) {
this.producer = new InitialElementProducer<>(importDirectory);
this.reader = new Dir2RecordsFilter(new ClassNameRegistryRepository());
final IPipeFactory pipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
pipeFactory.create(this.producer.getOutputPort(), this.reader.getInputPort());
}
@Override
protected void executeWithPorts() {
this.producer.executeWithPorts();
}
public OutputPort<IMonitoringRecord> getOutputPort() {
return this.reader.getOutputPort();
}
@Override
public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) {
this.reader.validateOutputPorts(invalidPortConnections);
}
@Override
protected void onSignal(final ISignal signal, final InputPort<?> inputPort) {
this.producer.onSignal(signal, inputPort);
}
@Override
protected TerminationStrategy getTerminationStrategy() {
return this.producer.getTerminationStrategy();
}
@Override
protected void terminate() {
this.producer.terminate();
}
@Override
protected boolean shouldBeTerminated() {
return this.producer.shouldBeTerminated();
}
}
......@@ -26,7 +26,7 @@ import teetime.framework.OutputPort;
*
* @author Nils Christian Ehmke
*/
public final class RecordSimplificator extends AbstractConsumerStage<IMonitoringRecord> {
final class RecordSimplificator extends AbstractConsumerStage<IMonitoringRecord> {
private final OutputPort<Record> outputPort = super.createOutputPort();
......
......@@ -16,23 +16,66 @@
package kieker.gui.common.model.importer.stages;
import kieker.gui.common.domain.AggregatedExecution;
import teetime.framework.AbstractConsumerStage;
import teetime.framework.OutputPort;
import java.util.List;
public final class FailedAggregatedTraceFilter extends AbstractConsumerStage<AggregatedExecution> {
import kieker.common.record.IMonitoringRecord;
import kieker.gui.common.domain.Record;
import teetime.framework.InputPort;
import teetime.framework.Stage;
import teetime.framework.TerminationStrategy;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.framework.signal.ISignal;
import teetime.framework.validation.InvalidPortConnection;
import teetime.stage.CollectorSink;
private final OutputPort<AggregatedExecution> outputPort = super.createOutputPort();
public final class RecordSimplificatorComposite extends Stage {
private final RecordSimplificator simplificator;
private final CollectorSink<Record> collector;
public RecordSimplificatorComposite(final List<Record> records) {
this.simplificator = new RecordSimplificator();
this.collector = new CollectorSink<>(records);
final IPipeFactory pipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
pipeFactory.create(this.simplificator.getOutputPort(), this.collector.getInputPort());
}
@Override
protected void executeWithPorts() {
this.simplificator.executeWithPorts();
}
public InputPort<IMonitoringRecord> getInputPort() {
return this.simplificator.getInputPort();
}
@Override
protected void execute(final AggregatedExecution element) {
if (element.isFailed()) {
this.outputPort.send(element);
}
public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) {
// No code necessary
}
public OutputPort<AggregatedExecution> getOutputPort() {
return this.outputPort;
@Override
protected void onSignal(final ISignal signal, final InputPort<?> inputPort) {
this.simplificator.onSignal(signal, inputPort);
}
@Override
protected TerminationStrategy getTerminationStrategy() {
return this.simplificator.getTerminationStrategy();
}
@Override
protected void terminate() {
this.simplificator.terminate();
}
@Override
protected boolean shouldBeTerminated() {
return this.simplificator.shouldBeTerminated();
}
}
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package kieker.gui.common.model.importer.stages;
import java.util.List;
import kieker.gui.common.domain.AggregatedExecution;
import kieker.gui.common.domain.Execution;
import teetime.framework.InputPort;
import teetime.framework.Stage;
import teetime.framework.TerminationStrategy;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.framework.signal.ISignal;
import teetime.framework.validation.InvalidPortConnection;
import teetime.stage.CollectorSink;
import teetime.stage.basic.distributor.CopyByReferenceStrategy;
import teetime.stage.basic.distributor.Distributor;
public final class TraceAggregationComposite extends Stage {
private final TraceAggregator aggregator;
private final Distributor<AggregatedExecution> distributor;
private final FailedTraceFilter<AggregatedExecution> failedTraceFilter;
private final FailureContainingTraceFilter<AggregatedExecution> failureContainingTraceFilter;
private final CollectorSink<AggregatedExecution> tracesCollector;
private final CollectorSink<AggregatedExecution> failedTracesCollector;
private final CollectorSink<AggregatedExecution> failureContainingTracesCollector;
public TraceAggregationComposite(final List<AggregatedExecution> traces, final List<AggregatedExecution> failedTraces, final List<AggregatedExecution> failureContainingTraces) {
this.aggregator = new TraceAggregator();
this.distributor = new Distributor<>(new CopyByReferenceStrategy());
this.failedTraceFilter = new FailedTraceFilter<>();
this.failureContainingTraceFilter = new FailureContainingTraceFilter<>();
this.tracesCollector = new CollectorSink<>(traces);
this.failedTracesCollector = new CollectorSink<>(failedTraces);
this.failureContainingTracesCollector = new CollectorSink<>(failureContainingTraces);
final IPipeFactory pipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
pipeFactory.create(this.aggregator.getOutputPort(), this.distributor.getInputPort());
pipeFactory.create(this.distributor.getNewOutputPort(), this.tracesCollector.getInputPort());
pipeFactory.create(this.distributor.getNewOutputPort(), this.failedTraceFilter.getInputPort());
pipeFactory.create(this.distributor.getNewOutputPort(), this.failureContainingTraceFilter.getInputPort());
pipeFactory.create(this.failedTraceFilter.getOutputPort(), this.failedTracesCollector.getInputPort());
pipeFactory.create(this.failureContainingTraceFilter.getOutputPort(), this.failureContainingTracesCollector.getInputPort());
}
@Override
protected void executeWithPorts() {
this.aggregator.executeWithPorts();
}
public InputPort<Execution> getInputPort() {
return this.aggregator.getInputPort();
}
@Override
public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) {
// No code necessary
}
@Override
protected void onSignal(final ISignal signal, final InputPort<?> inputPort) {
this.aggregator.onSignal(signal, inputPort);
}
@Override
protected TerminationStrategy getTerminationStrategy() {
return this.aggregator.getTerminationStrategy();
}
@Override
protected void terminate() {
this.aggregator.terminate();
}
@Override
protected boolean shouldBeTerminated() {
return this.aggregator.shouldBeTerminated();
}
}
......@@ -29,7 +29,7 @@ import teetime.framework.OutputPort;
*
* @author Nils Christian Ehmke
*/
public final class TraceAggregator extends AbstractConsumerStage<Execution> {
final class TraceAggregator extends AbstractConsumerStage<Execution> {
private final OutputPort<AggregatedExecution> outputPort = super.createOutputPort();
private final Map<Execution, AggregatedExecution> aggregationMap = new HashMap<>();
......
package kieker.gui.common.model.importer.stages;
import java.util.List;
import kieker.common.record.flow.IFlowRecord;
import kieker.gui.common.domain.Execution;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.Stage;
import teetime.framework.TerminationStrategy;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.framework.signal.ISignal;
import teetime.framework.validation.InvalidPortConnection;
import teetime.stage.CollectorSink;
import teetime.stage.basic.distributor.CopyByReferenceStrategy;
import teetime.stage.basic.distributor.Distributor;
public final class TraceReconstructionComposite extends Stage {
private final TraceReconstructor reconstructor;
private final Distributor<Execution> distributor;
private final FailedTraceFilter<Execution> failedTraceFilter;
private final FailureContainingTraceFilter<Execution> failureContainingTraceFilter;
private final CollectorSink<Execution> tracesCollector;
private final CollectorSink<Execution> failedTracesCollector;
private final CollectorSink<Execution> failureContainingTracescollector;
private final OutputPort<Execution> outputPort;
public TraceReconstructionComposite(final List<Execution> traces, final List<Execution> failedTraces, final List<Execution> failureContainingTraces) {
this.reconstructor = new TraceReconstructor();
this.distributor = new Distributor<>(new CopyByReferenceStrategy());
this.failedTraceFilter = new FailedTraceFilter<>();
this.failureContainingTraceFilter = new FailureContainingTraceFilter<>();
this.tracesCollector = new CollectorSink<>(traces);
this.failedTracesCollector = new CollectorSink<>(failedTraces);
this.failureContainingTracescollector = new CollectorSink<>(failureContainingTraces);
this.outputPort = this.distributor.getNewOutputPort();
final IPipeFactory pipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
pipeFactory.create(this.reconstructor.getOutputPort(), this.distributor.getInputPort());
pipeFactory.create(this.distributor.getNewOutputPort(), this.tracesCollector.getInputPort());
pipeFactory.create(this.distributor.getNewOutputPort(), this.failedTraceFilter.getInputPort());
pipeFactory.create(this.distributor.getNewOutputPort(), this.failureContainingTraceFilter.getInputPort());
pipeFactory.create(this.failedTraceFilter.getOutputPort(), this.failedTracesCollector.getInputPort());
pipeFactory.create(this.failureContainingTraceFilter.getOutputPort(), this.failureContainingTracescollector.getInputPort());
}
@Override
protected void executeWithPorts() {
this.reconstructor.executeWithPorts();
}
public InputPort<IFlowRecord> getInputPort() {
return this.reconstructor.getInputPort();
}
public OutputPort<Execution> getOutputPort() {
return this.outputPort;
}
@Override
public void validateOutputPorts(final List<InvalidPortConnection> invalidPortConnections) {
this.distributor.validateOutputPorts(invalidPortConnections);
}
@Override
protected void onSignal(final ISignal signal, final InputPort<?> inputPort) {
this.reconstructor.onSignal(signal, inputPort);
}
@Override
protected TerminationStrategy getTerminationStrategy() {
return this.reconstructor.getTerminationStrategy();
}
@Override
protected void terminate() {
this.reconstructor.terminate();
}
@Override
protected boolean shouldBeTerminated() {
return this.reconstructor.shouldBeTerminated();
}
}
......@@ -37,7 +37,7 @@ import teetime.framework.OutputPort;
*
* @author Nils Christian Ehmke
*/
public final class TraceReconstructor extends AbstractConsumerStage<IFlowRecord> {
final class TraceReconstructor extends AbstractConsumerStage<IFlowRecord> {
private final OutputPort<Execution> outputPort = super.createOutputPort();
private final Map<Long, TraceBuffer> traceBuffers = new HashMap<>();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment