Skip to content
Snippets Groups Projects
Commit 252d37fe authored by Florian Fittkau's avatar Florian Fittkau
Browse files

filters are now configured in FilterConfiguration.java

parent 5b4daebb
No related branches found
No related tags found
No related merge requests found
Showing
with 62 additions and 29 deletions
......@@ -11,5 +11,5 @@
</listAttribute>
<stringAttribute key="org.eclipse.jdt.launching.MAIN_TYPE" value="explorviz.live_trace_processing.main.WorkerStarter"/>
<stringAttribute key="org.eclipse.jdt.launching.PROJECT_ATTR" value="worker"/>
<stringAttribute key="org.eclipse.jdt.launching.VM_ARGUMENTS" value="-Xmx4G -Dexplorviz.live_trace_processing.worker_enabled=false -Dexplorviz.live_trace_processing.reader_listening_port=10133"/>
<stringAttribute key="org.eclipse.jdt.launching.VM_ARGUMENTS" value="-Xmx4G -Dexplorviz.live_trace_processing.worker_enabled=false -Dexplorviz.live_trace_processing.reader_listening_port=10134"/>
</launchConfiguration>
......@@ -3,7 +3,7 @@ explorviz.live_trace_processing.worker_enabled=true
explorviz.live_trace_processing.reader_listening_port=10133
explorviz.live_trace_processing.writer_target_ip=127.0.0.1
explorviz.live_trace_processing.writer_target_port=10134
explorviz.live_trace_processing.writer_target_port=10133
explorviz.live_trace_processing.writer_load_balancing_enabled=false
explorviz.live_trace_processing.writer_load_balancing_ip=10.50.0.2
......
......@@ -26,8 +26,7 @@ public class TCPConnector extends AbstractSink implements IWriter, IStringRecord
private final StringRegistry stringRegistry = new StringRegistry(this);
private final ByteBuffer buffer = ByteBuffer
.allocateDirect(Constants.SENDING_BUFFER_SIZE);
private final ByteBuffer buffer = ByteBuffer.allocateDirect(Constants.SENDING_BUFFER_SIZE);
private volatile boolean shouldDisconnect = false;
......
package explorviz.live_trace_processing.filter.reconstruction;
import explorviz.live_trace_processing.filter.IPipeReceiver;
public interface ITraceReconstruction extends IPipeReceiver {
}
......@@ -5,12 +5,10 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.filter.AbstractFilter;
import explorviz.live_trace_processing.filter.IPipeReceiver;
import explorviz.live_trace_processing.filter.reduction.summarization.TracePatternSummarizationFilter;
import explorviz.live_trace_processing.filter.reduction.ITraceReduction;
import explorviz.live_trace_processing.reader.TimeProvider;
import explorviz.live_trace_processing.record.IRecord;
import explorviz.live_trace_processing.record.event.AbstractOperationEventRecord;
......@@ -18,15 +16,14 @@ import explorviz.live_trace_processing.record.misc.TerminateRecord;
import explorviz.live_trace_processing.record.misc.TimedPeriodRecord;
import explorviz.live_trace_processing.record.trace.Trace;
public final class TraceReconstructionFilter extends AbstractFilter {
public final class TraceReconstructionFilter extends AbstractFilter implements ITraceReconstruction {
private final long maxTraceTimeout;
private final Map<Long, TraceReconstructionBuffer> traceId2trace = new ConcurrentSkipListMap<Long, TraceReconstructionBuffer>();
public TraceReconstructionFilter(final long maxTraceTimeout, final IPipeReceiver sinkReceiver) {
super(
new TracePatternSummarizationFilter(TimeUnit.MILLISECONDS.toNanos(990),
sinkReceiver), Constants.TRACE_RECONSTRUCTION_DISRUPTOR_SIZE,
public TraceReconstructionFilter(final long maxTraceTimeout,
final ITraceReduction traceReduction) {
super(traceReduction, Constants.TRACE_RECONSTRUCTION_DISRUPTOR_SIZE,
Constants.TRACE_RECONSTRUCTION_OUTPUT_BUFFER_SIZE, "Reconstructed traces/sec");
this.maxTraceTimeout = maxTraceTimeout;
}
......
package explorviz.live_trace_processing.filter.reduction;
import explorviz.live_trace_processing.filter.IPipeReceiver;
public interface ITraceReduction extends IPipeReceiver {
}
......@@ -8,6 +8,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.filter.AbstractFilter;
import explorviz.live_trace_processing.filter.IPipeReceiver;
import explorviz.live_trace_processing.filter.reduction.ITraceReduction;
import explorviz.live_trace_processing.reader.TimeProvider;
import explorviz.live_trace_processing.record.IRecord;
import explorviz.live_trace_processing.record.misc.TerminateRecord;
......@@ -15,7 +16,7 @@ import explorviz.live_trace_processing.record.misc.TimedPeriodRecord;
import explorviz.live_trace_processing.record.trace.Trace;
import explorviz.live_trace_processing.record.trace.TraceComperator;
public class TracePatternSummarizationFilter extends AbstractFilter {
public class TracePatternSummarizationFilter extends AbstractFilter implements ITraceReduction {
private final long maxCollectionDuration;
private final Map<Trace, TracePatternSummarizationBuffer> trace2buffer = new ConcurrentSkipListMap<Trace, TracePatternSummarizationBuffer>(
......
package explorviz.live_trace_processing.main;
import java.util.concurrent.TimeUnit;
import explorviz.live_trace_processing.configuration.Configuration;
import explorviz.live_trace_processing.configuration.ConfigurationFactory;
import explorviz.live_trace_processing.filter.ITraceSink;
import explorviz.live_trace_processing.filter.reconstruction.ITraceReconstruction;
import explorviz.live_trace_processing.filter.reconstruction.TraceReconstructionFilter;
import explorviz.live_trace_processing.filter.reduction.ITraceReduction;
import explorviz.live_trace_processing.filter.reduction.summarization.TracePatternSummarizationFilter;
import explorviz.live_trace_processing.reader.TCPReader;
public class FilterConfiguration {
public static void configureAndStartFilters(final Configuration configuration,
final ITraceSink sink) {
final ITraceReduction traceReduction = new TracePatternSummarizationFilter(
TimeUnit.MILLISECONDS.toNanos(990), sink);
final ITraceReconstruction traceReconstruction = new TraceReconstructionFilter(
TimeUnit.SECONDS.toNanos(5), traceReduction);
new TCPReader(configuration.getIntProperty(ConfigurationFactory.READER_LISTENING_PORT),
traceReconstruction).read();
}
}
......@@ -5,7 +5,7 @@ import java.io.IOException;
import explorviz.live_trace_processing.configuration.Configuration;
import explorviz.live_trace_processing.configuration.ConfigurationFactory;
import explorviz.live_trace_processing.connector.TCPConnector;
import explorviz.live_trace_processing.reader.TCPReader;
import explorviz.live_trace_processing.filter.ITraceSink;
import explorviz.live_trace_processing.writer.load_balancer.LoadBalancer;
public class WorkerStarter {
......@@ -13,24 +13,22 @@ public class WorkerStarter {
public static void main(final String[] args) {
final Configuration configuration = ConfigurationFactory.createSingletonConfiguration();
final boolean worker = configuration
final boolean isWorker = configuration
.getBooleanProperty(ConfigurationFactory.WORKER_ENABLED);
if (worker) {
final TCPConnector tcpConnector = new TCPConnector(
ITraceSink sink = null;
if (isWorker) {
final TCPConnector connector = new TCPConnector(
configuration.getStringProperty(ConfigurationFactory.WRITER_TARGET_IP),
configuration.getIntProperty(ConfigurationFactory.WRITER_TARGET_PORT),
configuration);
configureLoadBalancerIfEnabled(configuration, tcpConnector);
new TCPReader(configuration.getIntProperty(ConfigurationFactory.READER_LISTENING_PORT),
tcpConnector).read();
} else { // testing purpose
new TCPReader(configuration.getIntProperty(ConfigurationFactory.READER_LISTENING_PORT),
null).read();
configureLoadBalancerIfEnabled(configuration, connector);
sink = connector;
}
FilterConfiguration.configureAndStartFilters(configuration, sink);
}
private static void configureLoadBalancerIfEnabled(final Configuration configuration,
......
......@@ -5,7 +5,6 @@ import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
......@@ -13,10 +12,9 @@ import com.lmax.disruptor.dsl.Disruptor;
import explorviz.live_trace_processing.Constants;
import explorviz.live_trace_processing.filter.AbstractFilter;
import explorviz.live_trace_processing.filter.IPipeReceiver;
import explorviz.live_trace_processing.filter.RecordArrayEvent;
import explorviz.live_trace_processing.filter.RecordArrayEventFactory;
import explorviz.live_trace_processing.filter.reconstruction.TraceReconstructionFilter;
import explorviz.live_trace_processing.filter.reconstruction.ITraceReconstruction;
public final class TCPReader {
private final int listeningPort;
......@@ -28,7 +26,7 @@ public final class TCPReader {
private final List<TCPReaderOneClient> threads = new ArrayList<TCPReaderOneClient>();
public TCPReader(final int listeningPort, final IPipeReceiver endReceiver) {
public TCPReader(final int listeningPort, final ITraceReconstruction traceReconstruction) {
this.listeningPort = listeningPort;
final Disruptor<RecordArrayEvent> disruptor = new Disruptor<RecordArrayEvent>(
......@@ -37,7 +35,7 @@ public final class TCPReader {
@SuppressWarnings("unchecked")
final EventHandler<RecordArrayEvent>[] eventHandlers = new EventHandler[1];
eventHandlers[0] = new TraceReconstructionFilter(TimeUnit.SECONDS.toNanos(5), endReceiver);
eventHandlers[0] = traceReconstruction;
disruptor.handleEventsWith(eventHandlers);
ringBuffer = disruptor.start();
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment