From f2b3eaa7e3f9e4954fa8b4704b3c36d1a97c3d10 Mon Sep 17 00:00:00 2001 From: Florian Fittkau <ffi@informatik.uni-kiel.de> Date: Sat, 12 Oct 2013 18:33:17 +0200 Subject: [PATCH] new processing archi --- .../TraceReconstructionFilter.java | 16 +++++-------- .../TracePatternSummarizationFilter.java | 6 +---- .../hpc_monitoring/reader/TCPReader.java | 8 +++---- .../reader/TCPReaderThread.java | 24 ++++++++++++------- 4 files changed, 25 insertions(+), 29 deletions(-) diff --git a/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java b/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java index e48986f..6029fe6 100644 --- a/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java +++ b/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java @@ -16,7 +16,6 @@ import explorviz.hpc_monitoring.disruptor.RecordArrayEvent; import explorviz.hpc_monitoring.disruptor.RecordEvent; import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter; import explorviz.hpc_monitoring.filter.reduction.TracePatternSummarizationFilter; -import explorviz.hpc_monitoring.reader.TimeProvider; import explorviz.hpc_monitoring.record.HostApplicationMetaData; import explorviz.hpc_monitoring.record.IRecord; import explorviz.hpc_monitoring.record.TerminateRecord; @@ -48,16 +47,11 @@ public final class TraceReconstructionFilter implements EventHandler<RecordArray @SuppressWarnings("unchecked") final EventHandler<RecordArrayEvent>[] eventHandlers = new EventHandler[1]; - eventHandlers[0] = new TracePatternSummarizationFilter(1 * 1000, endReceiver); + eventHandlers[0] = new TracePatternSummarizationFilter(1 * 1000 * 1000, endReceiver); disruptor.handleEventsWith(eventHandlers); ringBuffer = disruptor.start(); } - private void periodicTimeSignal(final long timestamp) { - checkForTimeouts(timestamp); - flushOutputBuffer(); - } - private void checkForTimeouts(final long timestamp) { final long traceTimeout = timestamp - maxTraceTimeout; final List<Long> traceIdsToRemove = new ArrayList<Long>(); @@ -76,11 +70,11 @@ public final class TraceReconstructionFilter implements EventHandler<RecordArray } private void sendOutValidTrace(final Trace trace) { - putInRingBuffer(trace); + // putInRingBuffer(trace); } private void sendOutInvalidTrace(final Trace trace) { - putInRingBuffer(trace); + // putInRingBuffer(trace); System.out.println("Invalid trace: " + trace.getTraceEvents()[0].getTraceId()); } @@ -127,7 +121,9 @@ public final class TraceReconstructionFilter implements EventHandler<RecordArray sendOutValidTrace(traceBuffer.toTrace()); } } else if (record instanceof TimedPeriodRecord) { - periodicTimeSignal(TimeProvider.getCurrentTimestamp()); + // checkForTimeouts(TimeProvider.getCurrentTimestamp()); TODO + outputBuffer[outputBufferIndex++] = record; + flushOutputBuffer(); } else if (record instanceof TerminateRecord) { terminate(); } diff --git a/src/explorviz/hpc_monitoring/filter/reduction/TracePatternSummarizationFilter.java b/src/explorviz/hpc_monitoring/filter/reduction/TracePatternSummarizationFilter.java index 9274e51..f410435 100644 --- a/src/explorviz/hpc_monitoring/filter/reduction/TracePatternSummarizationFilter.java +++ b/src/explorviz/hpc_monitoring/filter/reduction/TracePatternSummarizationFilter.java @@ -59,7 +59,7 @@ public class TracePatternSummarizationFilter implements EventHandler<RecordArray if (record instanceof Trace) { insertIntoBuffer((Trace) record); } else if (record instanceof TimedPeriodRecord) { - periodicTimeSignal(TimeProvider.getCurrentTimestamp()); + processTimeoutQueue(TimeProvider.getCurrentTimestamp()); } else if (record instanceof TerminateRecord) { terminate(); } @@ -75,10 +75,6 @@ public class TracePatternSummarizationFilter implements EventHandler<RecordArray traceAggregationBuffer.insertTrace(trace); } - private void periodicTimeSignal(final long timestamp) { - processTimeoutQueue(timestamp); - } - private void processTimeoutQueue(final long timestamp) { final long bufferTimeout = timestamp - maxCollectionDuration; final List<Trace> toRemove = new ArrayList<Trace>(); diff --git a/src/explorviz/hpc_monitoring/reader/TCPReader.java b/src/explorviz/hpc_monitoring/reader/TCPReader.java index 9289fe2..e64d8bb 100644 --- a/src/explorviz/hpc_monitoring/reader/TCPReader.java +++ b/src/explorviz/hpc_monitoring/reader/TCPReader.java @@ -26,8 +26,7 @@ public final class TCPReader { private final List<Thread> threads = new ArrayList<Thread>(); - public TCPReader(final int listeningPort, - final EventHandler<RecordEvent> endReceiver) { + public TCPReader(final int listeningPort, final EventHandler<RecordEvent> endReceiver) { this.listeningPort = listeningPort; final ExecutorService exec = Executors.newCachedThreadPool(); @@ -36,7 +35,7 @@ public final class TCPReader { @SuppressWarnings("unchecked") final EventHandler<RecordArrayEvent>[] eventHandlers = new EventHandler[1]; - eventHandlers[0] = new TraceReconstructionFilter(2 * 1000, endReceiver); + eventHandlers[0] = new TraceReconstructionFilter(2 * 1000 * 1000, endReceiver); disruptor.handleEventsWith(eventHandlers); ringBuffer = disruptor.start(); } @@ -45,8 +44,7 @@ public final class TCPReader { try { open(); while (active) { - final Thread thread = new TCPReaderThread( - serversocket.accept(), ringBuffer); + final Thread thread = new TCPReaderThread(serversocket.accept(), ringBuffer); thread.start(); threads.add(thread); } diff --git a/src/explorviz/hpc_monitoring/reader/TCPReaderThread.java b/src/explorviz/hpc_monitoring/reader/TCPReaderThread.java index 164c000..d60498e 100644 --- a/src/explorviz/hpc_monitoring/reader/TCPReaderThread.java +++ b/src/explorviz/hpc_monitoring/reader/TCPReaderThread.java @@ -15,6 +15,7 @@ import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter; import explorviz.hpc_monitoring.record.HostApplicationMetaData; import explorviz.hpc_monitoring.record.IRecord; import explorviz.hpc_monitoring.record.StringRegistryRecord; +import explorviz.hpc_monitoring.record.TimedPeriodRecord; import explorviz.hpc_monitoring.record.event.normal.AfterFailedOperationEvent; import explorviz.hpc_monitoring.record.event.normal.AfterOperationEvent; import explorviz.hpc_monitoring.record.event.normal.BeforeOperationEvent; @@ -60,9 +61,16 @@ public class TCPReaderThread extends Thread implements IPeriodicTimeSignalReceiv @Override public void periodicTimeSignal(final long timestamp) { - synchronized (this) { // TODO remove - flushOutputBuffer(); - } + // TODO flush out buffer! + final IRecord[] buffer = new IRecord[1]; + buffer[0] = new TimedPeriodRecord(); + + final long hiseq = ringBuffer.next(); + final RecordArrayEvent valueEvent = ringBuffer.get(hiseq); + valueEvent.setValues(buffer); + valueEvent.setValuesLength(1); + valueEvent.setMetadata(hostApplicationMetadata); + ringBuffer.publish(hiseq); } private final void messagesfromByteArray(final ByteBuffer buffer) { @@ -258,12 +266,10 @@ public class TCPReaderThread extends Thread implements IPeriodicTimeSignalReceiv private final void putInRingBuffer(final IRecord message) { counter.inputObjects(message); - // synchronized (this) { // TODO remove - // outputBuffer[outputBufferIndex++] = message; - // if (outputBufferIndex == OUTPUT_MESSAGE_BUFFER_SIZE) { - // flushOutputBuffer(); - // } - // } + outputBuffer[outputBufferIndex++] = message; + if (outputBufferIndex == OUTPUT_MESSAGE_BUFFER_SIZE) { + flushOutputBuffer(); + } } private void flushOutputBuffer() { -- GitLab