Skip to content
Snippets Groups Projects
Commit b25581f7 authored by Florian Fittkau's avatar Florian Fittkau
Browse files

WiP

parent e88c7f79
No related branches found
No related tags found
No related merge requests found
...@@ -36,6 +36,7 @@ IRecordSender { ...@@ -36,6 +36,7 @@ IRecordSender {
private final ByteBuffer stringBuffer = ByteBuffer.allocate(Constants.SENDING_BUFFER_SIZE); private final ByteBuffer stringBuffer = ByteBuffer.allocate(Constants.SENDING_BUFFER_SIZE);
private volatile boolean shouldDisconnect = false; private volatile boolean shouldDisconnect = false;
private volatile boolean finishedSendingStrings = false;
private final SinglePipeConnector<IRecord> tcpConnectorConnector; private final SinglePipeConnector<IRecord> tcpConnectorConnector;
...@@ -73,16 +74,19 @@ IRecordSender { ...@@ -73,16 +74,19 @@ IRecordSender {
} catch (final InterruptedException e) { } catch (final InterruptedException e) {
} }
} }
finishedSendingStrings = false;
socketChannel = SocketChannel.open(new InetSocketAddress(getProviderURL().getHost(), socketChannel = SocketChannel.open(new InetSocketAddress(getProviderURL().getHost(),
getProviderURL().getPort())); getProviderURL().getPort()));
stringRegistry.sendOutAllStringRegistryRecords(); stringRegistry.sendOutAllStringRegistryRecords();
finishedSendingStrings = true;
} }
@Override @Override
public void sendOutStringRecord(final StringRegistryRecord record) { public void sendOutStringRecord(final StringRegistryRecord record) {
record.putIntoByteBuffer(stringBuffer, stringRegistry, this); record.putIntoByteBuffer(stringBuffer, stringRegistry, this);
send(stringBuffer); prioritizedSend(stringBuffer);
} }
@Override @Override
...@@ -113,7 +117,8 @@ IRecordSender { ...@@ -113,7 +117,8 @@ IRecordSender {
@Override @Override
public void send(final ByteBuffer buffer) { public void send(final ByteBuffer buffer) {
while ((socketChannel == null) || (!socketChannel.isConnected())) { while ((socketChannel == null) || (!socketChannel.isConnected())
|| (!finishedSendingStrings)) {
try { try {
Thread.sleep(1); Thread.sleep(1);
} catch (final InterruptedException e) { } catch (final InterruptedException e) {
...@@ -137,6 +142,32 @@ IRecordSender { ...@@ -137,6 +142,32 @@ IRecordSender {
} }
} }
public void prioritizedSend(final ByteBuffer buffer) {
while ((socketChannel == null) || (!socketChannel.isConnected())) {
try {
Thread.sleep(1);
} catch (final InterruptedException e) {
}
}
try {
buffer.flip();
while (buffer.hasRemaining()) {
socketChannel.write(buffer);
}
doDisconnectIfNessecary();
} catch (final IOException e) {
System.out
.println("WARNING: Connection was closed during String sending - possible data loss");
try {
socketChannel.close();
} catch (final IOException e1) {
}
} finally {
buffer.clear();
}
}
private void doDisconnectIfNessecary() { private void doDisconnectIfNessecary() {
if (shouldDisconnect) { if (shouldDisconnect) {
if ((socketChannel != null) && socketChannel.isConnected()) { if ((socketChannel != null) && socketChannel.isConnected()) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment