Skip to content
Snippets Groups Projects
Commit 303de702 authored by Florian Fittkau's avatar Florian Fittkau
Browse files

ISerilizableRecords

parent fb92d7b1
No related branches found
No related tags found
No related merge requests found
explorviz.live_trace_processing.worker_enabled=false
explorviz.live_trace_processing.writer_target_ip=127.0.0.1 explorviz.live_trace_processing.writer_target_ip=127.0.0.1
explorviz.live_trace_processing.writer_target_port=10133 explorviz.live_trace_processing.writer_target_port=10133
......
...@@ -7,10 +7,11 @@ import java.net.URL; ...@@ -7,10 +7,11 @@ import java.net.URL;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.configuration.Configuration; import explorviz.live_trace_processing.configuration.Configuration;
import explorviz.live_trace_processing.filter.AbstractSink; import explorviz.live_trace_processing.filter.AbstractSink;
import explorviz.live_trace_processing.record.IRecord; import explorviz.live_trace_processing.record.IRecord;
import explorviz.live_trace_processing.record.misc.HostApplicationMetaDataRecord; import explorviz.live_trace_processing.record.ISerilizableRecord;
import explorviz.live_trace_processing.writer.IWriter; import explorviz.live_trace_processing.writer.IWriter;
public class TCPConnector extends AbstractSink implements IWriter { public class TCPConnector extends AbstractSink implements IWriter {
...@@ -18,12 +19,12 @@ public class TCPConnector extends AbstractSink implements IWriter { ...@@ -18,12 +19,12 @@ public class TCPConnector extends AbstractSink implements IWriter {
private SocketChannel socketChannel; private SocketChannel socketChannel;
private final Configuration configuration; private final ByteBuffer buffer = ByteBuffer
.allocateDirect(Constants.MONITORING_MESSAGE_BUFFER_SIZE);
private ByteBuffer byteBuffer; private volatile boolean shouldDisconnect = false;
public TCPConnector(final String hostname, final int port, final Configuration configuration) { public TCPConnector(final String hostname, final int port, final Configuration configuration) {
this.configuration = configuration;
try { try {
setProviderURL(new URL("http://" + hostname + ":" + port)); setProviderURL(new URL("http://" + hostname + ":" + port));
} catch (final MalformedURLException e) { } catch (final MalformedURLException e) {
...@@ -31,11 +32,6 @@ public class TCPConnector extends AbstractSink implements IWriter { ...@@ -31,11 +32,6 @@ public class TCPConnector extends AbstractSink implements IWriter {
} }
} }
protected TCPConnector(final Configuration configuration) {
this.configuration = configuration;
providerURL = null;
}
@Override @Override
public URL getProviderURL() { public URL getProviderURL() {
return providerURL; return providerURL;
...@@ -46,59 +42,73 @@ public class TCPConnector extends AbstractSink implements IWriter { ...@@ -46,59 +42,73 @@ public class TCPConnector extends AbstractSink implements IWriter {
this.providerURL = providerURL; this.providerURL = providerURL;
} }
public void init() {
try {
connect();
} catch (final IOException e) {
e.printStackTrace();
}
}
@Override @Override
public void connect() throws IOException { public void connect() throws IOException {
while (shouldDisconnect) {
try {
Thread.sleep(1);
} catch (final InterruptedException e) {
}
}
socketChannel = SocketChannel.open(new InetSocketAddress(getProviderURL().getHost(), socketChannel = SocketChannel.open(new InetSocketAddress(getProviderURL().getHost(),
getProviderURL().getPort())); getProviderURL().getPort()));
// StringRegistry.sendOutAllStringRegistryRecords(); TODO
if (socketChannel.isConnected()) {
// socketChannel.write(StringRegistry.getAllStringRegistryRecords());
}
} }
@Override @Override
protected void processRecord(final IRecord record, protected void processRecord(final IRecord record) {
final HostApplicationMetaDataRecord hostApplicationMetaData) { if (record instanceof ISerilizableRecord) {
if (record.getRecordSizeInBytes() > byteBuffer.remaining()) { final ISerilizableRecord serilizableRecord = (ISerilizableRecord) record;
send(byteBuffer); if (buffer.remaining() < serilizableRecord.getRecordSizeInBytes()) {
buffer.flip();
send(buffer);
buffer.clear();
}
serilizableRecord.putIntoByteBuffer(buffer, null); // TODO
} }
record.putIntoByteBuffer(byteBuffer);
} }
private void send(final ByteBuffer buffer) { private void send(final ByteBuffer buffer) {
if (socketChannel.isConnected()) { while ((socketChannel == null) || (!socketChannel.isConnected())) {
try { try {
while (buffer.hasRemaining()) { Thread.sleep(1);
socketChannel.write(buffer); } catch (final InterruptedException e) {
}
} catch (final IOException e) {
System.out.println("WARNING: Connection was closed");
// TODO reconnect
disconnect();
} }
} }
}
@Override try {
public final void disconnect() { while (buffer.hasRemaining()) {
if (socketChannel.isConnected()) { socketChannel.write(buffer);
}
doDisconnectIfNessecary();
} catch (final IOException e) {
System.out.println("WARNING: Connection was closed - possible data loss");
// TODO reconnect in non-load-balancing mode
try { try {
socketChannel.close(); socketChannel.close();
} catch (final IOException e) { } catch (final IOException e1) {
e.printStackTrace();
} }
} }
} }
public void cleanup() { private void doDisconnectIfNessecary() {
disconnect(); if (shouldDisconnect) {
if ((socketChannel != null) && socketChannel.isConnected()) {
try {
socketChannel.close();
} catch (final IOException e) {
e.printStackTrace();
}
}
shouldDisconnect = false;
}
}
@Override
public final void disconnect() {
if (socketChannel != null) {
shouldDisconnect = true;
}
} }
} }
...@@ -35,14 +35,14 @@ public final class TraceReconstructionFilter extends AbstractFilter { ...@@ -35,14 +35,14 @@ public final class TraceReconstructionFilter extends AbstractFilter {
} }
@Override @Override
public void processRecord(final IRecord record, public void processRecord(final IRecord record) {
final HostApplicationMetaDataRecord hostApplicationMetaData) {
if (record instanceof AbstractOperationEventRecord) { if (record instanceof AbstractOperationEventRecord) {
final AbstractOperationEventRecord abstractOperationEvent = ((AbstractOperationEventRecord) record); final AbstractOperationEventRecord abstractOperationEvent = ((AbstractOperationEventRecord) record);
final long traceId = abstractOperationEvent.getTraceId(); final long traceId = abstractOperationEvent.getTraceId();
final TraceReconstructionBuffer traceBuffer = getBufferForTraceId( final TraceReconstructionBuffer traceBuffer = getBufferForTraceId(
abstractOperationEvent.getTraceId(), hostApplicationMetaData); abstractOperationEvent.getTraceId(),
abstractOperationEvent.getHostApplicationMetadata());
traceBuffer.insertEvent(abstractOperationEvent); traceBuffer.insertEvent(abstractOperationEvent);
if (traceBuffer.isFinished()) { if (traceBuffer.isFinished()) {
......
...@@ -9,7 +9,6 @@ import explorviz.live_trace_processing.filter.AbstractFilter; ...@@ -9,7 +9,6 @@ import explorviz.live_trace_processing.filter.AbstractFilter;
import explorviz.live_trace_processing.filter.IPipeReceiver; import explorviz.live_trace_processing.filter.IPipeReceiver;
import explorviz.live_trace_processing.reader.TimeProvider; import explorviz.live_trace_processing.reader.TimeProvider;
import explorviz.live_trace_processing.record.IRecord; import explorviz.live_trace_processing.record.IRecord;
import explorviz.live_trace_processing.record.misc.HostApplicationMetaDataRecord;
import explorviz.live_trace_processing.record.misc.TerminateRecord; import explorviz.live_trace_processing.record.misc.TerminateRecord;
import explorviz.live_trace_processing.record.misc.TimedPeriodRecord; import explorviz.live_trace_processing.record.misc.TimedPeriodRecord;
import explorviz.live_trace_processing.record.trace.Trace; import explorviz.live_trace_processing.record.trace.Trace;
...@@ -32,8 +31,7 @@ public class TracePatternSummarizationFilter extends AbstractFilter { ...@@ -32,8 +31,7 @@ public class TracePatternSummarizationFilter extends AbstractFilter {
} }
@Override @Override
public void processRecord(final IRecord record, public void processRecord(final IRecord record) {
final HostApplicationMetaDataRecord hostApplicationMetaData) {
if (record instanceof Trace) { if (record instanceof Trace) {
final Trace trace = (Trace) record; final Trace trace = (Trace) record;
if (trace.isValid()) { if (trace.isValid()) {
...@@ -54,7 +52,8 @@ public class TracePatternSummarizationFilter extends AbstractFilter { ...@@ -54,7 +52,8 @@ public class TracePatternSummarizationFilter extends AbstractFilter {
private void insertIntoBuffer(final Trace trace) { private void insertIntoBuffer(final Trace trace) {
TracePatternSummarizationBuffer traceAggregationBuffer = trace2buffer.get(trace); TracePatternSummarizationBuffer traceAggregationBuffer = trace2buffer.get(trace);
if (traceAggregationBuffer == null) { if (traceAggregationBuffer == null) {
traceAggregationBuffer = new TracePatternSummarizationBuffer(TimeProvider.getCurrentTimestamp()); traceAggregationBuffer = new TracePatternSummarizationBuffer(
TimeProvider.getCurrentTimestamp());
trace2buffer.put(trace, traceAggregationBuffer); trace2buffer.put(trace, traceAggregationBuffer);
} }
traceAggregationBuffer.insertTrace(trace); traceAggregationBuffer.insertTrace(trace);
......
package explorviz.live_trace_processing.main; package explorviz.live_trace_processing.main;
import java.io.IOException;
import explorviz.live_trace_processing.configuration.Configuration;
import explorviz.live_trace_processing.configuration.ConfigurationFactory;
import explorviz.live_trace_processing.connector.TCPConnector;
import explorviz.live_trace_processing.reader.TCPReader; import explorviz.live_trace_processing.reader.TCPReader;
import explorviz.live_trace_processing.writer.load_balancer.LoadBalancer;
public class WorkerStarter { public class WorkerStarter {
public static void main(final String[] args) { public static void main(final String[] args) {
final TCPReader tcpReader = new TCPReader(10133, null); final Configuration configuration = ConfigurationFactory.createSingletonConfiguration();
tcpReader.read();
final boolean worker = configuration
.getBooleanProperty(ConfigurationFactory.WORKER_ENABLED);
if (worker) {
final TCPConnector tcpConnector = new TCPConnector(
configuration.getStringProperty(ConfigurationFactory.WRITER_TARGET_IP),
configuration.getIntProperty(ConfigurationFactory.WRITER_TARGET_PORT),
configuration);
configureLoadBalancerIfEnabled(configuration, tcpConnector);
new TCPReader(10133, tcpConnector).read();
} else { // testing purpose
new TCPReader(10133, null).read();
}
}
private static void configureLoadBalancerIfEnabled(final Configuration configuration,
final TCPConnector tcpConnector) {
final boolean loadBalancerEnabled = configuration
.getBooleanProperty(ConfigurationFactory.LOAD_BALANCER_ENABLED);
if (loadBalancerEnabled) {
new LoadBalancer(
configuration.getStringProperty(ConfigurationFactory.LOAD_BALANCER_IP),
configuration.getIntProperty(ConfigurationFactory.LOAD_BALANCER_PORT),
configuration.getIntProperty(ConfigurationFactory.LOAD_BALANCER_WAIT_TIME),
configuration
.getStringProperty(ConfigurationFactory.LOAD_BALANCER_SCALING_GROUP),
configuration, tcpConnector);
} else {
try {
tcpConnector.connect();
} catch (final IOException e) {
e.printStackTrace();
}
}
} }
} }
...@@ -74,8 +74,8 @@ public final class TCPReader { ...@@ -74,8 +74,8 @@ public final class TCPReader {
public final void terminate(final boolean error) { public final void terminate(final boolean error) {
System.out.println("Shutdown of TCPReader requested."); System.out.println("Shutdown of TCPReader requested.");
active = false; active = false;
for (final TCPReaderOneClient thread : threads) { // for (final TCPReaderOneClient thread : threads) {
thread.terminate(); // thread.terminate();
} // }
} }
} }
...@@ -62,7 +62,9 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec ...@@ -62,7 +62,9 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
@Override @Override
public void periodicTimeSignal(final long timestamp) { public void periodicTimeSignal(final long timestamp) {
// TODO flush out buffer! synchronized (this) { // TODO better solution
flushOutputBuffer();
}
final long hiseq = ringBuffer.next(); final long hiseq = ringBuffer.next();
final RecordArrayEvent valueEvent = ringBuffer.get(hiseq); final RecordArrayEvent valueEvent = ringBuffer.get(hiseq);
...@@ -70,8 +72,6 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec ...@@ -70,8 +72,6 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
buffer[0] = new TimedPeriodRecord(); buffer[0] = new TimedPeriodRecord();
valueEvent.setValues(buffer); valueEvent.setValues(buffer);
valueEvent.setValuesLength(1);
valueEvent.setMetadata(hostApplicationMetadata);
ringBuffer.publish(hiseq); ringBuffer.publish(hiseq);
} }
...@@ -195,7 +195,7 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec ...@@ -195,7 +195,7 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
if (operation != null) { if (operation != null) {
putInRingBuffer(new BeforeOperationEventRecord(timestamp, traceId, orderIndex, putInRingBuffer(new BeforeOperationEventRecord(timestamp, traceId, orderIndex,
operation)); operation, hostApplicationMetadata));
} else { } else {
final byte[] message = new byte[BeforeOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID]; final byte[] message = new byte[BeforeOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID];
buffer.position(buffer.position() buffer.position(buffer.position()
...@@ -217,7 +217,7 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec ...@@ -217,7 +217,7 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
if ((operation != null) && (cause != null)) { if ((operation != null) && (cause != null)) {
putInRingBuffer(new AfterFailedOperationEventRecord(timestamp, traceId, orderIndex, putInRingBuffer(new AfterFailedOperationEventRecord(timestamp, traceId, orderIndex,
operation, cause)); operation, cause, hostApplicationMetadata));
} else { } else {
final byte[] message = new byte[AfterFailedOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID]; final byte[] message = new byte[AfterFailedOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID];
buffer.position(buffer.position() buffer.position(buffer.position()
...@@ -235,7 +235,8 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec ...@@ -235,7 +235,8 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
final String operation = getStringFromRegistry(operationId); final String operation = getStringFromRegistry(operationId);
if (operation != null) { if (operation != null) {
putInRingBuffer(new AfterOperationEventRecord(timestamp, traceId, orderIndex, operation)); putInRingBuffer(new AfterOperationEventRecord(timestamp, traceId, orderIndex,
operation, hostApplicationMetadata));
} else { } else {
final byte[] message = new byte[AfterOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID]; final byte[] message = new byte[AfterOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID];
buffer.position(buffer.position() - AfterOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID); buffer.position(buffer.position() - AfterOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID);
...@@ -249,7 +250,8 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec ...@@ -249,7 +250,8 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
final double cpuUtil = buffer.getDouble(); final double cpuUtil = buffer.getDouble();
final long usedRAM = buffer.getLong(); final long usedRAM = buffer.getLong();
final long absoluteRAM = buffer.getLong(); final long absoluteRAM = buffer.getLong();
System.out.println(new SystemMonitoringRecord(cpuUtil, usedRAM, absoluteRAM).toString()); System.out.println(new SystemMonitoringRecord(cpuUtil, usedRAM, absoluteRAM,
hostApplicationMetadata).toString());
// putInRingBuffer(new SystemMonitoringRecord(cpuUtil, usedRAM, // putInRingBuffer(new SystemMonitoringRecord(cpuUtil, usedRAM,
// absoluteRAM)); // absoluteRAM));
} }
...@@ -290,9 +292,11 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec ...@@ -290,9 +292,11 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
private final void putInRingBuffer(final IRecord message) { private final void putInRingBuffer(final IRecord message) {
counter.inputObjects(message); counter.inputObjects(message);
outputBuffer[outputBufferIndex++] = message; synchronized (this) { // TODO better solution
if (outputBufferIndex == TCPReader.OUTPUT_MESSAGE_BUFFER_SIZE) { outputBuffer[outputBufferIndex++] = message;
flushOutputBuffer(); if (outputBufferIndex == TCPReader.OUTPUT_MESSAGE_BUFFER_SIZE) {
flushOutputBuffer();
}
} }
} }
...@@ -303,8 +307,6 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec ...@@ -303,8 +307,6 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
final IRecord[] oldValues = valueEvent.getValues(); final IRecord[] oldValues = valueEvent.getValues();
valueEvent.setValues(outputBuffer); valueEvent.setValues(outputBuffer);
valueEvent.setValuesLength(outputBufferIndex);
valueEvent.setMetadata(hostApplicationMetadata);
ringBuffer.publish(hiseq); ringBuffer.publish(hiseq);
outputBuffer = oldValues; outputBuffer = oldValues;
...@@ -322,8 +324,4 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec ...@@ -322,8 +324,4 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
private final String getStringFromRegistry(final int id) { private final String getStringFromRegistry(final int id) {
return stringRegistry.get(id); return stringRegistry.get(id);
} }
public void terminate() {
// TODO
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment