Skip to content
Snippets Groups Projects
Commit 1843b5ab authored by Christian Wulf's avatar Christian Wulf
Browse files

initial commit

parents
No related branches found
No related tags found
No related merge requests found
Showing
with 1023 additions and 0 deletions
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" path="src/main/java"/>
<classpathentry kind="src" path="src/test/java"/>
<classpathentry kind="src" path="src/test/resources"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.7">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.jdt.junit.JUNIT_CONTAINER/4"/>
<classpathentry kind="output" path="target/classes"/>
</classpath>
/target
/*.log
\ No newline at end of file
.project 0 → 100644
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>Akka-Performancetest</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.m2e.core.maven2Builder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.m2e.core.maven2Nature</nature>
<nature>org.eclipse.jdt.core.javanature</nature>
</natures>
</projectDescription>
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.7
org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve
org.eclipse.jdt.core.compiler.compliance=1.7
org.eclipse.jdt.core.compiler.debug.lineNumber=generate
org.eclipse.jdt.core.compiler.debug.localVariable=generate
org.eclipse.jdt.core.compiler.debug.sourceFile=generate
org.eclipse.jdt.core.compiler.problem.assertIdentifier=error
org.eclipse.jdt.core.compiler.problem.enumIdentifier=error
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
org.eclipse.jdt.core.compiler.source=1.7
activeProfiles=
eclipse.preferences.version=1
resolveWorkspaceProjects=true
version=1
pom.xml 0 → 100644
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>Akka-Performancetest</groupId>
<artifactId>Akka-Performancetest</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.10</artifactId>
<version>2.4-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>net.kieker-monitoring</groupId>
<artifactId>kieker</artifactId>
<version>1.11-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.1.2</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-all</artifactId>
<version>2.3.6</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
<repositories>
<repository>
<id>akka-snapshots</id>
<snapshots>
<enabled>true</enabled>
</snapshots>
<url>http://repo.akka.io/snapshots/</url>
</repository>
<repository>
<!-- for SNAPSHOT versions: teetime -->
<id>sonatype.oss.snapshots</id>
<url>https://oss.sonatype.org/content/repositories/snapshots/</url>
</repository>
</repositories>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package kiekerdays;
import java.util.Collection;
import java.util.LinkedList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import akka.actor.UntypedActor;
public class Collector extends UntypedActor {
private static final Logger LOGGER = LoggerFactory.getLogger(Collector.class);
private final Collection<Object> elements;
public Collector() {
this(new LinkedList<Object>());
}
public Collector(final Collection<Object> elements) {
super();
this.elements = elements;
}
@Override
public void onReceive(Object message) throws Exception {
elements.add(message.getClass());
}
@Override
public void postStop() throws Exception {
LOGGER.info("stopped: " + elements);
super.postStop();
}
}
package kiekerdays;
import akka.actor.ActorRef;
import akka.actor.UntypedActor;
public class Reaper extends UntypedActor {
public static class WatchMe {
private final ActorRef actorRef;
public WatchMe(ActorRef actorRef) {
this.actorRef = actorRef;
}
public ActorRef getActorRef() {
return actorRef;
}
}
@Override
public void onReceive(Object message) throws Exception {
// TODO Auto-generated method stub
}
}
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package kiekerdays.tcpreader;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import kieker.common.exception.RecordInstantiationException;
import kieker.common.logging.Log;
import kieker.common.logging.LogFactory;
import kieker.common.record.IMonitoringRecord;
import kieker.common.record.factory.CachedRecordFactoryCatalog;
import kieker.common.record.factory.IRecordFactory;
import kieker.common.record.misc.RegistryRecord;
import kieker.common.util.registry.ILookup;
import kieker.common.util.registry.Lookup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import teetime.stage.io.AbstractTcpReader;
/**
* This is a reader which reads the records from a TCP port.
*
* @author Jan Waller, Nils Christian Ehmke, Christian Wulf
*
*/
public class TcpReader extends AbstractTcpReader<IMonitoringRecord> {
private static final Logger LOGGER = LoggerFactory.getLogger(TcpReader.class);
private final CachedRecordFactoryCatalog recordFactories = CachedRecordFactoryCatalog.getInstance();
// BETTER use a non thread-safe implementation to increase performance. A thread-safe version is not necessary.
private final ILookup<String> stringRegistry = new Lookup<String>();
private int port2 = 10134;
private TCPStringReader tcpStringReader;
/**
* Default constructor with <code>port=10133</code> and <code>bufferCapacity=65535</code>
*
* @param tcpTraceReconstructonActor
*/
public TcpReader() {
this(10133, 65535);
}
/**
*
* @param port
* accept connections on this port
* @param bufferCapacity
* capacity of the receiving buffer
*/
public TcpReader(final int port, final int bufferCapacity) {
super(port, bufferCapacity);
}
public void onStarting() throws Exception {
LOGGER.trace("onStarting");
this.tcpStringReader = new TCPStringReader(this.port2, this.stringRegistry);
this.tcpStringReader.start();
}
@Override
public void execute() {
super.execute();
}
@Override
protected final void read(final ByteBuffer buffer) {
final int clazzId = buffer.getInt();
final long loggingTimestamp = buffer.getLong();
final String recordClassName = this.stringRegistry.get(clazzId);
try {
final IRecordFactory<? extends IMonitoringRecord> recordFactory = this.recordFactories.get(recordClassName);
IMonitoringRecord record = recordFactory.create(buffer, this.stringRegistry);
record.setLoggingTimestamp(loggingTimestamp);
send(record);
} catch (final BufferUnderflowException ex) {
super.logger.error("Failed to create record.", ex);
} catch (final RecordInstantiationException ex) {
super.logger.error("Failed to create record.", ex);
}
}
protected void send(IMonitoringRecord record) {};
public void onTerminating() throws Exception {
LOGGER.trace("onTerminating");
this.tcpStringReader.terminate();
}
/**
*
* @author Jan Waller
*
* @since 1.8
*/
private static class TCPStringReader extends Thread {
private static final int MESSAGE_BUFFER_SIZE = 65535;
private static final Log LOG = LogFactory.getLog(TCPStringReader.class);
private final int port;
private final ILookup<String> stringRegistry;
private volatile boolean terminated = false; // NOPMD
private volatile Thread readerThread;
public TCPStringReader(final int port, final ILookup<String> stringRegistry) {
this.port = port;
this.stringRegistry = stringRegistry;
}
public void terminate() {
this.terminated = true;
this.readerThread.interrupt();
}
@Override
public void run() {
this.readerThread = Thread.currentThread();
ServerSocketChannel serversocket = null;
try {
serversocket = ServerSocketChannel.open();
serversocket.socket().bind(new InetSocketAddress(this.port));
if (LOG.isDebugEnabled()) {
LOG.debug("Listening on port " + this.port);
}
// BEGIN also loop this one?
final SocketChannel socketChannel = serversocket.accept();
final ByteBuffer buffer = ByteBuffer.allocateDirect(MESSAGE_BUFFER_SIZE);
while ((socketChannel.read(buffer) != -1) && (!this.terminated)) {
buffer.flip();
try {
while (buffer.hasRemaining()) {
buffer.mark();
RegistryRecord.registerRecordInRegistry(buffer, this.stringRegistry);
}
buffer.clear();
} catch (final BufferUnderflowException ex) {
buffer.reset();
buffer.compact();
}
}
socketChannel.close();
// END also loop this one?
} catch (final ClosedByInterruptException ex) {
LOG.warn("Reader interrupted", ex);
} catch (final IOException ex) {
LOG.error("Error while reading", ex);
} finally {
if (null != serversocket) {
try {
serversocket.close();
} catch (final IOException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Failed to close TCP connection!", e);
}
}
}
}
}
}
public int getPort2() {
return this.port2;
}
public void setPort2(final int port2) {
this.port2 = port2;
}
}
package kiekerdays.tcpreader;
import kieker.common.record.IMonitoringRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
public class TcpReaderActor extends UntypedActor {
private static final Logger LOGGER = LoggerFactory.getLogger(TcpReaderActor.class);
private ActorRef tcpTraceReconstructionActor;
private TcpReader tcpReader = new TcpReader() {
@Override
protected final void send(IMonitoringRecord record) {
tcpTraceReconstructionActor.tell(record, getSelf());
};
};
public TcpReaderActor(Props tcpTraceReconstructionProps) {
super();
this.tcpTraceReconstructionActor = context().actorOf(tcpTraceReconstructionProps);
}
@Override
public void onReceive(Object message) throws Exception {
// if (message instanceof ) {
//
// } else if () {
//
// } else {
// unhandled(message);
// }
LOGGER.debug("Message: " + message.getClass().getName());
try {
tcpReader.onStarting();
tcpReader.execute();
tcpReader.onTerminating();
} catch (Exception e) {
LOGGER.error("Exception while executing TcpReader.", e);
}
// context().stop(getSelf());
// getSelf().tell(PoisonPill.getInstance(), getSelf());
// tcpTraceReconstructionActor.tell(PoisonPill.getInstance(), getSelf());
// context().parent().tell(PoisonPill.getInstance(), getSelf());
// context().system().terminate();
}
@Override
public void postStop() throws Exception {
LOGGER.info("stopped");
super.postStop();
}
}
package kiekerdays.tcpreconstruction;
import java.util.concurrent.TimeUnit;
import kieker.common.record.flow.IFlowRecord;
import kieker.common.record.flow.trace.AbstractTraceEvent;
import kieker.common.record.flow.trace.TraceMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
import teetime.util.concurrent.hashmap.TraceBuffer;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
public class TcpTraceReconstructionActor extends UntypedActor {
private static final Logger LOGGER = LoggerFactory.getLogger(TcpTraceReconstructionActor.class);
private TimeUnit timeunit;
private long maxTraceDuration = Long.MAX_VALUE;
private long maxTraceTimeout = Long.MAX_VALUE;
private long maxEncounteredLoggingTimestamp = -1;
private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace;
private ActorRef validTraceReceiver;
private ActorRef invalidTraceReceiver;
public TcpTraceReconstructionActor(ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace,
Props validTraceReceiverProps, Props invalidTraceReceiverProps) {
super();
this.traceId2trace = traceId2trace;
this.validTraceReceiver = context().actorOf(validTraceReceiverProps);
this.invalidTraceReceiver = context().actorOf(invalidTraceReceiverProps);
}
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof IFlowRecord) {
final Long traceId = this.reconstructTrace((IFlowRecord) message);
if (traceId != null) {
this.put(traceId, true);
}
} else {
unhandled(message);
}
}
private void onTerminating() {
LOGGER.trace("traces left: " + traceId2trace.keySet());
for (Long traceId : this.traceId2trace.keySet()) {
this.put(traceId, false);
}
}
private Long reconstructTrace(final IFlowRecord record) {
Long traceId = null;
if (record instanceof TraceMetadata) {
traceId = ((TraceMetadata) record).getTraceId();
TraceBuffer traceBuffer = this.traceId2trace.getOrCreate(traceId);
traceBuffer.setTrace((TraceMetadata) record);
} else if (record instanceof AbstractTraceEvent) {
traceId = ((AbstractTraceEvent) record).getTraceId();
TraceBuffer traceBuffer = this.traceId2trace.getOrCreate(traceId);
traceBuffer.insertEvent((AbstractTraceEvent) record);
}
return traceId;
}
private void put(final Long traceId, final boolean onlyIfFinished) {
final TraceBuffer traceBuffer = this.traceId2trace.get(traceId);
if (traceBuffer != null) { // null-check to check whether the trace has already been sent and removed
boolean shouldSend;
if (onlyIfFinished) {
shouldSend = traceBuffer.isFinished();
} else {
shouldSend = true;
}
if (shouldSend) {
boolean removed = (null != this.traceId2trace.remove(traceId));
if (removed) {
this.sendTraceBuffer(traceBuffer);
}
}
}
}
@Override
public void postStop() throws Exception {
LOGGER.info("stopped");
onTerminating();
super.postStop();
}
private void sendTraceBuffer(final TraceBuffer traceBuffer) {
ActorRef receiver = (traceBuffer.isInvalid()) ? invalidTraceReceiver : validTraceReceiver;
receiver.tell(traceBuffer.toTraceEvents(), getSelf());
}
public TimeUnit getTimeunit() {
return timeunit;
}
public void setTimeunit(TimeUnit timeunit) {
this.timeunit = timeunit;
}
public long getMaxTraceDuration() {
return maxTraceDuration;
}
public void setMaxTraceDuration(long maxTraceDuration) {
this.maxTraceDuration = maxTraceDuration;
}
public long getMaxTraceTimeout() {
return maxTraceTimeout;
}
public void setMaxTraceTimeout(long maxTraceTimeout) {
this.maxTraceTimeout = maxTraceTimeout;
}
public long getMaxEncounteredLoggingTimestamp() {
return maxEncounteredLoggingTimestamp;
}
public void setMaxEncounteredLoggingTimestamp(long maxEncounteredLoggingTimestamp) {
this.maxEncounteredLoggingTimestamp = maxEncounteredLoggingTimestamp;
}
}
package kiekerdays.tcpreconstruction;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import kiekerdays.Collector;
import kiekerdays.tcpreader.TcpReaderActor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
import teetime.util.concurrent.hashmap.TraceBuffer;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
public class TraceReconstructionAnalysisConfiguration {
private static final String START_MESSAGE = "start";
private static final Logger LOGGER = LoggerFactory.getLogger(TraceReconstructionAnalysisConfiguration.class);
private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
private ActorRef tcpReaderActor;
private Collection<Object> validTraces = new LinkedList<>();
private ActorRef rootActor;
public TraceReconstructionAnalysisConfiguration() {
init();
}
private void init() {
ActorSystem system = ActorSystem.create("TraceReductionAnalysisConfiguration");
// specify actors
Props collectorValidProps = Props.create(Collector.class, validTraces);
Props collectorInvalidProps = Props.create(Collector.class, Collections.emptyList());
Props tcpTraceReconstructionProps = Props.create(TcpTraceReconstructionActor.class, traceId2trace, collectorValidProps, collectorInvalidProps);
Props tcpReaderProps = Props.create(TcpReaderActor.class, tcpTraceReconstructionProps);
// create actors
final ActorRef tcpReaderActor = system.actorOf(tcpReaderProps, "TcpReaderActor");
// final ActorRef tcpTraceReconstructonActor = system.actorOf(tcpTraceReconstructionProps, "TcpTraceReconstructionActor");
// final ActorRef validTraceReceiver = system.actorOf(collectorValidProps, "CollectorValid");
// final ActorRef invalidTraceReceiver = system.actorOf(collectorInvalidProps, "CollectorInvalid");
// final ActorRef tcpReaderActor = system.actorOf(Props.create(TcpReaderActor.class,
// new Creator<TcpReaderActor>() {
// @Override
// public TcpReaderActor create() throws Exception {
// return new TcpReaderActor(tcpTraceReconstructonActor);
// }
// }));
// tcpReader = TypedActor.get(system).typedActorOf(new TypedProps<TcpReaderActor>(ITcpReader.class, new
// Creator<TcpReaderActor>() {
// @Override
// public TcpReaderActor create() throws Exception {
// return new TcpReaderActor(tcpTraceReconstructonActor);
// }
// }));
this.tcpReaderActor = tcpReaderActor;
ActorSelection rootActorSelection = system.actorSelection(tcpReaderActor.path().root());
rootActor = rootActorSelection.anchor();
LOGGER.info("Configuration initialized.");
}
public void start() throws Exception {
LOGGER.info("Starting analysis...");
// tcpReader.onStarting();
// tcpReader.execute();
// tcpReader.onTerminating();
tcpReaderActor.tell(START_MESSAGE, ActorRef.noSender());
LOGGER.info("Analysis started.");
rootActor.tell(PoisonPill.getInstance(), rootActor);
// tcpReaderActor.parent().tell(PoisonPill.getInstance(), getSelf());
// Future<Terminated> future = system.terminate();
// Await.ready(future, Duration.Inf());
LOGGER.info("Analysis stopped.");
}
public Collection<Object> getValidTraces() {
return validTraces;
}
}
package teetime.stage.io;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AbstractTcpReader<T> {
protected final Logger logger = LoggerFactory.getLogger(AbstractTcpReader.class.getName());
private final int port;
private final int bufferCapacity;
public AbstractTcpReader(final int port, final int bufferCapacity) {
super();
this.port = port;
this.bufferCapacity = bufferCapacity;
}
protected void execute() {
ServerSocketChannel serversocket = null;
try {
serversocket = ServerSocketChannel.open();
serversocket.socket().bind(new InetSocketAddress(this.port));
logger.debug("Listening on port " + this.port);
final SocketChannel socketChannel = serversocket.accept();
try {
final ByteBuffer buffer = ByteBuffer.allocateDirect(bufferCapacity);
while (socketChannel.read(buffer) != -1) {
process(buffer);
}
} finally {
socketChannel.close();
}
} catch (final IOException ex) {
logger.error("Error while reading", ex);
} finally {
if (null != serversocket) {
try {
serversocket.close();
} catch (final IOException e) {
logger.debug("Failed to close TCP connection!", e);
}
}
}
}
private void process(final ByteBuffer buffer) {
buffer.flip();
try {
while (buffer.hasRemaining()) {
buffer.mark();
this.read(buffer);
}
buffer.clear();
} catch (final BufferUnderflowException ex) {
buffer.reset();
buffer.compact();
}
}
protected abstract void read(final ByteBuffer buffer);
}
package teetime.util.concurrent.hashmap;
import java.util.concurrent.ConcurrentHashMap;
public class ConcurrentHashMapWithDefault<K, V> extends ConcurrentHashMap<K, V> {
private static final long serialVersionUID = 199185976241037967L;
private final ValueFactory<V> valueFactory;
private int maxElements;
public ConcurrentHashMapWithDefault(final ValueFactory<V> valueFactory) {
this.valueFactory = valueFactory;
}
public V getOrCreate(final K key) {
V value = this.get(key);
if (value == null) {
synchronized (this) {
value = this.get(key);
if (value == null) { // NOCS (DCL)
value = this.valueFactory.create();
this.put(key, value);
this.maxElements++;
}
}
}
return value;
}
public int getMaxElements() {
return this.maxElements;
}
}
package teetime.util.concurrent.hashmap;
import java.io.Serializable;
import java.util.Comparator;
import java.util.SortedSet;
import java.util.TreeSet;
import kieker.analysis.plugin.filter.flow.TraceEventRecords;
import kieker.common.logging.Log;
import kieker.common.logging.LogFactory;
import kieker.common.record.flow.trace.AbstractTraceEvent;
import kieker.common.record.flow.trace.TraceMetadata;
import kieker.common.record.flow.trace.operation.AfterOperationEvent;
import kieker.common.record.flow.trace.operation.AfterOperationFailedEvent;
import kieker.common.record.flow.trace.operation.BeforeOperationEvent;
/**
* The TraceBuffer is synchronized to prevent problems with concurrent access.
*
* @author Jan Waller
*/
public final class TraceBuffer implements ValueFactory<TraceBuffer> {
private static final Log LOG = LogFactory.getLog(TraceBuffer.class);
private static final Comparator<AbstractTraceEvent> COMPARATOR = new TraceEventComperator();
private TraceMetadata trace;
private final SortedSet<AbstractTraceEvent> events = new TreeSet<AbstractTraceEvent>(COMPARATOR);
private boolean closeable;
private boolean damaged;
private int openEvents;
private int maxOrderIndex = -1;
private long minLoggingTimestamp = Long.MAX_VALUE;
private long maxLoggingTimestamp = -1;
private long traceId = -1;
/**
* Creates a new instance of this class.
*/
public TraceBuffer() {
// default empty constructor
}
public void insertEvent(final AbstractTraceEvent event) {
final long myTraceId = event.getTraceId();
synchronized (this) {
if (this.traceId == -1) {
this.traceId = myTraceId;
} else if (this.traceId != myTraceId) {
LOG.error("Invalid traceId! Expected: " + this.traceId + " but found: " + myTraceId + " in event " + event.toString());
this.damaged = true;
}
final long loggingTimestamp = event.getTimestamp();
if (loggingTimestamp > this.maxLoggingTimestamp) {
this.maxLoggingTimestamp = loggingTimestamp;
}
if (loggingTimestamp < this.minLoggingTimestamp) {
this.minLoggingTimestamp = loggingTimestamp;
}
final int orderIndex = event.getOrderIndex();
if (orderIndex > this.maxOrderIndex) {
this.maxOrderIndex = orderIndex;
}
if (event instanceof BeforeOperationEvent) {
if (orderIndex == 0) {
this.closeable = true;
}
this.openEvents++;
} else if (event instanceof AfterOperationEvent) {
this.openEvents--;
} else if (event instanceof AfterOperationFailedEvent) {
this.openEvents--;
}
if (!this.events.add(event)) {
LOG.error("Duplicate entry for orderIndex " + orderIndex + " with traceId " + myTraceId);
this.damaged = true;
}
}
}
public void setTrace(final TraceMetadata trace) {
final long myTraceId = trace.getTraceId();
synchronized (this) {
if (this.traceId == -1) {
this.traceId = myTraceId;
} else if (this.traceId != myTraceId) {
LOG.error("Invalid traceId! Expected: " + this.traceId + " but found: " + myTraceId + " in trace " + trace.toString());
this.damaged = true;
}
if (this.trace == null) {
this.trace = trace;
} else {
LOG.error("Duplicate Trace entry for traceId " + myTraceId);
this.damaged = true;
}
}
}
public boolean isFinished() {
synchronized (this) {
return this.closeable && !this.isInvalid();
}
}
public boolean isInvalid() {
synchronized (this) {
return (this.trace == null) || this.damaged || (this.openEvents != 0) || (((this.maxOrderIndex + 1) != this.events.size()) || this.events.isEmpty());
}
}
public TraceEventRecords toTraceEvents() {
synchronized (this) {
return new TraceEventRecords(this.trace, this.events.toArray(new AbstractTraceEvent[this.events.size()]));
}
}
public long getMaxLoggingTimestamp() {
synchronized (this) {
return this.maxLoggingTimestamp;
}
}
public long getMinLoggingTimestamp() {
synchronized (this) {
return this.minLoggingTimestamp;
}
}
/**
* @author Jan Waller
*/
private static final class TraceEventComperator implements Comparator<AbstractTraceEvent>, Serializable {
private static final long serialVersionUID = 8920737343446332517L;
/**
* Creates a new instance of this class.
*/
public TraceEventComperator() {
// default empty constructor
}
public int compare(final AbstractTraceEvent o1, final AbstractTraceEvent o2) {
return o1.getOrderIndex() - o2.getOrderIndex();
}
}
public TraceBuffer create() {
return new TraceBuffer();
}
}
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.util.concurrent.hashmap;
/**
* @author Christian Wulf
*
* @since 1.10
*/
public interface ValueFactory<T> {
/**
* Create a new instance of the type <code>T</code>.
*
* @since 1.10
*/
public T create();
}
package kiekerdays.tcpreconstruction;
import org.junit.Test;
public class TraceReductionAnalysisTest {
@Test
public void testAnalysis() throws Exception {
TraceReconstructionAnalysisConfiguration configuration = new TraceReconstructionAnalysisConfiguration();
configuration.start();
}
public static void main(String[] args) {
try {
TraceReductionAnalysisTest test = new TraceReductionAnalysisTest();
test.testAnalysis();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
import ch.qos.logback.classic.filter.ThresholdFilter
statusListener(OnConsoleStatusListener)
appender("FILE", FileAppender) {
file = "akka.log"
append = false
filter(ThresholdFilter) {
level = DEBUG
}
encoder(PatternLayoutEncoder) {
pattern = "%msg%n"
}
}
appender("CONSOLE", ConsoleAppender) {
encoder(PatternLayoutEncoder) {
pattern = "%d{HH:mm:ss.SSS} %level %logger - %msg%n"
}
}
root DEBUG, ["CONSOLE"]
logger "kiekerdays.tcpreader", TRACE
logger "kiekerdays.tcpreconstruction", TRACE
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment