Skip to content
Snippets Groups Projects
Commit 9ba536b0 authored by Florian Fittkau's avatar Florian Fittkau
Browse files

monitored app -> worker -> master chain now working :)

parent 8cd77fd3
No related branches found
No related tags found
No related merge requests found
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<launchConfiguration type="org.eclipse.jdt.launching.localJavaApplication">
<listAttribute key="org.eclipse.debug.core.MAPPED_RESOURCE_PATHS">
<listEntry value="/worker/src/explorviz/live_trace_processing/main/WorkerStarter.java"/>
</listAttribute>
<listAttribute key="org.eclipse.debug.core.MAPPED_RESOURCE_TYPES">
<listEntry value="1"/>
</listAttribute>
<listAttribute key="org.eclipse.debug.ui.favoriteGroups">
<listEntry value="org.eclipse.debug.ui.launchGroup.run"/>
</listAttribute>
<stringAttribute key="org.eclipse.jdt.launching.MAIN_TYPE" value="explorviz.live_trace_processing.main.WorkerStarter"/>
<stringAttribute key="org.eclipse.jdt.launching.PROJECT_ATTR" value="worker"/>
<stringAttribute key="org.eclipse.jdt.launching.VM_ARGUMENTS" value="-Xmx4G -Dexplorviz.live_trace_processing.worker_enabled=false -Dexplorviz.live_trace_processing.reader_listening_port=10134"/>
</launchConfiguration>
...@@ -11,5 +11,5 @@ ...@@ -11,5 +11,5 @@
</listAttribute> </listAttribute>
<stringAttribute key="org.eclipse.jdt.launching.MAIN_TYPE" value="explorviz.live_trace_processing.main.WorkerStarter"/> <stringAttribute key="org.eclipse.jdt.launching.MAIN_TYPE" value="explorviz.live_trace_processing.main.WorkerStarter"/>
<stringAttribute key="org.eclipse.jdt.launching.PROJECT_ATTR" value="worker"/> <stringAttribute key="org.eclipse.jdt.launching.PROJECT_ATTR" value="worker"/>
<stringAttribute key="org.eclipse.jdt.launching.VM_ARGUMENTS" value="-Xmx4G"/> <stringAttribute key="org.eclipse.jdt.launching.VM_ARGUMENTS" value="-Xmx4G -Dexplorviz.live_trace_processing.worker_enabled=true -Dexplorviz.live_trace_processing.reader_listening_port=10133"/>
</launchConfiguration> </launchConfiguration>
explorviz.live_trace_processing.worker_enabled=false explorviz.live_trace_processing.worker_enabled=true
explorviz.live_trace_processing.reader_listening_port=10133
explorviz.live_trace_processing.writer_target_ip=127.0.0.1 explorviz.live_trace_processing.writer_target_ip=127.0.0.1
explorviz.live_trace_processing.writer_target_port=10133 explorviz.live_trace_processing.writer_target_port=10134
explorviz.live_trace_processing.writer_load_balancing_enabled=false explorviz.live_trace_processing.writer_load_balancing_enabled=false
explorviz.live_trace_processing.writer_load_balancing_ip=10.50.0.2 explorviz.live_trace_processing.writer_load_balancing_ip=10.50.0.2
......
...@@ -8,17 +8,27 @@ import java.nio.ByteBuffer; ...@@ -8,17 +8,27 @@ import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import explorviz.live_trace_processing.Constants; import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.StringRegistry;
import explorviz.live_trace_processing.configuration.Configuration; import explorviz.live_trace_processing.configuration.Configuration;
import explorviz.live_trace_processing.filter.AbstractSink; import explorviz.live_trace_processing.filter.AbstractSink;
import explorviz.live_trace_processing.record.IRecord; import explorviz.live_trace_processing.record.IRecord;
import explorviz.live_trace_processing.record.ISerilizableRecord; import explorviz.live_trace_processing.record.ISerilizableRecord;
import explorviz.live_trace_processing.record.misc.StringRegistryRecord;
import explorviz.live_trace_processing.record.misc.TerminateRecord;
import explorviz.live_trace_processing.record.misc.TimedPeriodRecord;
import explorviz.live_trace_processing.writer.IStringRecordSender;
import explorviz.live_trace_processing.writer.IWriter; import explorviz.live_trace_processing.writer.IWriter;
public class TCPConnector extends AbstractSink implements IWriter { public class TCPConnector extends AbstractSink implements IWriter, IStringRecordSender {
private URL providerURL; private URL providerURL;
private SocketChannel socketChannel; private SocketChannel socketChannel;
private final StringRegistry stringRegistry = new StringRegistry(this); // TODO
// clear
// after
// disconnect?
private final ByteBuffer buffer = ByteBuffer private final ByteBuffer buffer = ByteBuffer
.allocateDirect(Constants.MONITORING_MESSAGE_BUFFER_SIZE); .allocateDirect(Constants.MONITORING_MESSAGE_BUFFER_SIZE);
...@@ -53,7 +63,14 @@ public class TCPConnector extends AbstractSink implements IWriter { ...@@ -53,7 +63,14 @@ public class TCPConnector extends AbstractSink implements IWriter {
socketChannel = SocketChannel.open(new InetSocketAddress(getProviderURL().getHost(), socketChannel = SocketChannel.open(new InetSocketAddress(getProviderURL().getHost(),
getProviderURL().getPort())); getProviderURL().getPort()));
// StringRegistry.sendOutAllStringRegistryRecords(); TODO stringRegistry.sendOutAllStringRegistryRecords();
}
@Override
public void sendOutStringRecord(final StringRegistryRecord record) {
final ByteBuffer stringBuffer = ByteBuffer.allocateDirect(record.getRecordSizeInBytes());
record.putIntoByteBuffer(stringBuffer, stringRegistry);
send(stringBuffer);
} }
@Override @Override
...@@ -61,11 +78,15 @@ public class TCPConnector extends AbstractSink implements IWriter { ...@@ -61,11 +78,15 @@ public class TCPConnector extends AbstractSink implements IWriter {
if (record instanceof ISerilizableRecord) { if (record instanceof ISerilizableRecord) {
final ISerilizableRecord serilizableRecord = (ISerilizableRecord) record; final ISerilizableRecord serilizableRecord = (ISerilizableRecord) record;
if (buffer.remaining() < serilizableRecord.getRecordSizeInBytes()) { if (buffer.remaining() < serilizableRecord.getRecordSizeInBytes()) {
buffer.flip();
send(buffer); send(buffer);
buffer.clear();
} }
serilizableRecord.putIntoByteBuffer(buffer, null); // TODO serilizableRecord.putIntoByteBuffer(buffer, stringRegistry);
} else if (record instanceof TimedPeriodRecord) {
if (buffer.hasRemaining()) {
send(buffer);
}
} else if (record instanceof TerminateRecord) {
terminate();
} }
} }
...@@ -78,9 +99,11 @@ public class TCPConnector extends AbstractSink implements IWriter { ...@@ -78,9 +99,11 @@ public class TCPConnector extends AbstractSink implements IWriter {
} }
try { try {
buffer.flip();
while (buffer.hasRemaining()) { while (buffer.hasRemaining()) {
socketChannel.write(buffer); socketChannel.write(buffer);
} }
buffer.clear();
doDisconnectIfNessecary(); doDisconnectIfNessecary();
} catch (final IOException e) { } catch (final IOException e) {
System.out.println("WARNING: Connection was closed - possible data loss"); System.out.println("WARNING: Connection was closed - possible data loss");
...@@ -111,4 +134,8 @@ public class TCPConnector extends AbstractSink implements IWriter { ...@@ -111,4 +134,8 @@ public class TCPConnector extends AbstractSink implements IWriter {
shouldDisconnect = true; shouldDisconnect = true;
} }
} }
private void terminate() {
// TODO
}
} }
...@@ -76,8 +76,8 @@ class TraceReconstructionBuffer { ...@@ -76,8 +76,8 @@ class TraceReconstructionBuffer {
|| damaged || !closeable); || damaged || !closeable);
} }
public final Trace toTrace() { public final Trace toTrace(final boolean valid) {
return new Trace(events, false); return new Trace(events, valid);
} }
// private static final class AbstractOperationEventComperator implements // private static final class AbstractOperationEventComperator implements
......
...@@ -45,7 +45,7 @@ public final class TraceReconstructionFilter extends AbstractFilter { ...@@ -45,7 +45,7 @@ public final class TraceReconstructionFilter extends AbstractFilter {
if (traceBuffer.isFinished()) { if (traceBuffer.isFinished()) {
traceId2trace.remove(traceId); traceId2trace.remove(traceId);
sendOutValidTrace(traceBuffer.toTrace()); sendOutValidTrace(traceBuffer.toTrace(true));
} }
} else if (record instanceof Trace) { } else if (record instanceof Trace) {
final Trace trace = (Trace) record; final Trace trace = (Trace) record;
...@@ -57,8 +57,10 @@ public final class TraceReconstructionFilter extends AbstractFilter { ...@@ -57,8 +57,10 @@ public final class TraceReconstructionFilter extends AbstractFilter {
} else if (record instanceof TimedPeriodRecord) { } else if (record instanceof TimedPeriodRecord) {
checkForTimeouts(TimeProvider.getCurrentTimestamp()); checkForTimeouts(TimeProvider.getCurrentTimestamp());
periodicFlush(record); periodicFlush(record);
deliver(record);
} else if (record instanceof TerminateRecord) { } else if (record instanceof TerminateRecord) {
terminate(); terminate();
deliver(record);
} else { } else {
deliver(record); deliver(record);
} }
...@@ -80,7 +82,7 @@ public final class TraceReconstructionFilter extends AbstractFilter { ...@@ -80,7 +82,7 @@ public final class TraceReconstructionFilter extends AbstractFilter {
for (final Entry<Long, TraceReconstructionBuffer> entry : traceId2trace.entrySet()) { for (final Entry<Long, TraceReconstructionBuffer> entry : traceId2trace.entrySet()) {
final TraceReconstructionBuffer traceBuffer = entry.getValue(); final TraceReconstructionBuffer traceBuffer = entry.getValue();
if (traceBuffer.getMaxLoggingTimestamp() <= traceTimeout) { if (traceBuffer.getMaxLoggingTimestamp() <= traceTimeout) {
sendOutInvalidTrace(traceBuffer.toTrace()); sendOutInvalidTrace(traceBuffer.toTrace(false));
traceIdsToRemove.add(entry.getKey()); traceIdsToRemove.add(entry.getKey());
} }
} }
...@@ -101,7 +103,7 @@ public final class TraceReconstructionFilter extends AbstractFilter { ...@@ -101,7 +103,7 @@ public final class TraceReconstructionFilter extends AbstractFilter {
private void terminate() { private void terminate() {
for (final TraceReconstructionBuffer entry : traceId2trace.values()) { for (final TraceReconstructionBuffer entry : traceId2trace.values()) {
sendOutInvalidTrace(entry.toTrace()); sendOutInvalidTrace(entry.toTrace(false));
} }
traceId2trace.clear(); traceId2trace.clear();
} }
......
...@@ -42,8 +42,10 @@ public class TracePatternSummarizationFilter extends AbstractFilter { ...@@ -42,8 +42,10 @@ public class TracePatternSummarizationFilter extends AbstractFilter {
} else if (record instanceof TimedPeriodRecord) { } else if (record instanceof TimedPeriodRecord) {
processTimeoutQueue(TimeProvider.getCurrentTimestamp()); processTimeoutQueue(TimeProvider.getCurrentTimestamp());
periodicFlush(record); periodicFlush(record);
deliver(record);
} else if (record instanceof TerminateRecord) { } else if (record instanceof TerminateRecord) {
terminate(); terminate();
deliver(record);
} else { } else {
deliver(record); deliver(record);
} }
......
...@@ -24,9 +24,11 @@ public class WorkerStarter { ...@@ -24,9 +24,11 @@ public class WorkerStarter {
configureLoadBalancerIfEnabled(configuration, tcpConnector); configureLoadBalancerIfEnabled(configuration, tcpConnector);
new TCPReader(10133, tcpConnector).read(); new TCPReader(configuration.getIntProperty(ConfigurationFactory.READER_LISTENING_PORT),
tcpConnector).read();
} else { // testing purpose } else { // testing purpose
new TCPReader(10133, null).read(); new TCPReader(configuration.getIntProperty(ConfigurationFactory.READER_LISTENING_PORT),
null).read();
} }
} }
......
...@@ -5,16 +5,16 @@ import java.nio.ByteBuffer; ...@@ -5,16 +5,16 @@ import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.RingBuffer;
import explorviz.live_trace_processing.Constants; import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.StringRegistry;
import explorviz.live_trace_processing.filter.RecordArrayEvent; import explorviz.live_trace_processing.filter.RecordArrayEvent;
import explorviz.live_trace_processing.filter.counting.CountingThroughputFilter; import explorviz.live_trace_processing.filter.counting.CountingThroughputFilter;
import explorviz.live_trace_processing.record.IRecord; import explorviz.live_trace_processing.record.IRecord;
import explorviz.live_trace_processing.record.event.AbstractOperationEventRecord;
import explorviz.live_trace_processing.record.event.HostApplicationMetaDataRecord; import explorviz.live_trace_processing.record.event.HostApplicationMetaDataRecord;
import explorviz.live_trace_processing.record.event.normal.AfterFailedOperationEventRecord; import explorviz.live_trace_processing.record.event.normal.AfterFailedOperationEventRecord;
import explorviz.live_trace_processing.record.event.normal.AfterOperationEventRecord; import explorviz.live_trace_processing.record.event.normal.AfterOperationEventRecord;
...@@ -23,11 +23,12 @@ import explorviz.live_trace_processing.record.misc.StringRegistryRecord; ...@@ -23,11 +23,12 @@ import explorviz.live_trace_processing.record.misc.StringRegistryRecord;
import explorviz.live_trace_processing.record.misc.SystemMonitoringRecord; import explorviz.live_trace_processing.record.misc.SystemMonitoringRecord;
import explorviz.live_trace_processing.record.misc.TimedPeriodRecord; import explorviz.live_trace_processing.record.misc.TimedPeriodRecord;
import explorviz.live_trace_processing.record.trace.RuntimeStatisticInformation; import explorviz.live_trace_processing.record.trace.RuntimeStatisticInformation;
import explorviz.live_trace_processing.record.trace.Trace;
public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalReceiver { public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalReceiver {
private HostApplicationMetaDataRecord hostApplicationMetadata; private HostApplicationMetaDataRecord hostApplicationMetadata;
private final Map<Integer, String> stringRegistry = new TreeMap<Integer, String>(); private final StringRegistry stringRegistry = new StringRegistry(null);
private final List<byte[]> waitingForStringMessages = new ArrayList<byte[]>(1024); private final List<byte[]> waitingForStringMessages = new ArrayList<byte[]>(1024);
private static final CountingThroughputFilter counter = new CountingThroughputFilter( private static final CountingThroughputFilter counter = new CountingThroughputFilter(
...@@ -73,6 +74,7 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec ...@@ -73,6 +74,7 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
buffer[0] = new TimedPeriodRecord(); buffer[0] = new TimedPeriodRecord();
valueEvent.setValues(buffer); valueEvent.setValues(buffer);
valueEvent.setValueSize(1);
ringBuffer.publish(hiseq); ringBuffer.publish(hiseq);
} }
...@@ -138,7 +140,9 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec ...@@ -138,7 +140,9 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
buffer.get(stringByteArray); buffer.get(stringByteArray);
addToRegistry(mapId, new String(stringByteArray)); stringRegistry.putStringRecord(mapId, new String(stringByteArray));
checkWaitingMessages();
} else { } else {
buffer.position(buffer.position() - 9); buffer.position(buffer.position() - 9);
buffer.compact(); buffer.compact();
...@@ -156,9 +160,41 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec ...@@ -156,9 +160,41 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
} }
break; break;
} }
case Trace.CLAZZ_ID: {
if (buffer.remaining() >= 9) {
final byte validByte = buffer.get();
boolean valid = true;
if (validByte == (byte) 0) {
valid = false;
}
final int eventsLength = buffer.getInt();
final int byteLength = buffer.getInt();
if (buffer.remaining() >= byteLength) {
final List<AbstractOperationEventRecord> events = new ArrayList<AbstractOperationEventRecord>(
eventsLength);
for (int i = 0; i < eventsLength; i++) {
final AbstractOperationEventRecord eventRecord = AbstractOperationEventRecord
.createFromByteBuffer(buffer, stringRegistry);
events.add(eventRecord);
}
putInRingBuffer(new Trace(events, valid));
} else {
buffer.position(buffer.position() - 10);
buffer.compact();
return;
}
} else {
buffer.position(buffer.position() - 1);
buffer.compact();
return;
}
break;
}
default: { default: {
System.out.println("unknown class id " + clazzId + " at offset " System.out.println("unknown class id " + clazzId + " at offset "
+ (buffer.position() - 4)); + (buffer.position() - 1));
buffer.clear(); buffer.clear();
return; return;
} }
...@@ -172,8 +208,8 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec ...@@ -172,8 +208,8 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
final int hostnameId = buffer.getInt(); final int hostnameId = buffer.getInt();
final int applicationId = buffer.getInt(); final int applicationId = buffer.getInt();
final String hostname = getStringFromRegistry(hostnameId); final String hostname = stringRegistry.getStringFromId(hostnameId);
final String application = getStringFromRegistry(applicationId); final String application = stringRegistry.getStringFromId(applicationId);
if ((hostname != null) && (application != null)) { if ((hostname != null) && (application != null)) {
hostApplicationMetadata = new HostApplicationMetaDataRecord(hostname, application); hostApplicationMetadata = new HostApplicationMetaDataRecord(hostname, application);
...@@ -192,7 +228,7 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec ...@@ -192,7 +228,7 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
final int orderIndex = buffer.getInt(); final int orderIndex = buffer.getInt();
final int operationId = buffer.getInt(); final int operationId = buffer.getInt();
final String operation = getStringFromRegistry(operationId); final String operation = stringRegistry.getStringFromId(operationId);
if (operation != null) { if (operation != null) {
putInRingBuffer(new BeforeOperationEventRecord(timestamp, traceId, orderIndex, putInRingBuffer(new BeforeOperationEventRecord(timestamp, traceId, orderIndex,
...@@ -213,8 +249,8 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec ...@@ -213,8 +249,8 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
final int operationId = buffer.getInt(); final int operationId = buffer.getInt();
final int causeId = buffer.getInt(); final int causeId = buffer.getInt();
final String operation = getStringFromRegistry(operationId); final String operation = stringRegistry.getStringFromId(operationId);
final String cause = getStringFromRegistry(causeId); final String cause = stringRegistry.getStringFromId(causeId);
if ((operation != null) && (cause != null)) { if ((operation != null) && (cause != null)) {
putInRingBuffer(new AfterFailedOperationEventRecord(timestamp, traceId, orderIndex, putInRingBuffer(new AfterFailedOperationEventRecord(timestamp, traceId, orderIndex,
...@@ -235,7 +271,7 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec ...@@ -235,7 +271,7 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
final int orderIndex = buffer.getInt(); final int orderIndex = buffer.getInt();
final int operationId = buffer.getInt(); final int operationId = buffer.getInt();
final String operation = getStringFromRegistry(operationId); final String operation = stringRegistry.getStringFromId(operationId);
if (operation != null) { if (operation != null) {
putInRingBuffer(new AfterOperationEventRecord(timestamp, traceId, orderIndex, putInRingBuffer(new AfterOperationEventRecord(timestamp, traceId, orderIndex,
operation, hostApplicationMetadata, new RuntimeStatisticInformation(timestamp))); operation, hostApplicationMetadata, new RuntimeStatisticInformation(timestamp)));
...@@ -252,10 +288,9 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec ...@@ -252,10 +288,9 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
final double cpuUtil = buffer.getDouble(); final double cpuUtil = buffer.getDouble();
final long usedRAM = buffer.getLong(); final long usedRAM = buffer.getLong();
final long absoluteRAM = buffer.getLong(); final long absoluteRAM = buffer.getLong();
System.out.println(new SystemMonitoringRecord(cpuUtil, usedRAM, absoluteRAM,
hostApplicationMetadata).toString()); putInRingBuffer(new SystemMonitoringRecord(cpuUtil, usedRAM, absoluteRAM,
// putInRingBuffer(new SystemMonitoringRecord(cpuUtil, usedRAM, hostApplicationMetadata));
// absoluteRAM));
} }
private final void putInWaitingMessages(final byte[] message) { private final void putInWaitingMessages(final byte[] message) {
...@@ -292,7 +327,7 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec ...@@ -292,7 +327,7 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
} }
private final void putInRingBuffer(final IRecord message) { private final void putInRingBuffer(final IRecord message) {
counter.inputObjects(message); counter.inputRecord(message);
synchronized (this) { // TODO better solution synchronized (this) { // TODO better solution
outputBuffer[outputBufferIndex++] = message; outputBuffer[outputBufferIndex++] = message;
...@@ -307,8 +342,8 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec ...@@ -307,8 +342,8 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
final long hiseq = ringBuffer.next(); final long hiseq = ringBuffer.next();
final RecordArrayEvent valueEvent = ringBuffer.get(hiseq); final RecordArrayEvent valueEvent = ringBuffer.get(hiseq);
final IRecord[] oldValues = valueEvent.getValues(); final IRecord[] oldValues = valueEvent.getValues();
valueEvent.setValues(outputBuffer); valueEvent.setValues(outputBuffer);
valueEvent.setValueSize(outputBufferIndex);
ringBuffer.publish(hiseq); ringBuffer.publish(hiseq);
outputBuffer = oldValues; outputBuffer = oldValues;
...@@ -317,13 +352,4 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec ...@@ -317,13 +352,4 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
} }
} }
private final void addToRegistry(final int key, final String value) {
stringRegistry.put(key, value);
checkWaitingMessages();
}
private final String getStringFromRegistry(final int id) {
return stringRegistry.get(id);
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment