Skip to content
Snippets Groups Projects
Commit 31fbe2e8 authored by Florian Fittkau's avatar Florian Fittkau
Browse files

tweaks

parent fc81eaec
No related branches found
No related tags found
No related merge requests found
...@@ -27,7 +27,7 @@ public class TCPConnector extends AbstractSink implements IWriter, IStringRecord ...@@ -27,7 +27,7 @@ public class TCPConnector extends AbstractSink implements IWriter, IStringRecord
private final StringRegistry stringRegistry = new StringRegistry(this); private final StringRegistry stringRegistry = new StringRegistry(this);
private final ByteBuffer buffer = ByteBuffer private final ByteBuffer buffer = ByteBuffer
.allocateDirect(Constants.MONITORING_MESSAGE_BUFFER_SIZE); .allocateDirect(Constants.SENDING_BUFFER_SIZE);
private volatile boolean shouldDisconnect = false; private volatile boolean shouldDisconnect = false;
......
...@@ -3,6 +3,7 @@ package explorviz.live_trace_processing.filter.reconstruction; ...@@ -3,6 +3,7 @@ package explorviz.live_trace_processing.filter.reconstruction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.record.event.AbstractOperationEventRecord; import explorviz.live_trace_processing.record.event.AbstractOperationEventRecord;
import explorviz.live_trace_processing.record.event.normal.AfterFailedOperationEventRecord; import explorviz.live_trace_processing.record.event.normal.AfterFailedOperationEventRecord;
import explorviz.live_trace_processing.record.event.normal.AfterOperationEventRecord; import explorviz.live_trace_processing.record.event.normal.AfterOperationEventRecord;
...@@ -13,10 +14,8 @@ class TraceReconstructionBuffer { ...@@ -13,10 +14,8 @@ class TraceReconstructionBuffer {
// private static final Comparator<AbstractOperationEvent> COMPARATOR = new // private static final Comparator<AbstractOperationEvent> COMPARATOR = new
// AbstractOperationEventComperator(); // AbstractOperationEventComperator();
private static final int INITIAL_EVENT_CAPACITY = 100;
private final List<AbstractOperationEventRecord> events = new ArrayList<AbstractOperationEventRecord>( private final List<AbstractOperationEventRecord> events = new ArrayList<AbstractOperationEventRecord>(
INITIAL_EVENT_CAPACITY); Constants.TRACE_RECONSTRUCTION_BUFFER_INITIAL_SIZE);
private boolean closeable; private boolean closeable;
private boolean damaged; private boolean damaged;
......
...@@ -7,6 +7,7 @@ import java.util.Map.Entry; ...@@ -7,6 +7,7 @@ import java.util.Map.Entry;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.filter.AbstractFilter; import explorviz.live_trace_processing.filter.AbstractFilter;
import explorviz.live_trace_processing.filter.IPipeReceiver; import explorviz.live_trace_processing.filter.IPipeReceiver;
import explorviz.live_trace_processing.filter.reduction.summarization.TracePatternSummarizationFilter; import explorviz.live_trace_processing.filter.reduction.summarization.TracePatternSummarizationFilter;
...@@ -18,9 +19,6 @@ import explorviz.live_trace_processing.record.misc.TimedPeriodRecord; ...@@ -18,9 +19,6 @@ import explorviz.live_trace_processing.record.misc.TimedPeriodRecord;
import explorviz.live_trace_processing.record.trace.Trace; import explorviz.live_trace_processing.record.trace.Trace;
public final class TraceReconstructionFilter extends AbstractFilter { public final class TraceReconstructionFilter extends AbstractFilter {
private static final int RINGBUFFER_LENGTH = 32;
private static final int OUTPUT_BATCH_SIZE = 256;
private final long maxTraceTimeout; private final long maxTraceTimeout;
private final Map<Long, TraceReconstructionBuffer> traceId2trace = new ConcurrentSkipListMap<Long, TraceReconstructionBuffer>(); private final Map<Long, TraceReconstructionBuffer> traceId2trace = new ConcurrentSkipListMap<Long, TraceReconstructionBuffer>();
...@@ -28,8 +26,8 @@ public final class TraceReconstructionFilter extends AbstractFilter { ...@@ -28,8 +26,8 @@ public final class TraceReconstructionFilter extends AbstractFilter {
public TraceReconstructionFilter(final long maxTraceTimeout, final IPipeReceiver sinkReceiver) { public TraceReconstructionFilter(final long maxTraceTimeout, final IPipeReceiver sinkReceiver) {
super( super(
new TracePatternSummarizationFilter(TimeUnit.MILLISECONDS.toNanos(990), new TracePatternSummarizationFilter(TimeUnit.MILLISECONDS.toNanos(990),
sinkReceiver), RINGBUFFER_LENGTH, OUTPUT_BATCH_SIZE, sinkReceiver), Constants.TRACE_RECONSTRUCTION_DISRUPTOR_SIZE,
"Reconstructed traces/sec"); Constants.TRACE_RECONSTRUCTION_OUTPUT_BUFFER_SIZE, "Reconstructed traces/sec");
this.maxTraceTimeout = maxTraceTimeout; this.maxTraceTimeout = maxTraceTimeout;
} }
......
...@@ -5,6 +5,7 @@ import java.util.List; ...@@ -5,6 +5,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.filter.AbstractFilter; import explorviz.live_trace_processing.filter.AbstractFilter;
import explorviz.live_trace_processing.filter.IPipeReceiver; import explorviz.live_trace_processing.filter.IPipeReceiver;
import explorviz.live_trace_processing.reader.TimeProvider; import explorviz.live_trace_processing.reader.TimeProvider;
...@@ -15,9 +16,6 @@ import explorviz.live_trace_processing.record.trace.Trace; ...@@ -15,9 +16,6 @@ import explorviz.live_trace_processing.record.trace.Trace;
import explorviz.live_trace_processing.record.trace.TraceComperator; import explorviz.live_trace_processing.record.trace.TraceComperator;
public class TracePatternSummarizationFilter extends AbstractFilter { public class TracePatternSummarizationFilter extends AbstractFilter {
private static final int RINGBUFFER_LENGTH = 256;
private static final int OUTPUT_BATCH_SIZE = 16;
private final long maxCollectionDuration; private final long maxCollectionDuration;
private final Map<Trace, TracePatternSummarizationBuffer> trace2buffer = new ConcurrentSkipListMap<Trace, TracePatternSummarizationBuffer>( private final Map<Trace, TracePatternSummarizationBuffer> trace2buffer = new ConcurrentSkipListMap<Trace, TracePatternSummarizationBuffer>(
...@@ -25,7 +23,8 @@ public class TracePatternSummarizationFilter extends AbstractFilter { ...@@ -25,7 +23,8 @@ public class TracePatternSummarizationFilter extends AbstractFilter {
public TracePatternSummarizationFilter(final long maxCollectionDuration, public TracePatternSummarizationFilter(final long maxCollectionDuration,
final IPipeReceiver sinkReceiver) { final IPipeReceiver sinkReceiver) {
super(sinkReceiver, RINGBUFFER_LENGTH, OUTPUT_BATCH_SIZE, "Reduced traces / sec"); super(sinkReceiver, Constants.TRACE_SUMMARIZATION_DISRUPTOR_SIZE,
Constants.TRACE_SUMMARIZATION_OUTPUT_BUFFER_SIZE, "Reduced traces / sec");
this.maxCollectionDuration = maxCollectionDuration; this.maxCollectionDuration = maxCollectionDuration;
} }
......
...@@ -45,7 +45,7 @@ public class WorkerStarter { ...@@ -45,7 +45,7 @@ public class WorkerStarter {
configuration.getIntProperty(ConfigurationFactory.LOAD_BALANCER_WAIT_TIME), configuration.getIntProperty(ConfigurationFactory.LOAD_BALANCER_WAIT_TIME),
configuration configuration
.getStringProperty(ConfigurationFactory.LOAD_BALANCER_SCALING_GROUP), .getStringProperty(ConfigurationFactory.LOAD_BALANCER_SCALING_GROUP),
configuration, tcpConnector); tcpConnector);
} else { } else {
try { try {
tcpConnector.connect(); tcpConnector.connect();
......
...@@ -11,6 +11,7 @@ import com.lmax.disruptor.EventHandler; ...@@ -11,6 +11,7 @@ import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.Disruptor;
import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.filter.AbstractFilter; import explorviz.live_trace_processing.filter.AbstractFilter;
import explorviz.live_trace_processing.filter.IPipeReceiver; import explorviz.live_trace_processing.filter.IPipeReceiver;
import explorviz.live_trace_processing.filter.RecordArrayEvent; import explorviz.live_trace_processing.filter.RecordArrayEvent;
...@@ -18,9 +19,6 @@ import explorviz.live_trace_processing.filter.RecordArrayEventFactory; ...@@ -18,9 +19,6 @@ import explorviz.live_trace_processing.filter.RecordArrayEventFactory;
import explorviz.live_trace_processing.filter.reconstruction.TraceReconstructionFilter; import explorviz.live_trace_processing.filter.reconstruction.TraceReconstructionFilter;
public final class TCPReader { public final class TCPReader {
static final int OUTPUT_MESSAGE_BUFFER_SIZE = 8192;
private static final int RINGBUFFER_LENGTH = 16;
private final int listeningPort; private final int listeningPort;
private boolean active = true; private boolean active = true;
...@@ -34,8 +32,8 @@ public final class TCPReader { ...@@ -34,8 +32,8 @@ public final class TCPReader {
this.listeningPort = listeningPort; this.listeningPort = listeningPort;
final Disruptor<RecordArrayEvent> disruptor = new Disruptor<RecordArrayEvent>( final Disruptor<RecordArrayEvent> disruptor = new Disruptor<RecordArrayEvent>(
new RecordArrayEventFactory(OUTPUT_MESSAGE_BUFFER_SIZE), RINGBUFFER_LENGTH, new RecordArrayEventFactory(Constants.TCP_READER_OUTPUT_BUFFER_SIZE),
AbstractFilter.cachedThreadPool); Constants.TCP_READER_DISRUPTOR_SIZE, AbstractFilter.cachedThreadPool);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final EventHandler<RecordArrayEvent>[] eventHandlers = new EventHandler[1]; final EventHandler<RecordArrayEvent>[] eventHandlers = new EventHandler[1];
......
...@@ -12,7 +12,6 @@ import com.lmax.disruptor.RingBuffer; ...@@ -12,7 +12,6 @@ import com.lmax.disruptor.RingBuffer;
import explorviz.live_trace_processing.Constants; import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.StringRegistry; import explorviz.live_trace_processing.StringRegistry;
import explorviz.live_trace_processing.filter.RecordArrayEvent; import explorviz.live_trace_processing.filter.RecordArrayEvent;
import explorviz.live_trace_processing.filter.counting.CountingThroughputFilter;
import explorviz.live_trace_processing.record.IRecord; import explorviz.live_trace_processing.record.IRecord;
import explorviz.live_trace_processing.record.event.AbstractOperationEventRecord; import explorviz.live_trace_processing.record.event.AbstractOperationEventRecord;
import explorviz.live_trace_processing.record.event.HostApplicationMetaDataRecord; import explorviz.live_trace_processing.record.event.HostApplicationMetaDataRecord;
...@@ -31,13 +30,10 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec ...@@ -31,13 +30,10 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
private final StringRegistry stringRegistry = new StringRegistry(null); private final StringRegistry stringRegistry = new StringRegistry(null);
private final List<byte[]> waitingForStringMessages = new ArrayList<byte[]>(1024); private final List<byte[]> waitingForStringMessages = new ArrayList<byte[]>(1024);
private static final CountingThroughputFilter counter = new CountingThroughputFilter(
"Received records/sec in Reader" + Thread.currentThread().getId());
private final SocketChannel socketChannel; private final SocketChannel socketChannel;
private final RingBuffer<RecordArrayEvent> ringBuffer; private final RingBuffer<RecordArrayEvent> ringBuffer;
private IRecord[] outputBuffer = new IRecord[TCPReader.OUTPUT_MESSAGE_BUFFER_SIZE]; private IRecord[] outputBuffer = new IRecord[Constants.TCP_READER_OUTPUT_BUFFER_SIZE];
private int outputBufferIndex = 0; private int outputBufferIndex = 0;
public TCPReaderOneClient(final SocketChannel socketChannel, public TCPReaderOneClient(final SocketChannel socketChannel,
...@@ -51,7 +47,7 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec ...@@ -51,7 +47,7 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
@Override @Override
public void run() { public void run() {
final ByteBuffer buffer = ByteBuffer final ByteBuffer buffer = ByteBuffer
.allocateDirect(Constants.MONITORING_MESSAGE_BUFFER_SIZE); .allocateDirect(Constants.SENDING_BUFFER_SIZE);
try { try {
while ((socketChannel.read(buffer)) != -1) { while ((socketChannel.read(buffer)) != -1) {
buffer.flip(); buffer.flip();
...@@ -327,11 +323,9 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec ...@@ -327,11 +323,9 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
} }
private final void putInRingBuffer(final IRecord message) { private final void putInRingBuffer(final IRecord message) {
counter.inputRecord(message);
synchronized (this) { // TODO better solution synchronized (this) { // TODO better solution
outputBuffer[outputBufferIndex++] = message; outputBuffer[outputBufferIndex++] = message;
if (outputBufferIndex == TCPReader.OUTPUT_MESSAGE_BUFFER_SIZE) { if (outputBufferIndex == Constants.TCP_READER_OUTPUT_BUFFER_SIZE) {
flushOutputBuffer(); flushOutputBuffer();
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment