From 31fbe2e865fa1a9061ab5423543da93d9ae228f1 Mon Sep 17 00:00:00 2001 From: Florian Fittkau <ffi@informatik.uni-kiel.de> Date: Tue, 26 Nov 2013 13:39:19 +0100 Subject: [PATCH] tweaks --- .../connector/TCPConnector.java | 2 +- .../reconstruction/TraceReconstructionBuffer.java | 5 ++--- .../reconstruction/TraceReconstructionFilter.java | 8 +++----- .../TracePatternSummarizationFilter.java | 7 +++---- .../live_trace_processing/main/WorkerStarter.java | 2 +- .../live_trace_processing/reader/TCPReader.java | 8 +++----- .../reader/TCPReaderOneClient.java | 12 +++--------- 7 files changed, 16 insertions(+), 28 deletions(-) diff --git a/src/explorviz/live_trace_processing/connector/TCPConnector.java b/src/explorviz/live_trace_processing/connector/TCPConnector.java index 0ff9d3c..af80414 100644 --- a/src/explorviz/live_trace_processing/connector/TCPConnector.java +++ b/src/explorviz/live_trace_processing/connector/TCPConnector.java @@ -27,7 +27,7 @@ public class TCPConnector extends AbstractSink implements IWriter, IStringRecord private final StringRegistry stringRegistry = new StringRegistry(this); private final ByteBuffer buffer = ByteBuffer - .allocateDirect(Constants.MONITORING_MESSAGE_BUFFER_SIZE); + .allocateDirect(Constants.SENDING_BUFFER_SIZE); private volatile boolean shouldDisconnect = false; diff --git a/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionBuffer.java b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionBuffer.java index 58e247d..a2d790f 100644 --- a/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionBuffer.java +++ b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionBuffer.java @@ -3,6 +3,7 @@ package explorviz.live_trace_processing.filter.reconstruction; import java.util.ArrayList; import java.util.List; +import explorviz.live_trace_processing.Constants; import explorviz.live_trace_processing.record.event.AbstractOperationEventRecord; import explorviz.live_trace_processing.record.event.normal.AfterFailedOperationEventRecord; import explorviz.live_trace_processing.record.event.normal.AfterOperationEventRecord; @@ -13,10 +14,8 @@ class TraceReconstructionBuffer { // private static final Comparator<AbstractOperationEvent> COMPARATOR = new // AbstractOperationEventComperator(); - private static final int INITIAL_EVENT_CAPACITY = 100; - private final List<AbstractOperationEventRecord> events = new ArrayList<AbstractOperationEventRecord>( - INITIAL_EVENT_CAPACITY); + Constants.TRACE_RECONSTRUCTION_BUFFER_INITIAL_SIZE); private boolean closeable; private boolean damaged; diff --git a/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java index 10d7338..ce71458 100644 --- a/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java +++ b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java @@ -7,6 +7,7 @@ import java.util.Map.Entry; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.TimeUnit; +import explorviz.live_trace_processing.Constants; import explorviz.live_trace_processing.filter.AbstractFilter; import explorviz.live_trace_processing.filter.IPipeReceiver; import explorviz.live_trace_processing.filter.reduction.summarization.TracePatternSummarizationFilter; @@ -18,9 +19,6 @@ import explorviz.live_trace_processing.record.misc.TimedPeriodRecord; import explorviz.live_trace_processing.record.trace.Trace; public final class TraceReconstructionFilter extends AbstractFilter { - private static final int RINGBUFFER_LENGTH = 32; - private static final int OUTPUT_BATCH_SIZE = 256; - private final long maxTraceTimeout; private final Map<Long, TraceReconstructionBuffer> traceId2trace = new ConcurrentSkipListMap<Long, TraceReconstructionBuffer>(); @@ -28,8 +26,8 @@ public final class TraceReconstructionFilter extends AbstractFilter { public TraceReconstructionFilter(final long maxTraceTimeout, final IPipeReceiver sinkReceiver) { super( new TracePatternSummarizationFilter(TimeUnit.MILLISECONDS.toNanos(990), - sinkReceiver), RINGBUFFER_LENGTH, OUTPUT_BATCH_SIZE, - "Reconstructed traces/sec"); + sinkReceiver), Constants.TRACE_RECONSTRUCTION_DISRUPTOR_SIZE, + Constants.TRACE_RECONSTRUCTION_OUTPUT_BUFFER_SIZE, "Reconstructed traces/sec"); this.maxTraceTimeout = maxTraceTimeout; } diff --git a/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationFilter.java b/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationFilter.java index 8aec06a..ea95b53 100644 --- a/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationFilter.java +++ b/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationFilter.java @@ -5,6 +5,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentSkipListMap; +import explorviz.live_trace_processing.Constants; import explorviz.live_trace_processing.filter.AbstractFilter; import explorviz.live_trace_processing.filter.IPipeReceiver; import explorviz.live_trace_processing.reader.TimeProvider; @@ -15,9 +16,6 @@ import explorviz.live_trace_processing.record.trace.Trace; import explorviz.live_trace_processing.record.trace.TraceComperator; public class TracePatternSummarizationFilter extends AbstractFilter { - private static final int RINGBUFFER_LENGTH = 256; - private static final int OUTPUT_BATCH_SIZE = 16; - private final long maxCollectionDuration; private final Map<Trace, TracePatternSummarizationBuffer> trace2buffer = new ConcurrentSkipListMap<Trace, TracePatternSummarizationBuffer>( @@ -25,7 +23,8 @@ public class TracePatternSummarizationFilter extends AbstractFilter { public TracePatternSummarizationFilter(final long maxCollectionDuration, final IPipeReceiver sinkReceiver) { - super(sinkReceiver, RINGBUFFER_LENGTH, OUTPUT_BATCH_SIZE, "Reduced traces / sec"); + super(sinkReceiver, Constants.TRACE_SUMMARIZATION_DISRUPTOR_SIZE, + Constants.TRACE_SUMMARIZATION_OUTPUT_BUFFER_SIZE, "Reduced traces / sec"); this.maxCollectionDuration = maxCollectionDuration; } diff --git a/src/explorviz/live_trace_processing/main/WorkerStarter.java b/src/explorviz/live_trace_processing/main/WorkerStarter.java index cf8bbe0..5219793 100644 --- a/src/explorviz/live_trace_processing/main/WorkerStarter.java +++ b/src/explorviz/live_trace_processing/main/WorkerStarter.java @@ -45,7 +45,7 @@ public class WorkerStarter { configuration.getIntProperty(ConfigurationFactory.LOAD_BALANCER_WAIT_TIME), configuration .getStringProperty(ConfigurationFactory.LOAD_BALANCER_SCALING_GROUP), - configuration, tcpConnector); + tcpConnector); } else { try { tcpConnector.connect(); diff --git a/src/explorviz/live_trace_processing/reader/TCPReader.java b/src/explorviz/live_trace_processing/reader/TCPReader.java index a6cb83d..9b13249 100644 --- a/src/explorviz/live_trace_processing/reader/TCPReader.java +++ b/src/explorviz/live_trace_processing/reader/TCPReader.java @@ -11,6 +11,7 @@ import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; +import explorviz.live_trace_processing.Constants; import explorviz.live_trace_processing.filter.AbstractFilter; import explorviz.live_trace_processing.filter.IPipeReceiver; import explorviz.live_trace_processing.filter.RecordArrayEvent; @@ -18,9 +19,6 @@ import explorviz.live_trace_processing.filter.RecordArrayEventFactory; import explorviz.live_trace_processing.filter.reconstruction.TraceReconstructionFilter; public final class TCPReader { - static final int OUTPUT_MESSAGE_BUFFER_SIZE = 8192; - private static final int RINGBUFFER_LENGTH = 16; - private final int listeningPort; private boolean active = true; @@ -34,8 +32,8 @@ public final class TCPReader { this.listeningPort = listeningPort; final Disruptor<RecordArrayEvent> disruptor = new Disruptor<RecordArrayEvent>( - new RecordArrayEventFactory(OUTPUT_MESSAGE_BUFFER_SIZE), RINGBUFFER_LENGTH, - AbstractFilter.cachedThreadPool); + new RecordArrayEventFactory(Constants.TCP_READER_OUTPUT_BUFFER_SIZE), + Constants.TCP_READER_DISRUPTOR_SIZE, AbstractFilter.cachedThreadPool); @SuppressWarnings("unchecked") final EventHandler<RecordArrayEvent>[] eventHandlers = new EventHandler[1]; diff --git a/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java b/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java index 16f0548..e02b689 100644 --- a/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java +++ b/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java @@ -12,7 +12,6 @@ import com.lmax.disruptor.RingBuffer; import explorviz.live_trace_processing.Constants; import explorviz.live_trace_processing.StringRegistry; import explorviz.live_trace_processing.filter.RecordArrayEvent; -import explorviz.live_trace_processing.filter.counting.CountingThroughputFilter; import explorviz.live_trace_processing.record.IRecord; import explorviz.live_trace_processing.record.event.AbstractOperationEventRecord; import explorviz.live_trace_processing.record.event.HostApplicationMetaDataRecord; @@ -31,13 +30,10 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec private final StringRegistry stringRegistry = new StringRegistry(null); private final List<byte[]> waitingForStringMessages = new ArrayList<byte[]>(1024); - private static final CountingThroughputFilter counter = new CountingThroughputFilter( - "Received records/sec in Reader" + Thread.currentThread().getId()); - private final SocketChannel socketChannel; private final RingBuffer<RecordArrayEvent> ringBuffer; - private IRecord[] outputBuffer = new IRecord[TCPReader.OUTPUT_MESSAGE_BUFFER_SIZE]; + private IRecord[] outputBuffer = new IRecord[Constants.TCP_READER_OUTPUT_BUFFER_SIZE]; private int outputBufferIndex = 0; public TCPReaderOneClient(final SocketChannel socketChannel, @@ -51,7 +47,7 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec @Override public void run() { final ByteBuffer buffer = ByteBuffer - .allocateDirect(Constants.MONITORING_MESSAGE_BUFFER_SIZE); + .allocateDirect(Constants.SENDING_BUFFER_SIZE); try { while ((socketChannel.read(buffer)) != -1) { buffer.flip(); @@ -327,11 +323,9 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec } private final void putInRingBuffer(final IRecord message) { - counter.inputRecord(message); - synchronized (this) { // TODO better solution outputBuffer[outputBufferIndex++] = message; - if (outputBufferIndex == TCPReader.OUTPUT_MESSAGE_BUFFER_SIZE) { + if (outputBufferIndex == Constants.TCP_READER_OUTPUT_BUFFER_SIZE) { flushOutputBuffer(); } } -- GitLab