diff --git a/Start Master.launch b/Start Master.launch new file mode 100644 index 0000000000000000000000000000000000000000..803bd3b4486e8ea4853f474e0ca60b1012026170 --- /dev/null +++ b/Start Master.launch @@ -0,0 +1,15 @@ +<?xml version="1.0" encoding="UTF-8" standalone="no"?> +<launchConfiguration type="org.eclipse.jdt.launching.localJavaApplication"> +<listAttribute key="org.eclipse.debug.core.MAPPED_RESOURCE_PATHS"> +<listEntry value="/worker/src/explorviz/live_trace_processing/main/WorkerStarter.java"/> +</listAttribute> +<listAttribute key="org.eclipse.debug.core.MAPPED_RESOURCE_TYPES"> +<listEntry value="1"/> +</listAttribute> +<listAttribute key="org.eclipse.debug.ui.favoriteGroups"> +<listEntry value="org.eclipse.debug.ui.launchGroup.run"/> +</listAttribute> +<stringAttribute key="org.eclipse.jdt.launching.MAIN_TYPE" value="explorviz.live_trace_processing.main.WorkerStarter"/> +<stringAttribute key="org.eclipse.jdt.launching.PROJECT_ATTR" value="worker"/> +<stringAttribute key="org.eclipse.jdt.launching.VM_ARGUMENTS" value="-Xmx4G -Dexplorviz.live_trace_processing.worker_enabled=false -Dexplorviz.live_trace_processing.reader_listening_port=10134"/> +</launchConfiguration> diff --git a/Start Worker.launch b/Start Worker.launch index 5c010e932dfd6cf67e5e469de16fcba3ed8d28e0..83a9afc5991de17ac595be173ac7fc3a6743cfe2 100644 --- a/Start Worker.launch +++ b/Start Worker.launch @@ -11,5 +11,5 @@ </listAttribute> <stringAttribute key="org.eclipse.jdt.launching.MAIN_TYPE" value="explorviz.live_trace_processing.main.WorkerStarter"/> <stringAttribute key="org.eclipse.jdt.launching.PROJECT_ATTR" value="worker"/> -<stringAttribute key="org.eclipse.jdt.launching.VM_ARGUMENTS" value="-Xmx4G"/> +<stringAttribute key="org.eclipse.jdt.launching.VM_ARGUMENTS" value="-Xmx4G -Dexplorviz.live_trace_processing.worker_enabled=true -Dexplorviz.live_trace_processing.reader_listening_port=10133"/> </launchConfiguration> diff --git a/src/META-INF/explorviz.live_trace_processing.default.properties b/src/META-INF/explorviz.live_trace_processing.default.properties index 4ec30dc25880b7b4b97f0e083fd69e6ac304f201..1e91a53bbd6dc152d924515bccec186dc564d01e 100644 --- a/src/META-INF/explorviz.live_trace_processing.default.properties +++ b/src/META-INF/explorviz.live_trace_processing.default.properties @@ -1,7 +1,9 @@ -explorviz.live_trace_processing.worker_enabled=false +explorviz.live_trace_processing.worker_enabled=true + +explorviz.live_trace_processing.reader_listening_port=10133 explorviz.live_trace_processing.writer_target_ip=127.0.0.1 -explorviz.live_trace_processing.writer_target_port=10133 +explorviz.live_trace_processing.writer_target_port=10134 explorviz.live_trace_processing.writer_load_balancing_enabled=false explorviz.live_trace_processing.writer_load_balancing_ip=10.50.0.2 diff --git a/src/explorviz/live_trace_processing/connector/TCPConnector.java b/src/explorviz/live_trace_processing/connector/TCPConnector.java index 6830ec97b7c3668799c998a805f6bebd76ae445c..a568a8379df7c080e92646646efe67ed2f95f032 100644 --- a/src/explorviz/live_trace_processing/connector/TCPConnector.java +++ b/src/explorviz/live_trace_processing/connector/TCPConnector.java @@ -8,17 +8,27 @@ import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import explorviz.live_trace_processing.Constants; +import explorviz.live_trace_processing.StringRegistry; import explorviz.live_trace_processing.configuration.Configuration; import explorviz.live_trace_processing.filter.AbstractSink; import explorviz.live_trace_processing.record.IRecord; import explorviz.live_trace_processing.record.ISerilizableRecord; +import explorviz.live_trace_processing.record.misc.StringRegistryRecord; +import explorviz.live_trace_processing.record.misc.TerminateRecord; +import explorviz.live_trace_processing.record.misc.TimedPeriodRecord; +import explorviz.live_trace_processing.writer.IStringRecordSender; import explorviz.live_trace_processing.writer.IWriter; -public class TCPConnector extends AbstractSink implements IWriter { +public class TCPConnector extends AbstractSink implements IWriter, IStringRecordSender { private URL providerURL; private SocketChannel socketChannel; + private final StringRegistry stringRegistry = new StringRegistry(this); // TODO + // clear + // after + // disconnect? + private final ByteBuffer buffer = ByteBuffer .allocateDirect(Constants.MONITORING_MESSAGE_BUFFER_SIZE); @@ -53,7 +63,14 @@ public class TCPConnector extends AbstractSink implements IWriter { socketChannel = SocketChannel.open(new InetSocketAddress(getProviderURL().getHost(), getProviderURL().getPort())); - // StringRegistry.sendOutAllStringRegistryRecords(); TODO + stringRegistry.sendOutAllStringRegistryRecords(); + } + + @Override + public void sendOutStringRecord(final StringRegistryRecord record) { + final ByteBuffer stringBuffer = ByteBuffer.allocateDirect(record.getRecordSizeInBytes()); + record.putIntoByteBuffer(stringBuffer, stringRegistry); + send(stringBuffer); } @Override @@ -61,11 +78,15 @@ public class TCPConnector extends AbstractSink implements IWriter { if (record instanceof ISerilizableRecord) { final ISerilizableRecord serilizableRecord = (ISerilizableRecord) record; if (buffer.remaining() < serilizableRecord.getRecordSizeInBytes()) { - buffer.flip(); send(buffer); - buffer.clear(); } - serilizableRecord.putIntoByteBuffer(buffer, null); // TODO + serilizableRecord.putIntoByteBuffer(buffer, stringRegistry); + } else if (record instanceof TimedPeriodRecord) { + if (buffer.hasRemaining()) { + send(buffer); + } + } else if (record instanceof TerminateRecord) { + terminate(); } } @@ -78,9 +99,11 @@ public class TCPConnector extends AbstractSink implements IWriter { } try { + buffer.flip(); while (buffer.hasRemaining()) { socketChannel.write(buffer); } + buffer.clear(); doDisconnectIfNessecary(); } catch (final IOException e) { System.out.println("WARNING: Connection was closed - possible data loss"); @@ -111,4 +134,8 @@ public class TCPConnector extends AbstractSink implements IWriter { shouldDisconnect = true; } } + + private void terminate() { + // TODO + } } diff --git a/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionBuffer.java b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionBuffer.java index 13cd5d6dc901861e25087ad6bf68bb2a8848f914..58e247da5ec0c15733d9815b11838139c7e02bc7 100644 --- a/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionBuffer.java +++ b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionBuffer.java @@ -76,8 +76,8 @@ class TraceReconstructionBuffer { || damaged || !closeable); } - public final Trace toTrace() { - return new Trace(events, false); + public final Trace toTrace(final boolean valid) { + return new Trace(events, valid); } // private static final class AbstractOperationEventComperator implements diff --git a/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java index 6c39b3052ca46c6395693668d511be2995ee3eee..10d7338a03bfb427f14375ff97837044e33a4e66 100644 --- a/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java +++ b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java @@ -45,7 +45,7 @@ public final class TraceReconstructionFilter extends AbstractFilter { if (traceBuffer.isFinished()) { traceId2trace.remove(traceId); - sendOutValidTrace(traceBuffer.toTrace()); + sendOutValidTrace(traceBuffer.toTrace(true)); } } else if (record instanceof Trace) { final Trace trace = (Trace) record; @@ -57,8 +57,10 @@ public final class TraceReconstructionFilter extends AbstractFilter { } else if (record instanceof TimedPeriodRecord) { checkForTimeouts(TimeProvider.getCurrentTimestamp()); periodicFlush(record); + deliver(record); } else if (record instanceof TerminateRecord) { terminate(); + deliver(record); } else { deliver(record); } @@ -80,7 +82,7 @@ public final class TraceReconstructionFilter extends AbstractFilter { for (final Entry<Long, TraceReconstructionBuffer> entry : traceId2trace.entrySet()) { final TraceReconstructionBuffer traceBuffer = entry.getValue(); if (traceBuffer.getMaxLoggingTimestamp() <= traceTimeout) { - sendOutInvalidTrace(traceBuffer.toTrace()); + sendOutInvalidTrace(traceBuffer.toTrace(false)); traceIdsToRemove.add(entry.getKey()); } } @@ -101,7 +103,7 @@ public final class TraceReconstructionFilter extends AbstractFilter { private void terminate() { for (final TraceReconstructionBuffer entry : traceId2trace.values()) { - sendOutInvalidTrace(entry.toTrace()); + sendOutInvalidTrace(entry.toTrace(false)); } traceId2trace.clear(); } diff --git a/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationFilter.java b/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationFilter.java index ec81a0020caad89d7af61b1e01066f1050cf7fdb..8aec06ae8513f17756624de79c5752e306fbbadd 100644 --- a/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationFilter.java +++ b/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationFilter.java @@ -42,8 +42,10 @@ public class TracePatternSummarizationFilter extends AbstractFilter { } else if (record instanceof TimedPeriodRecord) { processTimeoutQueue(TimeProvider.getCurrentTimestamp()); periodicFlush(record); + deliver(record); } else if (record instanceof TerminateRecord) { terminate(); + deliver(record); } else { deliver(record); } diff --git a/src/explorviz/live_trace_processing/main/WorkerStarter.java b/src/explorviz/live_trace_processing/main/WorkerStarter.java index 7ee3f152ab298f3920509a7159e9f9792e26106a..cf8bbe0805567c0c2d04bbacf4db409a7cacdb93 100644 --- a/src/explorviz/live_trace_processing/main/WorkerStarter.java +++ b/src/explorviz/live_trace_processing/main/WorkerStarter.java @@ -24,9 +24,11 @@ public class WorkerStarter { configureLoadBalancerIfEnabled(configuration, tcpConnector); - new TCPReader(10133, tcpConnector).read(); + new TCPReader(configuration.getIntProperty(ConfigurationFactory.READER_LISTENING_PORT), + tcpConnector).read(); } else { // testing purpose - new TCPReader(10133, null).read(); + new TCPReader(configuration.getIntProperty(ConfigurationFactory.READER_LISTENING_PORT), + null).read(); } } diff --git a/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java b/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java index 9715b4fbfa26a443c28c07cea27194258efbc530..16f054846db27400fd9196dc3f941080f9354c21 100644 --- a/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java +++ b/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java @@ -5,16 +5,16 @@ import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.TreeMap; import java.util.concurrent.TimeUnit; import com.lmax.disruptor.RingBuffer; import explorviz.live_trace_processing.Constants; +import explorviz.live_trace_processing.StringRegistry; import explorviz.live_trace_processing.filter.RecordArrayEvent; import explorviz.live_trace_processing.filter.counting.CountingThroughputFilter; import explorviz.live_trace_processing.record.IRecord; +import explorviz.live_trace_processing.record.event.AbstractOperationEventRecord; import explorviz.live_trace_processing.record.event.HostApplicationMetaDataRecord; import explorviz.live_trace_processing.record.event.normal.AfterFailedOperationEventRecord; import explorviz.live_trace_processing.record.event.normal.AfterOperationEventRecord; @@ -23,11 +23,12 @@ import explorviz.live_trace_processing.record.misc.StringRegistryRecord; import explorviz.live_trace_processing.record.misc.SystemMonitoringRecord; import explorviz.live_trace_processing.record.misc.TimedPeriodRecord; import explorviz.live_trace_processing.record.trace.RuntimeStatisticInformation; +import explorviz.live_trace_processing.record.trace.Trace; public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalReceiver { private HostApplicationMetaDataRecord hostApplicationMetadata; - private final Map<Integer, String> stringRegistry = new TreeMap<Integer, String>(); + private final StringRegistry stringRegistry = new StringRegistry(null); private final List<byte[]> waitingForStringMessages = new ArrayList<byte[]>(1024); private static final CountingThroughputFilter counter = new CountingThroughputFilter( @@ -73,6 +74,7 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec buffer[0] = new TimedPeriodRecord(); valueEvent.setValues(buffer); + valueEvent.setValueSize(1); ringBuffer.publish(hiseq); } @@ -138,7 +140,9 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec buffer.get(stringByteArray); - addToRegistry(mapId, new String(stringByteArray)); + stringRegistry.putStringRecord(mapId, new String(stringByteArray)); + + checkWaitingMessages(); } else { buffer.position(buffer.position() - 9); buffer.compact(); @@ -156,9 +160,41 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec } break; } + case Trace.CLAZZ_ID: { + if (buffer.remaining() >= 9) { + final byte validByte = buffer.get(); + boolean valid = true; + if (validByte == (byte) 0) { + valid = false; + } + final int eventsLength = buffer.getInt(); + final int byteLength = buffer.getInt(); + if (buffer.remaining() >= byteLength) { + final List<AbstractOperationEventRecord> events = new ArrayList<AbstractOperationEventRecord>( + eventsLength); + for (int i = 0; i < eventsLength; i++) { + final AbstractOperationEventRecord eventRecord = AbstractOperationEventRecord + .createFromByteBuffer(buffer, stringRegistry); + events.add(eventRecord); + } + + putInRingBuffer(new Trace(events, valid)); + } else { + buffer.position(buffer.position() - 10); + buffer.compact(); + return; + } + } else { + buffer.position(buffer.position() - 1); + buffer.compact(); + return; + } + + break; + } default: { System.out.println("unknown class id " + clazzId + " at offset " - + (buffer.position() - 4)); + + (buffer.position() - 1)); buffer.clear(); return; } @@ -172,8 +208,8 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec final int hostnameId = buffer.getInt(); final int applicationId = buffer.getInt(); - final String hostname = getStringFromRegistry(hostnameId); - final String application = getStringFromRegistry(applicationId); + final String hostname = stringRegistry.getStringFromId(hostnameId); + final String application = stringRegistry.getStringFromId(applicationId); if ((hostname != null) && (application != null)) { hostApplicationMetadata = new HostApplicationMetaDataRecord(hostname, application); @@ -192,7 +228,7 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec final int orderIndex = buffer.getInt(); final int operationId = buffer.getInt(); - final String operation = getStringFromRegistry(operationId); + final String operation = stringRegistry.getStringFromId(operationId); if (operation != null) { putInRingBuffer(new BeforeOperationEventRecord(timestamp, traceId, orderIndex, @@ -213,8 +249,8 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec final int operationId = buffer.getInt(); final int causeId = buffer.getInt(); - final String operation = getStringFromRegistry(operationId); - final String cause = getStringFromRegistry(causeId); + final String operation = stringRegistry.getStringFromId(operationId); + final String cause = stringRegistry.getStringFromId(causeId); if ((operation != null) && (cause != null)) { putInRingBuffer(new AfterFailedOperationEventRecord(timestamp, traceId, orderIndex, @@ -235,7 +271,7 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec final int orderIndex = buffer.getInt(); final int operationId = buffer.getInt(); - final String operation = getStringFromRegistry(operationId); + final String operation = stringRegistry.getStringFromId(operationId); if (operation != null) { putInRingBuffer(new AfterOperationEventRecord(timestamp, traceId, orderIndex, operation, hostApplicationMetadata, new RuntimeStatisticInformation(timestamp))); @@ -252,10 +288,9 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec final double cpuUtil = buffer.getDouble(); final long usedRAM = buffer.getLong(); final long absoluteRAM = buffer.getLong(); - System.out.println(new SystemMonitoringRecord(cpuUtil, usedRAM, absoluteRAM, - hostApplicationMetadata).toString()); - // putInRingBuffer(new SystemMonitoringRecord(cpuUtil, usedRAM, - // absoluteRAM)); + + putInRingBuffer(new SystemMonitoringRecord(cpuUtil, usedRAM, absoluteRAM, + hostApplicationMetadata)); } private final void putInWaitingMessages(final byte[] message) { @@ -292,7 +327,7 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec } private final void putInRingBuffer(final IRecord message) { - counter.inputObjects(message); + counter.inputRecord(message); synchronized (this) { // TODO better solution outputBuffer[outputBufferIndex++] = message; @@ -307,8 +342,8 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec final long hiseq = ringBuffer.next(); final RecordArrayEvent valueEvent = ringBuffer.get(hiseq); final IRecord[] oldValues = valueEvent.getValues(); - valueEvent.setValues(outputBuffer); + valueEvent.setValueSize(outputBufferIndex); ringBuffer.publish(hiseq); outputBuffer = oldValues; @@ -317,13 +352,4 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec } } - private final void addToRegistry(final int key, final String value) { - stringRegistry.put(key, value); - - checkWaitingMessages(); - } - - private final String getStringFromRegistry(final int id) { - return stringRegistry.get(id); - } }