Skip to content
Snippets Groups Projects
Commit 5da1605b authored by Lars Erik Blümke's avatar Lars Erik Blümke
Browse files

replaced MultipleInstanceOfFilter by several single instance of filters...

replaced MultipleInstanceOfFilter by several single instance of filters because MultipleInstanceOfFilter passes records multiple times when several types fit
parent 736b740f
No related branches found
No related tags found
No related merge requests found
......@@ -12,14 +12,17 @@ import kieker.common.record.flow.trace.TraceMetadata;
import teetime.framework.CompositeStage;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.stage.MultipleInstanceOfFilter;
import teetime.stage.InstanceOfFilter;
import teetime.stage.basic.merger.Merger;
public class TimestampStage extends CompositeStage {
private final InputPort<IMonitoringRecord> monitoringRecordsCombinedInputPort;
private final MultipleInstanceOfFilter<IMonitoringRecord> recordTypeFilter;
private final InstanceOfFilter<IMonitoringRecord, OperationExecutionRecord> instanceOfOperationExecutionRecordFilter;
private final InstanceOfFilter<IMonitoringRecord, TraceMetadata> instanceOfTraceMetadataFilter;
private final InstanceOfFilter<IMonitoringRecord, IEventRecord> instanceOfIEventFilter;
private final InstanceOfFilter<IMonitoringRecord, IMonitoringRecord> instanceOfIMonitoringRecord;
private final EventRecordTimestampStage eventRecordTimestampStage;
private final OperationExecutionRecordTimestampStage operationExecutionRecordTimestampStage;
......@@ -42,7 +45,11 @@ public class TimestampStage extends CompositeStage {
*/
public TimestampStage(final long ignoreBeforeTimestamp, final long ignoreAfterTimestamp) {
this.recordTypeFilter = new MultipleInstanceOfFilter<>();
// Instantiate internal stages
this.instanceOfOperationExecutionRecordFilter = new InstanceOfFilter<>(OperationExecutionRecord.class);
this.instanceOfTraceMetadataFilter = new InstanceOfFilter<>(TraceMetadata.class);
this.instanceOfIEventFilter = new InstanceOfFilter<>(IEventRecord.class);
this.instanceOfIMonitoringRecord = new InstanceOfFilter<>(IMonitoringRecord.class);
this.eventRecordTimestampStage = new EventRecordTimestampStage(ignoreBeforeTimestamp, ignoreAfterTimestamp);
this.operationExecutionRecordTimestampStage = new OperationExecutionRecordTimestampStage(ignoreBeforeTimestamp, ignoreAfterTimestamp);
......@@ -52,15 +59,19 @@ public class TimestampStage extends CompositeStage {
this.recordsWithinTimePeriodMerger = new Merger<>();
this.recordsOutsideTimePeriodMerger = new Merger<>();
this.monitoringRecordsCombinedInputPort = recordTypeFilter.getInputPort();
// Define input and output ports of composite stage
this.monitoringRecordsCombinedInputPort = instanceOfOperationExecutionRecordFilter.getInputPort();
this.recordsWithinTimePeriodOutputPort = recordsWithinTimePeriodMerger.getOutputPort();
this.recordsOutsideTimePeriodOutputPort = recordsOutsideTimePeriodMerger.getOutputPort();
// Connect MultipleInstanceOfFilter with specific TimestampStages
connectPorts(this.recordTypeFilter.getOutputPortForType(IEventRecord.class), this.eventRecordTimestampStage.getInputPort());
connectPorts(this.recordTypeFilter.getOutputPortForType(OperationExecutionRecord.class), this.operationExecutionRecordTimestampStage.getInputPort());
connectPorts(this.recordTypeFilter.getOutputPortForType(TraceMetadata.class), this.traceMetadataTimestampStage.getInputPort());
connectPorts(this.recordTypeFilter.getOutputPortForType(IMonitoringRecord.class), this.monitoringRecordTimestampStage.getInputPort());
// Connect InstanceOfFilters with specific TimestampStages and each other
connectPorts(this.instanceOfOperationExecutionRecordFilter.getMatchedOutputPort(), this.operationExecutionRecordTimestampStage.getInputPort());
connectPorts(this.instanceOfOperationExecutionRecordFilter.getMismatchedOutputPort(), this.instanceOfTraceMetadataFilter.getInputPort());
connectPorts(this.instanceOfTraceMetadataFilter.getMatchedOutputPort(), this.traceMetadataTimestampStage.getInputPort());
connectPorts(this.instanceOfTraceMetadataFilter.getMismatchedOutputPort(), this.instanceOfIEventFilter.getInputPort());
connectPorts(this.instanceOfIEventFilter.getMatchedOutputPort(), this.eventRecordTimestampStage.getInputPort());
connectPorts(this.instanceOfIEventFilter.getMismatchedOutputPort(), this.instanceOfIMonitoringRecord.getInputPort());
connectPorts(this.instanceOfIMonitoringRecord.getMatchedOutputPort(), this.monitoringRecordTimestampStage.getInputPort());
// Connect specific TimestampStages with Mergers
connectPorts(this.eventRecordTimestampStage.getRecordWithinTimePeriodOutputPort(), this.recordsWithinTimePeriodMerger.getNewInputPort());
......@@ -76,7 +87,7 @@ public class TimestampStage extends CompositeStage {
/**
* Returns the input port for the records.
*
*
* @return The input port.
*/
public InputPort<IMonitoringRecord> getMonitoringRecordsCombinedInputPort() {
......@@ -85,7 +96,7 @@ public class TimestampStage extends CompositeStage {
/**
* Returns the output port for the records whose timestamps are within the defined time period.
*
*
* @return The recordsWithinTimePeriod output port.
*/
public OutputPort<IMonitoringRecord> getRecordsWithinTimePeriodOutputPort() {
......@@ -94,7 +105,7 @@ public class TimestampStage extends CompositeStage {
/**
* Returns the output port for the records whose timestamps are outside the defined time period.
*
*
* @return The recordsOutsideTimePeriod output port.
*/
public OutputPort<IMonitoringRecord> getRecordsOutsideTimePeriodOutputPort() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment