Skip to content
Snippets Groups Projects
Commit 5bd9c449 authored by Christian Wulf's avatar Christian Wulf
Browse files

restructuring

parent 88ec7509
No related branches found
No related tags found
No related merge requests found
Showing
with 25 additions and 377 deletions
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<projectDescription> <projectDescription>
<name>Akka-Performancetest</name> <name>akka-stages</name>
<comment></comment> <comment></comment>
<projects> <projects>
</projects> </projects>
......
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>Akka-Performancetest</groupId> <groupId>com.typesafe.akka</groupId>
<artifactId>Akka-Performancetest</artifactId> <artifactId>akka-stages</artifactId>
<version>0.0.1-SNAPSHOT</version> <version>1.0-SNAPSHOT</version>
<dependencies> <dependencies>
<dependency> <dependency>
......
package experiment.fse15; package akka.common;
import java.util.Collection; import java.util.Collection;
import java.util.LinkedList; import java.util.LinkedList;
...@@ -8,7 +8,7 @@ import org.slf4j.LoggerFactory; ...@@ -8,7 +8,7 @@ import org.slf4j.LoggerFactory;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
import experiment.fse15.WatchActor.WatchMe; import akka.common.WatchActor.WatchMe;
public class CollectorActor extends UntypedActor { public class CollectorActor extends UntypedActor {
......
package experiment.fse15; package akka.common;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
import experiment.fse15.WatchActor.WatchMe; import akka.common.WatchActor.WatchMe;
public class CounterActor extends UntypedActor { public class CounterActor extends UntypedActor {
......
package experiment.fse15; package akka.common;
import java.io.Serializable; import java.io.Serializable;
......
package experiment.fse15; package akka.common;
import java.io.Serializable; import java.io.Serializable;
......
package experiment.fse15; package akka.common;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.Terminated; import akka.actor.Terminated;
......
package experiment.fse15.tcpreader; package akka.io.net;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
...@@ -17,13 +17,13 @@ import akka.actor.ActorSelection; ...@@ -17,13 +17,13 @@ import akka.actor.ActorSelection;
import akka.actor.PoisonPill; import akka.actor.PoisonPill;
import akka.actor.Props; import akka.actor.Props;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
import experiment.fse15.StartMessage; import akka.common.StartMessage;
import experiment.fse15.StopMessage; import akka.common.StopMessage;
import experiment.fse15.WatchActor.WatchMe; import akka.common.WatchActor.WatchMe;
public class TcpReaderActor extends UntypedActor { public class TcpRecordReconstructionActor extends UntypedActor {
private static final Logger LOGGER = LoggerFactory.getLogger(TcpReaderActor.class); private static final Logger LOGGER = LoggerFactory.getLogger(TcpRecordReconstructionActor.class);
private final ILookup<String> stringRegistry = new Lookup<String>(); private final ILookup<String> stringRegistry = new Lookup<String>();
private final ActorRef tcpTraceReconstructionActor; private final ActorRef tcpTraceReconstructionActor;
...@@ -36,7 +36,7 @@ public class TcpReaderActor extends UntypedActor { ...@@ -36,7 +36,7 @@ public class TcpReaderActor extends UntypedActor {
private final int port2 = 10134; private final int port2 = 10134;
private final int bufferCapacity = 65535; private final int bufferCapacity = 65535;
public TcpReaderActor(final Props tcpTraceReconstructionProps, ActorRef watchActor) { public TcpRecordReconstructionActor(final Props tcpTraceReconstructionProps, ActorRef watchActor) {
super(); super();
watchActor.tell(new WatchMe(getSelf()), getSelf()); watchActor.tell(new WatchMe(getSelf()), getSelf());
......
package experiment.fse15.tcpreconstruction; package akka.io.net;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
...@@ -14,8 +14,8 @@ import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault; ...@@ -14,8 +14,8 @@ import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.Props; import akka.actor.Props;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
import experiment.fse15.StopMessage; import akka.common.StopMessage;
import experiment.fse15.WatchActor.WatchMe; import akka.common.WatchActor.WatchMe;
public class TcpTraceReconstructionActor extends UntypedActor { public class TcpTraceReconstructionActor extends UntypedActor {
......
package experiment.fse15.tcptracereduction; package akka.io.net;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map.Entry; import java.util.Map.Entry;
...@@ -12,8 +12,8 @@ import teetime.stage.trace.traceReduction.TraceAggregationBuffer; ...@@ -12,8 +12,8 @@ import teetime.stage.trace.traceReduction.TraceAggregationBuffer;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.Props; import akka.actor.Props;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
import experiment.fse15.StopMessage; import akka.common.StopMessage;
import experiment.fse15.WatchActor.WatchMe; import akka.common.WatchActor.WatchMe;
public class TcpTraceReductionActor extends UntypedActor { public class TcpTraceReductionActor extends UntypedActor {
......
package experiment.fse15.tcpreader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import experiment.fse15.CounterActor;
import experiment.fse15.StartMessage;
import experiment.fse15.WatchActor;
public class TcpReaderAnalysisConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(TcpReaderAnalysisConfiguration.class);
private ActorRef tcpReaderActor;
public TcpReaderAnalysisConfiguration() {
init();
}
private void init() {
ActorSystem system = ActorSystem.create(TcpReaderAnalysisConfiguration.class.getSimpleName());
// create watch actor for termination
Props watchProps = Props.create(WatchActor.class);
final ActorRef watchActor = system.actorOf(watchProps);
// specify actors
Props counterProps = Props.create(CounterActor.class, watchActor);
Props tcpReaderProps = Props.create(TcpReaderActor.class, counterProps, watchActor);
// create actors
final ActorRef tcpReaderActor = system.actorOf(tcpReaderProps, "TcpReaderActor");
// final ActorRef tcpTraceReconstructonActor = system.actorOf(tcpTraceReconstructionProps, "TcpTraceReconstructionActor");
// final ActorRef validTraceReceiver = system.actorOf(collectorValidProps, "CollectorValid");
// final ActorRef invalidTraceReceiver = system.actorOf(collectorInvalidProps, "CollectorInvalid");
// final ActorRef tcpReaderActor = system.actorOf(Props.create(TcpReaderActor.class,
// new Creator<TcpReaderActor>() {
// @Override
// public TcpReaderActor create() throws Exception {
// return new TcpReaderActor(tcpTraceReconstructonActor);
// }
// }));
// tcpReader = TypedActor.get(system).typedActorOf(new TypedProps<TcpReaderActor>(ITcpReader.class, new
// Creator<TcpReaderActor>() {
// @Override
// public TcpReaderActor create() throws Exception {
// return new TcpReaderActor(tcpTraceReconstructonActor);
// }
// }));
this.tcpReaderActor = tcpReaderActor;
LOGGER.info("Configuration initialized.");
}
public void start() throws Exception {
LOGGER.info("Starting analysis...");
// tcpReader.onStarting();
// tcpReader.execute();
// tcpReader.onTerminating();
tcpReaderActor.tell(new StartMessage(), ActorRef.noSender());
LOGGER.info("Analysis started.");
}
}
package experiment.fse15.tcpreconstruction;
import java.util.Collection;
import java.util.LinkedList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import teetime.stage.trace.traceReconstruction.EventBasedTrace;
import teetime.stage.trace.traceReconstruction.EventBasedTraceFactory;
import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import experiment.fse15.CollectorActor;
import experiment.fse15.CounterActor;
import experiment.fse15.StartMessage;
import experiment.fse15.WatchActor;
import experiment.fse15.tcpreader.TcpReaderActor;
public class TraceReconstructionAnalysisConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(TraceReconstructionAnalysisConfiguration.class);
private final ConcurrentHashMapWithDefault<Long, EventBasedTrace> traceId2trace;
private ActorRef tcpReaderActor;
private final Collection<Object> validTraces = new LinkedList<>();
public TraceReconstructionAnalysisConfiguration() {
traceId2trace = new ConcurrentHashMapWithDefault<>(EventBasedTraceFactory.INSTANCE);
init();
}
private void init() {
ActorSystem system = ActorSystem.create(TraceReconstructionAnalysisConfiguration.class.getSimpleName());
// create watch actor for termination
Props watchProps = Props.create(WatchActor.class);
final ActorRef watchActor = system.actorOf(watchProps);
// specify actors
// Props collectorValidProps = Props.create(Collector.class, validTraces); // do not use a collector since it
// pollutes the heap
Props collectorValidProps = Props.create(CounterActor.class, watchActor);
Props collectorInvalidProps = Props.create(CollectorActor.class, watchActor);
Props tcpTraceReconstructionProps = Props.create(TcpTraceReconstructionActor.class, traceId2trace,
collectorValidProps, collectorInvalidProps, watchActor);
Props tcpReaderProps = Props.create(TcpReaderActor.class, tcpTraceReconstructionProps, watchActor);
// create actors
final ActorRef tcpReaderActor = system.actorOf(tcpReaderProps, "TcpReaderActor");
// final ActorRef tcpTraceReconstructonActor = system.actorOf(tcpTraceReconstructionProps,
// "TcpTraceReconstructionActor");
// final ActorRef validTraceReceiver = system.actorOf(collectorValidProps, "CollectorValid");
// final ActorRef invalidTraceReceiver = system.actorOf(collectorInvalidProps, "CollectorInvalid");
// final ActorRef tcpReaderActor = system.actorOf(Props.create(TcpReaderActor.class,
// new Creator<TcpReaderActor>() {
// @Override
// public TcpReaderActor create() throws Exception {
// return new TcpReaderActor(tcpTraceReconstructonActor);
// }
// }));
// tcpReader = TypedActor.get(system).typedActorOf(new TypedProps<TcpReaderActor>(ITcpReader.class, new
// Creator<TcpReaderActor>() {
// @Override
// public TcpReaderActor create() throws Exception {
// return new TcpReaderActor(tcpTraceReconstructonActor);
// }
// }));
this.tcpReaderActor = tcpReaderActor;
LOGGER.info("Configuration initialized.");
}
public void start() throws Exception {
LOGGER.info("Starting analysis...");
// tcpReader.onStarting();
// tcpReader.execute();
// tcpReader.onTerminating();
tcpReaderActor.tell(new StartMessage(), ActorRef.noSender());
LOGGER.info("Analysis started.");
}
public Collection<Object> getValidTraces() {
return validTraces;
}
}
package experiment.fse15.tcptracereduction;
import java.util.Collection;
import java.util.LinkedList;
import java.util.NavigableMap;
import java.util.TreeMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import teetime.stage.trace.traceReconstruction.EventBasedTrace;
import teetime.stage.trace.traceReconstruction.EventBasedTraceFactory;
import teetime.stage.trace.traceReduction.EventBasedTraceComperator;
import teetime.stage.trace.traceReduction.TraceAggregationBuffer;
import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import experiment.fse15.CounterActor;
import experiment.fse15.StartMessage;
import experiment.fse15.WatchActor;
import experiment.fse15.tcpreader.TcpReaderActor;
import experiment.fse15.tcpreconstruction.TcpTraceReconstructionActor;
public class TraceReductionAnalysisConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(TraceReductionAnalysisConfiguration.class);
private final ConcurrentHashMapWithDefault<Long, EventBasedTrace> traceId2trace;
private final NavigableMap<EventBasedTrace, TraceAggregationBuffer> trace2Buffer;
private ActorRef tcpReaderActor;
private final Collection<Object> validTraces = new LinkedList<>();
public TraceReductionAnalysisConfiguration() {
traceId2trace = new ConcurrentHashMapWithDefault<Long, EventBasedTrace>(EventBasedTraceFactory.INSTANCE);
trace2Buffer = new TreeMap<EventBasedTrace, TraceAggregationBuffer>(new EventBasedTraceComperator());
init();
}
private void init() {
ActorSystem system = ActorSystem.create(TraceReductionAnalysisConfiguration.class.getSimpleName());
// create watch actor for termination
Props watchProps = Props.create(WatchActor.class);
final ActorRef watchActor = system.actorOf(watchProps);
// specify actors
Props counterProps = Props.create(CounterActor.class, watchActor);
Props tcpTraceReductionProps = Props.create(TcpTraceReductionActor.class, trace2Buffer, counterProps, watchActor);
Props tcpTraceReconstructionProps = Props.create(TcpTraceReconstructionActor.class, traceId2trace, tcpTraceReductionProps, Props.empty(), watchActor);
Props tcpReaderProps = Props.create(TcpReaderActor.class, tcpTraceReconstructionProps, watchActor);
// create actors
final ActorRef tcpReaderActor = system.actorOf(tcpReaderProps, "TcpReaderActor");
// final ActorRef tcpTraceReconstructonActor = system.actorOf(tcpTraceReconstructionProps, "TcpTraceReconstructionActor");
// final ActorRef tcpTraceReductionActor = system.actorOf(tcpTraceReductionProps, "TcpTraceReductionActor");
// final ActorRef validTraceReceiver = system.actorOf(collectorValidProps, "CollectorValid");
// final ActorRef invalidTraceReceiver = system.actorOf(collectorInvalidProps, "CollectorInvalid");
// final ActorRef tcpReaderActor = system.actorOf(Props.create(TcpReaderActor.class,
// new Creator<TcpReaderActor>() {
// @Override
// public TcpReaderActor create() throws Exception {
// return new TcpReaderActor(tcpTraceReconstructonActor);
// }
// }));
// tcpReader = TypedActor.get(system).typedActorOf(new TypedProps<TcpReaderActor>(ITcpReader.class, new
// Creator<TcpReaderActor>() {
// @Override
// public TcpReaderActor create() throws Exception {
// return new TcpReaderActor(tcpTraceReconstructonActor);
// }
// }));
this.tcpReaderActor = tcpReaderActor;
LOGGER.info("Configuration initialized.");
}
public void start() throws Exception {
LOGGER.info("Starting analysis...");
// tcpReader.onStarting();
// tcpReader.execute();
// tcpReader.onTerminating();
tcpReaderActor.tell(new StartMessage(), ActorRef.noSender());
LOGGER.info("Analysis started.");
}
public Collection<Object> getValidTraces() {
return validTraces;
}
}
package experiment.fse15;
import experiment.fse15.tcpreader.TcpReaderAnalysisConfiguration;
public class TcpReaderAnalysisConfigurationTest {
// @Test
public void testAnalysis() throws Exception {
TcpReaderAnalysisConfiguration configuration = new TcpReaderAnalysisConfiguration();
configuration.start();
// fail("Akka does not support being executed by JUnit.");
}
public static void main(String[] args) {
try {
TcpReaderAnalysisConfigurationTest test = new TcpReaderAnalysisConfigurationTest();
test.testAnalysis();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
package experiment.fse15;
import experiment.fse15.tcpreconstruction.TraceReconstructionAnalysisConfiguration;
public class TraceReconstructionAnalysisConfigurationTest {
// @Test
public void testAnalysis() throws Exception {
TraceReconstructionAnalysisConfiguration configuration = new TraceReconstructionAnalysisConfiguration();
configuration.start();
// fail("Akka does not support being executed by JUnit.");
}
public static void main(String[] args) {
try {
TraceReconstructionAnalysisConfigurationTest test = new TraceReconstructionAnalysisConfigurationTest();
test.testAnalysis();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
package experiment.fse15;
import experiment.fse15.tcptracereduction.TraceReductionAnalysisConfiguration;
public class TraceReductionAnalysisConfigurationTest {
// @Test
public void testAnalysis() throws Exception {
TraceReductionAnalysisConfiguration configuration = new TraceReductionAnalysisConfiguration();
configuration.start();
// fail("Akka does not support being executed by JUnit.");
}
public static void main(String[] args) {
try {
TraceReductionAnalysisConfigurationTest test = new TraceReductionAnalysisConfigurationTest();
test.testAnalysis();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
import ch.qos.logback.classic.filter.ThresholdFilter
statusListener(OnConsoleStatusListener)
appender("FILE", FileAppender) {
file = "akka.log"
append = false
filter(ThresholdFilter) {
level = DEBUG
}
encoder(PatternLayoutEncoder) {
pattern = "%msg%n"
}
}
appender("CONSOLE", ConsoleAppender) {
encoder(PatternLayoutEncoder) {
pattern = "%d{HH:mm:ss.SSS} %level %logger - %msg%n"
}
}
root DEBUG, ["CONSOLE"]
logger "experiment.fse15", TRACE
logger "experiment.fse15.tcpreader", TRACE
logger "experiment.fse15.tcpreconstruction", TRACE
logger "experiment.fse15.tcptracereduction", TRACE
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment