From 303de702783a001126389c597f7b2da3d2e169a3 Mon Sep 17 00:00:00 2001
From: Florian Fittkau <ffi@informatik.uni-kiel.de>
Date: Mon, 18 Nov 2013 17:38:39 +0100
Subject: [PATCH] ISerilizableRecords

---
 ...z.live_trace_processing.default.properties |  2 +
 .../connector/TCPConnector.java               | 94 ++++++++++---------
 .../TraceReconstructionFilter.java            |  6 +-
 .../TracePatternSummarizationFilter.java      |  7 +-
 .../main/WorkerStarter.java                   | 48 +++++++++-
 .../reader/TCPReader.java                     |  6 +-
 .../reader/TCPReaderOneClient.java            | 30 +++---
 7 files changed, 123 insertions(+), 70 deletions(-)

diff --git a/src/META-INF/explorviz.live_trace_processing.default.properties b/src/META-INF/explorviz.live_trace_processing.default.properties
index 0135ba1..4ec30dc 100644
--- a/src/META-INF/explorviz.live_trace_processing.default.properties
+++ b/src/META-INF/explorviz.live_trace_processing.default.properties
@@ -1,3 +1,5 @@
+explorviz.live_trace_processing.worker_enabled=false
+
 explorviz.live_trace_processing.writer_target_ip=127.0.0.1
 explorviz.live_trace_processing.writer_target_port=10133
 
diff --git a/src/explorviz/live_trace_processing/connector/TCPConnector.java b/src/explorviz/live_trace_processing/connector/TCPConnector.java
index baf9121..6830ec9 100644
--- a/src/explorviz/live_trace_processing/connector/TCPConnector.java
+++ b/src/explorviz/live_trace_processing/connector/TCPConnector.java
@@ -7,10 +7,11 @@ import java.net.URL;
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
 
+import explorviz.live_trace_processing.Constants;
 import explorviz.live_trace_processing.configuration.Configuration;
 import explorviz.live_trace_processing.filter.AbstractSink;
 import explorviz.live_trace_processing.record.IRecord;
-import explorviz.live_trace_processing.record.misc.HostApplicationMetaDataRecord;
+import explorviz.live_trace_processing.record.ISerilizableRecord;
 import explorviz.live_trace_processing.writer.IWriter;
 
 public class TCPConnector extends AbstractSink implements IWriter {
@@ -18,12 +19,12 @@ public class TCPConnector extends AbstractSink implements IWriter {
 
 	private SocketChannel socketChannel;
 
-	private final Configuration configuration;
+	private final ByteBuffer buffer = ByteBuffer
+			.allocateDirect(Constants.MONITORING_MESSAGE_BUFFER_SIZE);
 
-	private ByteBuffer byteBuffer;
+	private volatile boolean shouldDisconnect = false;
 
 	public TCPConnector(final String hostname, final int port, final Configuration configuration) {
-		this.configuration = configuration;
 		try {
 			setProviderURL(new URL("http://" + hostname + ":" + port));
 		} catch (final MalformedURLException e) {
@@ -31,11 +32,6 @@ public class TCPConnector extends AbstractSink implements IWriter {
 		}
 	}
 
-	protected TCPConnector(final Configuration configuration) {
-		this.configuration = configuration;
-		providerURL = null;
-	}
-
 	@Override
 	public URL getProviderURL() {
 		return providerURL;
@@ -46,59 +42,73 @@ public class TCPConnector extends AbstractSink implements IWriter {
 		this.providerURL = providerURL;
 	}
 
-	public void init() {
-		try {
-			connect();
-		} catch (final IOException e) {
-			e.printStackTrace();
-		}
-	}
-
 	@Override
 	public void connect() throws IOException {
+		while (shouldDisconnect) {
+			try {
+				Thread.sleep(1);
+			} catch (final InterruptedException e) {
+			}
+		}
+
 		socketChannel = SocketChannel.open(new InetSocketAddress(getProviderURL().getHost(),
 				getProviderURL().getPort()));
-
-		if (socketChannel.isConnected()) {
-			// socketChannel.write(StringRegistry.getAllStringRegistryRecords());
-		}
+		// StringRegistry.sendOutAllStringRegistryRecords(); TODO
 	}
 
 	@Override
-	protected void processRecord(final IRecord record,
-			final HostApplicationMetaDataRecord hostApplicationMetaData) {
-		if (record.getRecordSizeInBytes() > byteBuffer.remaining()) {
-			send(byteBuffer);
+	protected void processRecord(final IRecord record) {
+		if (record instanceof ISerilizableRecord) {
+			final ISerilizableRecord serilizableRecord = (ISerilizableRecord) record;
+			if (buffer.remaining() < serilizableRecord.getRecordSizeInBytes()) {
+				buffer.flip();
+				send(buffer);
+				buffer.clear();
+			}
+			serilizableRecord.putIntoByteBuffer(buffer, null); // TODO
 		}
-		record.putIntoByteBuffer(byteBuffer);
 	}
 
 	private void send(final ByteBuffer buffer) {
-		if (socketChannel.isConnected()) {
+		while ((socketChannel == null) || (!socketChannel.isConnected())) {
 			try {
-				while (buffer.hasRemaining()) {
-					socketChannel.write(buffer);
-				}
-			} catch (final IOException e) {
-				System.out.println("WARNING: Connection was closed");
-				// TODO reconnect
-				disconnect();
+				Thread.sleep(1);
+			} catch (final InterruptedException e) {
 			}
 		}
-	}
 
-	@Override
-	public final void disconnect() {
-		if (socketChannel.isConnected()) {
+		try {
+			while (buffer.hasRemaining()) {
+				socketChannel.write(buffer);
+			}
+			doDisconnectIfNessecary();
+		} catch (final IOException e) {
+			System.out.println("WARNING: Connection was closed - possible data loss");
+			// TODO reconnect in non-load-balancing mode
 			try {
 				socketChannel.close();
-			} catch (final IOException e) {
-				e.printStackTrace();
+			} catch (final IOException e1) {
 			}
 		}
 	}
 
-	public void cleanup() {
-		disconnect();
+	private void doDisconnectIfNessecary() {
+		if (shouldDisconnect) {
+			if ((socketChannel != null) && socketChannel.isConnected()) {
+				try {
+					socketChannel.close();
+				} catch (final IOException e) {
+					e.printStackTrace();
+				}
+			}
+			shouldDisconnect = false;
+		}
+	}
+
+	@Override
+	public final void disconnect() {
+		if (socketChannel != null) {
+			shouldDisconnect = true;
+		}
 	}
 }
diff --git a/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java
index 1f5319b..5cfd682 100644
--- a/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java
+++ b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java
@@ -35,14 +35,14 @@ public final class TraceReconstructionFilter extends AbstractFilter {
 	}
 
 	@Override
-	public void processRecord(final IRecord record,
-			final HostApplicationMetaDataRecord hostApplicationMetaData) {
+	public void processRecord(final IRecord record) {
 		if (record instanceof AbstractOperationEventRecord) {
 			final AbstractOperationEventRecord abstractOperationEvent = ((AbstractOperationEventRecord) record);
 
 			final long traceId = abstractOperationEvent.getTraceId();
 			final TraceReconstructionBuffer traceBuffer = getBufferForTraceId(
-					abstractOperationEvent.getTraceId(), hostApplicationMetaData);
+					abstractOperationEvent.getTraceId(),
+					abstractOperationEvent.getHostApplicationMetadata());
 			traceBuffer.insertEvent(abstractOperationEvent);
 
 			if (traceBuffer.isFinished()) {
diff --git a/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationFilter.java b/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationFilter.java
index 9fd22e1..ec81a00 100644
--- a/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationFilter.java
+++ b/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationFilter.java
@@ -9,7 +9,6 @@ import explorviz.live_trace_processing.filter.AbstractFilter;
 import explorviz.live_trace_processing.filter.IPipeReceiver;
 import explorviz.live_trace_processing.reader.TimeProvider;
 import explorviz.live_trace_processing.record.IRecord;
-import explorviz.live_trace_processing.record.misc.HostApplicationMetaDataRecord;
 import explorviz.live_trace_processing.record.misc.TerminateRecord;
 import explorviz.live_trace_processing.record.misc.TimedPeriodRecord;
 import explorviz.live_trace_processing.record.trace.Trace;
@@ -32,8 +31,7 @@ public class TracePatternSummarizationFilter extends AbstractFilter {
 	}
 
 	@Override
-	public void processRecord(final IRecord record,
-			final HostApplicationMetaDataRecord hostApplicationMetaData) {
+	public void processRecord(final IRecord record) {
 		if (record instanceof Trace) {
 			final Trace trace = (Trace) record;
 			if (trace.isValid()) {
@@ -54,7 +52,8 @@ public class TracePatternSummarizationFilter extends AbstractFilter {
 	private void insertIntoBuffer(final Trace trace) {
 		TracePatternSummarizationBuffer traceAggregationBuffer = trace2buffer.get(trace);
 		if (traceAggregationBuffer == null) {
-			traceAggregationBuffer = new TracePatternSummarizationBuffer(TimeProvider.getCurrentTimestamp());
+			traceAggregationBuffer = new TracePatternSummarizationBuffer(
+					TimeProvider.getCurrentTimestamp());
 			trace2buffer.put(trace, traceAggregationBuffer);
 		}
 		traceAggregationBuffer.insertTrace(trace);
diff --git a/src/explorviz/live_trace_processing/main/WorkerStarter.java b/src/explorviz/live_trace_processing/main/WorkerStarter.java
index bcf21fd..7ee3f15 100644
--- a/src/explorviz/live_trace_processing/main/WorkerStarter.java
+++ b/src/explorviz/live_trace_processing/main/WorkerStarter.java
@@ -1,11 +1,55 @@
 package explorviz.live_trace_processing.main;
 
+import java.io.IOException;
+
+import explorviz.live_trace_processing.configuration.Configuration;
+import explorviz.live_trace_processing.configuration.ConfigurationFactory;
+import explorviz.live_trace_processing.connector.TCPConnector;
 import explorviz.live_trace_processing.reader.TCPReader;
+import explorviz.live_trace_processing.writer.load_balancer.LoadBalancer;
 
 public class WorkerStarter {
 
 	public static void main(final String[] args) {
-		final TCPReader tcpReader = new TCPReader(10133, null);
-		tcpReader.read();
+		final Configuration configuration = ConfigurationFactory.createSingletonConfiguration();
+
+		final boolean worker = configuration
+				.getBooleanProperty(ConfigurationFactory.WORKER_ENABLED);
+
+		if (worker) {
+			final TCPConnector tcpConnector = new TCPConnector(
+					configuration.getStringProperty(ConfigurationFactory.WRITER_TARGET_IP),
+					configuration.getIntProperty(ConfigurationFactory.WRITER_TARGET_PORT),
+					configuration);
+
+			configureLoadBalancerIfEnabled(configuration, tcpConnector);
+
+			new TCPReader(10133, tcpConnector).read();
+		} else { // testing purpose
+			new TCPReader(10133, null).read();
+		}
+
+	}
+
+	private static void configureLoadBalancerIfEnabled(final Configuration configuration,
+			final TCPConnector tcpConnector) {
+		final boolean loadBalancerEnabled = configuration
+				.getBooleanProperty(ConfigurationFactory.LOAD_BALANCER_ENABLED);
+
+		if (loadBalancerEnabled) {
+			new LoadBalancer(
+					configuration.getStringProperty(ConfigurationFactory.LOAD_BALANCER_IP),
+					configuration.getIntProperty(ConfigurationFactory.LOAD_BALANCER_PORT),
+					configuration.getIntProperty(ConfigurationFactory.LOAD_BALANCER_WAIT_TIME),
+					configuration
+							.getStringProperty(ConfigurationFactory.LOAD_BALANCER_SCALING_GROUP),
+					configuration, tcpConnector);
+		} else {
+			try {
+				tcpConnector.connect();
+			} catch (final IOException e) {
+				e.printStackTrace();
+			}
+		}
 	}
 }
diff --git a/src/explorviz/live_trace_processing/reader/TCPReader.java b/src/explorviz/live_trace_processing/reader/TCPReader.java
index b8b2329..a6cb83d 100644
--- a/src/explorviz/live_trace_processing/reader/TCPReader.java
+++ b/src/explorviz/live_trace_processing/reader/TCPReader.java
@@ -74,8 +74,8 @@ public final class TCPReader {
 	public final void terminate(final boolean error) {
 		System.out.println("Shutdown of TCPReader requested.");
 		active = false;
-		for (final TCPReaderOneClient thread : threads) {
-			thread.terminate();
-		}
+		// for (final TCPReaderOneClient thread : threads) {
+		// thread.terminate();
+		// }
 	}
 }
diff --git a/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java b/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java
index f398afc..dade9e4 100644
--- a/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java
+++ b/src/explorviz/live_trace_processing/reader/TCPReaderOneClient.java
@@ -62,7 +62,9 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
 
 	@Override
 	public void periodicTimeSignal(final long timestamp) {
-		// TODO flush out buffer!
+		synchronized (this) { // TODO better solution
+			flushOutputBuffer();
+		}
 
 		final long hiseq = ringBuffer.next();
 		final RecordArrayEvent valueEvent = ringBuffer.get(hiseq);
@@ -70,8 +72,6 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
 		buffer[0] = new TimedPeriodRecord();
 
 		valueEvent.setValues(buffer);
-		valueEvent.setValuesLength(1);
-		valueEvent.setMetadata(hostApplicationMetadata);
 
 		ringBuffer.publish(hiseq);
 	}
@@ -195,7 +195,7 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
 
 		if (operation != null) {
 			putInRingBuffer(new BeforeOperationEventRecord(timestamp, traceId, orderIndex,
-					operation));
+					operation, hostApplicationMetadata));
 		} else {
 			final byte[] message = new byte[BeforeOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID];
 			buffer.position(buffer.position()
@@ -217,7 +217,7 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
 
 		if ((operation != null) && (cause != null)) {
 			putInRingBuffer(new AfterFailedOperationEventRecord(timestamp, traceId, orderIndex,
-					operation, cause));
+					operation, cause, hostApplicationMetadata));
 		} else {
 			final byte[] message = new byte[AfterFailedOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID];
 			buffer.position(buffer.position()
@@ -235,7 +235,8 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
 
 		final String operation = getStringFromRegistry(operationId);
 		if (operation != null) {
-			putInRingBuffer(new AfterOperationEventRecord(timestamp, traceId, orderIndex, operation));
+			putInRingBuffer(new AfterOperationEventRecord(timestamp, traceId, orderIndex,
+					operation, hostApplicationMetadata));
 		} else {
 			final byte[] message = new byte[AfterOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID];
 			buffer.position(buffer.position() - AfterOperationEventRecord.BYTE_LENGTH_WITH_CLAZZ_ID);
@@ -249,7 +250,8 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
 		final double cpuUtil = buffer.getDouble();
 		final long usedRAM = buffer.getLong();
 		final long absoluteRAM = buffer.getLong();
-		System.out.println(new SystemMonitoringRecord(cpuUtil, usedRAM, absoluteRAM).toString());
+		System.out.println(new SystemMonitoringRecord(cpuUtil, usedRAM, absoluteRAM,
+				hostApplicationMetadata).toString());
 		// putInRingBuffer(new SystemMonitoringRecord(cpuUtil, usedRAM,
 		// absoluteRAM));
 	}
@@ -290,9 +292,11 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
 	private final void putInRingBuffer(final IRecord message) {
 		counter.inputObjects(message);
 
-		outputBuffer[outputBufferIndex++] = message;
-		if (outputBufferIndex == TCPReader.OUTPUT_MESSAGE_BUFFER_SIZE) {
-			flushOutputBuffer();
+		synchronized (this) { // TODO better solution
+			outputBuffer[outputBufferIndex++] = message;
+			if (outputBufferIndex == TCPReader.OUTPUT_MESSAGE_BUFFER_SIZE) {
+				flushOutputBuffer();
+			}
 		}
 	}
 
@@ -303,8 +307,6 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
 			final IRecord[] oldValues = valueEvent.getValues();
 
 			valueEvent.setValues(outputBuffer);
-			valueEvent.setValuesLength(outputBufferIndex);
-			valueEvent.setMetadata(hostApplicationMetadata);
 			ringBuffer.publish(hiseq);
 
 			outputBuffer = oldValues;
@@ -322,8 +324,4 @@ public class TCPReaderOneClient extends Thread implements IPeriodicTimeSignalRec
 	private final String getStringFromRegistry(final int id) {
 		return stringRegistry.get(id);
 	}
-
-	public void terminate() {
-		// TODO
-	}
 }
-- 
GitLab