diff --git a/src/explorviz/live_trace_processing/connector/TCPConnector.java b/src/explorviz/live_trace_processing/connector/TCPConnector.java index 0ff9d3c5b67d10fb22584611d552e09498c18638..af8041444ff9a623c75d16d6a29a10959e763fec 100644 --- a/src/explorviz/live_trace_processing/connector/TCPConnector.java +++ b/src/explorviz/live_trace_processing/connector/TCPConnector.java @@ -27,7 +27,7 @@ public class TCPConnector extends AbstractSink implements IWriter, IStringRecord private final StringRegistry stringRegistry = new StringRegistry(this); private final ByteBuffer buffer = ByteBuffer - .allocateDirect(Constants.MONITORING_MESSAGE_BUFFER_SIZE); + .allocateDirect(Constants.SENDING_BUFFER_SIZE); private volatile boolean shouldDisconnect = false; diff --git a/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionBuffer.java b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionBuffer.java index 58e247da5ec0c15733d9815b11838139c7e02bc7..a2d790f34d9a9bb877d7275e18dc6569f3e1a50b 100644 --- a/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionBuffer.java +++ b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionBuffer.java @@ -3,6 +3,7 @@ package explorviz.live_trace_processing.filter.reconstruction; import java.util.ArrayList; import java.util.List; +import explorviz.live_trace_processing.Constants; import explorviz.live_trace_processing.record.event.AbstractOperationEventRecord; import explorviz.live_trace_processing.record.event.normal.AfterFailedOperationEventRecord; import explorviz.live_trace_processing.record.event.normal.AfterOperationEventRecord; @@ -13,10 +14,8 @@ class TraceReconstructionBuffer { // private static final Comparator<AbstractOperationEvent> COMPARATOR = new // AbstractOperationEventComperator(); - private static final int INITIAL_EVENT_CAPACITY = 100; - private final List<AbstractOperationEventRecord> events = new ArrayList<AbstractOperationEventRecord>( - INITIAL_EVENT_CAPACITY); + Constants.TRACE_RECONSTRUCTION_BUFFER_INITIAL_SIZE); private boolean closeable; private boolean damaged; diff --git a/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java index 10d7338a03bfb427f14375ff97837044e33a4e66..ce71458456a4b4bb58a2ac2c2c46eb636868ce38 100644 --- a/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java +++ b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java @@ -7,6 +7,7 @@ import java.util.Map.Entry; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.TimeUnit; +import explorviz.live_trace_processing.Constants; import explorviz.live_trace_processing.filter.AbstractFilter; import explorviz.live_trace_processing.filter.IPipeReceiver; import explorviz.live_trace_processing.filter.reduction.summarization.TracePatternSummarizationFilter; @@ -18,9 +19,6 @@ import explorviz.live_trace_processing.record.misc.TimedPeriodRecord; import explorviz.live_trace_processing.record.trace.Trace; public final class TraceReconstructionFilter extends AbstractFilter { - private static final int RINGBUFFER_LENGTH = 32; - private static final int OUTPUT_BATCH_SIZE = 256; - private final long maxTraceTimeout; private final Map<Long, TraceReconstructionBuffer> traceId2trace = new ConcurrentSkipListMap<Long, TraceReconstructionBuffer>(); @@ -28,8 +26,8 @@ public final class TraceReconstructionFilter extends AbstractFilter { public TraceReconstructionFilter(final long maxTraceTimeout, final IPipeReceiver sinkReceiver) { super( new TracePatternSummarizationFilter(TimeUnit.MILLISECONDS.toNanos(990), - sinkReceiver), RINGBUFFER_LENGTH, OUTPUT_BATCH_SIZE, - "Reconstructed traces/sec"); + sinkReceiver), Constants.TRACE_RECONSTRUCTION_DISRUPTOR_SIZE, + Constants.TRACE_RECONSTRUCTION_OUTPUT_BUFFER_SIZE, "Reconstructed traces/sec"); this.maxTraceTimeout = maxTraceTimeout; } diff --git a/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationFilter.java b/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationFilter.java index 8aec06ae8513f17756624de79c5752e306fbbadd..ea95b53f7822d91b4a7f0040d69b7c59f746abe9 100644 --- a/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationFilter.java +++ b/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationFilter.java @@ -5,6 +5,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentSkipListMap; +import explorviz.live_trace_processing.Constants; import explorviz.live_trace_processing.filter.AbstractFilter; import explorviz.live_trace_processing.filter.IPipeReceiver; import explorviz.live_trace_processing.reader.TimeProvider; @@ -15,9 +16,6 @@ import explorviz.live_trace_processing.record.trace.Trace; import explorviz.live_trace_processing.record.trace.TraceComperator; public class TracePatternSummarizationFilter extends AbstractFilter { - private static final int RINGBUFFER_LENGTH = 256; - private static final int OUTPUT_BATCH_SIZE = 16; - private final long maxCollectionDuration; private final Map<Trace, TracePatternSummarizationBuffer> trace2buffer = new ConcurrentSkipListMap<Trace, TracePatternSummarizationBuffer>( @@ -25,7 +23,8 @@ public class TracePatternSummarizationFilter extends AbstractFilter { public TracePatternSummarizationFilter(final long maxCollectionDuration, final IPipeReceiver sinkReceiver) { - super(sinkReceiver, RINGBUFFER_LENGTH, OUTPUT_BATCH_SIZE, "Reduced traces / sec"); + super(sinkReceiver, Constants.TRACE_SUMMARIZATION_DISRUPTOR_SIZE, + Constants.TRACE_SUMMARIZATION_OUTPUT_BUFFER_SIZE, "Reduced traces / sec"); this.maxCollectionDuration = maxCollectionDuration; } diff --git a/src/explorviz/live_trace_processing/main/WorkerStarter.java b/src/explorviz/live_trace_processing/main/WorkerStarter.java index cf8bbe0805567c0c2d04bbacf4db409a7cacdb93..5219793169cc58ddbea98fb9555cbaf9dc0f990b 100644 --- a/src/explorviz/live_trace_processing/main/WorkerStarter.java +++ b/src/explorviz/live_trace_processing/main/WorkerStarter.java @@ -45,7 +45,7 @@ public class WorkerStarter { configuration.getIntProperty(ConfigurationFactory.LOAD_BALANCER_WAIT_TIME), configuration .getStringProperty(ConfigurationFactory.LOAD_BALANCER_SCALING_GROUP), - configuration, tcpConnector); + tcpConnector); } else { try { tcpConnector.connect(); diff --git a/src/explorviz/live_trace_processing/reader/TCPReader.java b/src/explorviz/live_trace_processing/reader/TCPReader.java index a6cb83d0de390cbb93764e12370716a9af8d3d2b..9b1324930f8c940e9dbc9416b707ad822cc49ebf 100644 --- a/src/explorviz/live_trace_processing/reader/TCPReader.java +++ b/src/explorviz/live_trace_processing/reader/TCPReader.java @@ -11,6 +11,7 @@ import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; +import explorviz.live_trace_processing.Constants; import explorviz.live_trace_processing.filter.AbstractFilter; import explorviz.live_trace_processing.filter.IPipeReceiver; import explorviz.live_trace_processing.filter.RecordArrayEvent; @@ -18,9 +19,6 @@ import explorviz.live_trace_processing.filter.RecordArrayEventFactory; import explorviz.live_trace_processing.filter.reconstruction.TraceReconstructionFilter; public final class TCPReader { - static final int OUTPUT_MESSAGE_BUFFER_SIZE = 8192; - private static final int RINGBUFFER_LENGTH = 16; - private final int listeningPort; private boolean active = true; @@ -34,8 +32,8 @@ public final class TCPReader { this.listeningPort = listeningPort; final Disruptor<RecordArrayEvent> disruptor = new Disruptor<RecordArrayEvent>( - new RecordArrayEventFactory(OUTPUT_MESSAGE_BUFFER_SIZE), RINGBUFFER_LENGTH, - AbstractFilter.cachedThreadPool); + new RecordArrayEventFactory(Constants.TCP_READER_OUTPUT_BUFFER_SIZE), + Constants.TCP_READER_DISRUPTOR_SIZE, AbstractFilter.cachedThreadPool); @SuppressWarnings("unchecked") final EventHandler<RecordArrayEvent>[] eventHandlers = new EventHandler[1]; diff --git a/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java b/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java index 16f054846db27400fd9196dc3f941080f9354c21..e02b68975f1a62b58c949d550f4a0dab6cf023a1 100644 --- a/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java +++ b/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java @@ -12,7 +12,6 @@ import com.lmax.disruptor.RingBuffer; import explorviz.live_trace_processing.Constants; import explorviz.live_trace_processing.StringRegistry; import explorviz.live_trace_processing.filter.RecordArrayEvent; -import explorviz.live_trace_processing.filter.counting.CountingThroughputFilter; import explorviz.live_trace_processing.record.IRecord; import explorviz.live_trace_processing.record.event.AbstractOperationEventRecord; import explorviz.live_trace_processing.record.event.HostApplicationMetaDataRecord; @@ -31,13 +30,10 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec private final StringRegistry stringRegistry = new StringRegistry(null); private final List<byte[]> waitingForStringMessages = new ArrayList<byte[]>(1024); - private static final CountingThroughputFilter counter = new CountingThroughputFilter( - "Received records/sec in Reader" + Thread.currentThread().getId()); - private final SocketChannel socketChannel; private final RingBuffer<RecordArrayEvent> ringBuffer; - private IRecord[] outputBuffer = new IRecord[TCPReader.OUTPUT_MESSAGE_BUFFER_SIZE]; + private IRecord[] outputBuffer = new IRecord[Constants.TCP_READER_OUTPUT_BUFFER_SIZE]; private int outputBufferIndex = 0; public TCPReaderOneClient(final SocketChannel socketChannel, @@ -51,7 +47,7 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec @Override public void run() { final ByteBuffer buffer = ByteBuffer - .allocateDirect(Constants.MONITORING_MESSAGE_BUFFER_SIZE); + .allocateDirect(Constants.SENDING_BUFFER_SIZE); try { while ((socketChannel.read(buffer)) != -1) { buffer.flip(); @@ -327,11 +323,9 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec } private final void putInRingBuffer(final IRecord message) { - counter.inputRecord(message); - synchronized (this) { // TODO better solution outputBuffer[outputBufferIndex++] = message; - if (outputBufferIndex == TCPReader.OUTPUT_MESSAGE_BUFFER_SIZE) { + if (outputBufferIndex == Constants.TCP_READER_OUTPUT_BUFFER_SIZE) { flushOutputBuffer(); } }