diff --git a/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java b/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java index e48986f805e50677498223517ea2765dc028e3c8..6029fe6c968eba5d7554b442df5316303106d297 100644 --- a/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java +++ b/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java @@ -16,7 +16,6 @@ import explorviz.hpc_monitoring.disruptor.RecordArrayEvent; import explorviz.hpc_monitoring.disruptor.RecordEvent; import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter; import explorviz.hpc_monitoring.filter.reduction.TracePatternSummarizationFilter; -import explorviz.hpc_monitoring.reader.TimeProvider; import explorviz.hpc_monitoring.record.HostApplicationMetaData; import explorviz.hpc_monitoring.record.IRecord; import explorviz.hpc_monitoring.record.TerminateRecord; @@ -48,16 +47,11 @@ public final class TraceReconstructionFilter implements EventHandler<RecordArray @SuppressWarnings("unchecked") final EventHandler<RecordArrayEvent>[] eventHandlers = new EventHandler[1]; - eventHandlers[0] = new TracePatternSummarizationFilter(1 * 1000, endReceiver); + eventHandlers[0] = new TracePatternSummarizationFilter(1 * 1000 * 1000, endReceiver); disruptor.handleEventsWith(eventHandlers); ringBuffer = disruptor.start(); } - private void periodicTimeSignal(final long timestamp) { - checkForTimeouts(timestamp); - flushOutputBuffer(); - } - private void checkForTimeouts(final long timestamp) { final long traceTimeout = timestamp - maxTraceTimeout; final List<Long> traceIdsToRemove = new ArrayList<Long>(); @@ -76,11 +70,11 @@ public final class TraceReconstructionFilter implements EventHandler<RecordArray } private void sendOutValidTrace(final Trace trace) { - putInRingBuffer(trace); + // putInRingBuffer(trace); } private void sendOutInvalidTrace(final Trace trace) { - putInRingBuffer(trace); + // putInRingBuffer(trace); System.out.println("Invalid trace: " + trace.getTraceEvents()[0].getTraceId()); } @@ -127,7 +121,9 @@ public final class TraceReconstructionFilter implements EventHandler<RecordArray sendOutValidTrace(traceBuffer.toTrace()); } } else if (record instanceof TimedPeriodRecord) { - periodicTimeSignal(TimeProvider.getCurrentTimestamp()); + // checkForTimeouts(TimeProvider.getCurrentTimestamp()); TODO + outputBuffer[outputBufferIndex++] = record; + flushOutputBuffer(); } else if (record instanceof TerminateRecord) { terminate(); } diff --git a/src/explorviz/hpc_monitoring/filter/reduction/TracePatternSummarizationFilter.java b/src/explorviz/hpc_monitoring/filter/reduction/TracePatternSummarizationFilter.java index 9274e514a16cfcd94fa10f8c2c264aaaf179d479..f410435095301f0f2551771a73fc8f6a79ff39cf 100644 --- a/src/explorviz/hpc_monitoring/filter/reduction/TracePatternSummarizationFilter.java +++ b/src/explorviz/hpc_monitoring/filter/reduction/TracePatternSummarizationFilter.java @@ -59,7 +59,7 @@ public class TracePatternSummarizationFilter implements EventHandler<RecordArray if (record instanceof Trace) { insertIntoBuffer((Trace) record); } else if (record instanceof TimedPeriodRecord) { - periodicTimeSignal(TimeProvider.getCurrentTimestamp()); + processTimeoutQueue(TimeProvider.getCurrentTimestamp()); } else if (record instanceof TerminateRecord) { terminate(); } @@ -75,10 +75,6 @@ public class TracePatternSummarizationFilter implements EventHandler<RecordArray traceAggregationBuffer.insertTrace(trace); } - private void periodicTimeSignal(final long timestamp) { - processTimeoutQueue(timestamp); - } - private void processTimeoutQueue(final long timestamp) { final long bufferTimeout = timestamp - maxCollectionDuration; final List<Trace> toRemove = new ArrayList<Trace>(); diff --git a/src/explorviz/hpc_monitoring/reader/TCPReader.java b/src/explorviz/hpc_monitoring/reader/TCPReader.java index 9289fe2dc5dfa6376b60c178241fde512eced740..e64d8bb2c88bf615f6b1235c86521f90d0777366 100644 --- a/src/explorviz/hpc_monitoring/reader/TCPReader.java +++ b/src/explorviz/hpc_monitoring/reader/TCPReader.java @@ -26,8 +26,7 @@ public final class TCPReader { private final List<Thread> threads = new ArrayList<Thread>(); - public TCPReader(final int listeningPort, - final EventHandler<RecordEvent> endReceiver) { + public TCPReader(final int listeningPort, final EventHandler<RecordEvent> endReceiver) { this.listeningPort = listeningPort; final ExecutorService exec = Executors.newCachedThreadPool(); @@ -36,7 +35,7 @@ public final class TCPReader { @SuppressWarnings("unchecked") final EventHandler<RecordArrayEvent>[] eventHandlers = new EventHandler[1]; - eventHandlers[0] = new TraceReconstructionFilter(2 * 1000, endReceiver); + eventHandlers[0] = new TraceReconstructionFilter(2 * 1000 * 1000, endReceiver); disruptor.handleEventsWith(eventHandlers); ringBuffer = disruptor.start(); } @@ -45,8 +44,7 @@ public final class TCPReader { try { open(); while (active) { - final Thread thread = new TCPReaderThread( - serversocket.accept(), ringBuffer); + final Thread thread = new TCPReaderThread(serversocket.accept(), ringBuffer); thread.start(); threads.add(thread); } diff --git a/src/explorviz/hpc_monitoring/reader/TCPReaderThread.java b/src/explorviz/hpc_monitoring/reader/TCPReaderThread.java index 164c0006173ee9577b1fb567db9c21076dfc8663..d60498e76432f741194820dbb47888e695081dba 100644 --- a/src/explorviz/hpc_monitoring/reader/TCPReaderThread.java +++ b/src/explorviz/hpc_monitoring/reader/TCPReaderThread.java @@ -15,6 +15,7 @@ import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter; import explorviz.hpc_monitoring.record.HostApplicationMetaData; import explorviz.hpc_monitoring.record.IRecord; import explorviz.hpc_monitoring.record.StringRegistryRecord; +import explorviz.hpc_monitoring.record.TimedPeriodRecord; import explorviz.hpc_monitoring.record.event.normal.AfterFailedOperationEvent; import explorviz.hpc_monitoring.record.event.normal.AfterOperationEvent; import explorviz.hpc_monitoring.record.event.normal.BeforeOperationEvent; @@ -60,9 +61,16 @@ public class TCPReaderThread extends Thread implements IPeriodicTimeSignalReceiv @Override public void periodicTimeSignal(final long timestamp) { - synchronized (this) { // TODO remove - flushOutputBuffer(); - } + // TODO flush out buffer! + final IRecord[] buffer = new IRecord[1]; + buffer[0] = new TimedPeriodRecord(); + + final long hiseq = ringBuffer.next(); + final RecordArrayEvent valueEvent = ringBuffer.get(hiseq); + valueEvent.setValues(buffer); + valueEvent.setValuesLength(1); + valueEvent.setMetadata(hostApplicationMetadata); + ringBuffer.publish(hiseq); } private final void messagesfromByteArray(final ByteBuffer buffer) { @@ -258,12 +266,10 @@ public class TCPReaderThread extends Thread implements IPeriodicTimeSignalReceiv private final void putInRingBuffer(final IRecord message) { counter.inputObjects(message); - // synchronized (this) { // TODO remove - // outputBuffer[outputBufferIndex++] = message; - // if (outputBufferIndex == OUTPUT_MESSAGE_BUFFER_SIZE) { - // flushOutputBuffer(); - // } - // } + outputBuffer[outputBufferIndex++] = message; + if (outputBufferIndex == OUTPUT_MESSAGE_BUFFER_SIZE) { + flushOutputBuffer(); + } } private void flushOutputBuffer() {