Skip to content
Snippets Groups Projects
Commit a0d209fe authored by Florian Fittkau's avatar Florian Fittkau
Browse files

summarization now working

parent 8b3be3c1
No related branches found
No related tags found
No related merge requests found
......@@ -2,19 +2,25 @@ package explorviz.hpc_monitoring.filter.reconstruction;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter;
import explorviz.hpc_monitoring.filter.reduction.TracePatternSummarizationFilter;
import explorviz.hpc_monitoring.reader.IPeriodicTimeSignalReceiver;
import explorviz.hpc_monitoring.reader.RecordEvent;
import explorviz.hpc_monitoring.record.*;
import explorviz.hpc_monitoring.reader.TimeReader;
import explorviz.hpc_monitoring.record.IRecord;
import explorviz.hpc_monitoring.record.Trace;
import explorviz.hpc_monitoring.record.TraceMetadata;
import explorviz.hpc_monitoring.record.events.AbstractOperationEvent;
import gnu.trove.iterator.TLongObjectIterator;
import gnu.trove.map.hash.TLongObjectHashMap;
public final class TraceReconstructionFilter implements
EventHandler<RecordEvent> {
EventHandler<RecordEvent>, IPeriodicTimeSignalReceiver {
private static final CountingThroughputFilter counter = new CountingThroughputFilter(
"Reconstructed traces per second");
......@@ -25,7 +31,8 @@ public final class TraceReconstructionFilter implements
private final RingBuffer<RecordEvent> ringBuffer;
@SuppressWarnings("unchecked")
public TraceReconstructionFilter(final long maxTraceTimeout) {
public TraceReconstructionFilter(final long maxTraceTimeout,
final EventHandler<RecordEvent> endReceiver) {
this.maxTraceTimeout = maxTraceTimeout;
final ExecutorService exec = Executors.newCachedThreadPool();
......@@ -33,11 +40,15 @@ public final class TraceReconstructionFilter implements
RecordEvent.EVENT_FACTORY, 16384, exec);
final EventHandler<RecordEvent>[] eventHandlers = new EventHandler[1];
eventHandlers[0] = new TracePatternSummarizationFilter(5 * 1000 * 1000);
eventHandlers[0] = new TracePatternSummarizationFilter(5 * 1000 * 1000,
endReceiver);
disruptor.handleEventsWith(eventHandlers);
ringBuffer = disruptor.start();
new TimeReader(1 * 1000 * 1000, this).start();
}
@Override
public void periodicTimeSignal(final long timestamp) {
checkForTimeouts(timestamp);
}
......@@ -56,12 +67,12 @@ public final class TraceReconstructionFilter implements
private void sendOutValidTrace(final Trace trace) {
counter.inputObjects(trace);
// putInRingBuffer(trace);
putInRingBuffer(trace);
}
private void sendOutInvalidTrace(final Trace trace) {
counter.inputObjects(trace);
// putInRingBuffer(trace);
putInRingBuffer(trace); // TODO
}
private void putInRingBuffer(final IRecord record) {
......@@ -81,8 +92,7 @@ public final class TraceReconstructionFilter implements
final long traceId = traceMetadata.getTraceId();
final TraceBuffer traceBuffer = getBufferForTraceId(traceId);
traceBuffer.setTrace(traceMetadata);
}
else if (record instanceof AbstractOperationEvent) {
} else if (record instanceof AbstractOperationEvent) {
final AbstractOperationEvent abstractOperationEvent = ((AbstractOperationEvent) record);
final long traceId = abstractOperationEvent.getTraceId();
......
package explorviz.hpc_monitoring.filter.reduction;
import java.util.*;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter;
import explorviz.hpc_monitoring.reader.IPeriodicTimeSignalReceiver;
import explorviz.hpc_monitoring.reader.RecordEvent;
import explorviz.hpc_monitoring.reader.TimeReader;
import explorviz.hpc_monitoring.record.IRecord;
import explorviz.hpc_monitoring.record.Trace;
import explorviz.hpc_monitoring.record.events.AbstractOperationEvent;
public class TracePatternSummarizationFilter implements
EventHandler<RecordEvent> {
EventHandler<RecordEvent>, IPeriodicTimeSignalReceiver {
private final long maxCollectionDuration;
private final Map<Trace, TraceAggregationBuffer> trace2buffer = new TreeMap<Trace, TraceAggregationBuffer>(
new TraceComperator());
public TracePatternSummarizationFilter(final long maxCollectionDuration) {
private static final CountingThroughputFilter counter = new CountingThroughputFilter(
"Reduced trace results per second");
private final RingBuffer<RecordEvent> ringBuffer;
public TracePatternSummarizationFilter(final long maxCollectionDuration,
final EventHandler<RecordEvent> endReceiver) {
this.maxCollectionDuration = maxCollectionDuration;
final ExecutorService exec = Executors.newCachedThreadPool();
final Disruptor<RecordEvent> disruptor = new Disruptor<RecordEvent>(
RecordEvent.EVENT_FACTORY, 128, exec);
@SuppressWarnings("unchecked")
final EventHandler<RecordEvent>[] eventHandlers = new EventHandler[1];
eventHandlers[0] = endReceiver;
if (endReceiver != null) {
disruptor.handleEventsWith(eventHandlers);
}
ringBuffer = disruptor.start();
new TimeReader(1 * 1000 * 1000, this).start();
}
public void periodicTimeSignal(final Long timestamp) {
@Override
public void periodicTimeSignal(final long timestamp) {
processTimeoutQueue(timestamp);
}
......@@ -37,7 +71,15 @@ public class TracePatternSummarizationFilter implements
}
private void sendOutTrace(final Trace aggregatedTrace) {
// TODO
counter.inputObjects(aggregatedTrace);
putInRingBuffer(aggregatedTrace);
}
private void putInRingBuffer(final IRecord record) {
final long hiseq = ringBuffer.next();
final RecordEvent valueEvent = ringBuffer.get(hiseq);
valueEvent.setValue(record);
ringBuffer.publish(hiseq);
}
@Override
......
package explorviz.hpc_monitoring.reader;
public interface IPeriodicTimeSignalReceiver {
void periodicTimeSignal(long timestamp);
}
......@@ -2,15 +2,19 @@ package explorviz.hpc_monitoring.reader;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import explorviz.hpc_monitoring.byteaccess.UnsafeBits;
import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter;
import explorviz.hpc_monitoring.filter.reconstruction.TraceReconstructionFilter;
import explorviz.hpc_monitoring.record.IRecord;
import explorviz.hpc_monitoring.record.TraceMetadata;
import explorviz.hpc_monitoring.record.events.normal.*;
import explorviz.hpc_monitoring.record.events.normal.AfterFailedOperationEvent;
import explorviz.hpc_monitoring.record.events.normal.AfterOperationEvent;
import explorviz.hpc_monitoring.record.events.normal.BeforeOperationEvent;
import gnu.trove.map.hash.TIntObjectHashMap;
public class MessageDistributer implements EventHandler<ByteArrayEvent> {
......@@ -26,13 +30,14 @@ public class MessageDistributer implements EventHandler<ByteArrayEvent> {
private final RingBuffer<RecordEvent> ringBuffer;
@SuppressWarnings("unchecked")
public MessageDistributer() {
public MessageDistributer(final EventHandler<RecordEvent> endReceiver) {
final ExecutorService exec = Executors.newCachedThreadPool();
final Disruptor<RecordEvent> disruptor = new Disruptor<RecordEvent>(
RecordEvent.EVENT_FACTORY, 32768, exec);
final EventHandler<RecordEvent>[] eventHandlers = new EventHandler[1];
eventHandlers[0] = new TraceReconstructionFilter(5 * 1000 * 1000);
eventHandlers[0] = new TraceReconstructionFilter(5 * 1000 * 1000,
endReceiver);
disruptor.handleEventsWith(eventHandlers);
ringBuffer = disruptor.start();
}
......@@ -163,8 +168,7 @@ public class MessageDistributer implements EventHandler<ByteArrayEvent> {
offset += 4;
if ((readSize - offset) < stringLength) {
return createUnreadBytesArray(b, readSize, offset - 8,
true);
return createUnreadBytesArray(b, readSize, offset - 8, true);
}
final byte[] stringBytes = new byte[stringLength];
......@@ -224,8 +228,7 @@ public class MessageDistributer implements EventHandler<ByteArrayEvent> {
System.out.println("waiting for " + id);
this.wait();
}
}
catch (final InterruptedException e) {
} catch (final InterruptedException e) {
e.printStackTrace();
}
result = stringRegistry.get(id);
......
......@@ -6,6 +6,7 @@ import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
......@@ -20,7 +21,9 @@ public final class TCPReader {
private final RingBuffer<ByteArrayEvent> ringBuffer;
public TCPReader(final int listeningPort) throws IllegalArgumentException {
public TCPReader(final int listeningPort,
final EventHandler<RecordEvent> endReceiver)
throws IllegalArgumentException {
this.listeningPort = listeningPort;
final ExecutorService exec = Executors.newCachedThreadPool();
......@@ -29,7 +32,7 @@ public final class TCPReader {
@SuppressWarnings("unchecked")
final EventHandler<ByteArrayEvent>[] eventHandlers = new EventHandler[1];
eventHandlers[0] = new MessageDistributer();
eventHandlers[0] = new MessageDistributer(endReceiver);
disruptor.handleEventsWith(eventHandlers);
ringBuffer = disruptor.start();
}
......@@ -52,15 +55,12 @@ public final class TCPReader {
socket.close();
}
}
catch (final IOException ex) {
} catch (final IOException ex) {
System.out.println("Error in read() " + ex.toString());
}
finally {
} finally {
try {
serversocket.close();
}
catch (final IOException e) {
} catch (final IOException e) {
System.out.println("Error in read()" + e.toString());
}
}
......@@ -68,6 +68,7 @@ public final class TCPReader {
private void open() throws IOException {
serversocket = new ServerSocket(listeningPort);
System.out.println("listening on port " + listeningPort);
}
private void putInRingBuffer(final byte[] messages, final int readBytes) {
......
package explorviz.hpc_monitoring.reader;
import java.util.concurrent.*;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public final class TimeReader {
private final long period;
private boolean terminated = false;
private final ScheduledExecutorService executorService;
private ScheduledFuture<?> result;
public TimeReader(final long periodInNanoSec) {
private final IPeriodicTimeSignalReceiver receiver;
public TimeReader(final long periodInNanoSec,
final IPeriodicTimeSignalReceiver receiver) {
period = periodInNanoSec;
this.receiver = receiver;
executorService = new ScheduledThreadPoolExecutor(1);
}
public void read() {
result = executorService.scheduleAtFixedRate(new Runnable() {
public void start() {
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
sendTimestampEvent();
}
}, 0, period, TimeUnit.NANOSECONDS);
try {
result.get();
}
catch (final ExecutionException ex) {
terminate();
}
catch (final InterruptedException ignore) {}
catch (final CancellationException ignore) {}
terminate();
}
protected void sendTimestampEvent() {
// final long timestamp = System.nanoTime();
// super.deliver(OUTPUT_PORT_NAME_TIMESTAMPS, timestamp);
}
public void terminate() {
if (!terminated) {
executorService.shutdown();
try {
terminated = executorService.awaitTermination(5,
TimeUnit.SECONDS);
}
catch (final InterruptedException ex) {}
if (!terminated) {
result.cancel(true);
}
}
final long timestamp = System.nanoTime();
receiver.periodicTimeSignal(timestamp);
}
}
......@@ -5,7 +5,7 @@ import explorviz.hpc_monitoring.reader.TCPReader;
public class WorkerController {
public void start() {
final TCPReader tcpReader = new TCPReader(10133);
final TCPReader tcpReader = new TCPReader(10133, null);
tcpReader.read();
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment