Skip to content
Snippets Groups Projects
Commit d68fc103 authored by Florian Fittkau's avatar Florian Fittkau
Browse files

pipeline now working again

parent 96ea03b0
No related branches found
No related tags found
No related merge requests found
......@@ -46,8 +46,8 @@ public class TraceBuffer {
}
if (!events.add(event)) {
// System.out.println("Duplicate entry for orderIndex " + orderIndex
// + " with traceId " + traceMetadata.getTraceId());
System.out.println("Duplicate entry for orderIndex " + orderIndex
+ " with traceId " + event.getTraceId());
damaged = true;
}
}
......@@ -69,8 +69,6 @@ public class TraceBuffer {
public void setTrace(final HostApplicationMetadata trace) {
if (traceMetadata != null) {
// System.out.println("Duplicate Trace entry for traceId "
// + trace.getTraceId());
damaged = true;
return;
}
......@@ -86,7 +84,7 @@ public class TraceBuffer {
|| (openEvents != 0) || (traceMetadata == null) || damaged);
}
public final Trace toTrace() { // TODO still slow?
public final Trace toTrace() {
final AbstractOperationEvent[] arrayEvents = new AbstractOperationEvent[events
.size()];
final Iterator<AbstractOperationEvent> iterator = events.iterator();
......
......@@ -40,12 +40,12 @@ public final class TraceReconstructionFilter implements
RecordEvent.EVENT_FACTORY, 16384, exec);
final EventHandler<RecordEvent>[] eventHandlers = new EventHandler[1];
eventHandlers[0] = new TracePatternSummarizationFilter(2 * 1000 * 1000,
eventHandlers[0] = new TracePatternSummarizationFilter(1 * 1000,
endReceiver);
disruptor.handleEventsWith(eventHandlers);
ringBuffer = disruptor.start();
new TimeReader(2 * 1000, this).start();
new TimeReader(1 * 1000, this).start();
}
@Override
......@@ -67,14 +67,14 @@ public final class TraceReconstructionFilter implements
private void sendOutValidTrace(final Trace trace) {
counter.inputObjects(trace);
// putInRingBuffer(trace);
putInRingBuffer(trace);
}
private void sendOutInvalidTrace(final Trace trace) {
// counter.inputObjects(trace);
// putInRingBuffer(trace); // TODO
// System.out.println("Invalid trace: "
// + trace.getTraceMetadata().getTraceId());
System.out.println("Invalid trace: "
+ trace.getTraceEvents()[0].getTraceId());
}
private void putInRingBuffer(final IRecord record) {
......@@ -88,17 +88,12 @@ public final class TraceReconstructionFilter implements
public void onEvent(final RecordEvent event, final long sequence,
final boolean endOfBatch) throws Exception {
final IRecord record = event.getValue();
if (record instanceof HostApplicationMetadata) {
final HostApplicationMetadata traceMetadata = ((HostApplicationMetadata) record);
// final long traceId = traceMetadata.getTraceId();
// final TraceBuffer traceBuffer = getBufferForTraceId(traceId);
// traceBuffer.setTrace(traceMetadata);
} else if (record instanceof AbstractOperationEvent) {
if (record instanceof AbstractOperationEvent) {
final AbstractOperationEvent abstractOperationEvent = ((AbstractOperationEvent) record);
final long traceId = abstractOperationEvent.getTraceId();
final TraceBuffer traceBuffer = getBufferForTraceId(traceId);
final TraceBuffer traceBuffer = getBufferForTraceId(traceId,
event.getMetadata());
traceBuffer.insertEvent(abstractOperationEvent);
if (traceBuffer.isFinished()) {
......@@ -108,10 +103,12 @@ public final class TraceReconstructionFilter implements
}
}
private TraceBuffer getBufferForTraceId(final long traceId) {
private TraceBuffer getBufferForTraceId(final long traceId,
final HostApplicationMetadata metadata) {
TraceBuffer traceBuffer = traceId2trace.get(traceId);
if (traceBuffer == null) {
traceBuffer = new TraceBuffer();
traceBuffer.setTrace(metadata);
traceId2trace.put(traceId, traceBuffer);
}
return traceBuffer;
......
......@@ -95,7 +95,7 @@ public class TracePatternSummarizationFilter implements
TraceAggregationBuffer traceAggregationBuffer = trace2buffer.get(trace);
if (traceAggregationBuffer == null) {
traceAggregationBuffer = new TraceAggregationBuffer(
System.nanoTime());
System.currentTimeMillis());
trace2buffer.put(trace, traceAggregationBuffer);
}
traceAggregationBuffer.insertTrace(trace);
......
......@@ -97,11 +97,11 @@ public final class TCPReader {
private void putInRingBuffer(final IRecord message) {
counter.inputObjects(message);
// System.out.println(message.toString());
// final long hiseq = ringBuffer.next();
// final RecordEvent valueEvent = ringBuffer.get(hiseq);
// valueEvent.setValue(message);
// ringBuffer.publish(hiseq);
final long hiseq = ringBuffer.next();
final RecordEvent valueEvent = ringBuffer.get(hiseq);
valueEvent.setValue(message);
valueEvent.setMetadata(hostApplicationMetadata);
ringBuffer.publish(hiseq);
}
public void terminate(final boolean error) {
......@@ -165,17 +165,16 @@ public final class TCPReader {
final String hostname = getStringFromRegistry(hostnameId);
final String application = getStringFromRegistry(applicationId);
// if ((hostname != null) && (application != null)) {
hostApplicationMetadata = new HostApplicationMetadata(hostname,
application);
// } else {
// final byte[] message = new
// byte[TraceMetadata.BYTE_LENGTH_WITH_CLAZZ_ID];
// System.arraycopy(b, offset
// - TraceMetadata.BYTE_LENGTH_WITH_CLAZZ_ID, message, 0,
// TraceMetadata.BYTE_LENGTH_WITH_CLAZZ_ID);
// putInWaitingMessages(message);
// }
if ((hostname != null) && (application != null)) {
hostApplicationMetadata = new HostApplicationMetadata(hostname,
application);
} else {
final byte[] message = new byte[HostApplicationMetadata.BYTE_LENGTH_WITH_CLAZZ_ID];
buffer.position(buffer.position()
- HostApplicationMetadata.BYTE_LENGTH_WITH_CLAZZ_ID);
buffer.get(message);
putInWaitingMessages(message);
}
}
private final void readInBeforeOperationEvent(final ByteBuffer buffer) {
......@@ -186,17 +185,16 @@ public final class TCPReader {
final String operation = getStringFromRegistry(operationId);
// if (operation != null) {
putInRingBuffer(new BeforeOperationEvent(timestamp, traceId,
orderIndex, operation));
// } else {
// final byte[] message = new
// byte[BeforeOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID];
// System.arraycopy(b, offset
// - BeforeOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID, message,
// 0, BeforeOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID);
// putInWaitingMessages(message);
// }
if (operation != null) {
putInRingBuffer(new BeforeOperationEvent(timestamp, traceId,
orderIndex, operation));
} else {
final byte[] message = new byte[BeforeOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID];
buffer.position(buffer.position()
- BeforeOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID);
buffer.get(message);
putInWaitingMessages(message);
}
}
private final void readInAfterFailedOperationEvent(final ByteBuffer buffer) {
......@@ -209,18 +207,16 @@ public final class TCPReader {
final String operation = getStringFromRegistry(operationId);
final String cause = getStringFromRegistry(causeId);
// if ((operation != null) && (cause != null)) {
putInRingBuffer(new AfterFailedOperationEvent(timestamp, traceId,
orderIndex, operation, cause));
// } else {
// final byte[] message = new
// byte[AfterFailedOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID];
// System.arraycopy(b, offset
// - AfterFailedOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID,
// message, 0,
// AfterFailedOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID);
// putInWaitingMessages(message);
// }
if ((operation != null) && (cause != null)) {
putInRingBuffer(new AfterFailedOperationEvent(timestamp, traceId,
orderIndex, operation, cause));
} else {
final byte[] message = new byte[AfterFailedOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID];
buffer.position(buffer.position()
- AfterFailedOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID);
buffer.get(message);
putInWaitingMessages(message);
}
}
private final void readInAfterOperationEvent(final ByteBuffer buffer) {
......@@ -230,17 +226,16 @@ public final class TCPReader {
final int operationId = buffer.getInt();
final String operation = getStringFromRegistry(operationId);
// if (operation != null) {
putInRingBuffer(new AfterOperationEvent(timestamp, traceId, orderIndex,
operation));
// } else {
// final byte[] message = new
// byte[AfterOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID];
// System.arraycopy(b, offset
// - AfterOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID, message,
// 0, AfterOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID);
// putInWaitingMessages(message);
// }
if (operation != null) {
putInRingBuffer(new AfterOperationEvent(timestamp, traceId,
orderIndex, operation));
} else {
final byte[] message = new byte[AfterOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID];
buffer.position(buffer.position()
- AfterOperationEvent.BYTE_LENGTH_WITH_CLAZZ_ID);
buffer.get(message);
putInWaitingMessages(message);
}
}
......@@ -255,26 +250,26 @@ public final class TCPReader {
}
waitingForStringMessages.clear();
// for (final byte[] waitingMessage : localWaitingList) {
// final int waitingMessageClazzId = UnsafeBits.getInt(waitingMessage,
// 0);
// switch (waitingMessageClazzId) {
// case TraceMetadata.CLAZZ_ID:
// readInTraceMetadata(waitingMessage, 4);
// break;
// case BeforeOperationEvent.CLAZZ_ID:
// readInBeforeOperationEvent(waitingMessage, 4);
// break;
// case AfterFailedOperationEvent.CLAZZ_ID:
// readInAfterFailedOperationEvent(waitingMessage, 4);
// break;
// case AfterOperationEvent.CLAZZ_ID:
// readInAfterOperationEvent(waitingMessage, 4);
// break;
// default:
// break;
// }
// }
for (final byte[] waitingMessage : localWaitingList) {
final ByteBuffer buffer = ByteBuffer.wrap(waitingMessage);
final byte waitingMessageClazzId = buffer.get();
switch (waitingMessageClazzId) {
case HostApplicationMetadata.CLAZZ_ID:
readInTraceMetadata(buffer);
break;
case BeforeOperationEvent.CLAZZ_ID:
readInBeforeOperationEvent(buffer);
break;
case AfterFailedOperationEvent.CLAZZ_ID:
readInAfterFailedOperationEvent(buffer);
break;
case AfterOperationEvent.CLAZZ_ID:
readInAfterOperationEvent(buffer);
break;
default:
break;
}
}
}
public void addToRegistry(final int key, final String value) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment