Skip to content
Snippets Groups Projects
Commit 08cf885a authored by Christian Wulf's avatar Christian Wulf
Browse files

refactored package structure;

renamed some classes;
added TcpReaderAnalysisConfigurationTest
parent 1843b5ab
No related branches found
No related tags found
No related merge requests found
Showing
with 189 additions and 32 deletions
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<classpath> <classpath>
<classpathentry kind="src" path="src/main/java"/> <classpathentry kind="src" output="target/classes" path="src/main/java">
<classpathentry kind="src" path="src/test/java"/> <attributes>
<classpathentry kind="src" path="src/test/resources"/> <attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" output="target/test-classes" path="src/test/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry including="**/*.java" kind="src" path="src/test/resources"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.7"> <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.7">
<attributes> <attributes>
<attribute name="maven.pomderived" value="true"/> <attribute name="maven.pomderived" value="true"/>
......
...@@ -8,30 +8,29 @@ import org.slf4j.LoggerFactory; ...@@ -8,30 +8,29 @@ import org.slf4j.LoggerFactory;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
public class Collector extends UntypedActor { public class CollectorActor extends UntypedActor {
private static final Logger LOGGER = LoggerFactory.getLogger(Collector.class); private static final Logger LOGGER = LoggerFactory.getLogger(CollectorActor.class);
private final Collection<Object> elements; private final Collection<Object> elements;
public Collector() { public CollectorActor() {
this(new LinkedList<Object>()); this(new LinkedList<Object>());
} }
public Collector(final Collection<Object> elements) { public CollectorActor(final Collection<Object> elements) {
super(); super();
this.elements = elements; this.elements = elements;
} }
@Override @Override
public void onReceive(Object message) throws Exception { public void onReceive(Object message) throws Exception {
elements.add(message.getClass()); elements.add(message);
} }
@Override @Override
public void postStop() throws Exception { public void postStop() throws Exception {
LOGGER.info("stopped: " + elements); LOGGER.info("stopped: " + elements.size());
super.postStop(); super.postStop();
} }
} }
package kiekerdays;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import akka.actor.UntypedActor;
public class CounterActor extends UntypedActor {
private static final Logger LOGGER = LoggerFactory.getLogger(CounterActor.class);
private int numMessages;
public CounterActor() {
super();
}
@Override
public void onReceive(Object message) throws Exception {
numMessages++;
}
@Override
public void postStop() throws Exception {
LOGGER.info("stopped: " + numMessages);
super.postStop();
}
}
...@@ -3,7 +3,7 @@ package kiekerdays; ...@@ -3,7 +3,7 @@ package kiekerdays;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
public class Reaper extends UntypedActor { public class ReaperActor extends UntypedActor {
public static class WatchMe { public static class WatchMe {
private final ActorRef actorRef; private final ActorRef actorRef;
......
...@@ -5,6 +5,7 @@ import kieker.common.record.IMonitoringRecord; ...@@ -5,6 +5,7 @@ import kieker.common.record.IMonitoringRecord;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import teetime.util.TcpReader;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.Props; import akka.actor.Props;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
...@@ -16,9 +17,12 @@ public class TcpReaderActor extends UntypedActor { ...@@ -16,9 +17,12 @@ public class TcpReaderActor extends UntypedActor {
private ActorRef tcpTraceReconstructionActor; private ActorRef tcpTraceReconstructionActor;
private TcpReader tcpReader = new TcpReader() { private TcpReader tcpReader = new TcpReader() {
// private int numProcessedRecords;
@Override @Override
protected final void send(IMonitoringRecord record) { protected final void send(IMonitoringRecord record) {
tcpTraceReconstructionActor.tell(record, getSelf()); tcpTraceReconstructionActor.tell(record, getSelf());
// LOGGER.info("#processed records: " + numProcessedRecords++);
}; };
}; };
......
package kiekerdays.tcpreader;
import kiekerdays.CounterActor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
public class TcpReaderAnalysisConfiguration {
private static final String START_MESSAGE = "start";
private static final Logger LOGGER = LoggerFactory.getLogger(TcpReaderAnalysisConfiguration.class);
private ActorRef tcpReaderActor;
private ActorRef rootActor;
public TcpReaderAnalysisConfiguration() {
init();
}
private void init() {
ActorSystem system = ActorSystem.create("TraceReductionAnalysisConfiguration");
// specify actors
Props counterProps = Props.create(CounterActor.class);
Props tcpReaderProps = Props.create(TcpReaderActor.class, counterProps);
// create actors
final ActorRef tcpReaderActor = system.actorOf(tcpReaderProps, "TcpReaderActor");
// final ActorRef tcpTraceReconstructonActor = system.actorOf(tcpTraceReconstructionProps, "TcpTraceReconstructionActor");
// final ActorRef validTraceReceiver = system.actorOf(collectorValidProps, "CollectorValid");
// final ActorRef invalidTraceReceiver = system.actorOf(collectorInvalidProps, "CollectorInvalid");
// final ActorRef tcpReaderActor = system.actorOf(Props.create(TcpReaderActor.class,
// new Creator<TcpReaderActor>() {
// @Override
// public TcpReaderActor create() throws Exception {
// return new TcpReaderActor(tcpTraceReconstructonActor);
// }
// }));
// tcpReader = TypedActor.get(system).typedActorOf(new TypedProps<TcpReaderActor>(ITcpReader.class, new
// Creator<TcpReaderActor>() {
// @Override
// public TcpReaderActor create() throws Exception {
// return new TcpReaderActor(tcpTraceReconstructonActor);
// }
// }));
this.tcpReaderActor = tcpReaderActor;
ActorSelection rootActorSelection = system.actorSelection(tcpReaderActor.path().root());
rootActor = rootActorSelection.anchor();
LOGGER.info("Configuration initialized.");
}
public void start() throws Exception {
LOGGER.info("Starting analysis...");
// tcpReader.onStarting();
// tcpReader.execute();
// tcpReader.onTerminating();
tcpReaderActor.tell(START_MESSAGE, ActorRef.noSender());
LOGGER.info("Analysis started.");
rootActor.tell(PoisonPill.getInstance(), rootActor);
// tcpReaderActor.parent().tell(PoisonPill.getInstance(), getSelf());
// Future<Terminated> future = system.terminate();
// Await.ready(future, Duration.Inf());
LOGGER.info("Analysis stopped.");
}
}
package kiekerdays.tcpreconstruction; package kiekerdays.tcpreconstruction;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList; import java.util.LinkedList;
import kiekerdays.Collector; import kiekerdays.CollectorActor;
import kiekerdays.CounterActor;
import kiekerdays.tcpreader.TcpReaderActor; import kiekerdays.tcpreader.TcpReaderActor;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -39,8 +39,9 @@ public class TraceReconstructionAnalysisConfiguration { ...@@ -39,8 +39,9 @@ public class TraceReconstructionAnalysisConfiguration {
ActorSystem system = ActorSystem.create("TraceReductionAnalysisConfiguration"); ActorSystem system = ActorSystem.create("TraceReductionAnalysisConfiguration");
// specify actors // specify actors
Props collectorValidProps = Props.create(Collector.class, validTraces); // Props collectorValidProps = Props.create(Collector.class, validTraces); // do not use a collector since it pollutes the heap
Props collectorInvalidProps = Props.create(Collector.class, Collections.emptyList()); Props collectorValidProps = Props.create(CounterActor.class);
Props collectorInvalidProps = Props.create(CollectorActor.class);
Props tcpTraceReconstructionProps = Props.create(TcpTraceReconstructionActor.class, traceId2trace, collectorValidProps, collectorInvalidProps); Props tcpTraceReconstructionProps = Props.create(TcpTraceReconstructionActor.class, traceId2trace, collectorValidProps, collectorInvalidProps);
Props tcpReaderProps = Props.create(TcpReaderActor.class, tcpTraceReconstructionProps); Props tcpReaderProps = Props.create(TcpReaderActor.class, tcpTraceReconstructionProps);
......
package teetime.stage.io; package teetime.stage.io;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.BufferUnderflowException; import java.nio.BufferUnderflowException;
...@@ -18,6 +17,8 @@ public abstract class AbstractTcpReader<T> { ...@@ -18,6 +17,8 @@ public abstract class AbstractTcpReader<T> {
private final int port; private final int port;
private final int bufferCapacity; private final int bufferCapacity;
private int numBufferUnderflows;
public AbstractTcpReader(final int port, final int bufferCapacity) { public AbstractTcpReader(final int port, final int bufferCapacity) {
super(); super();
this.port = port; this.port = port;
...@@ -41,13 +42,13 @@ public abstract class AbstractTcpReader<T> { ...@@ -41,13 +42,13 @@ public abstract class AbstractTcpReader<T> {
socketChannel.close(); socketChannel.close();
} }
} catch (final IOException ex) { } catch (final IOException ex) {
logger.error("Error while reading", ex); logger.error("Error while reading.", ex);
} finally { } finally {
if (null != serversocket) { if (null != serversocket) {
try { try {
serversocket.close(); serversocket.close();
} catch (final IOException e) { } catch (final IOException e) {
logger.debug("Failed to close TCP connection!", e); logger.debug("Failed to close TCP connection.", e);
} }
} }
} }
...@@ -58,15 +59,30 @@ public abstract class AbstractTcpReader<T> { ...@@ -58,15 +59,30 @@ public abstract class AbstractTcpReader<T> {
try { try {
while (buffer.hasRemaining()) { while (buffer.hasRemaining()) {
buffer.mark(); buffer.mark();
this.read(buffer); boolean success = this.read(buffer);
if (!success) {
buffer.reset();
buffer.compact();
return;
}
} }
buffer.clear(); buffer.clear();
} catch (final BufferUnderflowException ex) { } catch (final BufferUnderflowException ex) {
// logger.warn("Unexpected exception. Resetting and compacting buffer.", ex);
numBufferUnderflows++;
logger.warn("numBufferUnderflows: " + numBufferUnderflows);
buffer.reset(); buffer.reset();
buffer.compact(); buffer.compact();
} }
} }
protected abstract void read(final ByteBuffer buffer); /**
* @param buffer
* to be read from
* @return <ul>
* <li><code>true</code> when there were enough bytes to perform the read operation
* <li><code>false</code> otherwise. In this case, the buffer is reset, compacted, and filled with new content.
*/
protected abstract boolean read(final ByteBuffer buffer);
} }
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
***************************************************************************/ ***************************************************************************/
package kiekerdays.tcpreader; package teetime.util;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
...@@ -86,7 +86,7 @@ public class TcpReader extends AbstractTcpReader<IMonitoringRecord> { ...@@ -86,7 +86,7 @@ public class TcpReader extends AbstractTcpReader<IMonitoringRecord> {
} }
@Override @Override
protected final void read(final ByteBuffer buffer) { protected final boolean read(final ByteBuffer buffer) {
final int clazzId = buffer.getInt(); final int clazzId = buffer.getInt();
final long loggingTimestamp = buffer.getLong(); final long loggingTimestamp = buffer.getLong();
...@@ -97,11 +97,11 @@ public class TcpReader extends AbstractTcpReader<IMonitoringRecord> { ...@@ -97,11 +97,11 @@ public class TcpReader extends AbstractTcpReader<IMonitoringRecord> {
record.setLoggingTimestamp(loggingTimestamp); record.setLoggingTimestamp(loggingTimestamp);
send(record); send(record);
} catch (final BufferUnderflowException ex) {
super.logger.error("Failed to create record.", ex);
} catch (final RecordInstantiationException ex) { } catch (final RecordInstantiationException ex) {
super.logger.error("Failed to create record.", ex); super.logger.error("Failed to create record:", ex);
} }
return true;
} }
protected void send(IMonitoringRecord record) {}; protected void send(IMonitoringRecord record) {};
......
package kiekerdays.tcpreader;
public class TcpReaderAnalysisConfigurationTest {
// @Test
public void testAnalysis() throws Exception {
TcpReaderAnalysisConfiguration configuration = new TcpReaderAnalysisConfiguration();
configuration.start();
// fail("Akka does not support being executed by JUnit.");
}
public static void main(String[] args) {
try {
TcpReaderAnalysisConfigurationTest test = new TcpReaderAnalysisConfigurationTest();
test.testAnalysis();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
package kiekerdays.tcpreconstruction; package kiekerdays.tcpreconstruction;
import org.junit.Test;
public class TraceReductionAnalysisTest { public class TraceReductionAnalysisConfigurationTest {
@Test // @Test
public void testAnalysis() throws Exception { public void testAnalysis() throws Exception {
TraceReconstructionAnalysisConfiguration configuration = new TraceReconstructionAnalysisConfiguration(); TraceReconstructionAnalysisConfiguration configuration = new TraceReconstructionAnalysisConfiguration();
configuration.start(); configuration.start();
// fail("Akka does not support being executed by JUnit.");
} }
public static void main(String[] args) { public static void main(String[] args) {
try { try {
TraceReductionAnalysisTest test = new TraceReductionAnalysisTest(); TraceReductionAnalysisConfigurationTest test = new TraceReductionAnalysisConfigurationTest();
test.testAnalysis(); test.testAnalysis();
} catch (Exception e) { } catch (Exception e) {
// TODO Auto-generated catch block // TODO Auto-generated catch block
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment