From bf7cf5b34390c2c625b9167315a3c1a660cd46c8 Mon Sep 17 00:00:00 2001 From: Florian Fittkau <ffi@informatik.uni-kiel.de> Date: Wed, 30 Oct 2013 23:13:48 +0100 Subject: [PATCH] major refactoring --- .classpath | 1 + .../connector/TCPConnector.java | 99 +++++++++++++++++++ .../TraceReconstructionBuffer.java} | 34 +++---- .../TraceReconstructionFilter.java | 48 ++++----- .../TracePatternSummarizationBuffer.java} | 14 +-- .../TracePatternSummarizationFilter.java | 34 +++---- .../reader/TCPReader.java | 25 +++-- .../reader/TCPReaderOneClient.java} | 78 ++++++++------- .../explorviz/worker/main/WorkerStarter.java | 2 +- 9 files changed, 218 insertions(+), 117 deletions(-) create mode 100644 src/explorviz/live_trace_processing/connector/TCPConnector.java rename src/explorviz/{hpc_monitoring/filter/reconstruction/TraceBuffer.java => live_trace_processing/filter/reconstruction/TraceReconstructionBuffer.java} (64%) rename src/explorviz/{hpc_monitoring => live_trace_processing}/filter/reconstruction/TraceReconstructionFilter.java (58%) rename src/explorviz/{hpc_monitoring/filter/reduction/TraceAggregationBuffer.java => live_trace_processing/filter/reduction/summarization/TracePatternSummarizationBuffer.java} (57%) rename src/explorviz/{hpc_monitoring/filter/reduction => live_trace_processing/filter/reduction/summarization}/TracePatternSummarizationFilter.java (59%) rename src/explorviz/{hpc_monitoring => live_trace_processing}/reader/TCPReader.java (72%) rename src/explorviz/{hpc_monitoring/reader/TCPReaderThread.java => live_trace_processing/reader/TCPReaderOneClient.java} (71%) rename {src => test}/explorviz/worker/main/WorkerStarter.java (77%) diff --git a/.classpath b/.classpath index 0a732d8..bd22750 100644 --- a/.classpath +++ b/.classpath @@ -1,6 +1,7 @@ <?xml version="1.0" encoding="UTF-8"?> <classpath> <classpathentry excluding="kieker/analysis/plugin/reader/mq/" kind="src" path="src"/> + <classpathentry kind="src" path="test"/> <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/> <classpathentry kind="lib" path="lib/disruptor-3.2.0.jar"> <attributes> diff --git a/src/explorviz/live_trace_processing/connector/TCPConnector.java b/src/explorviz/live_trace_processing/connector/TCPConnector.java new file mode 100644 index 0000000..dbe4bc9 --- /dev/null +++ b/src/explorviz/live_trace_processing/connector/TCPConnector.java @@ -0,0 +1,99 @@ +package explorviz.live_trace_processing.connector; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + +import explorviz.live_trace_processing.configuration.Configuration; +import explorviz.live_trace_processing.filter.AbstractSink; +import explorviz.live_trace_processing.record.IRecord; +import explorviz.live_trace_processing.record.misc.HostApplicationMetaDataRecord; +import explorviz.live_trace_processing.writer.IWriter; + +public class TCPConnector extends AbstractSink implements IWriter { + private URL providerURL; + + private SocketChannel socketChannel; + + private final Configuration configuration; + + public TCPConnector(final String hostname, final int port, final Configuration configuration) { + this.configuration = configuration; + try { + setProviderURL(new URL("http://" + hostname + ":" + port)); + } catch (final MalformedURLException e) { + e.printStackTrace(); + } + } + + protected TCPConnector(final Configuration configuration) { + this.configuration = configuration; + providerURL = null; + } + + @Override + public URL getProviderURL() { + return providerURL; + } + + @Override + public void setProviderURL(final URL providerURL) { + this.providerURL = providerURL; + } + + public void init() { + try { + connect(); + } catch (final IOException e) { + e.printStackTrace(); + } + } + + @Override + public void connect() throws IOException { + socketChannel = SocketChannel.open(new InetSocketAddress(getProviderURL().getHost(), + getProviderURL().getPort())); + + if (socketChannel.isConnected()) { + // socketChannel.write(StringRegistry.getAllStringRegistryRecords()); + } + } + + @Override + protected void processRecord(final IRecord record, + final HostApplicationMetaDataRecord hostApplicationMetaData) { + // send(); TODO + } + + private void send(final ByteBuffer buffer) { + if (socketChannel.isConnected()) { + try { + while (buffer.hasRemaining()) { + socketChannel.write(buffer); + } + } catch (final IOException e) { + System.out.println("WARNING: Connection was closed"); + // TODO reconnect + disconnect(); + } + } + } + + @Override + public final void disconnect() { + if (socketChannel.isConnected()) { + try { + socketChannel.close(); + } catch (final IOException e) { + e.printStackTrace(); + } + } + } + + public void cleanup() { + disconnect(); + } +} diff --git a/src/explorviz/hpc_monitoring/filter/reconstruction/TraceBuffer.java b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionBuffer.java similarity index 64% rename from src/explorviz/hpc_monitoring/filter/reconstruction/TraceBuffer.java rename to src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionBuffer.java index 0011336..a97ca03 100644 --- a/src/explorviz/hpc_monitoring/filter/reconstruction/TraceBuffer.java +++ b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionBuffer.java @@ -1,23 +1,23 @@ -package explorviz.hpc_monitoring.filter.reconstruction; +package explorviz.live_trace_processing.filter.reconstruction; import java.util.ArrayList; import java.util.List; -import explorviz.hpc_monitoring.record.HostApplicationMetaData; -import explorviz.hpc_monitoring.record.event.AbstractOperationEvent; -import explorviz.hpc_monitoring.record.event.normal.AfterFailedOperationEvent; -import explorviz.hpc_monitoring.record.event.normal.AfterOperationEvent; -import explorviz.hpc_monitoring.record.event.normal.BeforeOperationEvent; -import explorviz.hpc_monitoring.record.trace.Trace; +import explorviz.live_trace_processing.record.event.AbstractOperationEventRecord; +import explorviz.live_trace_processing.record.event.normal.AfterFailedOperationEventRecord; +import explorviz.live_trace_processing.record.event.normal.AfterOperationEventRecord; +import explorviz.live_trace_processing.record.event.normal.BeforeOperationEventRecord; +import explorviz.live_trace_processing.record.misc.HostApplicationMetaDataRecord; +import explorviz.live_trace_processing.record.trace.Trace; -public class TraceBuffer { +class TraceReconstructionBuffer { // private static final Comparator<AbstractOperationEvent> COMPARATOR = new // AbstractOperationEventComperator(); private static final int INITIAL_EVENT_CAPACITY = 100; - private HostApplicationMetaData traceMetadata; - private final List<AbstractOperationEvent> events = new ArrayList<AbstractOperationEvent>( + private HostApplicationMetaDataRecord traceMetadata; + private final List<AbstractOperationEventRecord> events = new ArrayList<AbstractOperationEventRecord>( INITIAL_EVENT_CAPACITY); private boolean closeable; @@ -28,18 +28,18 @@ public class TraceBuffer { private long maxLoggingTimestamp = -1; private int maxOrderIndex = -1; - public final void insertEvent(final AbstractOperationEvent event) { + public final void insertEvent(final AbstractOperationEventRecord event) { setMaxLoggingTimestamp(event); final int orderIndex = setMaxOrderIndex(event); - if (event instanceof BeforeOperationEvent) { + if (event instanceof BeforeOperationEventRecord) { if (orderIndex == 0) { closeable = true; } openEvents++; - } else if (event instanceof AfterOperationEvent) { + } else if (event instanceof AfterOperationEventRecord) { openEvents--; - } else if (event instanceof AfterFailedOperationEvent) { + } else if (event instanceof AfterFailedOperationEventRecord) { openEvents--; } @@ -54,14 +54,14 @@ public class TraceBuffer { return maxLoggingTimestamp; } - private final void setMaxLoggingTimestamp(final AbstractOperationEvent event) { + private final void setMaxLoggingTimestamp(final AbstractOperationEventRecord event) { final long loggingTimestamp = event.getLoggingTimestamp(); if (loggingTimestamp > maxLoggingTimestamp) { maxLoggingTimestamp = loggingTimestamp; } } - private final int setMaxOrderIndex(final AbstractOperationEvent event) { + private final int setMaxOrderIndex(final AbstractOperationEventRecord event) { final int orderIndex = event.getOrderIndex(); if (orderIndex > maxOrderIndex) { maxOrderIndex = orderIndex; @@ -69,7 +69,7 @@ public class TraceBuffer { return orderIndex; } - public void setTrace(final HostApplicationMetaData trace) { + public void setTrace(final HostApplicationMetaDataRecord trace) { if (traceMetadata != null) { damaged = true; return; diff --git a/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java similarity index 58% rename from src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java rename to src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java index 54a8c72..1f5319b 100644 --- a/src/explorviz/hpc_monitoring/filter/reconstruction/TraceReconstructionFilter.java +++ b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java @@ -1,4 +1,4 @@ -package explorviz.hpc_monitoring.filter.reconstruction; +package explorviz.live_trace_processing.filter.reconstruction; import java.util.ArrayList; import java.util.List; @@ -7,16 +7,16 @@ import java.util.Map.Entry; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.TimeUnit; -import explorviz.hpc_monitoring.filter.AbstractFilter; -import explorviz.hpc_monitoring.filter.IReceiver; -import explorviz.hpc_monitoring.filter.reduction.TracePatternSummarizationFilter; -import explorviz.hpc_monitoring.reader.TimeProvider; -import explorviz.hpc_monitoring.record.HostApplicationMetaData; -import explorviz.hpc_monitoring.record.IRecord; -import explorviz.hpc_monitoring.record.TerminateRecord; -import explorviz.hpc_monitoring.record.TimedPeriodRecord; -import explorviz.hpc_monitoring.record.event.AbstractOperationEvent; -import explorviz.hpc_monitoring.record.trace.Trace; +import explorviz.live_trace_processing.filter.AbstractFilter; +import explorviz.live_trace_processing.filter.IPipeReceiver; +import explorviz.live_trace_processing.filter.reduction.summarization.TracePatternSummarizationFilter; +import explorviz.live_trace_processing.reader.TimeProvider; +import explorviz.live_trace_processing.record.IRecord; +import explorviz.live_trace_processing.record.event.AbstractOperationEventRecord; +import explorviz.live_trace_processing.record.misc.HostApplicationMetaDataRecord; +import explorviz.live_trace_processing.record.misc.TerminateRecord; +import explorviz.live_trace_processing.record.misc.TimedPeriodRecord; +import explorviz.live_trace_processing.record.trace.Trace; public final class TraceReconstructionFilter extends AbstractFilter { private static final int RINGBUFFER_LENGTH = 32; @@ -24,9 +24,9 @@ public final class TraceReconstructionFilter extends AbstractFilter { private final long maxTraceTimeout; - private final Map<Long, TraceBuffer> traceId2trace = new ConcurrentSkipListMap<Long, TraceBuffer>(); + private final Map<Long, TraceReconstructionBuffer> traceId2trace = new ConcurrentSkipListMap<Long, TraceReconstructionBuffer>(); - public TraceReconstructionFilter(final long maxTraceTimeout, final IReceiver sinkReceiver) { + public TraceReconstructionFilter(final long maxTraceTimeout, final IPipeReceiver sinkReceiver) { super( new TracePatternSummarizationFilter(TimeUnit.MILLISECONDS.toNanos(990), sinkReceiver), RINGBUFFER_LENGTH, OUTPUT_BATCH_SIZE, @@ -36,12 +36,12 @@ public final class TraceReconstructionFilter extends AbstractFilter { @Override public void processRecord(final IRecord record, - final HostApplicationMetaData hostApplicationMetaData) { - if (record instanceof AbstractOperationEvent) { - final AbstractOperationEvent abstractOperationEvent = ((AbstractOperationEvent) record); + final HostApplicationMetaDataRecord hostApplicationMetaData) { + if (record instanceof AbstractOperationEventRecord) { + final AbstractOperationEventRecord abstractOperationEvent = ((AbstractOperationEventRecord) record); final long traceId = abstractOperationEvent.getTraceId(); - final TraceBuffer traceBuffer = getBufferForTraceId( + final TraceReconstructionBuffer traceBuffer = getBufferForTraceId( abstractOperationEvent.getTraceId(), hostApplicationMetaData); traceBuffer.insertEvent(abstractOperationEvent); @@ -66,11 +66,11 @@ public final class TraceReconstructionFilter extends AbstractFilter { } } - private TraceBuffer getBufferForTraceId(final long traceId, - final HostApplicationMetaData metadata) { - TraceBuffer traceBuffer = traceId2trace.get(traceId); + private TraceReconstructionBuffer getBufferForTraceId(final long traceId, + final HostApplicationMetaDataRecord metadata) { + TraceReconstructionBuffer traceBuffer = traceId2trace.get(traceId); if (traceBuffer == null) { - traceBuffer = new TraceBuffer(); + traceBuffer = new TraceReconstructionBuffer(); traceBuffer.setTrace(metadata); traceId2trace.put(traceId, traceBuffer); } @@ -81,8 +81,8 @@ public final class TraceReconstructionFilter extends AbstractFilter { final long traceTimeout = timestamp - maxTraceTimeout; final List<Long> traceIdsToRemove = new ArrayList<Long>(); - for (final Entry<Long, TraceBuffer> entry : traceId2trace.entrySet()) { - final TraceBuffer traceBuffer = entry.getValue(); + for (final Entry<Long, TraceReconstructionBuffer> entry : traceId2trace.entrySet()) { + final TraceReconstructionBuffer traceBuffer = entry.getValue(); if (traceBuffer.getMaxLoggingTimestamp() <= traceTimeout) { sendOutInvalidTrace(traceBuffer.toTrace()); traceIdsToRemove.add(entry.getKey()); @@ -104,7 +104,7 @@ public final class TraceReconstructionFilter extends AbstractFilter { } private void terminate() { - for (final TraceBuffer entry : traceId2trace.values()) { + for (final TraceReconstructionBuffer entry : traceId2trace.values()) { sendOutInvalidTrace(entry.toTrace()); } traceId2trace.clear(); diff --git a/src/explorviz/hpc_monitoring/filter/reduction/TraceAggregationBuffer.java b/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationBuffer.java similarity index 57% rename from src/explorviz/hpc_monitoring/filter/reduction/TraceAggregationBuffer.java rename to src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationBuffer.java index 6b90dde..477ead1 100644 --- a/src/explorviz/hpc_monitoring/filter/reduction/TraceAggregationBuffer.java +++ b/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationBuffer.java @@ -1,15 +1,15 @@ -package explorviz.hpc_monitoring.filter.reduction; +package explorviz.live_trace_processing.filter.reduction.summarization; import java.util.List; -import explorviz.hpc_monitoring.record.event.AbstractOperationEvent; -import explorviz.hpc_monitoring.record.trace.Trace; +import explorviz.live_trace_processing.record.event.AbstractOperationEventRecord; +import explorviz.live_trace_processing.record.trace.Trace; -public class TraceAggregationBuffer { +class TracePatternSummarizationBuffer { private Trace accumulator; private final long bufferCreatedTimestamp; - public TraceAggregationBuffer(final long bufferCreatedTimestamp) { + public TracePatternSummarizationBuffer(final long bufferCreatedTimestamp) { this.bufferCreatedTimestamp = bufferCreatedTimestamp; } @@ -29,8 +29,8 @@ public class TraceAggregationBuffer { if (accumulator == null) { accumulator = trace; } else { - final List<AbstractOperationEvent> aggregatedRecords = accumulator.getTraceEvents(); - final List<AbstractOperationEvent> records = trace.getTraceEvents(); + final List<AbstractOperationEventRecord> aggregatedRecords = accumulator.getTraceEvents(); + final List<AbstractOperationEventRecord> records = trace.getTraceEvents(); for (int i = 0; i < aggregatedRecords.size(); i++) { aggregatedRecords.get(i).getRuntime().merge(records.get(i).getRuntime()); diff --git a/src/explorviz/hpc_monitoring/filter/reduction/TracePatternSummarizationFilter.java b/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationFilter.java similarity index 59% rename from src/explorviz/hpc_monitoring/filter/reduction/TracePatternSummarizationFilter.java rename to src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationFilter.java index f7989aa..9fd22e1 100644 --- a/src/explorviz/hpc_monitoring/filter/reduction/TracePatternSummarizationFilter.java +++ b/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationFilter.java @@ -1,19 +1,19 @@ -package explorviz.hpc_monitoring.filter.reduction; +package explorviz.live_trace_processing.filter.reduction.summarization; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentSkipListMap; -import explorviz.hpc_monitoring.filter.AbstractFilter; -import explorviz.hpc_monitoring.filter.IReceiver; -import explorviz.hpc_monitoring.reader.TimeProvider; -import explorviz.hpc_monitoring.record.HostApplicationMetaData; -import explorviz.hpc_monitoring.record.IRecord; -import explorviz.hpc_monitoring.record.TerminateRecord; -import explorviz.hpc_monitoring.record.TimedPeriodRecord; -import explorviz.hpc_monitoring.record.trace.Trace; -import explorviz.hpc_monitoring.record.trace.TraceComperator; +import explorviz.live_trace_processing.filter.AbstractFilter; +import explorviz.live_trace_processing.filter.IPipeReceiver; +import explorviz.live_trace_processing.reader.TimeProvider; +import explorviz.live_trace_processing.record.IRecord; +import explorviz.live_trace_processing.record.misc.HostApplicationMetaDataRecord; +import explorviz.live_trace_processing.record.misc.TerminateRecord; +import explorviz.live_trace_processing.record.misc.TimedPeriodRecord; +import explorviz.live_trace_processing.record.trace.Trace; +import explorviz.live_trace_processing.record.trace.TraceComperator; public class TracePatternSummarizationFilter extends AbstractFilter { private static final int RINGBUFFER_LENGTH = 256; @@ -21,11 +21,11 @@ public class TracePatternSummarizationFilter extends AbstractFilter { private final long maxCollectionDuration; - private final Map<Trace, TraceAggregationBuffer> trace2buffer = new ConcurrentSkipListMap<Trace, TraceAggregationBuffer>( + private final Map<Trace, TracePatternSummarizationBuffer> trace2buffer = new ConcurrentSkipListMap<Trace, TracePatternSummarizationBuffer>( new TraceComperator()); public TracePatternSummarizationFilter(final long maxCollectionDuration, - final IReceiver sinkReceiver) { + final IPipeReceiver sinkReceiver) { super(sinkReceiver, RINGBUFFER_LENGTH, OUTPUT_BATCH_SIZE, "Reduced traces / sec"); this.maxCollectionDuration = maxCollectionDuration; @@ -33,7 +33,7 @@ public class TracePatternSummarizationFilter extends AbstractFilter { @Override public void processRecord(final IRecord record, - final HostApplicationMetaData hostApplicationMetaData) { + final HostApplicationMetaDataRecord hostApplicationMetaData) { if (record instanceof Trace) { final Trace trace = (Trace) record; if (trace.isValid()) { @@ -52,9 +52,9 @@ public class TracePatternSummarizationFilter extends AbstractFilter { } private void insertIntoBuffer(final Trace trace) { - TraceAggregationBuffer traceAggregationBuffer = trace2buffer.get(trace); + TracePatternSummarizationBuffer traceAggregationBuffer = trace2buffer.get(trace); if (traceAggregationBuffer == null) { - traceAggregationBuffer = new TraceAggregationBuffer(TimeProvider.getCurrentTimestamp()); + traceAggregationBuffer = new TracePatternSummarizationBuffer(TimeProvider.getCurrentTimestamp()); trace2buffer.put(trace, traceAggregationBuffer); } traceAggregationBuffer.insertTrace(trace); @@ -63,7 +63,7 @@ public class TracePatternSummarizationFilter extends AbstractFilter { private void processTimeoutQueue(final long timestamp) { final long bufferTimeout = timestamp - maxCollectionDuration; final List<Trace> toRemove = new ArrayList<Trace>(); - for (final TraceAggregationBuffer traceBuffer : trace2buffer.values()) { + for (final TracePatternSummarizationBuffer traceBuffer : trace2buffer.values()) { if (traceBuffer.getBufferCreatedTimestamp() <= bufferTimeout) { final Trace aggregatedTrace = traceBuffer.getAggregatedTrace(); deliver(aggregatedTrace); @@ -76,7 +76,7 @@ public class TracePatternSummarizationFilter extends AbstractFilter { } private void terminate() { - for (final TraceAggregationBuffer traceBuffer : trace2buffer.values()) { + for (final TracePatternSummarizationBuffer traceBuffer : trace2buffer.values()) { deliver(traceBuffer.getAggregatedTrace()); } trace2buffer.clear(); diff --git a/src/explorviz/hpc_monitoring/reader/TCPReader.java b/src/explorviz/live_trace_processing/reader/TCPReader.java similarity index 72% rename from src/explorviz/hpc_monitoring/reader/TCPReader.java rename to src/explorviz/live_trace_processing/reader/TCPReader.java index 0459761..7199f97 100644 --- a/src/explorviz/hpc_monitoring/reader/TCPReader.java +++ b/src/explorviz/live_trace_processing/reader/TCPReader.java @@ -1,22 +1,21 @@ -package explorviz.hpc_monitoring.reader; +package explorviz.live_trace_processing.reader; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.ServerSocketChannel; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; -import explorviz.hpc_monitoring.disruptor.RecordArrayEvent; -import explorviz.hpc_monitoring.disruptor.RecordArrayEventFactory; -import explorviz.hpc_monitoring.filter.IReceiver; -import explorviz.hpc_monitoring.filter.reconstruction.TraceReconstructionFilter; +import explorviz.live_trace_processing.filter.AbstractFilter; +import explorviz.live_trace_processing.filter.IPipeReceiver; +import explorviz.live_trace_processing.filter.RecordArrayEvent; +import explorviz.live_trace_processing.filter.RecordArrayEventFactory; +import explorviz.live_trace_processing.filter.reconstruction.TraceReconstructionFilter; public final class TCPReader { static final int MESSAGE_BUFFER_SIZE = 131072; @@ -31,14 +30,14 @@ public final class TCPReader { private final RingBuffer<RecordArrayEvent> ringBuffer; - private final List<TCPReaderThread> threads = new ArrayList<TCPReaderThread>(); + private final List<TCPReaderOneClient> threads = new ArrayList<TCPReaderOneClient>(); - public TCPReader(final int listeningPort, final IReceiver endReceiver) { + public TCPReader(final int listeningPort, final IPipeReceiver endReceiver) { this.listeningPort = listeningPort; - final ExecutorService exec = Executors.newCachedThreadPool(); final Disruptor<RecordArrayEvent> disruptor = new Disruptor<RecordArrayEvent>( - new RecordArrayEventFactory(OUTPUT_MESSAGE_BUFFER_SIZE), RINGBUFFER_LENGTH, exec); + new RecordArrayEventFactory(OUTPUT_MESSAGE_BUFFER_SIZE), RINGBUFFER_LENGTH, + AbstractFilter.cachedThreadPool); @SuppressWarnings("unchecked") final EventHandler<RecordArrayEvent>[] eventHandlers = new EventHandler[1]; @@ -51,7 +50,7 @@ public final class TCPReader { try { open(); while (active) { - final TCPReaderThread thread = new TCPReaderThread(serversocket.accept(), + final TCPReaderOneClient thread = new TCPReaderOneClient(serversocket.accept(), ringBuffer); thread.start(); threads.add(thread); @@ -77,7 +76,7 @@ public final class TCPReader { public final void terminate(final boolean error) { System.out.println("Shutdown of TCPReader requested."); active = false; - for (final TCPReaderThread thread : threads) { + for (final TCPReaderOneClient thread : threads) { thread.terminate(); } } diff --git a/src/explorviz/hpc_monitoring/reader/TCPReaderThread.java b/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java similarity index 71% rename from src/explorviz/hpc_monitoring/reader/TCPReaderThread.java rename to src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java index 4396424..6dcf9db 100644 --- a/src/explorviz/hpc_monitoring/reader/TCPReaderThread.java +++ b/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java @@ -1,4 +1,4 @@ -package explorviz.hpc_monitoring.reader; +package explorviz.live_trace_processing.reader; import java.io.IOException; import java.nio.ByteBuffer; @@ -11,18 +11,20 @@ import java.util.concurrent.TimeUnit; import com.lmax.disruptor.RingBuffer; -import explorviz.hpc_monitoring.disruptor.RecordArrayEvent; -import explorviz.hpc_monitoring.filter.counting.CountingThroughputFilter; -import explorviz.hpc_monitoring.record.HostApplicationMetaData; -import explorviz.hpc_monitoring.record.IRecord; -import explorviz.hpc_monitoring.record.StringRegistryRecord; -import explorviz.hpc_monitoring.record.TimedPeriodRecord; -import explorviz.hpc_monitoring.record.event.normal.AfterFailedOperationEvent; -import explorviz.hpc_monitoring.record.event.normal.AfterOperationEvent; -import explorviz.hpc_monitoring.record.event.normal.BeforeOperationEvent; - -public class TCPReaderThread extends Thread implements IPeriodicTimeSignalReceiver { - private HostApplicationMetaData hostApplicationMetadata; +import explorviz.live_trace_processing.filter.RecordArrayEvent; +import explorviz.live_trace_processing.filter.counting.CountingThroughputFilter; +import explorviz.live_trace_processing.reader.IPeriodicTimeSignalReceiver; +import explorviz.live_trace_processing.reader.TimeSignalReader; +import explorviz.live_trace_processing.record.IRecord; +import explorviz.live_trace_processing.record.event.normal.AfterFailedOperationEventRecord; +import explorviz.live_trace_processing.record.event.normal.AfterOperationEventRecord; +import explorviz.live_trace_processing.record.event.normal.BeforeOperationEventRecord; +import explorviz.live_trace_processing.record.misc.HostApplicationMetaDataRecord; +import explorviz.live_trace_processing.record.misc.StringRegistryRecord; +import explorviz.live_trace_processing.record.misc.TimedPeriodRecord; + +public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalReceiver { + private HostApplicationMetaDataRecord hostApplicationMetadata; private final static Map<Integer, String> stringRegistry = new TreeMap<Integer, String>(); private final static List<byte[]> waitingForStringMessages = new ArrayList<byte[]>(1024); @@ -36,7 +38,7 @@ public class TCPReaderThread extends Thread implements IPeriodicTimeSignalReceiv private IRecord[] outputBuffer = new IRecord[TCPReader.OUTPUT_MESSAGE_BUFFER_SIZE]; private int outputBufferIndex = 0; - public TCPReaderThread(final SocketChannel socketChannel, + public TCPReaderOneClient(final SocketChannel socketChannel, final RingBuffer<RecordArrayEvent> ringBuffer) { this.socketChannel = socketChannel; this.ringBuffer = ringBuffer; @@ -77,8 +79,8 @@ public class TCPReaderThread extends Thread implements IPeriodicTimeSignalReceiv while (buffer.remaining() > 0) { final byte clazzId = buffer.get(); switch (clazzId) { - case HostApplicationMetaData.CLAZZ_ID: { - if (buffer.remaining() >= (HostApplicationMetaData.BYTE_LENGTH_WITH_CLAZZ_ID - 1)) { + case HostApplicationMetaDataRecord.CLAZZ_ID: { + if (buffer.remaining() >= (HostApplicationMetaDataRecord.BYTE_LENGTH_WITH_CLAZZ_ID - 1)) { readInTraceMetadata(buffer); } else { buffer.position(buffer.position() - 1); @@ -87,8 +89,8 @@ public class TCPReaderThread extends Thread implements IPeriodicTimeSignalReceiv } break; } - case BeforeOperationEvent.CLAZZ_ID: { - if (buffer.remaining() >= (BeforeOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID - 1)) { + case BeforeOperationEventRecord.CLAZZ_ID: { + if (buffer.remaining() >= (BeforeOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID - 1)) { readInBeforeOperationEvent(buffer); } else { buffer.position(buffer.position() - 1); @@ -97,8 +99,8 @@ public class TCPReaderThread extends Thread implements IPeriodicTimeSignalReceiv } break; } - case AfterFailedOperationEvent.CLAZZ_ID: { - if (buffer.remaining() >= (AfterFailedOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID - 1)) { + case AfterFailedOperationEventRecord.CLAZZ_ID: { + if (buffer.remaining() >= (AfterFailedOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID - 1)) { readInAfterFailedOperationEvent(buffer); } else { buffer.position(buffer.position() - 1); @@ -107,8 +109,8 @@ public class TCPReaderThread extends Thread implements IPeriodicTimeSignalReceiv } break; } - case AfterOperationEvent.CLAZZ_ID: { - if (buffer.remaining() >= (AfterOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID - 1)) { + case AfterOperationEventRecord.CLAZZ_ID: { + if (buffer.remaining() >= (AfterOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID - 1)) { readInAfterOperationEvent(buffer); } else { buffer.position(buffer.position() - 1); @@ -162,10 +164,10 @@ public class TCPReaderThread extends Thread implements IPeriodicTimeSignalReceiv final String application = getStringFromRegistry(applicationId); if ((hostname != null) && (application != null)) { - hostApplicationMetadata = new HostApplicationMetaData(hostname, application); + hostApplicationMetadata = new HostApplicationMetaDataRecord(hostname, application); } else { - final byte[] message = new byte[HostApplicationMetaData.BYTE_LENGTH_WITH_CLAZZ_ID]; - buffer.position(buffer.position() - HostApplicationMetaData.BYTE_LENGTH_WITH_CLAZZ_ID); + final byte[] message = new byte[HostApplicationMetaDataRecord.BYTE_LENGTH_WITH_CLAZZ_ID]; + buffer.position(buffer.position() - HostApplicationMetaDataRecord.BYTE_LENGTH_WITH_CLAZZ_ID); buffer.get(message); putInWaitingMessages(message); } @@ -180,11 +182,11 @@ public class TCPReaderThread extends Thread implements IPeriodicTimeSignalReceiv final String operation = getStringFromRegistry(operationId); if (operation != null) { - putInRingBuffer(new BeforeOperationEvent(timestamp, traceId, orderIndex, operationId, + putInRingBuffer(new BeforeOperationEventRecord(timestamp, traceId, orderIndex, operationId, operation)); } else { - final byte[] message = new byte[BeforeOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID]; - buffer.position(buffer.position() - BeforeOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID); + final byte[] message = new byte[BeforeOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID]; + buffer.position(buffer.position() - BeforeOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID); buffer.get(message); putInWaitingMessages(message); } @@ -201,11 +203,11 @@ public class TCPReaderThread extends Thread implements IPeriodicTimeSignalReceiv final String cause = getStringFromRegistry(causeId); if ((operation != null) && (cause != null)) { - putInRingBuffer(new AfterFailedOperationEvent(timestamp, traceId, orderIndex, + putInRingBuffer(new AfterFailedOperationEventRecord(timestamp, traceId, orderIndex, operationId, operation, cause)); } else { - final byte[] message = new byte[AfterFailedOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID]; - buffer.position(buffer.position() - AfterFailedOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID); + final byte[] message = new byte[AfterFailedOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID]; + buffer.position(buffer.position() - AfterFailedOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID); buffer.get(message); putInWaitingMessages(message); } @@ -219,11 +221,11 @@ public class TCPReaderThread extends Thread implements IPeriodicTimeSignalReceiv final String operation = getStringFromRegistry(operationId); if (operation != null) { - putInRingBuffer(new AfterOperationEvent(timestamp, traceId, orderIndex, operationId, + putInRingBuffer(new AfterOperationEventRecord(timestamp, traceId, orderIndex, operationId, operation)); } else { - final byte[] message = new byte[AfterOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID]; - buffer.position(buffer.position() - AfterOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID); + final byte[] message = new byte[AfterOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID]; + buffer.position(buffer.position() - AfterOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID); buffer.get(message); putInWaitingMessages(message); } @@ -245,16 +247,16 @@ public class TCPReaderThread extends Thread implements IPeriodicTimeSignalReceiv final ByteBuffer buffer = ByteBuffer.wrap(waitingMessage); final byte waitingMessageClazzId = buffer.get(); switch (waitingMessageClazzId) { - case HostApplicationMetaData.CLAZZ_ID: + case HostApplicationMetaDataRecord.CLAZZ_ID: readInTraceMetadata(buffer); break; - case BeforeOperationEvent.CLAZZ_ID: + case BeforeOperationEventRecord.CLAZZ_ID: readInBeforeOperationEvent(buffer); break; - case AfterFailedOperationEvent.CLAZZ_ID: + case AfterFailedOperationEventRecord.CLAZZ_ID: readInAfterFailedOperationEvent(buffer); break; - case AfterOperationEvent.CLAZZ_ID: + case AfterOperationEventRecord.CLAZZ_ID: readInAfterOperationEvent(buffer); break; default: diff --git a/src/explorviz/worker/main/WorkerStarter.java b/test/explorviz/worker/main/WorkerStarter.java similarity index 77% rename from src/explorviz/worker/main/WorkerStarter.java rename to test/explorviz/worker/main/WorkerStarter.java index 9834778..092fdf7 100644 --- a/src/explorviz/worker/main/WorkerStarter.java +++ b/test/explorviz/worker/main/WorkerStarter.java @@ -1,6 +1,6 @@ package explorviz.worker.main; -import explorviz.hpc_monitoring.reader.TCPReader; +import explorviz.live_trace_processing.reader.TCPReader; public class WorkerStarter { -- GitLab