Skip to content
Snippets Groups Projects
Commit aa674281 authored by Florian Fittkau's avatar Florian Fittkau
Browse files

fixed TimedRecord delivered too many times

parent 3cac335e
No related branches found
No related tags found
No related merge requests found
......@@ -40,7 +40,6 @@ public final class TraceReconstructionFilter extends AbstractFilter implements I
if (traceBuffer.isFinished()) {
final Trace trace = traceBuffer.toTrace(true);
deliver(trace);
traceId2trace.remove(traceId);
}
......@@ -54,7 +53,7 @@ public final class TraceReconstructionFilter extends AbstractFilter implements I
} else if (record instanceof TimedPeriodRecord) {
checkForTimeouts(TimeProvider.getCurrentTimestamp());
periodicFlush(record);
deliver(record);
// deliver(record);
} else if (record instanceof TerminateRecord) {
terminate();
deliver(record);
......
......@@ -44,7 +44,7 @@ public class TracePatternSummarizationFilter extends AbstractFilter implements I
} else if (record instanceof TimedPeriodRecord) {
processTimeoutQueue(TimeProvider.getCurrentTimestamp());
periodicFlush(record);
deliver(record);
// deliver(record);
} else if (record instanceof TerminateRecord) {
terminate();
deliver(record);
......
......@@ -5,6 +5,7 @@ import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -18,8 +19,10 @@ import explorviz.live_trace_processing.filter.AbstractFilter;
import explorviz.live_trace_processing.filter.RecordArrayEvent;
import explorviz.live_trace_processing.filter.RecordArrayEventFactory;
import explorviz.live_trace_processing.filter.reconstruction.ITraceReconstruction;
import explorviz.live_trace_processing.record.IRecord;
import explorviz.live_trace_processing.record.misc.TimedPeriodRecord;
public final class TCPReader {
public final class TCPReader implements IPeriodicTimeSignalReceiver {
private static final Logger LOG = LoggerFactory.getLogger(TCPReader.class);
......@@ -44,6 +47,34 @@ public final class TCPReader {
eventHandlers[0] = traceReconstruction;
disruptor.handleEventsWith(eventHandlers);
ringBuffer = disruptor.start();
new TimeSignalReader(TimeUnit.SECONDS.toMillis(1), this).start();
}
@Override
public void periodicTimeSignal(final long timestamp) {
final List<TCPReaderOneClient> toRemove = new ArrayList<TCPReaderOneClient>();
for (final TCPReaderOneClient thread : threads) {
if (!thread.isAlive()) {
toRemove.add(thread);
}
thread.flushOutputBuffer();
}
for (final TCPReaderOneClient toRemoveThread : toRemove) {
threads.remove(toRemoveThread);
}
final long hiseq = ringBuffer.next();
final RecordArrayEvent valueEvent = ringBuffer.get(hiseq);
final IRecord[] buffer = valueEvent.getValues();
buffer[0] = new TimedPeriodRecord();
valueEvent.setValues(buffer);
valueEvent.setValueSize(1);
ringBuffer.publish(hiseq);
}
public final void read() {
......
......@@ -5,7 +5,6 @@ import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -32,14 +31,13 @@ import explorviz.live_trace_processing.record.event.statics.AfterStaticOperation
import explorviz.live_trace_processing.record.event.statics.BeforeStaticOperationEventRecord;
import explorviz.live_trace_processing.record.misc.StringRegistryRecord;
import explorviz.live_trace_processing.record.misc.SystemMonitoringRecord;
import explorviz.live_trace_processing.record.misc.TimedPeriodRecord;
import explorviz.live_trace_processing.record.trace.HostApplicationMetaDataRecord;
import explorviz.live_trace_processing.record.trace.Trace;
public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalReceiver {
class TCPReaderOneClient extends Thread {
private static final Logger LOG = LoggerFactory.getLogger(TCPReaderOneClient.class);
private HostApplicationMetaDataRecord hostApplicationMetadata;
private final StringRegistry stringRegistry = new StringRegistry(null);
......@@ -55,17 +53,14 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
final RingBuffer<RecordArrayEvent> ringBuffer) {
this.socketChannel = socketChannel;
this.ringBuffer = ringBuffer;
new TimeSignalReader(TimeUnit.SECONDS.toMillis(1), this).start();
// TODO if 2 clients connected => bad
}
@Override
public void run() {
final ByteBuffer buffer = ByteBuffer.allocateDirect(2 * 1024 * 1024);
try {
if(socketChannel.isConnected()) {
LOG.info("Client " + socketChannel.getRemoteAddress() + " connected.");
if (socketChannel.isConnected()) {
LOG.info("Client " + socketChannel.getRemoteAddress() + " connected.");
}
while ((socketChannel.read(buffer)) != -1) {
buffer.flip();
......@@ -73,27 +68,9 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
}
} catch (final IOException ex) {
LOG.info("Error in read() " + ex.getMessage());
// TODO cancel timer???
}
}
@Override
public void periodicTimeSignal(final long timestamp) {
synchronized (this) { // TODO better solution
flushOutputBuffer();
}
final long hiseq = ringBuffer.next();
final RecordArrayEvent valueEvent = ringBuffer.get(hiseq);
final IRecord[] buffer = valueEvent.getValues();
buffer[0] = new TimedPeriodRecord();
valueEvent.setValues(buffer);
valueEvent.setValueSize(1);
ringBuffer.publish(hiseq);
}
private final void messagesfromByteArray(final ByteBuffer buffer) {
while (buffer.remaining() > 0) {
final byte clazzId = buffer.get();
......@@ -565,18 +542,20 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
}
}
private void flushOutputBuffer() {
if (outputBufferIndex > 0) {
final long hiseq = ringBuffer.next();
final RecordArrayEvent valueEvent = ringBuffer.get(hiseq);
final IRecord[] oldValues = valueEvent.getValues();
valueEvent.setValues(outputBuffer);
valueEvent.setValueSize(outputBufferIndex);
ringBuffer.publish(hiseq);
public void flushOutputBuffer() {
synchronized (this) {
if (outputBufferIndex > 0) {
final long hiseq = ringBuffer.next();
final RecordArrayEvent valueEvent = ringBuffer.get(hiseq);
final IRecord[] oldValues = valueEvent.getValues();
valueEvent.setValues(outputBuffer);
valueEvent.setValueSize(outputBufferIndex);
ringBuffer.publish(hiseq);
outputBuffer = oldValues;
outputBuffer = oldValues;
outputBufferIndex = 0;
outputBufferIndex = 0;
}
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment