Skip to content
Snippets Groups Projects
Commit fbf20247 authored by Christian Wulf's avatar Christian Wulf
Browse files

added TraceReductionAnalysisConfigurationTest

parent 08cf885a
No related branches found
No related tags found
No related merge requests found
...@@ -33,7 +33,11 @@ public class TcpTraceReconstructionActor extends UntypedActor { ...@@ -33,7 +33,11 @@ public class TcpTraceReconstructionActor extends UntypedActor {
super(); super();
this.traceId2trace = traceId2trace; this.traceId2trace = traceId2trace;
this.validTraceReceiver = context().actorOf(validTraceReceiverProps); this.validTraceReceiver = context().actorOf(validTraceReceiverProps);
this.invalidTraceReceiver = context().actorOf(invalidTraceReceiverProps); if (null != invalidTraceReceiverProps) {
this.invalidTraceReceiver = context().actorOf(invalidTraceReceiverProps);
} else {
this.invalidTraceReceiver = validTraceReceiver;
}
} }
@Override @Override
......
package kiekerdays.tcptracereduction;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.NavigableMap;
import kieker.analysis.plugin.filter.flow.TraceEventRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
public class TcpTraceReductionActor extends UntypedActor {
private static final Logger LOGGER = LoggerFactory.getLogger(TcpTraceReductionActor.class);
private final NavigableMap<TraceEventRecords, TraceAggregationBuffer> trace2buffer;
private ActorRef traceAggregationReceiver;
private long maxCollectionDurationInNs;
public TcpTraceReductionActor(final NavigableMap<TraceEventRecords, TraceAggregationBuffer> trace2buffer, Props traceAggregationProps) {
this.trace2buffer = trace2buffer;
// output "ports"
this.traceAggregationReceiver = context().actorOf(traceAggregationProps);
}
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof TraceEventRecords) {
TraceEventRecords traceEventRecords = (TraceEventRecords) message;
final long timestamp = System.nanoTime();
this.countSameTraces(traceEventRecords, timestamp);
} else if (message instanceof Long) {
Long timestampInNs = (Long) message;
if (timestampInNs != null) {
this.processTimeoutQueue(timestampInNs);
}
} else {
unhandled(message);
}
}
private void countSameTraces(final TraceEventRecords traceEventRecords, final long timestamp) {
synchronized (this.trace2buffer) {
TraceAggregationBuffer traceBuffer = this.trace2buffer.get(traceEventRecords);
if (traceBuffer == null) {
traceBuffer = new TraceAggregationBuffer(timestamp, traceEventRecords);
this.trace2buffer.put(traceEventRecords, traceBuffer);
}
traceBuffer.count();
}
}
private void processTimeoutQueue(final long timestampInNs) {
final long bufferTimeoutInNs = timestampInNs - this.maxCollectionDurationInNs;
synchronized (this.trace2buffer) {
for (final Iterator<Entry<TraceEventRecords, TraceAggregationBuffer>> iterator = this.trace2buffer.entrySet().iterator(); iterator.hasNext();) {
final TraceAggregationBuffer traceBuffer = iterator.next().getValue();
// this.logger.debug("traceBuffer.getBufferCreatedTimestamp(): " + traceBuffer.getBufferCreatedTimestamp() + " vs. " + bufferTimeoutInNs
// + " (bufferTimeoutInNs)");
if (traceBuffer.getBufferCreatedTimestamp() <= bufferTimeoutInNs) {
final TraceEventRecords record = traceBuffer.getTraceEventRecords();
record.setCount(traceBuffer.getCount());
send(record);
}
iterator.remove();
}
}
}
private void send(final TraceEventRecords record) {
traceAggregationReceiver.tell(record, getSelf());
}
@Override
public void postStop() throws Exception {
LOGGER.info("stopped");
onTerminating();
super.postStop();
}
private void onTerminating() {
synchronized (this.trace2buffer) { // BETTER hide and improve synchronization in the buffer
for (final Entry<TraceEventRecords, TraceAggregationBuffer> entry : this.trace2buffer.entrySet()) {
final TraceAggregationBuffer buffer = entry.getValue();
final TraceEventRecords record = buffer.getTraceEventRecords();
record.setCount(buffer.getCount());
send(record);
}
this.trace2buffer.clear();
}
}
public long getMaxCollectionDuration() {
return this.maxCollectionDurationInNs;
}
public void setMaxCollectionDuration(final long maxCollectionDuration) {
this.maxCollectionDurationInNs = maxCollectionDuration;
}
}
package kiekerdays.tcptracereduction;
import kieker.analysis.plugin.filter.flow.TraceEventRecords;
/**
* Buffer for similar traces that are to be aggregated into a single trace.
*
* @author Jan Waller, Florian Biss
*/
public final class TraceAggregationBuffer {
private final long bufferCreatedTimestamp;
private final TraceEventRecords aggregatedTrace;
private int countOfAggregatedTraces;
public TraceAggregationBuffer(final long bufferCreatedTimestamp, final TraceEventRecords trace) {
this.bufferCreatedTimestamp = bufferCreatedTimestamp;
this.aggregatedTrace = trace;
}
public void count() {
this.countOfAggregatedTraces++;
}
public long getBufferCreatedTimestamp() {
return this.bufferCreatedTimestamp;
}
public TraceEventRecords getTraceEventRecords() {
return this.aggregatedTrace;
}
public int getCount() {
return this.countOfAggregatedTraces;
}
}
package kiekerdays.tcptracereduction;
import java.io.Serializable;
import java.util.Comparator;
import kieker.analysis.plugin.filter.flow.TraceEventRecords;
import kieker.common.record.flow.trace.AbstractTraceEvent;
import kieker.common.record.flow.trace.operation.AbstractOperationEvent;
import kieker.common.record.flow.trace.operation.AfterOperationFailedEvent;
/**
* @author Jan Waller, Florian Fittkau, Florian Biss
*/
public final class TraceComperator implements Comparator<TraceEventRecords>, Serializable {
private static final long serialVersionUID = 8920766818232517L;
/**
* Creates a new instance of this class.
*/
public TraceComperator() {
// default empty constructor
}
/**
* {@inheritDoc}
*/
@Override
public int compare(final TraceEventRecords t1, final TraceEventRecords t2) {
final AbstractTraceEvent[] recordsT1 = t1.getTraceEvents();
final AbstractTraceEvent[] recordsT2 = t2.getTraceEvents();
if (recordsT1.length != recordsT2.length) {
return recordsT1.length - recordsT2.length;
}
final int cmpHostnames = t1.getTraceMetadata().getHostname()
.compareTo(t2.getTraceMetadata().getHostname());
if (cmpHostnames != 0) {
return cmpHostnames;
}
for (int i = 0; i < recordsT1.length; i++) {
final AbstractTraceEvent recordT1 = recordsT1[i];
final AbstractTraceEvent recordT2 = recordsT2[i];
final int cmpClass = recordT1.getClass().getName()
.compareTo(recordT2.getClass().getName());
if (cmpClass != 0) {
return cmpClass;
}
if (recordT1 instanceof AbstractOperationEvent) {
final int cmpSignature = ((AbstractOperationEvent) recordT1).getOperationSignature()
.compareTo(((AbstractOperationEvent) recordT2).getOperationSignature());
if (cmpSignature != 0) {
return cmpSignature;
}
}
if (recordT1 instanceof AfterOperationFailedEvent) {
final int cmpError = ((AfterOperationFailedEvent) recordT1).getCause().compareTo(
((AfterOperationFailedEvent) recordT2).getCause());
if (cmpError != 0) {
return cmpClass;
}
}
}
// All records match.
return 0;
}
}
package kiekerdays.tcptracereduction;
import java.util.Collection;
import java.util.LinkedList;
import java.util.NavigableMap;
import java.util.TreeMap;
import kieker.analysis.plugin.filter.flow.TraceEventRecords;
import kiekerdays.CounterActor;
import kiekerdays.tcpreader.TcpReaderActor;
import kiekerdays.tcpreconstruction.TcpTraceReconstructionActor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
import teetime.util.concurrent.hashmap.TraceBuffer;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
public class TraceReductionAnalysisConfiguration {
private static final String START_MESSAGE = "start";
private static final Logger LOGGER = LoggerFactory.getLogger(TraceReductionAnalysisConfiguration.class);
private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
private final NavigableMap<TraceEventRecords, TraceAggregationBuffer> trace2Buffer;
private ActorRef tcpReaderActor;
private Collection<Object> validTraces = new LinkedList<>();
private ActorRef rootActor;
public TraceReductionAnalysisConfiguration() {
trace2Buffer = new TreeMap<TraceEventRecords, TraceAggregationBuffer>(new TraceComperator());
init();
}
private void init() {
ActorSystem system = ActorSystem.create("TraceReductionAnalysisConfiguration");
// specify actors
Props counterProps = Props.create(CounterActor.class);
Props tcpTraceReductionProps = Props.create(TcpTraceReductionActor.class, trace2Buffer, counterProps);
Props tcpTraceReconstructionProps = Props.create(TcpTraceReconstructionActor.class, traceId2trace, tcpTraceReductionProps, null);
Props tcpReaderProps = Props.create(TcpReaderActor.class, tcpTraceReconstructionProps);
// create actors
final ActorRef tcpReaderActor = system.actorOf(tcpReaderProps, "TcpReaderActor");
// final ActorRef tcpTraceReconstructonActor = system.actorOf(tcpTraceReconstructionProps, "TcpTraceReconstructionActor");
// final ActorRef tcpTraceReductionActor = system.actorOf(tcpTraceReductionProps, "TcpTraceReductionActor");
// final ActorRef validTraceReceiver = system.actorOf(collectorValidProps, "CollectorValid");
// final ActorRef invalidTraceReceiver = system.actorOf(collectorInvalidProps, "CollectorInvalid");
// final ActorRef tcpReaderActor = system.actorOf(Props.create(TcpReaderActor.class,
// new Creator<TcpReaderActor>() {
// @Override
// public TcpReaderActor create() throws Exception {
// return new TcpReaderActor(tcpTraceReconstructonActor);
// }
// }));
// tcpReader = TypedActor.get(system).typedActorOf(new TypedProps<TcpReaderActor>(ITcpReader.class, new
// Creator<TcpReaderActor>() {
// @Override
// public TcpReaderActor create() throws Exception {
// return new TcpReaderActor(tcpTraceReconstructonActor);
// }
// }));
this.tcpReaderActor = tcpReaderActor;
ActorSelection rootActorSelection = system.actorSelection(tcpReaderActor.path().root());
rootActor = rootActorSelection.anchor();
LOGGER.info("Configuration initialized.");
}
public void start() throws Exception {
LOGGER.info("Starting analysis...");
// tcpReader.onStarting();
// tcpReader.execute();
// tcpReader.onTerminating();
tcpReaderActor.tell(START_MESSAGE, ActorRef.noSender());
LOGGER.info("Analysis started.");
rootActor.tell(PoisonPill.getInstance(), rootActor);
// tcpReaderActor.parent().tell(PoisonPill.getInstance(), getSelf());
// Future<Terminated> future = system.terminate();
// Await.ready(future, Duration.Inf());
LOGGER.info("Analysis stopped.");
}
public Collection<Object> getValidTraces() {
return validTraces;
}
}
package kiekerdays.tcpreconstruction;
public class TraceReconstructionAnalysisConfigurationTest {
// @Test
public void testAnalysis() throws Exception {
TraceReconstructionAnalysisConfiguration configuration = new TraceReconstructionAnalysisConfiguration();
configuration.start();
// fail("Akka does not support being executed by JUnit.");
}
public static void main(String[] args) {
try {
TraceReconstructionAnalysisConfigurationTest test = new TraceReconstructionAnalysisConfigurationTest();
test.testAnalysis();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
package kiekerdays.tcpreconstruction; package kiekerdays.tcptracereduction;
public class TraceReductionAnalysisConfigurationTest { public class TraceReductionAnalysisConfigurationTest {
// @Test // @Test
public void testAnalysis() throws Exception { public void testAnalysis() throws Exception {
TraceReconstructionAnalysisConfiguration configuration = new TraceReconstructionAnalysisConfiguration(); TraceReductionAnalysisConfiguration configuration = new TraceReductionAnalysisConfiguration();
configuration.start(); configuration.start();
// fail("Akka does not support being executed by JUnit."); // fail("Akka does not support being executed by JUnit.");
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment