From fbf20247f0bc74e9c91f6c6c0d39326b8e9dfb49 Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Tue, 20 Jan 2015 11:00:56 +0100
Subject: [PATCH] added TraceReductionAnalysisConfigurationTest

---
 .../TcpTraceReconstructionActor.java          |   6 +-
 .../TcpTraceReductionActor.java               | 106 ++++++++++++++++++
 .../TraceAggregationBuffer.java               |  36 ++++++
 .../tcptracereduction/TraceComperator.java    |  69 ++++++++++++
 .../TraceReductionAnalysisConfiguration.java  | 101 +++++++++++++++++
 ...constructionAnalysisConfigurationTest.java |  23 ++++
 ...aceReductionAnalysisConfigurationTest.java |   6 +-
 7 files changed, 342 insertions(+), 5 deletions(-)
 create mode 100644 src/main/java/kiekerdays/tcptracereduction/TcpTraceReductionActor.java
 create mode 100644 src/main/java/kiekerdays/tcptracereduction/TraceAggregationBuffer.java
 create mode 100644 src/main/java/kiekerdays/tcptracereduction/TraceComperator.java
 create mode 100644 src/main/java/kiekerdays/tcptracereduction/TraceReductionAnalysisConfiguration.java
 create mode 100644 src/test/java/kiekerdays/tcpreconstruction/TraceReconstructionAnalysisConfigurationTest.java
 rename src/test/java/kiekerdays/{tcpreconstruction => tcptracereduction}/TraceReductionAnalysisConfigurationTest.java (76%)

diff --git a/src/main/java/kiekerdays/tcpreconstruction/TcpTraceReconstructionActor.java b/src/main/java/kiekerdays/tcpreconstruction/TcpTraceReconstructionActor.java
index cc1b842..c3c0a37 100644
--- a/src/main/java/kiekerdays/tcpreconstruction/TcpTraceReconstructionActor.java
+++ b/src/main/java/kiekerdays/tcpreconstruction/TcpTraceReconstructionActor.java
@@ -33,7 +33,11 @@ public class TcpTraceReconstructionActor extends UntypedActor {
 		super();
 		this.traceId2trace = traceId2trace;
 		this.validTraceReceiver = context().actorOf(validTraceReceiverProps);
-		this.invalidTraceReceiver = context().actorOf(invalidTraceReceiverProps);
+		if (null != invalidTraceReceiverProps) {
+			this.invalidTraceReceiver = context().actorOf(invalidTraceReceiverProps);
+		} else {
+			this.invalidTraceReceiver = validTraceReceiver;
+		}
 	}
 
 	@Override
diff --git a/src/main/java/kiekerdays/tcptracereduction/TcpTraceReductionActor.java b/src/main/java/kiekerdays/tcptracereduction/TcpTraceReductionActor.java
new file mode 100644
index 0000000..86f5298
--- /dev/null
+++ b/src/main/java/kiekerdays/tcptracereduction/TcpTraceReductionActor.java
@@ -0,0 +1,106 @@
+package kiekerdays.tcptracereduction;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+
+import kieker.analysis.plugin.filter.flow.TraceEventRecords;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+
+public class TcpTraceReductionActor extends UntypedActor {
+
+	private static final Logger LOGGER = LoggerFactory.getLogger(TcpTraceReductionActor.class);
+
+	private final NavigableMap<TraceEventRecords, TraceAggregationBuffer> trace2buffer;
+	private ActorRef traceAggregationReceiver;
+
+	private long maxCollectionDurationInNs;
+
+	public TcpTraceReductionActor(final NavigableMap<TraceEventRecords, TraceAggregationBuffer> trace2buffer, Props traceAggregationProps) {
+		this.trace2buffer = trace2buffer;
+		// output "ports"
+		this.traceAggregationReceiver = context().actorOf(traceAggregationProps);
+	}
+
+	@Override
+	public void onReceive(Object message) throws Exception {
+		if (message instanceof TraceEventRecords) {
+			TraceEventRecords traceEventRecords = (TraceEventRecords) message;
+			final long timestamp = System.nanoTime();
+			this.countSameTraces(traceEventRecords, timestamp);
+		} else if (message instanceof Long) {
+			Long timestampInNs = (Long) message;
+			if (timestampInNs != null) {
+				this.processTimeoutQueue(timestampInNs);
+			}
+		} else {
+			unhandled(message);
+		}
+	}
+
+	private void countSameTraces(final TraceEventRecords traceEventRecords, final long timestamp) {
+		synchronized (this.trace2buffer) {
+			TraceAggregationBuffer traceBuffer = this.trace2buffer.get(traceEventRecords);
+			if (traceBuffer == null) {
+				traceBuffer = new TraceAggregationBuffer(timestamp, traceEventRecords);
+				this.trace2buffer.put(traceEventRecords, traceBuffer);
+			}
+			traceBuffer.count();
+		}
+	}
+
+	private void processTimeoutQueue(final long timestampInNs) {
+		final long bufferTimeoutInNs = timestampInNs - this.maxCollectionDurationInNs;
+		synchronized (this.trace2buffer) {
+			for (final Iterator<Entry<TraceEventRecords, TraceAggregationBuffer>> iterator = this.trace2buffer.entrySet().iterator(); iterator.hasNext();) {
+				final TraceAggregationBuffer traceBuffer = iterator.next().getValue();
+				// this.logger.debug("traceBuffer.getBufferCreatedTimestamp(): " + traceBuffer.getBufferCreatedTimestamp() + " vs. " + bufferTimeoutInNs
+				// + " (bufferTimeoutInNs)");
+				if (traceBuffer.getBufferCreatedTimestamp() <= bufferTimeoutInNs) {
+					final TraceEventRecords record = traceBuffer.getTraceEventRecords();
+					record.setCount(traceBuffer.getCount());
+					send(record);
+				}
+				iterator.remove();
+			}
+		}
+	}
+
+	private void send(final TraceEventRecords record) {
+		traceAggregationReceiver.tell(record, getSelf());
+	}
+
+	@Override
+	public void postStop() throws Exception {
+		LOGGER.info("stopped");
+		onTerminating();
+		super.postStop();
+	}
+
+	private void onTerminating() {
+		synchronized (this.trace2buffer) { // BETTER hide and improve synchronization in the buffer
+			for (final Entry<TraceEventRecords, TraceAggregationBuffer> entry : this.trace2buffer.entrySet()) {
+				final TraceAggregationBuffer buffer = entry.getValue();
+				final TraceEventRecords record = buffer.getTraceEventRecords();
+				record.setCount(buffer.getCount());
+				send(record);
+			}
+			this.trace2buffer.clear();
+		}
+	}
+
+	public long getMaxCollectionDuration() {
+		return this.maxCollectionDurationInNs;
+	}
+
+	public void setMaxCollectionDuration(final long maxCollectionDuration) {
+		this.maxCollectionDurationInNs = maxCollectionDuration;
+	}
+
+}
diff --git a/src/main/java/kiekerdays/tcptracereduction/TraceAggregationBuffer.java b/src/main/java/kiekerdays/tcptracereduction/TraceAggregationBuffer.java
new file mode 100644
index 0000000..ed16e7a
--- /dev/null
+++ b/src/main/java/kiekerdays/tcptracereduction/TraceAggregationBuffer.java
@@ -0,0 +1,36 @@
+package kiekerdays.tcptracereduction;
+
+import kieker.analysis.plugin.filter.flow.TraceEventRecords;
+
+/**
+ * Buffer for similar traces that are to be aggregated into a single trace.
+ * 
+ * @author Jan Waller, Florian Biss
+ */
+public final class TraceAggregationBuffer {
+	private final long bufferCreatedTimestamp;
+	private final TraceEventRecords aggregatedTrace;
+
+	private int countOfAggregatedTraces;
+
+	public TraceAggregationBuffer(final long bufferCreatedTimestamp, final TraceEventRecords trace) {
+		this.bufferCreatedTimestamp = bufferCreatedTimestamp;
+		this.aggregatedTrace = trace;
+	}
+
+	public void count() {
+		this.countOfAggregatedTraces++;
+	}
+
+	public long getBufferCreatedTimestamp() {
+		return this.bufferCreatedTimestamp;
+	}
+
+	public TraceEventRecords getTraceEventRecords() {
+		return this.aggregatedTrace;
+	}
+
+	public int getCount() {
+		return this.countOfAggregatedTraces;
+	}
+}
diff --git a/src/main/java/kiekerdays/tcptracereduction/TraceComperator.java b/src/main/java/kiekerdays/tcptracereduction/TraceComperator.java
new file mode 100644
index 0000000..d539ee9
--- /dev/null
+++ b/src/main/java/kiekerdays/tcptracereduction/TraceComperator.java
@@ -0,0 +1,69 @@
+package kiekerdays.tcptracereduction;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+import kieker.analysis.plugin.filter.flow.TraceEventRecords;
+import kieker.common.record.flow.trace.AbstractTraceEvent;
+import kieker.common.record.flow.trace.operation.AbstractOperationEvent;
+import kieker.common.record.flow.trace.operation.AfterOperationFailedEvent;
+
+/**
+ * @author Jan Waller, Florian Fittkau, Florian Biss
+ */
+public final class TraceComperator implements Comparator<TraceEventRecords>, Serializable {
+	private static final long serialVersionUID = 8920766818232517L;
+
+	/**
+	 * Creates a new instance of this class.
+	 */
+	public TraceComperator() {
+		// default empty constructor
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public int compare(final TraceEventRecords t1, final TraceEventRecords t2) {
+		final AbstractTraceEvent[] recordsT1 = t1.getTraceEvents();
+		final AbstractTraceEvent[] recordsT2 = t2.getTraceEvents();
+
+		if (recordsT1.length != recordsT2.length) {
+			return recordsT1.length - recordsT2.length;
+		}
+
+		final int cmpHostnames = t1.getTraceMetadata().getHostname()
+				.compareTo(t2.getTraceMetadata().getHostname());
+		if (cmpHostnames != 0) {
+			return cmpHostnames;
+		}
+
+		for (int i = 0; i < recordsT1.length; i++) {
+			final AbstractTraceEvent recordT1 = recordsT1[i];
+			final AbstractTraceEvent recordT2 = recordsT2[i];
+
+			final int cmpClass = recordT1.getClass().getName()
+					.compareTo(recordT2.getClass().getName());
+			if (cmpClass != 0) {
+				return cmpClass;
+			}
+			if (recordT1 instanceof AbstractOperationEvent) {
+				final int cmpSignature = ((AbstractOperationEvent) recordT1).getOperationSignature()
+						.compareTo(((AbstractOperationEvent) recordT2).getOperationSignature());
+				if (cmpSignature != 0) {
+					return cmpSignature;
+				}
+			}
+			if (recordT1 instanceof AfterOperationFailedEvent) {
+				final int cmpError = ((AfterOperationFailedEvent) recordT1).getCause().compareTo(
+						((AfterOperationFailedEvent) recordT2).getCause());
+				if (cmpError != 0) {
+					return cmpClass;
+				}
+			}
+		}
+		// All records match.
+		return 0;
+	}
+}
diff --git a/src/main/java/kiekerdays/tcptracereduction/TraceReductionAnalysisConfiguration.java b/src/main/java/kiekerdays/tcptracereduction/TraceReductionAnalysisConfiguration.java
new file mode 100644
index 0000000..2d86730
--- /dev/null
+++ b/src/main/java/kiekerdays/tcptracereduction/TraceReductionAnalysisConfiguration.java
@@ -0,0 +1,101 @@
+package kiekerdays.tcptracereduction;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import kieker.analysis.plugin.filter.flow.TraceEventRecords;
+import kiekerdays.CounterActor;
+import kiekerdays.tcpreader.TcpReaderActor;
+import kiekerdays.tcpreconstruction.TcpTraceReconstructionActor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
+import teetime.util.concurrent.hashmap.TraceBuffer;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+
+public class TraceReductionAnalysisConfiguration {
+
+	private static final String START_MESSAGE = "start";
+
+	private static final Logger LOGGER = LoggerFactory.getLogger(TraceReductionAnalysisConfiguration.class);
+
+	private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
+	private final NavigableMap<TraceEventRecords, TraceAggregationBuffer> trace2Buffer;
+
+	private ActorRef tcpReaderActor;
+	private Collection<Object> validTraces = new LinkedList<>();
+
+	private ActorRef rootActor;
+
+	public TraceReductionAnalysisConfiguration() {
+		trace2Buffer = new TreeMap<TraceEventRecords, TraceAggregationBuffer>(new TraceComperator());
+		init();
+	}
+
+	private void init() {
+		ActorSystem system = ActorSystem.create("TraceReductionAnalysisConfiguration");
+
+		// specify actors
+		Props counterProps = Props.create(CounterActor.class);
+		Props tcpTraceReductionProps = Props.create(TcpTraceReductionActor.class, trace2Buffer, counterProps);
+		Props tcpTraceReconstructionProps = Props.create(TcpTraceReconstructionActor.class, traceId2trace, tcpTraceReductionProps, null);
+		Props tcpReaderProps = Props.create(TcpReaderActor.class, tcpTraceReconstructionProps);
+
+		// create actors
+		final ActorRef tcpReaderActor = system.actorOf(tcpReaderProps, "TcpReaderActor");
+		// final ActorRef tcpTraceReconstructonActor = system.actorOf(tcpTraceReconstructionProps, "TcpTraceReconstructionActor");
+		// final ActorRef tcpTraceReductionActor = system.actorOf(tcpTraceReductionProps, "TcpTraceReductionActor");
+		// final ActorRef validTraceReceiver = system.actorOf(collectorValidProps, "CollectorValid");
+		// final ActorRef invalidTraceReceiver = system.actorOf(collectorInvalidProps, "CollectorInvalid");
+		// final ActorRef tcpReaderActor = system.actorOf(Props.create(TcpReaderActor.class,
+		// new Creator<TcpReaderActor>() {
+		// @Override
+		// public TcpReaderActor create() throws Exception {
+		// return new TcpReaderActor(tcpTraceReconstructonActor);
+		// }
+		// }));
+
+		// tcpReader = TypedActor.get(system).typedActorOf(new TypedProps<TcpReaderActor>(ITcpReader.class, new
+		// Creator<TcpReaderActor>() {
+		// @Override
+		// public TcpReaderActor create() throws Exception {
+		// return new TcpReaderActor(tcpTraceReconstructonActor);
+		// }
+		// }));
+
+		this.tcpReaderActor = tcpReaderActor;
+
+		ActorSelection rootActorSelection = system.actorSelection(tcpReaderActor.path().root());
+		rootActor = rootActorSelection.anchor();
+
+		LOGGER.info("Configuration initialized.");
+	}
+
+	public void start() throws Exception {
+		LOGGER.info("Starting analysis...");
+		// tcpReader.onStarting();
+		// tcpReader.execute();
+		// tcpReader.onTerminating();
+		tcpReaderActor.tell(START_MESSAGE, ActorRef.noSender());
+		LOGGER.info("Analysis started.");
+
+		rootActor.tell(PoisonPill.getInstance(), rootActor);
+		// tcpReaderActor.parent().tell(PoisonPill.getInstance(), getSelf());
+		// Future<Terminated> future = system.terminate();
+		// Await.ready(future, Duration.Inf());
+		LOGGER.info("Analysis stopped.");
+	}
+
+	public Collection<Object> getValidTraces() {
+		return validTraces;
+	}
+
+}
diff --git a/src/test/java/kiekerdays/tcpreconstruction/TraceReconstructionAnalysisConfigurationTest.java b/src/test/java/kiekerdays/tcpreconstruction/TraceReconstructionAnalysisConfigurationTest.java
new file mode 100644
index 0000000..a809d9f
--- /dev/null
+++ b/src/test/java/kiekerdays/tcpreconstruction/TraceReconstructionAnalysisConfigurationTest.java
@@ -0,0 +1,23 @@
+package kiekerdays.tcpreconstruction;
+
+
+public class TraceReconstructionAnalysisConfigurationTest {
+
+	// @Test
+	public void testAnalysis() throws Exception {
+		TraceReconstructionAnalysisConfiguration configuration = new TraceReconstructionAnalysisConfiguration();
+
+		configuration.start();
+		// fail("Akka does not support being executed by JUnit.");
+	}
+
+	public static void main(String[] args) {
+		try {
+			TraceReconstructionAnalysisConfigurationTest test = new TraceReconstructionAnalysisConfigurationTest();
+			test.testAnalysis();
+		} catch (Exception e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+	}
+}
diff --git a/src/test/java/kiekerdays/tcpreconstruction/TraceReductionAnalysisConfigurationTest.java b/src/test/java/kiekerdays/tcptracereduction/TraceReductionAnalysisConfigurationTest.java
similarity index 76%
rename from src/test/java/kiekerdays/tcpreconstruction/TraceReductionAnalysisConfigurationTest.java
rename to src/test/java/kiekerdays/tcptracereduction/TraceReductionAnalysisConfigurationTest.java
index d8c8f9e..1dd04ed 100644
--- a/src/test/java/kiekerdays/tcpreconstruction/TraceReductionAnalysisConfigurationTest.java
+++ b/src/test/java/kiekerdays/tcptracereduction/TraceReductionAnalysisConfigurationTest.java
@@ -1,11 +1,9 @@
-package kiekerdays.tcpreconstruction;
-
+package kiekerdays.tcptracereduction;
 
 public class TraceReductionAnalysisConfigurationTest {
-
 	// @Test
 	public void testAnalysis() throws Exception {
-		TraceReconstructionAnalysisConfiguration configuration = new TraceReconstructionAnalysisConfiguration();
+		TraceReductionAnalysisConfiguration configuration = new TraceReductionAnalysisConfiguration();
 
 		configuration.start();
 		// fail("Akka does not support being executed by JUnit.");
-- 
GitLab