Skip to content
Snippets Groups Projects
Commit 45bf29f9 authored by Christian Wulf's avatar Christian Wulf
Browse files

TCPReader now uses the record factory concept;

updated dependency to kieker 1.11-SNAPSHOT
parent 05f218b6
No related branches found
No related tags found
No related merge requests found
Showing
with 70 additions and 152 deletions
......@@ -23,7 +23,7 @@
<repositories>
<repository>
<!-- kieker:1.10-SNAPSHOT -->
<!-- for SNAPSHOT versions-->
<id>sonatype.oss.snapshots</id>
<url>https://oss.sonatype.org/content/repositories/snapshots/</url>
</repository>
......@@ -53,7 +53,7 @@
<dependency>
<groupId>net.kieker-monitoring</groupId>
<artifactId>kieker</artifactId>
<version>1.10-SNAPSHOT</version>
<version>1.11-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
......
......@@ -23,19 +23,13 @@ import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import teetime.framework.ProducerStage;
import teetime.framework.signal.OnStartingException;
import teetime.stage.io.AbstractTcpReader;
import kieker.common.exception.MonitoringRecordException;
import kieker.common.logging.Log;
import kieker.common.logging.LogFactory;
import kieker.common.record.AbstractMonitoringRecord;
import kieker.common.record.IMonitoringRecord;
import kieker.common.record.flow.trace.TraceMetadata;
import kieker.common.record.flow.trace.operation.AfterOperationEvent;
import kieker.common.record.flow.trace.operation.BeforeOperationEvent;
import kieker.common.record.flow.trace.operation.CallOperationEvent;
import kieker.common.record.misc.KiekerMetadataRecord;
import kieker.common.record.factory.IRecordFactory;
import kieker.common.record.misc.RegistryRecord;
import kieker.common.util.registry.ILookup;
import kieker.common.util.registry.Lookup;
......@@ -43,147 +37,62 @@ import kieker.common.util.registry.Lookup;
/**
* This is a reader which reads the records from a TCP port.
*
* @author Jan Waller, Nils Christian Ehmke
* @author Jan Waller, Nils Christian Ehmke, Christian Wulf
*
* @since 1.10
*/
public class TCPReader extends ProducerStage<IMonitoringRecord> {
private static final int MESSAGE_BUFFER_SIZE = 65535;
public class TcpReader extends AbstractTcpReader<IMonitoringRecord> {
private final CachedRecordFactoryRepository recordFactories;
// BETTER use a non thread-safe implementation to increase performance. A thread-safe version is not necessary.
private final ILookup<String> stringRegistry = new Lookup<String>();
private int port1 = 10133;
private int port2 = 10134;
private TCPStringReader tcpStringReader;
private RecordFactory recordFactory;
public final int getPort1() {
return this.port1;
}
public final void setPort1(final int port1) {
this.port1 = port1;
}
public final int getPort2() {
return this.port2;
/**
* Default constructor with <code>port=10133</code> and <code>bufferCapacity=65535</code>
*/
public TcpReader() {
this(10133, 65535);
}
public final void setPort2(final int port2) {
this.port2 = port2;
public TcpReader(final int port, final int bufferCapacity) {
super(port, bufferCapacity);
this.recordFactories = new CachedRecordFactoryRepository(new RecordFactoryRepository());
}
@Override
public void onStarting() throws OnStartingException {
super.onStarting();
this.recordFactory = new RecordFactory();
this.register();
this.tcpStringReader = new TCPStringReader(this.port2, this.stringRegistry);
this.tcpStringReader.start();
}
private void register() {
this.recordFactory.register(TraceMetadata.class.getCanonicalName(), new IRecordFactoryMethod() {
@Override
public IMonitoringRecord create(final ByteBuffer buffer, final ILookup<String> stringRegistry) {
return new TraceMetadata(buffer, stringRegistry);
}
});
this.recordFactory.register(KiekerMetadataRecord.class.getCanonicalName(), new IRecordFactoryMethod() {
@Override
public IMonitoringRecord create(final ByteBuffer buffer, final ILookup<String> stringRegistry) {
return new KiekerMetadataRecord(buffer, stringRegistry);
}
});
this.recordFactory.register(BeforeOperationEvent.class.getCanonicalName(), new IRecordFactoryMethod() {
@Override
public IMonitoringRecord create(final ByteBuffer buffer, final ILookup<String> stringRegistry) {
return new BeforeOperationEvent(buffer, stringRegistry);
}
});
this.recordFactory.register(AfterOperationEvent.class.getCanonicalName(), new IRecordFactoryMethod() {
@Override
public IMonitoringRecord create(final ByteBuffer buffer, final ILookup<String> stringRegistry) {
return new AfterOperationEvent(buffer, stringRegistry);
}
});
this.recordFactory.register(CallOperationEvent.class.getCanonicalName(), new IRecordFactoryMethod() {
@Override
public IMonitoringRecord create(final ByteBuffer buffer, final ILookup<String> stringRegistry) {
return new CallOperationEvent(buffer, stringRegistry);
}
});
}
@Override
protected void execute() {
ServerSocketChannel serversocket = null;
try {
serversocket = ServerSocketChannel.open();
serversocket.socket().bind(new InetSocketAddress(this.port1));
if (super.logger.isDebugEnabled()) {
super.logger.debug("Listening on port " + this.port1);
}
// BEGIN also loop this one?
final SocketChannel socketChannel = serversocket.accept();
final ByteBuffer buffer = ByteBuffer.allocateDirect(MESSAGE_BUFFER_SIZE);
while (socketChannel.read(buffer) != -1) {
buffer.flip();
// System.out.println("Reading, remaining:" + buffer.remaining());
try {
while (buffer.hasRemaining()) {
buffer.mark();
this.createAndSendRecord(buffer);
}
buffer.clear();
} catch (final BufferUnderflowException ex) {
buffer.reset();
// System.out.println("Underflow, remaining:" + buffer.remaining());
buffer.compact();
}
}
// System.out.println("Channel closing...");
socketChannel.close();
// END also loop this one?
} catch (final IOException ex) {
super.logger.error("Error while reading", ex);
} finally {
if (null != serversocket) {
try {
serversocket.close();
} catch (final IOException e) {
if (super.logger.isDebugEnabled()) {
super.logger.debug("Failed to close TCP connection!", e);
}
}
}
this.terminate();
this.tcpStringReader.terminate();
}
}
private final void createAndSendRecord(final ByteBuffer buffer) {
final int clazzid = buffer.getInt();
protected final void read(final ByteBuffer buffer) {
final int clazzId = buffer.getInt();
final long loggingTimestamp = buffer.getLong();
final String recordClassName = this.stringRegistry.get(clazzId);
try {
// record = this.recordFactory.create(clazzid, buffer, this.stringRegistry);
final IMonitoringRecord record = AbstractMonitoringRecord.createFromByteBuffer(clazzid, buffer, this.stringRegistry);
// final IMonitoringRecord record = AbstractMonitoringRecord.createFromByteBuffer(clazzId, buffer, this.stringRegistry);
// record.setLoggingTimestamp(loggingTimestamp);
final IRecordFactory<? extends IMonitoringRecord> recordFactory = this.recordFactories.get(recordClassName);
record = recordFactory.create(buffer, this.stringRegistry);
record.setLoggingTimestamp(loggingTimestamp);
this.send(this.outputPort, record);
} catch (final MonitoringRecordException ex) {
super.logger.error("Failed to create record.", ex);
}
}
@Override
public void onTerminating() {
super.onTerminating();
this.tcpStringReader.terminate();
}
/**
*
* @author Jan Waller
......@@ -256,4 +165,13 @@ public class TCPReader extends ProducerStage<IMonitoringRecord> {
}
}
}
public int getPort2() {
return this.port2;
}
public void setPort2(final int port2) {
this.port2 = port2;
}
}
......@@ -2,7 +2,7 @@ package teetime.examples.kiekerdays;
import teetime.framework.HeadStage;
import teetime.framework.RunnableStage;
import teetime.stage.io.network.TCPReader;
import teetime.stage.io.network.TcpReader;
public class TcpTraceLogging {
......@@ -26,7 +26,7 @@ public class TcpTraceLogging {
private HeadStage buildTcpPipeline() {
// TCPReaderSink tcpReader = new TCPReaderSink();
TCPReader tcpReader = new TCPReader();
TcpReader tcpReader = new TcpReader();
return tcpReader;
}
......
......@@ -13,7 +13,7 @@ import teetime.stage.InstanceOfFilter;
import teetime.stage.Relay;
import teetime.stage.basic.Sink;
import teetime.stage.basic.distributor.Distributor;
import teetime.stage.io.network.TCPReader;
import teetime.stage.io.network.TcpReader;
import teetime.stage.trace.traceReconstruction.TraceReconstructionFilter;
import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
import teetime.util.concurrent.hashmap.TraceBuffer;
......@@ -38,7 +38,7 @@ public class TcpTraceReconstruction {
private int numWorkerThreads;
public void init() {
HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline();
HeadPipeline<TcpReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline();
this.tcpThread = new Thread(new RunnableStage(tcpPipeline));
this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads);
......@@ -50,14 +50,14 @@ public class TcpTraceReconstruction {
}
}
private HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> buildTcpPipeline() {
TCPReader tcpReader = new TCPReader();
private HeadPipeline<TcpReader, Distributor<IMonitoringRecord>> buildTcpPipeline() {
TcpReader tcpReader = new TcpReader();
Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>();
SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort());
// create and configure pipeline
HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> pipeline = new HeadPipeline<TCPReader, Distributor<IMonitoringRecord>>();
HeadPipeline<TcpReader, Distributor<IMonitoringRecord>> pipeline = new HeadPipeline<TcpReader, Distributor<IMonitoringRecord>>();
pipeline.setFirstStage(tcpReader);
pipeline.setLastStage(distributor);
return pipeline;
......
......@@ -16,7 +16,7 @@ import teetime.stage.InstanceOfFilter;
import teetime.stage.Relay;
import teetime.stage.basic.Sink;
import teetime.stage.basic.distributor.Distributor;
import teetime.stage.io.network.TCPReader;
import teetime.stage.io.network.TcpReader;
import teetime.stage.trace.traceReconstruction.TraceReconstructionFilter;
import teetime.stage.trace.traceReduction.TraceAggregationBuffer;
import teetime.stage.trace.traceReduction.TraceComperator;
......@@ -46,7 +46,7 @@ public class TcpTraceReduction {
private int numWorkerThreads;
public void init() {
HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline();
HeadPipeline<TcpReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline();
this.tcpThread = new Thread(new RunnableStage(tcpPipeline));
HeadPipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(5000);
......@@ -61,14 +61,14 @@ public class TcpTraceReduction {
}
}
private HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> buildTcpPipeline() {
TCPReader tcpReader = new TCPReader();
private HeadPipeline<TcpReader, Distributor<IMonitoringRecord>> buildTcpPipeline() {
TcpReader tcpReader = new TcpReader();
Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>();
SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort());
// create and configure pipeline
HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> pipeline = new HeadPipeline<TCPReader, Distributor<IMonitoringRecord>>();
HeadPipeline<TcpReader, Distributor<IMonitoringRecord>> pipeline = new HeadPipeline<TcpReader, Distributor<IMonitoringRecord>>();
pipeline.setFirstStage(tcpReader);
pipeline.setLastStage(distributor);
return pipeline;
......
......@@ -11,7 +11,7 @@ import teetime.stage.Counter;
import teetime.stage.ElementThroughputMeasuringStage;
import teetime.stage.basic.Sink;
import teetime.stage.basic.distributor.Distributor;
import teetime.stage.io.network.TCPReader;
import teetime.stage.io.network.TcpReader;
import kieker.common.record.IMonitoringRecord;
......@@ -39,7 +39,7 @@ public class TcpTraceLoggingExtAnalysis {
}
private HeadPipeline<?, ?> buildTcpPipeline(final Distributor<Long> previousClockStage) {
TCPReader tcpReader = new TCPReader();
TcpReader tcpReader = new TcpReader();
this.recordCounter = new Counter<IMonitoringRecord>();
this.recordThroughputStage = new ElementThroughputMeasuringStage<IMonitoringRecord>();
Sink<IMonitoringRecord> endStage = new Sink<IMonitoringRecord>();
......@@ -52,7 +52,7 @@ public class TcpTraceLoggingExtAnalysis {
SpScPipe.connect(previousClockStage.getNewOutputPort(), this.recordThroughputStage.getTriggerInputPort(), 10);
// create and configure pipeline
HeadPipeline<TCPReader, Sink<IMonitoringRecord>> pipeline = new HeadPipeline<TCPReader, Sink<IMonitoringRecord>>();
HeadPipeline<TcpReader, Sink<IMonitoringRecord>> pipeline = new HeadPipeline<TcpReader, Sink<IMonitoringRecord>>();
pipeline.setFirstStage(tcpReader);
pipeline.setLastStage(endStage);
return pipeline;
......
......@@ -13,7 +13,7 @@ import teetime.stage.ElementThroughputMeasuringStage;
import teetime.stage.InstanceOfFilter;
import teetime.stage.basic.Sink;
import teetime.stage.basic.distributor.Distributor;
import teetime.stage.io.network.TCPReader;
import teetime.stage.io.network.TcpReader;
import teetime.stage.trace.traceReconstruction.TraceReconstructionFilter;
import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
import teetime.util.concurrent.hashmap.TraceBuffer;
......@@ -65,9 +65,9 @@ public class TcpTraceReconstructionAnalysis {
return pipeline;
}
private HeadPipeline<TCPReader, Sink<TraceEventRecords>> buildPipeline(final Distributor<Long> clockStage, final Distributor<Long> clock2Stage) {
private HeadPipeline<TcpReader, Sink<TraceEventRecords>> buildPipeline(final Distributor<Long> clockStage, final Distributor<Long> clock2Stage) {
// create stages
TCPReader tcpReader = new TCPReader();
TcpReader tcpReader = new TcpReader();
this.recordCounter = new Counter<IMonitoringRecord>();
final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>(
IFlowRecord.class);
......@@ -92,7 +92,7 @@ public class TcpTraceReconstructionAnalysis {
SpScPipe.connect(clock2Stage.getNewOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 10);
// create and configure pipeline
HeadPipeline<TCPReader, Sink<TraceEventRecords>> pipeline = new HeadPipeline<TCPReader, Sink<TraceEventRecords>>();
HeadPipeline<TcpReader, Sink<TraceEventRecords>> pipeline = new HeadPipeline<TcpReader, Sink<TraceEventRecords>>();
pipeline.setFirstStage(tcpReader);
pipeline.setLastStage(endStage);
return pipeline;
......
......@@ -20,7 +20,7 @@ import teetime.stage.InstanceOfFilter;
import teetime.stage.Relay;
import teetime.stage.basic.Sink;
import teetime.stage.basic.distributor.Distributor;
import teetime.stage.io.network.TCPReader;
import teetime.stage.io.network.TcpReader;
import teetime.stage.trace.traceReconstruction.TraceReconstructionFilter;
import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
import teetime.util.concurrent.hashmap.TraceBuffer;
......@@ -72,7 +72,7 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal
}
public void buildConfiguration() {
final HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline();
final HeadPipeline<TcpReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline();
this.getFiniteProducerStages().add(tcpPipeline);
final HeadPipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000);
......@@ -89,14 +89,14 @@ public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Anal
}
}
private HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> buildTcpPipeline() {
TCPReader tcpReader = new TCPReader();
private HeadPipeline<TcpReader, Distributor<IMonitoringRecord>> buildTcpPipeline() {
TcpReader tcpReader = new TcpReader();
Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>();
SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort());
// create and configure pipeline
HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> pipeline = new HeadPipeline<TCPReader, Distributor<IMonitoringRecord>>("TCP reader pipeline");
HeadPipeline<TcpReader, Distributor<IMonitoringRecord>> pipeline = new HeadPipeline<TcpReader, Distributor<IMonitoringRecord>>("TCP reader pipeline");
pipeline.setFirstStage(tcpReader);
pipeline.setLastStage(distributor);
return pipeline;
......
......@@ -22,7 +22,7 @@ import teetime.stage.InstanceOfFilter;
import teetime.stage.Relay;
import teetime.stage.basic.Sink;
import teetime.stage.basic.distributor.Distributor;
import teetime.stage.io.network.TCPReader;
import teetime.stage.io.network.TcpReader;
import teetime.stage.trace.traceReconstruction.TraceReconstructionFilter;
import teetime.stage.trace.traceReduction.TraceAggregationBuffer;
import teetime.stage.trace.traceReduction.TraceComperator;
......@@ -52,7 +52,7 @@ public class TcpTraceReductionAnalysisWithThreads {
private int numWorkerThreads;
public void init() {
HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline();
HeadPipeline<TcpReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline();
this.tcpThread = new Thread(new RunnableStage(tcpPipeline));
HeadPipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000);
......@@ -70,14 +70,14 @@ public class TcpTraceReductionAnalysisWithThreads {
}
}
private HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> buildTcpPipeline() {
TCPReader tcpReader = new TCPReader();
private HeadPipeline<TcpReader, Distributor<IMonitoringRecord>> buildTcpPipeline() {
TcpReader tcpReader = new TcpReader();
Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>();
SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort());
// create and configure pipeline
HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> pipeline = new HeadPipeline<TCPReader, Distributor<IMonitoringRecord>>();
HeadPipeline<TcpReader, Distributor<IMonitoringRecord>> pipeline = new HeadPipeline<TcpReader, Distributor<IMonitoringRecord>>();
pipeline.setFirstStage(tcpReader);
pipeline.setLastStage(distributor);
return pipeline;
......@@ -153,7 +153,7 @@ public class TcpTraceReductionAnalysisWithThreads {
}
private HeadPipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> buildPipeline(
final HeadPipeline<TCPReader, Distributor<IMonitoringRecord>> tcpReaderPipeline,
final HeadPipeline<TcpReader, Distributor<IMonitoringRecord>> tcpReaderPipeline,
final HeadPipeline<Clock, Distributor<Long>> clockStage,
final HeadPipeline<Clock, Distributor<Long>> clock2Stage) {
// create stages
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment