Skip to content
Snippets Groups Projects
Commit 88ec7509 authored by Christian Wulf's avatar Christian Wulf
Browse files

fixed termination bug

parent b7ea2b26
No related branches found
No related tags found
No related merge requests found
Showing
with 114 additions and 46 deletions
...@@ -83,6 +83,34 @@ ...@@ -83,6 +83,34 @@
<target>1.7</target> <target>1.7</target>
</configuration> </configuration>
</plugin> </plugin>
<!-- generates jar with tests classes -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.5</version>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- copies all dependencies as separate jars into the target folder -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.10</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins> </plugins>
</build> </build>
</project> </project>
\ No newline at end of file
...@@ -6,7 +6,9 @@ import java.util.LinkedList; ...@@ -6,7 +6,9 @@ import java.util.LinkedList;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import akka.actor.ActorRef;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
import experiment.fse15.WatchActor.WatchMe;
public class CollectorActor extends UntypedActor { public class CollectorActor extends UntypedActor {
...@@ -14,13 +16,15 @@ public class CollectorActor extends UntypedActor { ...@@ -14,13 +16,15 @@ public class CollectorActor extends UntypedActor {
private final Collection<Object> elements; private final Collection<Object> elements;
public CollectorActor() { public CollectorActor(ActorRef watchActor) {
this(new LinkedList<Object>()); this(new LinkedList<Object>(), watchActor);
} }
public CollectorActor(final Collection<Object> elements) { public CollectorActor(final Collection<Object> elements, ActorRef watchActor) {
super(); super();
this.elements = elements; this.elements = elements;
watchActor.tell(new WatchMe(getSelf()), getSelf());
} }
@Override @Override
......
...@@ -3,15 +3,18 @@ package experiment.fse15; ...@@ -3,15 +3,18 @@ package experiment.fse15;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import akka.actor.ActorRef;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
import experiment.fse15.WatchActor.WatchMe;
public class CounterActor extends UntypedActor { public class CounterActor extends UntypedActor {
private static final Logger LOGGER = LoggerFactory.getLogger(CounterActor.class); private static final Logger LOGGER = LoggerFactory.getLogger(CounterActor.class);
private int numMessages; private int numMessages;
public CounterActor() { public CounterActor(ActorRef watchActor) {
super(); super();
watchActor.tell(new WatchMe(getSelf()), getSelf());
} }
@Override @Override
......
package experiment.fse15;
import akka.actor.ActorRef;
import akka.actor.UntypedActor;
public class ReaperActor extends UntypedActor {
public static class WatchMe {
private final ActorRef actorRef;
public WatchMe(ActorRef actorRef) {
this.actorRef = actorRef;
}
public ActorRef getActorRef() {
return actorRef;
}
}
@Override
public void onReceive(Object message) throws Exception {
// TODO Auto-generated method stub
}
}
package experiment.fse15;
import akka.actor.ActorRef;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
public class WatchActor extends UntypedActor {
public static final class WatchMe {
final ActorRef actor;
public WatchMe(ActorRef actor) {
this.actor = actor;
}
}
private int running;
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof WatchMe) {
ActorRef actor = ((WatchMe) message).actor;
context().watch(actor);
running++;
} else if (message instanceof Terminated) {
running--;
if (running == 0) {
context().system().shutdown();
}
} else {
unhandled(message);
}
}
}
...@@ -19,6 +19,7 @@ import akka.actor.Props; ...@@ -19,6 +19,7 @@ import akka.actor.Props;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
import experiment.fse15.StartMessage; import experiment.fse15.StartMessage;
import experiment.fse15.StopMessage; import experiment.fse15.StopMessage;
import experiment.fse15.WatchActor.WatchMe;
public class TcpReaderActor extends UntypedActor { public class TcpReaderActor extends UntypedActor {
...@@ -35,9 +36,11 @@ public class TcpReaderActor extends UntypedActor { ...@@ -35,9 +36,11 @@ public class TcpReaderActor extends UntypedActor {
private final int port2 = 10134; private final int port2 = 10134;
private final int bufferCapacity = 65535; private final int bufferCapacity = 65535;
public TcpReaderActor(final Props tcpTraceReconstructionProps) { public TcpReaderActor(final Props tcpTraceReconstructionProps, ActorRef watchActor) {
super(); super();
watchActor.tell(new WatchMe(getSelf()), getSelf());
this.tcpTraceReconstructionActor = context().actorOf(tcpTraceReconstructionProps); this.tcpTraceReconstructionActor = context().actorOf(tcpTraceReconstructionProps);
this.tcpMonitoringRecordReader = new AbstractRecordTcpReader(port1, bufferCapacity, LOGGER, stringRegistry) { this.tcpMonitoringRecordReader = new AbstractRecordTcpReader(port1, bufferCapacity, LOGGER, stringRegistry) {
...@@ -67,7 +70,7 @@ public class TcpReaderActor extends UntypedActor { ...@@ -67,7 +70,7 @@ public class TcpReaderActor extends UntypedActor {
} else if (message instanceof StopMessage) { } else if (message instanceof StopMessage) {
onTerminating(); onTerminating();
tcpTraceReconstructionActor.tell(message, getSelf()); tcpTraceReconstructionActor.tell(message, getSelf());
// terminate(); terminate();
} else { } else {
unhandled(message); unhandled(message);
} }
......
...@@ -8,6 +8,7 @@ import akka.actor.ActorSystem; ...@@ -8,6 +8,7 @@ import akka.actor.ActorSystem;
import akka.actor.Props; import akka.actor.Props;
import experiment.fse15.CounterActor; import experiment.fse15.CounterActor;
import experiment.fse15.StartMessage; import experiment.fse15.StartMessage;
import experiment.fse15.WatchActor;
public class TcpReaderAnalysisConfiguration { public class TcpReaderAnalysisConfiguration {
...@@ -22,9 +23,13 @@ public class TcpReaderAnalysisConfiguration { ...@@ -22,9 +23,13 @@ public class TcpReaderAnalysisConfiguration {
private void init() { private void init() {
ActorSystem system = ActorSystem.create(TcpReaderAnalysisConfiguration.class.getSimpleName()); ActorSystem system = ActorSystem.create(TcpReaderAnalysisConfiguration.class.getSimpleName());
// create watch actor for termination
Props watchProps = Props.create(WatchActor.class);
final ActorRef watchActor = system.actorOf(watchProps);
// specify actors // specify actors
Props counterProps = Props.create(CounterActor.class); Props counterProps = Props.create(CounterActor.class, watchActor);
Props tcpReaderProps = Props.create(TcpReaderActor.class, counterProps); Props tcpReaderProps = Props.create(TcpReaderActor.class, counterProps, watchActor);
// create actors // create actors
final ActorRef tcpReaderActor = system.actorOf(tcpReaderProps, "TcpReaderActor"); final ActorRef tcpReaderActor = system.actorOf(tcpReaderProps, "TcpReaderActor");
......
...@@ -9,12 +9,13 @@ import kieker.common.record.flow.trace.TraceMetadata; ...@@ -9,12 +9,13 @@ import kieker.common.record.flow.trace.TraceMetadata;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import experiment.fse15.StopMessage;
import teetime.stage.trace.traceReconstruction.EventBasedTrace; import teetime.stage.trace.traceReconstruction.EventBasedTrace;
import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault; import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.Props; import akka.actor.Props;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
import experiment.fse15.StopMessage;
import experiment.fse15.WatchActor.WatchMe;
public class TcpTraceReconstructionActor extends UntypedActor { public class TcpTraceReconstructionActor extends UntypedActor {
...@@ -30,11 +31,13 @@ public class TcpTraceReconstructionActor extends UntypedActor { ...@@ -30,11 +31,13 @@ public class TcpTraceReconstructionActor extends UntypedActor {
private final ActorRef invalidTraceReceiver; private final ActorRef invalidTraceReceiver;
public TcpTraceReconstructionActor(final ConcurrentHashMapWithDefault<Long, EventBasedTrace> traceId2trace, public TcpTraceReconstructionActor(final ConcurrentHashMapWithDefault<Long, EventBasedTrace> traceId2trace,
final Props validTraceReceiverProps, final Props invalidTraceReceiverProps) { final Props validTraceReceiverProps, final Props invalidTraceReceiverProps, ActorRef watchActor) {
super(); super();
this.traceId2trace = traceId2trace; this.traceId2trace = traceId2trace;
this.validTraceReceiver = context().actorOf(validTraceReceiverProps); this.validTraceReceiver = context().actorOf(validTraceReceiverProps);
this.invalidTraceReceiver = context().actorOf(invalidTraceReceiverProps); this.invalidTraceReceiver = context().actorOf(invalidTraceReceiverProps);
watchActor.tell(new WatchMe(getSelf()), getSelf());
} }
@Override @Override
......
...@@ -15,6 +15,7 @@ import akka.actor.Props; ...@@ -15,6 +15,7 @@ import akka.actor.Props;
import experiment.fse15.CollectorActor; import experiment.fse15.CollectorActor;
import experiment.fse15.CounterActor; import experiment.fse15.CounterActor;
import experiment.fse15.StartMessage; import experiment.fse15.StartMessage;
import experiment.fse15.WatchActor;
import experiment.fse15.tcpreader.TcpReaderActor; import experiment.fse15.tcpreader.TcpReaderActor;
public class TraceReconstructionAnalysisConfiguration { public class TraceReconstructionAnalysisConfiguration {
...@@ -34,14 +35,18 @@ public class TraceReconstructionAnalysisConfiguration { ...@@ -34,14 +35,18 @@ public class TraceReconstructionAnalysisConfiguration {
private void init() { private void init() {
ActorSystem system = ActorSystem.create(TraceReconstructionAnalysisConfiguration.class.getSimpleName()); ActorSystem system = ActorSystem.create(TraceReconstructionAnalysisConfiguration.class.getSimpleName());
// create watch actor for termination
Props watchProps = Props.create(WatchActor.class);
final ActorRef watchActor = system.actorOf(watchProps);
// specify actors // specify actors
// Props collectorValidProps = Props.create(Collector.class, validTraces); // do not use a collector since it // Props collectorValidProps = Props.create(Collector.class, validTraces); // do not use a collector since it
// pollutes the heap // pollutes the heap
Props collectorValidProps = Props.create(CounterActor.class); Props collectorValidProps = Props.create(CounterActor.class, watchActor);
Props collectorInvalidProps = Props.create(CollectorActor.class); Props collectorInvalidProps = Props.create(CollectorActor.class, watchActor);
Props tcpTraceReconstructionProps = Props.create(TcpTraceReconstructionActor.class, traceId2trace, Props tcpTraceReconstructionProps = Props.create(TcpTraceReconstructionActor.class, traceId2trace,
collectorValidProps, collectorInvalidProps); collectorValidProps, collectorInvalidProps, watchActor);
Props tcpReaderProps = Props.create(TcpReaderActor.class, tcpTraceReconstructionProps); Props tcpReaderProps = Props.create(TcpReaderActor.class, tcpTraceReconstructionProps, watchActor);
// create actors // create actors
final ActorRef tcpReaderActor = system.actorOf(tcpReaderProps, "TcpReaderActor"); final ActorRef tcpReaderActor = system.actorOf(tcpReaderProps, "TcpReaderActor");
......
...@@ -7,12 +7,13 @@ import java.util.NavigableMap; ...@@ -7,12 +7,13 @@ import java.util.NavigableMap;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import experiment.fse15.StopMessage;
import teetime.stage.trace.traceReconstruction.EventBasedTrace; import teetime.stage.trace.traceReconstruction.EventBasedTrace;
import teetime.stage.trace.traceReduction.TraceAggregationBuffer; import teetime.stage.trace.traceReduction.TraceAggregationBuffer;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.Props; import akka.actor.Props;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
import experiment.fse15.StopMessage;
import experiment.fse15.WatchActor.WatchMe;
public class TcpTraceReductionActor extends UntypedActor { public class TcpTraceReductionActor extends UntypedActor {
...@@ -24,10 +25,12 @@ public class TcpTraceReductionActor extends UntypedActor { ...@@ -24,10 +25,12 @@ public class TcpTraceReductionActor extends UntypedActor {
private long maxCollectionDurationInNs; private long maxCollectionDurationInNs;
public TcpTraceReductionActor(final NavigableMap<EventBasedTrace, TraceAggregationBuffer> trace2buffer, public TcpTraceReductionActor(final NavigableMap<EventBasedTrace, TraceAggregationBuffer> trace2buffer,
final Props traceAggregationReceiverProps) { final Props traceAggregationReceiverProps, ActorRef watchActor) {
this.trace2buffer = trace2buffer; this.trace2buffer = trace2buffer;
// output "ports" // output "ports"
this.traceAggregationReceiver = context().actorOf(traceAggregationReceiverProps); this.traceAggregationReceiver = context().actorOf(traceAggregationReceiverProps);
watchActor.tell(new WatchMe(getSelf()), getSelf());
} }
@Override @Override
......
...@@ -18,6 +18,7 @@ import akka.actor.ActorSystem; ...@@ -18,6 +18,7 @@ import akka.actor.ActorSystem;
import akka.actor.Props; import akka.actor.Props;
import experiment.fse15.CounterActor; import experiment.fse15.CounterActor;
import experiment.fse15.StartMessage; import experiment.fse15.StartMessage;
import experiment.fse15.WatchActor;
import experiment.fse15.tcpreader.TcpReaderActor; import experiment.fse15.tcpreader.TcpReaderActor;
import experiment.fse15.tcpreconstruction.TcpTraceReconstructionActor; import experiment.fse15.tcpreconstruction.TcpTraceReconstructionActor;
...@@ -40,11 +41,15 @@ public class TraceReductionAnalysisConfiguration { ...@@ -40,11 +41,15 @@ public class TraceReductionAnalysisConfiguration {
private void init() { private void init() {
ActorSystem system = ActorSystem.create(TraceReductionAnalysisConfiguration.class.getSimpleName()); ActorSystem system = ActorSystem.create(TraceReductionAnalysisConfiguration.class.getSimpleName());
// create watch actor for termination
Props watchProps = Props.create(WatchActor.class);
final ActorRef watchActor = system.actorOf(watchProps);
// specify actors // specify actors
Props counterProps = Props.create(CounterActor.class); Props counterProps = Props.create(CounterActor.class, watchActor);
Props tcpTraceReductionProps = Props.create(TcpTraceReductionActor.class, trace2Buffer, counterProps); Props tcpTraceReductionProps = Props.create(TcpTraceReductionActor.class, trace2Buffer, counterProps, watchActor);
Props tcpTraceReconstructionProps = Props.create(TcpTraceReconstructionActor.class, traceId2trace, tcpTraceReductionProps, Props.empty()); Props tcpTraceReconstructionProps = Props.create(TcpTraceReconstructionActor.class, traceId2trace, tcpTraceReductionProps, Props.empty(), watchActor);
Props tcpReaderProps = Props.create(TcpReaderActor.class, tcpTraceReconstructionProps); Props tcpReaderProps = Props.create(TcpReaderActor.class, tcpTraceReconstructionProps, watchActor);
// create actors // create actors
final ActorRef tcpReaderActor = system.actorOf(tcpReaderProps, "TcpReaderActor"); final ActorRef tcpReaderActor = system.actorOf(tcpReaderProps, "TcpReaderActor");
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment