From 45bf29f95ded34bb1026915392faed1aa51a6d2a Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Mon, 3 Nov 2014 22:02:47 +0100 Subject: [PATCH] TCPReader now uses the record factory concept; updated dependency to kieker 1.11-SNAPSHOT --- pom.xml | 4 +- .../{TCPReader.java => TcpReader.java} | 158 +++++------------- .../examples/kiekerdays/TcpTraceLogging.java | 4 +- .../kiekerdays/TcpTraceReconstruction.java | 10 +- .../kiekerdays/TcpTraceReduction.java | 10 +- .../TcpTraceLoggingExtAnalysis.java | 6 +- .../TcpTraceReconstructionAnalysis.java | 8 +- ...ctionAnalysisWithThreadsConfiguration.java | 10 +- .../TcpTraceReductionAnalysisWithThreads.java | 12 +- 9 files changed, 70 insertions(+), 152 deletions(-) rename src/main/java/teetime/stage/io/network/{TCPReader.java => TcpReader.java} (50%) diff --git a/pom.xml b/pom.xml index ede95fa4..dded5d43 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ <repositories> <repository> - <!-- kieker:1.10-SNAPSHOT --> + <!-- for SNAPSHOT versions--> <id>sonatype.oss.snapshots</id> <url>https://oss.sonatype.org/content/repositories/snapshots/</url> </repository> @@ -53,7 +53,7 @@ <dependency> <groupId>net.kieker-monitoring</groupId> <artifactId>kieker</artifactId> - <version>1.10-SNAPSHOT</version> + <version>1.11-SNAPSHOT</version> </dependency> <dependency> <groupId>org.slf4j</groupId> diff --git a/src/main/java/teetime/stage/io/network/TCPReader.java b/src/main/java/teetime/stage/io/network/TcpReader.java similarity index 50% rename from src/main/java/teetime/stage/io/network/TCPReader.java rename to src/main/java/teetime/stage/io/network/TcpReader.java index d4235192..45b87c70 100644 --- a/src/main/java/teetime/stage/io/network/TCPReader.java +++ b/src/main/java/teetime/stage/io/network/TcpReader.java @@ -23,19 +23,13 @@ import java.nio.channels.ClosedByInterruptException; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; -import teetime.framework.ProducerStage; -import teetime.framework.signal.OnStartingException; +import teetime.stage.io.AbstractTcpReader; import kieker.common.exception.MonitoringRecordException; import kieker.common.logging.Log; import kieker.common.logging.LogFactory; -import kieker.common.record.AbstractMonitoringRecord; import kieker.common.record.IMonitoringRecord; -import kieker.common.record.flow.trace.TraceMetadata; -import kieker.common.record.flow.trace.operation.AfterOperationEvent; -import kieker.common.record.flow.trace.operation.BeforeOperationEvent; -import kieker.common.record.flow.trace.operation.CallOperationEvent; -import kieker.common.record.misc.KiekerMetadataRecord; +import kieker.common.record.factory.IRecordFactory; import kieker.common.record.misc.RegistryRecord; import kieker.common.util.registry.ILookup; import kieker.common.util.registry.Lookup; @@ -43,147 +37,62 @@ import kieker.common.util.registry.Lookup; /** * This is a reader which reads the records from a TCP port. * - * @author Jan Waller, Nils Christian Ehmke + * @author Jan Waller, Nils Christian Ehmke, Christian Wulf * - * @since 1.10 */ -public class TCPReader extends ProducerStage<IMonitoringRecord> { - - private static final int MESSAGE_BUFFER_SIZE = 65535; +public class TcpReader extends AbstractTcpReader<IMonitoringRecord> { + private final CachedRecordFactoryRepository recordFactories; // BETTER use a non thread-safe implementation to increase performance. A thread-safe version is not necessary. private final ILookup<String> stringRegistry = new Lookup<String>(); - private int port1 = 10133; private int port2 = 10134; - private TCPStringReader tcpStringReader; - private RecordFactory recordFactory; - - public final int getPort1() { - return this.port1; - } - - public final void setPort1(final int port1) { - this.port1 = port1; - } - - public final int getPort2() { - return this.port2; + /** + * Default constructor with <code>port=10133</code> and <code>bufferCapacity=65535</code> + */ + public TcpReader() { + this(10133, 65535); } - public final void setPort2(final int port2) { - this.port2 = port2; + public TcpReader(final int port, final int bufferCapacity) { + super(port, bufferCapacity); + this.recordFactories = new CachedRecordFactoryRepository(new RecordFactoryRepository()); } @Override public void onStarting() throws OnStartingException { super.onStarting(); - this.recordFactory = new RecordFactory(); - this.register(); - this.tcpStringReader = new TCPStringReader(this.port2, this.stringRegistry); this.tcpStringReader.start(); } - private void register() { - this.recordFactory.register(TraceMetadata.class.getCanonicalName(), new IRecordFactoryMethod() { - @Override - public IMonitoringRecord create(final ByteBuffer buffer, final ILookup<String> stringRegistry) { - return new TraceMetadata(buffer, stringRegistry); - } - }); - - this.recordFactory.register(KiekerMetadataRecord.class.getCanonicalName(), new IRecordFactoryMethod() { - @Override - public IMonitoringRecord create(final ByteBuffer buffer, final ILookup<String> stringRegistry) { - return new KiekerMetadataRecord(buffer, stringRegistry); - } - }); - - this.recordFactory.register(BeforeOperationEvent.class.getCanonicalName(), new IRecordFactoryMethod() { - @Override - public IMonitoringRecord create(final ByteBuffer buffer, final ILookup<String> stringRegistry) { - return new BeforeOperationEvent(buffer, stringRegistry); - } - }); - - this.recordFactory.register(AfterOperationEvent.class.getCanonicalName(), new IRecordFactoryMethod() { - @Override - public IMonitoringRecord create(final ByteBuffer buffer, final ILookup<String> stringRegistry) { - return new AfterOperationEvent(buffer, stringRegistry); - } - }); - - this.recordFactory.register(CallOperationEvent.class.getCanonicalName(), new IRecordFactoryMethod() { - @Override - public IMonitoringRecord create(final ByteBuffer buffer, final ILookup<String> stringRegistry) { - return new CallOperationEvent(buffer, stringRegistry); - } - }); - } - @Override - protected void execute() { - ServerSocketChannel serversocket = null; - try { - serversocket = ServerSocketChannel.open(); - serversocket.socket().bind(new InetSocketAddress(this.port1)); - if (super.logger.isDebugEnabled()) { - super.logger.debug("Listening on port " + this.port1); - } - // BEGIN also loop this one? - final SocketChannel socketChannel = serversocket.accept(); - final ByteBuffer buffer = ByteBuffer.allocateDirect(MESSAGE_BUFFER_SIZE); - while (socketChannel.read(buffer) != -1) { - buffer.flip(); - // System.out.println("Reading, remaining:" + buffer.remaining()); - try { - while (buffer.hasRemaining()) { - buffer.mark(); - this.createAndSendRecord(buffer); - } - buffer.clear(); - } catch (final BufferUnderflowException ex) { - buffer.reset(); - // System.out.println("Underflow, remaining:" + buffer.remaining()); - buffer.compact(); - } - } - // System.out.println("Channel closing..."); - socketChannel.close(); - // END also loop this one? - } catch (final IOException ex) { - super.logger.error("Error while reading", ex); - } finally { - if (null != serversocket) { - try { - serversocket.close(); - } catch (final IOException e) { - if (super.logger.isDebugEnabled()) { - super.logger.debug("Failed to close TCP connection!", e); - } - } - } - - this.terminate(); - this.tcpStringReader.terminate(); - } - } - - private final void createAndSendRecord(final ByteBuffer buffer) { - final int clazzid = buffer.getInt(); + protected final void read(final ByteBuffer buffer) { + final int clazzId = buffer.getInt(); final long loggingTimestamp = buffer.getLong(); + + final String recordClassName = this.stringRegistry.get(clazzId); try { - // record = this.recordFactory.create(clazzid, buffer, this.stringRegistry); - final IMonitoringRecord record = AbstractMonitoringRecord.createFromByteBuffer(clazzid, buffer, this.stringRegistry); + // final IMonitoringRecord record = AbstractMonitoringRecord.createFromByteBuffer(clazzId, buffer, this.stringRegistry); + // record.setLoggingTimestamp(loggingTimestamp); + + final IRecordFactory<? extends IMonitoringRecord> recordFactory = this.recordFactories.get(recordClassName); + record = recordFactory.create(buffer, this.stringRegistry); record.setLoggingTimestamp(loggingTimestamp); + this.send(this.outputPort, record); } catch (final MonitoringRecordException ex) { super.logger.error("Failed to create record.", ex); } } + @Override + public void onTerminating() { + super.onTerminating(); + this.tcpStringReader.terminate(); + } + /** * * @author Jan Waller @@ -256,4 +165,13 @@ public class TCPReader extends ProducerStage<IMonitoringRecord> { } } } + + public int getPort2() { + return this.port2; + } + + public void setPort2(final int port2) { + this.port2 = port2; + } + } diff --git a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceLogging.java b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceLogging.java index 37e5fe5d..3820209e 100644 --- a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceLogging.java +++ b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceLogging.java @@ -2,7 +2,7 @@ package teetime.examples.kiekerdays; import teetime.framework.HeadStage; import teetime.framework.RunnableStage; -import teetime.stage.io.network.TCPReader; +import teetime.stage.io.network.TcpReader; public class TcpTraceLogging { @@ -26,7 +26,7 @@ public class TcpTraceLogging { private HeadStage buildTcpPipeline() { // TCPReaderSink tcpReader = new TCPReaderSink(); - TCPReader tcpReader = new TCPReader(); + TcpReader tcpReader = new TcpReader(); return tcpReader; } diff --git a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReconstruction.java b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReconstruction.java index 4d1269d9..db4e87f8 100644 --- a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReconstruction.java +++ b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReconstruction.java @@ -13,7 +13,7 @@ import teetime.stage.InstanceOfFilter; import teetime.stage.Relay; import teetime.stage.basic.Sink; import teetime.stage.basic.distributor.Distributor; -import teetime.stage.io.network.TCPReader; +import teetime.stage.io.network.TcpReader; import teetime.stage.trace.traceReconstruction.TraceReconstructionFilter; import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault; import teetime.util.concurrent.hashmap.TraceBuffer; @@ -38,7 +38,7 @@ public class TcpTraceReconstruction { private int numWorkerThreads; public void init() { - HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); + HeadPipeline<TcpReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads); @@ -50,14 +50,14 @@ public class TcpTraceReconstruction { } } - private HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> buildTcpPipeline() { - TCPReader tcpReader = new TCPReader(); + private HeadPipeline<TcpReader, Distributor<IMonitoringRecord>> buildTcpPipeline() { + TcpReader tcpReader = new TcpReader(); Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>(); SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort()); // create and configure pipeline - HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> pipeline = new HeadPipeline<TCPReader, Distributor<IMonitoringRecord>>(); + HeadPipeline<TcpReader, Distributor<IMonitoringRecord>> pipeline = new HeadPipeline<TcpReader, Distributor<IMonitoringRecord>>(); pipeline.setFirstStage(tcpReader); pipeline.setLastStage(distributor); return pipeline; diff --git a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReduction.java b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReduction.java index 9db6f2d5..d50830d1 100644 --- a/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReduction.java +++ b/src/performancetest/java/teetime/examples/kiekerdays/TcpTraceReduction.java @@ -16,7 +16,7 @@ import teetime.stage.InstanceOfFilter; import teetime.stage.Relay; import teetime.stage.basic.Sink; import teetime.stage.basic.distributor.Distributor; -import teetime.stage.io.network.TCPReader; +import teetime.stage.io.network.TcpReader; import teetime.stage.trace.traceReconstruction.TraceReconstructionFilter; import teetime.stage.trace.traceReduction.TraceAggregationBuffer; import teetime.stage.trace.traceReduction.TraceComperator; @@ -46,7 +46,7 @@ public class TcpTraceReduction { private int numWorkerThreads; public void init() { - HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); + HeadPipeline<TcpReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); HeadPipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(5000); @@ -61,14 +61,14 @@ public class TcpTraceReduction { } } - private HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> buildTcpPipeline() { - TCPReader tcpReader = new TCPReader(); + private HeadPipeline<TcpReader, Distributor<IMonitoringRecord>> buildTcpPipeline() { + TcpReader tcpReader = new TcpReader(); Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>(); SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort()); // create and configure pipeline - HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> pipeline = new HeadPipeline<TCPReader, Distributor<IMonitoringRecord>>(); + HeadPipeline<TcpReader, Distributor<IMonitoringRecord>> pipeline = new HeadPipeline<TcpReader, Distributor<IMonitoringRecord>>(); pipeline.setFirstStage(tcpReader); pipeline.setLastStage(distributor); return pipeline; diff --git a/src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysis.java b/src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysis.java index 3d090b71..8808f3ee 100644 --- a/src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysis.java +++ b/src/performancetest/java/teetime/examples/traceReading/TcpTraceLoggingExtAnalysis.java @@ -11,7 +11,7 @@ import teetime.stage.Counter; import teetime.stage.ElementThroughputMeasuringStage; import teetime.stage.basic.Sink; import teetime.stage.basic.distributor.Distributor; -import teetime.stage.io.network.TCPReader; +import teetime.stage.io.network.TcpReader; import kieker.common.record.IMonitoringRecord; @@ -39,7 +39,7 @@ public class TcpTraceLoggingExtAnalysis { } private HeadPipeline<?, ?> buildTcpPipeline(final Distributor<Long> previousClockStage) { - TCPReader tcpReader = new TCPReader(); + TcpReader tcpReader = new TcpReader(); this.recordCounter = new Counter<IMonitoringRecord>(); this.recordThroughputStage = new ElementThroughputMeasuringStage<IMonitoringRecord>(); Sink<IMonitoringRecord> endStage = new Sink<IMonitoringRecord>(); @@ -52,7 +52,7 @@ public class TcpTraceLoggingExtAnalysis { SpScPipe.connect(previousClockStage.getNewOutputPort(), this.recordThroughputStage.getTriggerInputPort(), 10); // create and configure pipeline - HeadPipeline<TCPReader, Sink<IMonitoringRecord>> pipeline = new HeadPipeline<TCPReader, Sink<IMonitoringRecord>>(); + HeadPipeline<TcpReader, Sink<IMonitoringRecord>> pipeline = new HeadPipeline<TcpReader, Sink<IMonitoringRecord>>(); pipeline.setFirstStage(tcpReader); pipeline.setLastStage(endStage); return pipeline; diff --git a/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java b/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java index b3099a82..b2954cf4 100644 --- a/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java +++ b/src/performancetest/java/teetime/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java @@ -13,7 +13,7 @@ import teetime.stage.ElementThroughputMeasuringStage; import teetime.stage.InstanceOfFilter; import teetime.stage.basic.Sink; import teetime.stage.basic.distributor.Distributor; -import teetime.stage.io.network.TCPReader; +import teetime.stage.io.network.TcpReader; import teetime.stage.trace.traceReconstruction.TraceReconstructionFilter; import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault; import teetime.util.concurrent.hashmap.TraceBuffer; @@ -65,9 +65,9 @@ public class TcpTraceReconstructionAnalysis { return pipeline; } - private HeadPipeline<TCPReader, Sink<TraceEventRecords>> buildPipeline(final Distributor<Long> clockStage, final Distributor<Long> clock2Stage) { + private HeadPipeline<TcpReader, Sink<TraceEventRecords>> buildPipeline(final Distributor<Long> clockStage, final Distributor<Long> clock2Stage) { // create stages - TCPReader tcpReader = new TCPReader(); + TcpReader tcpReader = new TcpReader(); this.recordCounter = new Counter<IMonitoringRecord>(); final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>( IFlowRecord.class); @@ -92,7 +92,7 @@ public class TcpTraceReconstructionAnalysis { SpScPipe.connect(clock2Stage.getNewOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 10); // create and configure pipeline - HeadPipeline<TCPReader, Sink<TraceEventRecords>> pipeline = new HeadPipeline<TCPReader, Sink<TraceEventRecords>>(); + HeadPipeline<TcpReader, Sink<TraceEventRecords>> pipeline = new HeadPipeline<TcpReader, Sink<TraceEventRecords>>(); pipeline.setFirstStage(tcpReader); pipeline.setLastStage(endStage); return pipeline; diff --git a/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java b/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java index 253a861f..660e8239 100644 --- a/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java +++ b/src/performancetest/java/teetime/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreadsConfiguration.java @@ -20,7 +20,7 @@ import teetime.stage.InstanceOfFilter; import teetime.stage.Relay; import teetime.stage.basic.Sink; import teetime.stage.basic.distributor.Distributor; -import teetime.stage.io.network.TCPReader; +import teetime.stage.io.network.TcpReader; import teetime.stage.trace.traceReconstruction.TraceReconstructionFilter; import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault; import teetime.util.concurrent.hashmap.TraceBuffer; @@ -72,7 +72,7 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal } public void buildConfiguration() { - final HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); + final HeadPipeline<TcpReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); this.getFiniteProducerStages().add(tcpPipeline); final HeadPipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000); @@ -89,14 +89,14 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal } } - private HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> buildTcpPipeline() { - TCPReader tcpReader = new TCPReader(); + private HeadPipeline<TcpReader, Distributor<IMonitoringRecord>> buildTcpPipeline() { + TcpReader tcpReader = new TcpReader(); Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>(); SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort()); // create and configure pipeline - HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> pipeline = new HeadPipeline<TCPReader, Distributor<IMonitoringRecord>>("TCP reader pipeline"); + HeadPipeline<TcpReader, Distributor<IMonitoringRecord>> pipeline = new HeadPipeline<TcpReader, Distributor<IMonitoringRecord>>("TCP reader pipeline"); pipeline.setFirstStage(tcpReader); pipeline.setLastStage(distributor); return pipeline; diff --git a/src/performancetest/java/teetime/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java b/src/performancetest/java/teetime/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java index 598b92e4..e74fba89 100644 --- a/src/performancetest/java/teetime/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java +++ b/src/performancetest/java/teetime/examples/traceReductionWithThreads/TcpTraceReductionAnalysisWithThreads.java @@ -22,7 +22,7 @@ import teetime.stage.InstanceOfFilter; import teetime.stage.Relay; import teetime.stage.basic.Sink; import teetime.stage.basic.distributor.Distributor; -import teetime.stage.io.network.TCPReader; +import teetime.stage.io.network.TcpReader; import teetime.stage.trace.traceReconstruction.TraceReconstructionFilter; import teetime.stage.trace.traceReduction.TraceAggregationBuffer; import teetime.stage.trace.traceReduction.TraceComperator; @@ -52,7 +52,7 @@ public class TcpTraceReductionAnalysisWithThreads { private int numWorkerThreads; public void init() { - HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); + HeadPipeline<TcpReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline(); this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); HeadPipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000); @@ -70,14 +70,14 @@ public class TcpTraceReductionAnalysisWithThreads { } } - private HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> buildTcpPipeline() { - TCPReader tcpReader = new TCPReader(); + private HeadPipeline<TcpReader, Distributor<IMonitoringRecord>> buildTcpPipeline() { + TcpReader tcpReader = new TcpReader(); Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>(); SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort()); // create and configure pipeline - HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> pipeline = new HeadPipeline<TCPReader, Distributor<IMonitoringRecord>>(); + HeadPipeline<TcpReader, Distributor<IMonitoringRecord>> pipeline = new HeadPipeline<TcpReader, Distributor<IMonitoringRecord>>(); pipeline.setFirstStage(tcpReader); pipeline.setLastStage(distributor); return pipeline; @@ -153,7 +153,7 @@ public class TcpTraceReductionAnalysisWithThreads { } private HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> buildPipeline( - final HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> tcpReaderPipeline, + final HeadPipeline<TcpReader, Distributor<IMonitoringRecord>> tcpReaderPipeline, final HeadPipeline<Clock, Distributor<Long>> clockStage, final HeadPipeline<Clock, Distributor<Long>> clock2Stage) { // create stages -- GitLab