diff --git a/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java index 39c6425f7839be0e62992932434627a71651b537..e8c51f2843c0ef814b13fccff028a02cd2e91f49 100644 --- a/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java +++ b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java @@ -40,7 +40,6 @@ public final class TraceReconstructionFilter extends AbstractFilter implements I if (traceBuffer.isFinished()) { final Trace trace = traceBuffer.toTrace(true); - deliver(trace); traceId2trace.remove(traceId); } @@ -54,7 +53,7 @@ public final class TraceReconstructionFilter extends AbstractFilter implements I } else if (record instanceof TimedPeriodRecord) { checkForTimeouts(TimeProvider.getCurrentTimestamp()); periodicFlush(record); - deliver(record); + // deliver(record); } else if (record instanceof TerminateRecord) { terminate(); deliver(record); diff --git a/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationFilter.java b/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationFilter.java index 5a820a3c3feea18179493a95afbef4a4186b8b77..5903b08c34051ff6fd999cbb3db227edb79acd1b 100644 --- a/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationFilter.java +++ b/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationFilter.java @@ -44,7 +44,7 @@ public class TracePatternSummarizationFilter extends AbstractFilter implements I } else if (record instanceof TimedPeriodRecord) { processTimeoutQueue(TimeProvider.getCurrentTimestamp()); periodicFlush(record); - deliver(record); + // deliver(record); } else if (record instanceof TerminateRecord) { terminate(); deliver(record); diff --git a/src/explorviz/live_trace_processing/reader/TCPReader.java b/src/explorviz/live_trace_processing/reader/TCPReader.java index ade8f88f0c54693abe1d06412cdee6230a80a6f9..11263c3918858a316bf91adc547b336d106befaa 100644 --- a/src/explorviz/live_trace_processing/reader/TCPReader.java +++ b/src/explorviz/live_trace_processing/reader/TCPReader.java @@ -5,6 +5,7 @@ import java.net.InetSocketAddress; import java.nio.channels.ServerSocketChannel; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,8 +19,10 @@ import explorviz.live_trace_processing.filter.AbstractFilter; import explorviz.live_trace_processing.filter.RecordArrayEvent; import explorviz.live_trace_processing.filter.RecordArrayEventFactory; import explorviz.live_trace_processing.filter.reconstruction.ITraceReconstruction; +import explorviz.live_trace_processing.record.IRecord; +import explorviz.live_trace_processing.record.misc.TimedPeriodRecord; -public final class TCPReader { +public final class TCPReader implements IPeriodicTimeSignalReceiver { private static final Logger LOG = LoggerFactory.getLogger(TCPReader.class); @@ -44,6 +47,34 @@ public final class TCPReader { eventHandlers[0] = traceReconstruction; disruptor.handleEventsWith(eventHandlers); ringBuffer = disruptor.start(); + + new TimeSignalReader(TimeUnit.SECONDS.toMillis(1), this).start(); + } + + @Override + public void periodicTimeSignal(final long timestamp) { + final List<TCPReaderOneClient> toRemove = new ArrayList<TCPReaderOneClient>(); + + for (final TCPReaderOneClient thread : threads) { + if (!thread.isAlive()) { + toRemove.add(thread); + } + thread.flushOutputBuffer(); + } + + for (final TCPReaderOneClient toRemoveThread : toRemove) { + threads.remove(toRemoveThread); + } + + final long hiseq = ringBuffer.next(); + final RecordArrayEvent valueEvent = ringBuffer.get(hiseq); + final IRecord[] buffer = valueEvent.getValues(); + buffer[0] = new TimedPeriodRecord(); + + valueEvent.setValues(buffer); + valueEvent.setValueSize(1); + + ringBuffer.publish(hiseq); } public final void read() { diff --git a/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java b/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java index 544d25ef613a58e09a7dd62b8f74f971349854e1..ff2a380115e4322e4865550ffa275171f8c95e09 100644 --- a/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java +++ b/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java @@ -5,7 +5,6 @@ import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,14 +31,13 @@ import explorviz.live_trace_processing.record.event.statics.AfterStaticOperation import explorviz.live_trace_processing.record.event.statics.BeforeStaticOperationEventRecord; import explorviz.live_trace_processing.record.misc.StringRegistryRecord; import explorviz.live_trace_processing.record.misc.SystemMonitoringRecord; -import explorviz.live_trace_processing.record.misc.TimedPeriodRecord; import explorviz.live_trace_processing.record.trace.HostApplicationMetaDataRecord; import explorviz.live_trace_processing.record.trace.Trace; -public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalReceiver { +class TCPReaderOneClient extends Thread { private static final Logger LOG = LoggerFactory.getLogger(TCPReaderOneClient.class); - + private HostApplicationMetaDataRecord hostApplicationMetadata; private final StringRegistry stringRegistry = new StringRegistry(null); @@ -55,17 +53,14 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec final RingBuffer<RecordArrayEvent> ringBuffer) { this.socketChannel = socketChannel; this.ringBuffer = ringBuffer; - - new TimeSignalReader(TimeUnit.SECONDS.toMillis(1), this).start(); - // TODO if 2 clients connected => bad } @Override public void run() { final ByteBuffer buffer = ByteBuffer.allocateDirect(2 * 1024 * 1024); try { - if(socketChannel.isConnected()) { - LOG.info("Client " + socketChannel.getRemoteAddress() + " connected."); + if (socketChannel.isConnected()) { + LOG.info("Client " + socketChannel.getRemoteAddress() + " connected."); } while ((socketChannel.read(buffer)) != -1) { buffer.flip(); @@ -73,27 +68,9 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec } } catch (final IOException ex) { LOG.info("Error in read() " + ex.getMessage()); - // TODO cancel timer??? } } - @Override - public void periodicTimeSignal(final long timestamp) { - synchronized (this) { // TODO better solution - flushOutputBuffer(); - } - - final long hiseq = ringBuffer.next(); - final RecordArrayEvent valueEvent = ringBuffer.get(hiseq); - final IRecord[] buffer = valueEvent.getValues(); - buffer[0] = new TimedPeriodRecord(); - - valueEvent.setValues(buffer); - valueEvent.setValueSize(1); - - ringBuffer.publish(hiseq); - } - private final void messagesfromByteArray(final ByteBuffer buffer) { while (buffer.remaining() > 0) { final byte clazzId = buffer.get(); @@ -565,18 +542,20 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec } } - private void flushOutputBuffer() { - if (outputBufferIndex > 0) { - final long hiseq = ringBuffer.next(); - final RecordArrayEvent valueEvent = ringBuffer.get(hiseq); - final IRecord[] oldValues = valueEvent.getValues(); - valueEvent.setValues(outputBuffer); - valueEvent.setValueSize(outputBufferIndex); - ringBuffer.publish(hiseq); + public void flushOutputBuffer() { + synchronized (this) { + if (outputBufferIndex > 0) { + final long hiseq = ringBuffer.next(); + final RecordArrayEvent valueEvent = ringBuffer.get(hiseq); + final IRecord[] oldValues = valueEvent.getValues(); + valueEvent.setValues(outputBuffer); + valueEvent.setValueSize(outputBufferIndex); + ringBuffer.publish(hiseq); - outputBuffer = oldValues; + outputBuffer = oldValues; - outputBufferIndex = 0; + outputBufferIndex = 0; + } } }