From 88ec7509d276b3a542c5cfed79942f61a66d4490 Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Mon, 9 Feb 2015 10:46:06 +0100 Subject: [PATCH] fixed termination bug --- pom.xml | 28 +++++++++++++++ .../java/experiment/fse15/CollectorActor.java | 10 ++++-- .../java/experiment/fse15/CounterActor.java | 5 ++- .../java/experiment/fse15/ReaperActor.java | 26 -------------- .../java/experiment/fse15/WatchActor.java | 35 +++++++++++++++++++ .../fse15/tcpreader/TcpReaderActor.java | 7 ++-- .../TcpReaderAnalysisConfiguration.java | 9 +++-- .../TcpTraceReconstructionActor.java | 7 ++-- ...ceReconstructionAnalysisConfiguration.java | 13 ++++--- .../TcpTraceReductionActor.java | 7 ++-- .../TraceReductionAnalysisConfiguration.java | 13 ++++--- 11 files changed, 114 insertions(+), 46 deletions(-) delete mode 100644 src/main/java/experiment/fse15/ReaperActor.java create mode 100644 src/main/java/experiment/fse15/WatchActor.java diff --git a/pom.xml b/pom.xml index 8e7e706..4fddf47 100644 --- a/pom.xml +++ b/pom.xml @@ -83,6 +83,34 @@ <target>1.7</target> </configuration> </plugin> + <!-- generates jar with tests classes --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>2.5</version> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + <!-- copies all dependencies as separate jars into the target folder --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <version>2.10</version> + <executions> + <execution> + <id>copy-dependencies</id> + <phase>package</phase> + <goals> + <goal>copy-dependencies</goal> + </goals> + </execution> + </executions> + </plugin> </plugins> </build> </project> \ No newline at end of file diff --git a/src/main/java/experiment/fse15/CollectorActor.java b/src/main/java/experiment/fse15/CollectorActor.java index b5ff29a..99d3e8a 100644 --- a/src/main/java/experiment/fse15/CollectorActor.java +++ b/src/main/java/experiment/fse15/CollectorActor.java @@ -6,7 +6,9 @@ import java.util.LinkedList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import akka.actor.ActorRef; import akka.actor.UntypedActor; +import experiment.fse15.WatchActor.WatchMe; public class CollectorActor extends UntypedActor { @@ -14,13 +16,15 @@ public class CollectorActor extends UntypedActor { private final Collection<Object> elements; - public CollectorActor() { - this(new LinkedList<Object>()); + public CollectorActor(ActorRef watchActor) { + this(new LinkedList<Object>(), watchActor); } - public CollectorActor(final Collection<Object> elements) { + public CollectorActor(final Collection<Object> elements, ActorRef watchActor) { super(); this.elements = elements; + + watchActor.tell(new WatchMe(getSelf()), getSelf()); } @Override diff --git a/src/main/java/experiment/fse15/CounterActor.java b/src/main/java/experiment/fse15/CounterActor.java index ca24bd8..56dcbb1 100644 --- a/src/main/java/experiment/fse15/CounterActor.java +++ b/src/main/java/experiment/fse15/CounterActor.java @@ -3,15 +3,18 @@ package experiment.fse15; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import akka.actor.ActorRef; import akka.actor.UntypedActor; +import experiment.fse15.WatchActor.WatchMe; public class CounterActor extends UntypedActor { private static final Logger LOGGER = LoggerFactory.getLogger(CounterActor.class); private int numMessages; - public CounterActor() { + public CounterActor(ActorRef watchActor) { super(); + watchActor.tell(new WatchMe(getSelf()), getSelf()); } @Override diff --git a/src/main/java/experiment/fse15/ReaperActor.java b/src/main/java/experiment/fse15/ReaperActor.java deleted file mode 100644 index 4e846f9..0000000 --- a/src/main/java/experiment/fse15/ReaperActor.java +++ /dev/null @@ -1,26 +0,0 @@ -package experiment.fse15; - -import akka.actor.ActorRef; -import akka.actor.UntypedActor; - -public class ReaperActor extends UntypedActor { - - public static class WatchMe { - private final ActorRef actorRef; - - public WatchMe(ActorRef actorRef) { - this.actorRef = actorRef; - } - - public ActorRef getActorRef() { - return actorRef; - } - } - - @Override - public void onReceive(Object message) throws Exception { - // TODO Auto-generated method stub - - } - -} diff --git a/src/main/java/experiment/fse15/WatchActor.java b/src/main/java/experiment/fse15/WatchActor.java new file mode 100644 index 0000000..fd344ff --- /dev/null +++ b/src/main/java/experiment/fse15/WatchActor.java @@ -0,0 +1,35 @@ +package experiment.fse15; + +import akka.actor.ActorRef; +import akka.actor.Terminated; +import akka.actor.UntypedActor; + +public class WatchActor extends UntypedActor { + + public static final class WatchMe { + final ActorRef actor; + + public WatchMe(ActorRef actor) { + this.actor = actor; + } + } + + private int running; + + @Override + public void onReceive(Object message) throws Exception { + if (message instanceof WatchMe) { + ActorRef actor = ((WatchMe) message).actor; + context().watch(actor); + running++; + } else if (message instanceof Terminated) { + running--; + if (running == 0) { + context().system().shutdown(); + } + } else { + unhandled(message); + } + } + +} diff --git a/src/main/java/experiment/fse15/tcpreader/TcpReaderActor.java b/src/main/java/experiment/fse15/tcpreader/TcpReaderActor.java index bf9829d..9ec229e 100644 --- a/src/main/java/experiment/fse15/tcpreader/TcpReaderActor.java +++ b/src/main/java/experiment/fse15/tcpreader/TcpReaderActor.java @@ -19,6 +19,7 @@ import akka.actor.Props; import akka.actor.UntypedActor; import experiment.fse15.StartMessage; import experiment.fse15.StopMessage; +import experiment.fse15.WatchActor.WatchMe; public class TcpReaderActor extends UntypedActor { @@ -35,9 +36,11 @@ public class TcpReaderActor extends UntypedActor { private final int port2 = 10134; private final int bufferCapacity = 65535; - public TcpReaderActor(final Props tcpTraceReconstructionProps) { + public TcpReaderActor(final Props tcpTraceReconstructionProps, ActorRef watchActor) { super(); + watchActor.tell(new WatchMe(getSelf()), getSelf()); + this.tcpTraceReconstructionActor = context().actorOf(tcpTraceReconstructionProps); this.tcpMonitoringRecordReader = new AbstractRecordTcpReader(port1, bufferCapacity, LOGGER, stringRegistry) { @@ -67,7 +70,7 @@ public class TcpReaderActor extends UntypedActor { } else if (message instanceof StopMessage) { onTerminating(); tcpTraceReconstructionActor.tell(message, getSelf()); -// terminate(); + terminate(); } else { unhandled(message); } diff --git a/src/main/java/experiment/fse15/tcpreader/TcpReaderAnalysisConfiguration.java b/src/main/java/experiment/fse15/tcpreader/TcpReaderAnalysisConfiguration.java index ba95784..6700133 100644 --- a/src/main/java/experiment/fse15/tcpreader/TcpReaderAnalysisConfiguration.java +++ b/src/main/java/experiment/fse15/tcpreader/TcpReaderAnalysisConfiguration.java @@ -8,6 +8,7 @@ import akka.actor.ActorSystem; import akka.actor.Props; import experiment.fse15.CounterActor; import experiment.fse15.StartMessage; +import experiment.fse15.WatchActor; public class TcpReaderAnalysisConfiguration { @@ -22,9 +23,13 @@ public class TcpReaderAnalysisConfiguration { private void init() { ActorSystem system = ActorSystem.create(TcpReaderAnalysisConfiguration.class.getSimpleName()); + // create watch actor for termination + Props watchProps = Props.create(WatchActor.class); + final ActorRef watchActor = system.actorOf(watchProps); + // specify actors - Props counterProps = Props.create(CounterActor.class); - Props tcpReaderProps = Props.create(TcpReaderActor.class, counterProps); + Props counterProps = Props.create(CounterActor.class, watchActor); + Props tcpReaderProps = Props.create(TcpReaderActor.class, counterProps, watchActor); // create actors final ActorRef tcpReaderActor = system.actorOf(tcpReaderProps, "TcpReaderActor"); diff --git a/src/main/java/experiment/fse15/tcpreconstruction/TcpTraceReconstructionActor.java b/src/main/java/experiment/fse15/tcpreconstruction/TcpTraceReconstructionActor.java index c5d75b4..60dbb70 100644 --- a/src/main/java/experiment/fse15/tcpreconstruction/TcpTraceReconstructionActor.java +++ b/src/main/java/experiment/fse15/tcpreconstruction/TcpTraceReconstructionActor.java @@ -9,12 +9,13 @@ import kieker.common.record.flow.trace.TraceMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import experiment.fse15.StopMessage; import teetime.stage.trace.traceReconstruction.EventBasedTrace; import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault; import akka.actor.ActorRef; import akka.actor.Props; import akka.actor.UntypedActor; +import experiment.fse15.StopMessage; +import experiment.fse15.WatchActor.WatchMe; public class TcpTraceReconstructionActor extends UntypedActor { @@ -30,11 +31,13 @@ public class TcpTraceReconstructionActor extends UntypedActor { private final ActorRef invalidTraceReceiver; public TcpTraceReconstructionActor(final ConcurrentHashMapWithDefault<Long, EventBasedTrace> traceId2trace, - final Props validTraceReceiverProps, final Props invalidTraceReceiverProps) { + final Props validTraceReceiverProps, final Props invalidTraceReceiverProps, ActorRef watchActor) { super(); this.traceId2trace = traceId2trace; this.validTraceReceiver = context().actorOf(validTraceReceiverProps); this.invalidTraceReceiver = context().actorOf(invalidTraceReceiverProps); + + watchActor.tell(new WatchMe(getSelf()), getSelf()); } @Override diff --git a/src/main/java/experiment/fse15/tcpreconstruction/TraceReconstructionAnalysisConfiguration.java b/src/main/java/experiment/fse15/tcpreconstruction/TraceReconstructionAnalysisConfiguration.java index 66c09ed..e6ebb16 100644 --- a/src/main/java/experiment/fse15/tcpreconstruction/TraceReconstructionAnalysisConfiguration.java +++ b/src/main/java/experiment/fse15/tcpreconstruction/TraceReconstructionAnalysisConfiguration.java @@ -15,6 +15,7 @@ import akka.actor.Props; import experiment.fse15.CollectorActor; import experiment.fse15.CounterActor; import experiment.fse15.StartMessage; +import experiment.fse15.WatchActor; import experiment.fse15.tcpreader.TcpReaderActor; public class TraceReconstructionAnalysisConfiguration { @@ -34,14 +35,18 @@ public class TraceReconstructionAnalysisConfiguration { private void init() { ActorSystem system = ActorSystem.create(TraceReconstructionAnalysisConfiguration.class.getSimpleName()); + // create watch actor for termination + Props watchProps = Props.create(WatchActor.class); + final ActorRef watchActor = system.actorOf(watchProps); + // specify actors // Props collectorValidProps = Props.create(Collector.class, validTraces); // do not use a collector since it // pollutes the heap - Props collectorValidProps = Props.create(CounterActor.class); - Props collectorInvalidProps = Props.create(CollectorActor.class); + Props collectorValidProps = Props.create(CounterActor.class, watchActor); + Props collectorInvalidProps = Props.create(CollectorActor.class, watchActor); Props tcpTraceReconstructionProps = Props.create(TcpTraceReconstructionActor.class, traceId2trace, - collectorValidProps, collectorInvalidProps); - Props tcpReaderProps = Props.create(TcpReaderActor.class, tcpTraceReconstructionProps); + collectorValidProps, collectorInvalidProps, watchActor); + Props tcpReaderProps = Props.create(TcpReaderActor.class, tcpTraceReconstructionProps, watchActor); // create actors final ActorRef tcpReaderActor = system.actorOf(tcpReaderProps, "TcpReaderActor"); diff --git a/src/main/java/experiment/fse15/tcptracereduction/TcpTraceReductionActor.java b/src/main/java/experiment/fse15/tcptracereduction/TcpTraceReductionActor.java index 91aa8b1..90e03b7 100644 --- a/src/main/java/experiment/fse15/tcptracereduction/TcpTraceReductionActor.java +++ b/src/main/java/experiment/fse15/tcptracereduction/TcpTraceReductionActor.java @@ -7,12 +7,13 @@ import java.util.NavigableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import experiment.fse15.StopMessage; import teetime.stage.trace.traceReconstruction.EventBasedTrace; import teetime.stage.trace.traceReduction.TraceAggregationBuffer; import akka.actor.ActorRef; import akka.actor.Props; import akka.actor.UntypedActor; +import experiment.fse15.StopMessage; +import experiment.fse15.WatchActor.WatchMe; public class TcpTraceReductionActor extends UntypedActor { @@ -24,10 +25,12 @@ public class TcpTraceReductionActor extends UntypedActor { private long maxCollectionDurationInNs; public TcpTraceReductionActor(final NavigableMap<EventBasedTrace, TraceAggregationBuffer> trace2buffer, - final Props traceAggregationReceiverProps) { + final Props traceAggregationReceiverProps, ActorRef watchActor) { this.trace2buffer = trace2buffer; // output "ports" this.traceAggregationReceiver = context().actorOf(traceAggregationReceiverProps); + + watchActor.tell(new WatchMe(getSelf()), getSelf()); } @Override diff --git a/src/main/java/experiment/fse15/tcptracereduction/TraceReductionAnalysisConfiguration.java b/src/main/java/experiment/fse15/tcptracereduction/TraceReductionAnalysisConfiguration.java index e4f0777..e927b7c 100644 --- a/src/main/java/experiment/fse15/tcptracereduction/TraceReductionAnalysisConfiguration.java +++ b/src/main/java/experiment/fse15/tcptracereduction/TraceReductionAnalysisConfiguration.java @@ -18,6 +18,7 @@ import akka.actor.ActorSystem; import akka.actor.Props; import experiment.fse15.CounterActor; import experiment.fse15.StartMessage; +import experiment.fse15.WatchActor; import experiment.fse15.tcpreader.TcpReaderActor; import experiment.fse15.tcpreconstruction.TcpTraceReconstructionActor; @@ -40,11 +41,15 @@ public class TraceReductionAnalysisConfiguration { private void init() { ActorSystem system = ActorSystem.create(TraceReductionAnalysisConfiguration.class.getSimpleName()); + // create watch actor for termination + Props watchProps = Props.create(WatchActor.class); + final ActorRef watchActor = system.actorOf(watchProps); + // specify actors - Props counterProps = Props.create(CounterActor.class); - Props tcpTraceReductionProps = Props.create(TcpTraceReductionActor.class, trace2Buffer, counterProps); - Props tcpTraceReconstructionProps = Props.create(TcpTraceReconstructionActor.class, traceId2trace, tcpTraceReductionProps, Props.empty()); - Props tcpReaderProps = Props.create(TcpReaderActor.class, tcpTraceReconstructionProps); + Props counterProps = Props.create(CounterActor.class, watchActor); + Props tcpTraceReductionProps = Props.create(TcpTraceReductionActor.class, trace2Buffer, counterProps, watchActor); + Props tcpTraceReconstructionProps = Props.create(TcpTraceReconstructionActor.class, traceId2trace, tcpTraceReductionProps, Props.empty(), watchActor); + Props tcpReaderProps = Props.create(TcpReaderActor.class, tcpTraceReconstructionProps, watchActor); // create actors final ActorRef tcpReaderActor = system.actorOf(tcpReaderProps, "TcpReaderActor"); -- GitLab