Skip to content
Snippets Groups Projects
Commit b9c82a63 authored by Christian Wulf's avatar Christian Wulf
Browse files

added dependencies;

removed copied resources from the new dependencies
parent a65bedbc
Branches
Tags
No related merge requests found
...@@ -18,6 +18,20 @@ ...@@ -18,6 +18,20 @@
<version>1.11-SNAPSHOT</version> <version>1.11-SNAPSHOT</version>
</dependency> </dependency>
<!-- Provides ConcurrentHashMapWithDefault -->
<dependency>
<groupId>net.sourceforge.teetime</groupId>
<artifactId>teetime</artifactId>
<version>1.1-SNAPSHOT</version>
</dependency>
<!-- Provides AbstractTcpReader and TcpReader -->
<!-- <dependency> -->
<!-- <groupId>net.sourceforge.teetime-stages</groupId> -->
<!-- <artifactId>teetime-stages</artifactId> -->
<!-- <version>1.0-SNAPSHOT</version> -->
<!-- </dependency> -->
<dependency> <dependency>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId> <artifactId>slf4j-api</artifactId>
......
...@@ -21,7 +21,7 @@ public class CounterActor extends UntypedActor { ...@@ -21,7 +21,7 @@ public class CounterActor extends UntypedActor {
@Override @Override
public void postStop() throws Exception { public void postStop() throws Exception {
LOGGER.info("stopped: " + numMessages); LOGGER.info("stopped: " + numMessages + " messages");
super.postStop(); super.postStop();
} }
} }
package kiekerdays.tcpreader; package kiekerdays.tcpreader;
import java.nio.ByteBuffer;
import kieker.common.record.IMonitoringRecord; import kieker.common.record.IMonitoringRecord;
import kieker.common.record.misc.RegistryRecord;
import kieker.common.util.registry.ILookup;
import kieker.common.util.registry.Lookup;
import kiekerdays.StartMessage; import kiekerdays.StartMessage;
import kiekerdays.StopMessage; import kiekerdays.StopMessage;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import teetime.util.TcpReader; import teetime.util.io.network.AbstractTcpReader;
import teetime.util.network.AbstractRecordTcpReader;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.PoisonPill; import akka.actor.PoisonPill;
import akka.actor.Props; import akka.actor.Props;
...@@ -17,21 +23,36 @@ public class TcpReaderActor extends UntypedActor { ...@@ -17,21 +23,36 @@ public class TcpReaderActor extends UntypedActor {
private static final Logger LOGGER = LoggerFactory.getLogger(TcpReaderActor.class); private static final Logger LOGGER = LoggerFactory.getLogger(TcpReaderActor.class);
private ActorRef tcpTraceReconstructionActor; private final ILookup<String> stringRegistry = new Lookup<String>();
private final ActorRef tcpTraceReconstructionActor;
private final AbstractRecordTcpReader tcpMonitoringRecordReader;
private final AbstractTcpReader tcpStringRecordReader;
private TcpReader tcpReader = new TcpReader() { private Thread tcpStringRecordReaderThread;
// private int numProcessedRecords;
@Override private int port1 = 10133;
protected final void send(IMonitoringRecord record) { private int port2 = 10134;
tcpTraceReconstructionActor.tell(record, getSelf()); private int bufferCapacity = 65535;
// LOGGER.info("#processed records: " + numProcessedRecords++);
};
};
public TcpReaderActor(Props tcpTraceReconstructionProps) { public TcpReaderActor(Props tcpTraceReconstructionProps) {
super(); super();
this.tcpTraceReconstructionActor = context().actorOf(tcpTraceReconstructionProps); this.tcpTraceReconstructionActor = context().actorOf(tcpTraceReconstructionProps);
this.tcpMonitoringRecordReader = new AbstractRecordTcpReader(port1, bufferCapacity, LOGGER, stringRegistry) {
@Override
protected void onRecordReceived(final IMonitoringRecord record) {
tcpTraceReconstructionActor.tell(record, getSelf());
}
};
this.tcpStringRecordReader = new AbstractTcpReader(port2, bufferCapacity, LOGGER) {
@Override
protected boolean onBufferReceived(final ByteBuffer buffer) {
RegistryRecord.registerRecordInRegistry(buffer, stringRegistry);
return true;
}
};
} }
@Override @Override
...@@ -39,8 +60,8 @@ public class TcpReaderActor extends UntypedActor { ...@@ -39,8 +60,8 @@ public class TcpReaderActor extends UntypedActor {
LOGGER.debug("Message: " + message.getClass().getName()); LOGGER.debug("Message: " + message.getClass().getName());
try { try {
if (message instanceof StartMessage) { if (message instanceof StartMessage) {
tcpReader.onStarting(); onStarting();
tcpReader.execute(); tcpMonitoringRecordReader.run();
} else if (message instanceof StopMessage) { } else if (message instanceof StopMessage) {
this.onTerminating(); this.onTerminating();
tcpTraceReconstructionActor.tell(message, getSelf()); tcpTraceReconstructionActor.tell(message, getSelf());
...@@ -58,9 +79,16 @@ public class TcpReaderActor extends UntypedActor { ...@@ -58,9 +79,16 @@ public class TcpReaderActor extends UntypedActor {
// context().system().terminate(); // context().system().terminate();
} }
private void onStarting() {
LOGGER.debug("Starting...");
this.tcpStringRecordReaderThread = new Thread(tcpStringRecordReader);
this.tcpStringRecordReaderThread.start();
}
private void onTerminating() { private void onTerminating() {
LOGGER.debug("Terminating..."); LOGGER.debug("Terminating...");
// do nothing tcpStringRecordReader.terminate();
tcpStringRecordReaderThread.interrupt();
} }
@Override @Override
......
...@@ -22,8 +22,6 @@ import akka.actor.Props; ...@@ -22,8 +22,6 @@ import akka.actor.Props;
public class TraceReconstructionAnalysisConfiguration { public class TraceReconstructionAnalysisConfiguration {
private static final String START_MESSAGE = "start";
private static final Logger LOGGER = LoggerFactory.getLogger(TraceReconstructionAnalysisConfiguration.class); private static final Logger LOGGER = LoggerFactory.getLogger(TraceReconstructionAnalysisConfiguration.class);
private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer()); private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
......
package teetime.stage.io;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AbstractTcpReader<T> {
protected final Logger logger = LoggerFactory.getLogger(AbstractTcpReader.class.getName());
private final int port;
private final int bufferCapacity;
private int numBufferUnderflows;
public AbstractTcpReader(final int port, final int bufferCapacity) {
super();
this.port = port;
this.bufferCapacity = bufferCapacity;
}
protected void execute() {
ServerSocketChannel serversocket = null;
try {
serversocket = ServerSocketChannel.open();
serversocket.socket().bind(new InetSocketAddress(this.port));
logger.debug("Listening on port " + this.port);
final SocketChannel socketChannel = serversocket.accept();
try {
final ByteBuffer buffer = ByteBuffer.allocateDirect(bufferCapacity);
while (socketChannel.read(buffer) != -1) {
process(buffer);
}
} finally {
socketChannel.close();
}
} catch (final IOException ex) {
logger.error("Error while reading.", ex);
} finally {
if (null != serversocket) {
try {
serversocket.close();
} catch (final IOException e) {
logger.debug("Failed to close TCP connection.", e);
}
}
}
}
private void process(final ByteBuffer buffer) {
buffer.flip();
try {
while (buffer.hasRemaining()) {
buffer.mark();
boolean success = this.read(buffer);
if (!success) {
buffer.reset();
buffer.compact();
return;
}
}
buffer.clear();
} catch (final BufferUnderflowException ex) {
// logger.warn("Unexpected exception. Resetting and compacting buffer.", ex);
numBufferUnderflows++;
logger.warn("numBufferUnderflows: " + numBufferUnderflows);
buffer.reset();
buffer.compact();
}
}
/**
* @param buffer
* to be read from
* @return <ul>
* <li><code>true</code> when there were enough bytes to perform the read operation
* <li><code>false</code> otherwise. In this case, the buffer is reset, compacted, and filled with new content.
*/
protected abstract boolean read(final ByteBuffer buffer);
}
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.util;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import kieker.common.exception.RecordInstantiationException;
import kieker.common.logging.Log;
import kieker.common.logging.LogFactory;
import kieker.common.record.AbstractMonitoringRecord;
import kieker.common.record.IMonitoringRecord;
import kieker.common.record.factory.CachedRecordFactoryCatalog;
import kieker.common.record.factory.IRecordFactory;
import kieker.common.record.factory.old.RecordFactoryWrapper;
import kieker.common.record.misc.RegistryRecord;
import kieker.common.util.registry.ILookup;
import kieker.common.util.registry.Lookup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import teetime.stage.io.AbstractTcpReader;
/**
* This is a reader which reads the records from a TCP port.
*
* @author Jan Waller, Nils Christian Ehmke, Christian Wulf
*
*/
public class TcpReader extends AbstractTcpReader<IMonitoringRecord> {
private static final Logger LOGGER = LoggerFactory.getLogger(TcpReader.class);
private static final int INT_BYTES = AbstractMonitoringRecord.TYPE_SIZE_INT;
private static final int LONG_BYTES = AbstractMonitoringRecord.TYPE_SIZE_LONG;
private static final int UNKNOWN_SIZE = RecordFactoryWrapper.UNKNOWN_SIZE;
private final CachedRecordFactoryCatalog recordFactories = CachedRecordFactoryCatalog.getInstance();
// BETTER use a non thread-safe implementation to increase performance. A thread-safe version is not necessary.
private final ILookup<String> stringRegistry = new Lookup<String>();
private int port2 = 10134;
private TCPStringReader tcpStringReader;
/**
* Default constructor with <code>port=10133</code> and <code>bufferCapacity=65535</code>
*
* @param tcpTraceReconstructonActor
*/
public TcpReader() {
this(10133, 65535);
}
/**
*
* @param port
* accept connections on this port
* @param bufferCapacity
* capacity of the receiving buffer
*/
public TcpReader(final int port, final int bufferCapacity) {
super(port, bufferCapacity);
}
public void onStarting() throws Exception {
LOGGER.trace("onStarting");
this.tcpStringReader = new TCPStringReader(this.port2, this.stringRegistry);
this.tcpStringReader.start();
}
@Override
public void execute() {
super.execute();
}
@Override
protected final boolean read(final ByteBuffer buffer) {
// identify record class
if (buffer.remaining() < INT_BYTES) {
return false;
}
final int clazzId = buffer.getInt();
final String recordClassName = this.stringRegistry.get(clazzId);
// identify logging timestamp
if (buffer.remaining() < LONG_BYTES) {
return false;
}
final long loggingTimestamp = buffer.getLong();
// identify record data
final IRecordFactory<? extends IMonitoringRecord> recordFactory = this.recordFactories.get(recordClassName);
int recordSizeInBytes = recordFactory.getRecordSizeInBytes();
if (recordSizeInBytes != UNKNOWN_SIZE && buffer.remaining() < recordSizeInBytes) {
return false;
}
try {
final IMonitoringRecord record = recordFactory.create(buffer, this.stringRegistry);
record.setLoggingTimestamp(loggingTimestamp);
send(record);
} catch (final RecordInstantiationException ex) {
super.logger.error("Failed to create: " + recordClassName, ex);
}
return true;
}
protected void send(IMonitoringRecord record) {};
public void onTerminating() throws Exception {
LOGGER.trace("onTerminating");
this.tcpStringReader.terminate();
}
/**
*
* @author Jan Waller
*
* @since 1.8
*/
private static class TCPStringReader extends Thread {
private static final int MESSAGE_BUFFER_SIZE = 65535;
private static final Log LOG = LogFactory.getLog(TCPStringReader.class);
private final int port;
private final ILookup<String> stringRegistry;
private volatile boolean terminated = false; // NOPMD
private volatile Thread readerThread;
public TCPStringReader(final int port, final ILookup<String> stringRegistry) {
this.port = port;
this.stringRegistry = stringRegistry;
}
public void terminate() {
this.terminated = true;
this.readerThread.interrupt();
}
@Override
public void run() {
this.readerThread = Thread.currentThread();
ServerSocketChannel serversocket = null;
try {
serversocket = ServerSocketChannel.open();
serversocket.socket().bind(new InetSocketAddress(this.port));
if (LOG.isDebugEnabled()) {
LOG.debug("Listening on port " + this.port);
}
// BEGIN also loop this one?
final SocketChannel socketChannel = serversocket.accept();
final ByteBuffer buffer = ByteBuffer.allocateDirect(MESSAGE_BUFFER_SIZE);
while ((socketChannel.read(buffer) != -1) && (!this.terminated)) {
buffer.flip();
try {
while (buffer.hasRemaining()) {
buffer.mark();
RegistryRecord.registerRecordInRegistry(buffer, this.stringRegistry);
}
buffer.clear();
} catch (final BufferUnderflowException ex) {
buffer.reset();
buffer.compact();
}
}
socketChannel.close();
// END also loop this one?
} catch (final ClosedByInterruptException ex) {
LOG.warn("Reader interrupted", ex);
} catch (final IOException ex) {
LOG.error("Error while reading", ex);
} finally {
if (null != serversocket) {
try {
serversocket.close();
} catch (final IOException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Failed to close TCP connection!", e);
}
}
}
}
}
}
public int getPort2() {
return this.port2;
}
public void setPort2(final int port2) {
this.port2 = port2;
}
}
package teetime.util.concurrent.hashmap;
import java.util.concurrent.ConcurrentHashMap;
public class ConcurrentHashMapWithDefault<K, V> extends ConcurrentHashMap<K, V> {
private static final long serialVersionUID = 199185976241037967L;
private final ValueFactory<V> valueFactory;
private int maxElements;
public ConcurrentHashMapWithDefault(final ValueFactory<V> valueFactory) {
this.valueFactory = valueFactory;
}
public V getOrCreate(final K key) {
V value = this.get(key);
if (value == null) {
synchronized (this) {
value = this.get(key);
if (value == null) { // NOCS (DCL)
value = this.valueFactory.create();
this.put(key, value);
this.maxElements++;
}
}
}
return value;
}
public int getMaxElements() {
return this.maxElements;
}
}
package teetime.util.concurrent.hashmap;
import java.io.Serializable;
import java.util.Comparator;
import java.util.SortedSet;
import java.util.TreeSet;
import kieker.analysis.plugin.filter.flow.TraceEventRecords;
import kieker.common.logging.Log;
import kieker.common.logging.LogFactory;
import kieker.common.record.flow.trace.AbstractTraceEvent;
import kieker.common.record.flow.trace.TraceMetadata;
import kieker.common.record.flow.trace.operation.AfterOperationEvent;
import kieker.common.record.flow.trace.operation.AfterOperationFailedEvent;
import kieker.common.record.flow.trace.operation.BeforeOperationEvent;
/**
* The TraceBuffer is synchronized to prevent problems with concurrent access.
*
* @author Jan Waller
*/
public final class TraceBuffer implements ValueFactory<TraceBuffer> {
private static final Log LOG = LogFactory.getLog(TraceBuffer.class);
private static final Comparator<AbstractTraceEvent> COMPARATOR = new TraceEventComperator();
private TraceMetadata trace;
private final SortedSet<AbstractTraceEvent> events = new TreeSet<AbstractTraceEvent>(COMPARATOR);
private boolean closeable;
private boolean damaged;
private int openEvents;
private int maxOrderIndex = -1;
private long minLoggingTimestamp = Long.MAX_VALUE;
private long maxLoggingTimestamp = -1;
private long traceId = -1;
/**
* Creates a new instance of this class.
*/
public TraceBuffer() {
// default empty constructor
}
public void insertEvent(final AbstractTraceEvent event) {
final long myTraceId = event.getTraceId();
synchronized (this) {
if (this.traceId == -1) {
this.traceId = myTraceId;
} else if (this.traceId != myTraceId) {
LOG.error("Invalid traceId! Expected: " + this.traceId + " but found: " + myTraceId + " in event " + event.toString());
this.damaged = true;
}
final long loggingTimestamp = event.getTimestamp();
if (loggingTimestamp > this.maxLoggingTimestamp) {
this.maxLoggingTimestamp = loggingTimestamp;
}
if (loggingTimestamp < this.minLoggingTimestamp) {
this.minLoggingTimestamp = loggingTimestamp;
}
final int orderIndex = event.getOrderIndex();
if (orderIndex > this.maxOrderIndex) {
this.maxOrderIndex = orderIndex;
}
if (event instanceof BeforeOperationEvent) {
if (orderIndex == 0) {
this.closeable = true;
}
this.openEvents++;
} else if (event instanceof AfterOperationEvent) {
this.openEvents--;
} else if (event instanceof AfterOperationFailedEvent) {
this.openEvents--;
}
if (!this.events.add(event)) {
LOG.error("Duplicate entry for orderIndex " + orderIndex + " with traceId " + myTraceId);
this.damaged = true;
}
}
}
public void setTrace(final TraceMetadata trace) {
final long myTraceId = trace.getTraceId();
synchronized (this) {
if (this.traceId == -1) {
this.traceId = myTraceId;
} else if (this.traceId != myTraceId) {
LOG.error("Invalid traceId! Expected: " + this.traceId + " but found: " + myTraceId + " in trace " + trace.toString());
this.damaged = true;
}
if (this.trace == null) {
this.trace = trace;
} else {
LOG.error("Duplicate Trace entry for traceId " + myTraceId);
this.damaged = true;
}
}
}
public boolean isFinished() {
synchronized (this) {
return this.closeable && !this.isInvalid();
}
}
public boolean isInvalid() {
synchronized (this) {
return (this.trace == null) || this.damaged || (this.openEvents != 0) || (((this.maxOrderIndex + 1) != this.events.size()) || this.events.isEmpty());
}
}
public TraceEventRecords toTraceEvents() {
synchronized (this) {
return new TraceEventRecords(this.trace, this.events.toArray(new AbstractTraceEvent[this.events.size()]));
}
}
public long getMaxLoggingTimestamp() {
synchronized (this) {
return this.maxLoggingTimestamp;
}
}
public long getMinLoggingTimestamp() {
synchronized (this) {
return this.minLoggingTimestamp;
}
}
/**
* @author Jan Waller
*/
private static final class TraceEventComperator implements Comparator<AbstractTraceEvent>, Serializable {
private static final long serialVersionUID = 8920737343446332517L;
/**
* Creates a new instance of this class.
*/
public TraceEventComperator() {
// default empty constructor
}
public int compare(final AbstractTraceEvent o1, final AbstractTraceEvent o2) {
return o1.getOrderIndex() - o2.getOrderIndex();
}
}
public TraceBuffer create() {
return new TraceBuffer();
}
}
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.util.concurrent.hashmap;
/**
* @author Christian Wulf
*
* @since 1.10
*/
public interface ValueFactory<T> {
/**
* Create a new instance of the type <code>T</code>.
*
* @since 1.10
*/
public T create();
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment