diff --git a/Start Master.launch b/Start Master (testing only).launch similarity index 91% rename from Start Master.launch rename to Start Master (testing only).launch index 92ef4480469f95b69dddcabcc9929f542185459e..803bd3b4486e8ea4853f474e0ca60b1012026170 100644 --- a/Start Master.launch +++ b/Start Master (testing only).launch @@ -11,5 +11,5 @@ </listAttribute> <stringAttribute key="org.eclipse.jdt.launching.MAIN_TYPE" value="explorviz.live_trace_processing.main.WorkerStarter"/> <stringAttribute key="org.eclipse.jdt.launching.PROJECT_ATTR" value="worker"/> -<stringAttribute key="org.eclipse.jdt.launching.VM_ARGUMENTS" value="-Xmx4G -Dexplorviz.live_trace_processing.worker_enabled=false -Dexplorviz.live_trace_processing.reader_listening_port=10133"/> +<stringAttribute key="org.eclipse.jdt.launching.VM_ARGUMENTS" value="-Xmx4G -Dexplorviz.live_trace_processing.worker_enabled=false -Dexplorviz.live_trace_processing.reader_listening_port=10134"/> </launchConfiguration> diff --git a/src/META-INF/explorviz.live_trace_processing.default.properties b/src/META-INF/explorviz.live_trace_processing.default.properties index c52ffc7d6e571e360de8913820b895272b7a0f5b..26467df60b85e4e605546524c6f270210e85e163 100644 --- a/src/META-INF/explorviz.live_trace_processing.default.properties +++ b/src/META-INF/explorviz.live_trace_processing.default.properties @@ -3,7 +3,7 @@ explorviz.live_trace_processing.worker_enabled=true explorviz.live_trace_processing.reader_listening_port=10133 explorviz.live_trace_processing.writer_target_ip=127.0.0.1 -explorviz.live_trace_processing.writer_target_port=10134 +explorviz.live_trace_processing.writer_target_port=10133 explorviz.live_trace_processing.writer_load_balancing_enabled=false explorviz.live_trace_processing.writer_load_balancing_ip=10.50.0.2 diff --git a/src/explorviz/live_trace_processing/connector/TCPConnector.java b/src/explorviz/live_trace_processing/connector/TCPConnector.java index af8041444ff9a623c75d16d6a29a10959e763fec..b0e79b48677ef9d48e29c1eac85c9df33503c607 100644 --- a/src/explorviz/live_trace_processing/connector/TCPConnector.java +++ b/src/explorviz/live_trace_processing/connector/TCPConnector.java @@ -26,8 +26,7 @@ public class TCPConnector extends AbstractSink implements IWriter, IStringRecord private final StringRegistry stringRegistry = new StringRegistry(this); - private final ByteBuffer buffer = ByteBuffer - .allocateDirect(Constants.SENDING_BUFFER_SIZE); + private final ByteBuffer buffer = ByteBuffer.allocateDirect(Constants.SENDING_BUFFER_SIZE); private volatile boolean shouldDisconnect = false; diff --git a/src/explorviz/live_trace_processing/filter/reconstruction/ITraceReconstruction.java b/src/explorviz/live_trace_processing/filter/reconstruction/ITraceReconstruction.java new file mode 100644 index 0000000000000000000000000000000000000000..077bb319c81e98d2883d026fda77d64b9568fa6a --- /dev/null +++ b/src/explorviz/live_trace_processing/filter/reconstruction/ITraceReconstruction.java @@ -0,0 +1,7 @@ +package explorviz.live_trace_processing.filter.reconstruction; + +import explorviz.live_trace_processing.filter.IPipeReceiver; + +public interface ITraceReconstruction extends IPipeReceiver { + +} diff --git a/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java index ec030d49214795e3951cdf414c9e338274db407a..fa5a6841e7d024ac5bd13b5e239a22e682ea556d 100644 --- a/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java +++ b/src/explorviz/live_trace_processing/filter/reconstruction/TraceReconstructionFilter.java @@ -5,12 +5,10 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.TimeUnit; import explorviz.live_trace_processing.Constants; import explorviz.live_trace_processing.filter.AbstractFilter; -import explorviz.live_trace_processing.filter.IPipeReceiver; -import explorviz.live_trace_processing.filter.reduction.summarization.TracePatternSummarizationFilter; +import explorviz.live_trace_processing.filter.reduction.ITraceReduction; import explorviz.live_trace_processing.reader.TimeProvider; import explorviz.live_trace_processing.record.IRecord; import explorviz.live_trace_processing.record.event.AbstractOperationEventRecord; @@ -18,15 +16,14 @@ import explorviz.live_trace_processing.record.misc.TerminateRecord; import explorviz.live_trace_processing.record.misc.TimedPeriodRecord; import explorviz.live_trace_processing.record.trace.Trace; -public final class TraceReconstructionFilter extends AbstractFilter { +public final class TraceReconstructionFilter extends AbstractFilter implements ITraceReconstruction { private final long maxTraceTimeout; private final Map<Long, TraceReconstructionBuffer> traceId2trace = new ConcurrentSkipListMap<Long, TraceReconstructionBuffer>(); - public TraceReconstructionFilter(final long maxTraceTimeout, final IPipeReceiver sinkReceiver) { - super( - new TracePatternSummarizationFilter(TimeUnit.MILLISECONDS.toNanos(990), - sinkReceiver), Constants.TRACE_RECONSTRUCTION_DISRUPTOR_SIZE, + public TraceReconstructionFilter(final long maxTraceTimeout, + final ITraceReduction traceReduction) { + super(traceReduction, Constants.TRACE_RECONSTRUCTION_DISRUPTOR_SIZE, Constants.TRACE_RECONSTRUCTION_OUTPUT_BUFFER_SIZE, "Reconstructed traces/sec"); this.maxTraceTimeout = maxTraceTimeout; } diff --git a/src/explorviz/live_trace_processing/filter/reduction/ITraceReduction.java b/src/explorviz/live_trace_processing/filter/reduction/ITraceReduction.java new file mode 100644 index 0000000000000000000000000000000000000000..5623979d4c2eabd767a6961e43253cca9756825d --- /dev/null +++ b/src/explorviz/live_trace_processing/filter/reduction/ITraceReduction.java @@ -0,0 +1,7 @@ +package explorviz.live_trace_processing.filter.reduction; + +import explorviz.live_trace_processing.filter.IPipeReceiver; + +public interface ITraceReduction extends IPipeReceiver { + +} diff --git a/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationFilter.java b/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationFilter.java index ea95b53f7822d91b4a7f0040d69b7c59f746abe9..6683064f919c951f22771d9201214c861df47545 100644 --- a/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationFilter.java +++ b/src/explorviz/live_trace_processing/filter/reduction/summarization/TracePatternSummarizationFilter.java @@ -8,6 +8,7 @@ import java.util.concurrent.ConcurrentSkipListMap; import explorviz.live_trace_processing.Constants; import explorviz.live_trace_processing.filter.AbstractFilter; import explorviz.live_trace_processing.filter.IPipeReceiver; +import explorviz.live_trace_processing.filter.reduction.ITraceReduction; import explorviz.live_trace_processing.reader.TimeProvider; import explorviz.live_trace_processing.record.IRecord; import explorviz.live_trace_processing.record.misc.TerminateRecord; @@ -15,7 +16,7 @@ import explorviz.live_trace_processing.record.misc.TimedPeriodRecord; import explorviz.live_trace_processing.record.trace.Trace; import explorviz.live_trace_processing.record.trace.TraceComperator; -public class TracePatternSummarizationFilter extends AbstractFilter { +public class TracePatternSummarizationFilter extends AbstractFilter implements ITraceReduction { private final long maxCollectionDuration; private final Map<Trace, TracePatternSummarizationBuffer> trace2buffer = new ConcurrentSkipListMap<Trace, TracePatternSummarizationBuffer>( diff --git a/src/explorviz/live_trace_processing/main/FilterConfiguration.java b/src/explorviz/live_trace_processing/main/FilterConfiguration.java new file mode 100644 index 0000000000000000000000000000000000000000..977ca2c97183a337f7fcf23572a6a19bcc34aca0 --- /dev/null +++ b/src/explorviz/live_trace_processing/main/FilterConfiguration.java @@ -0,0 +1,26 @@ +package explorviz.live_trace_processing.main; + +import java.util.concurrent.TimeUnit; + +import explorviz.live_trace_processing.configuration.Configuration; +import explorviz.live_trace_processing.configuration.ConfigurationFactory; +import explorviz.live_trace_processing.filter.ITraceSink; +import explorviz.live_trace_processing.filter.reconstruction.ITraceReconstruction; +import explorviz.live_trace_processing.filter.reconstruction.TraceReconstructionFilter; +import explorviz.live_trace_processing.filter.reduction.ITraceReduction; +import explorviz.live_trace_processing.filter.reduction.summarization.TracePatternSummarizationFilter; +import explorviz.live_trace_processing.reader.TCPReader; + +public class FilterConfiguration { + public static void configureAndStartFilters(final Configuration configuration, + final ITraceSink sink) { + final ITraceReduction traceReduction = new TracePatternSummarizationFilter( + TimeUnit.MILLISECONDS.toNanos(990), sink); + + final ITraceReconstruction traceReconstruction = new TraceReconstructionFilter( + TimeUnit.SECONDS.toNanos(5), traceReduction); + + new TCPReader(configuration.getIntProperty(ConfigurationFactory.READER_LISTENING_PORT), + traceReconstruction).read(); + } +} diff --git a/src/explorviz/live_trace_processing/main/WorkerStarter.java b/src/explorviz/live_trace_processing/main/WorkerStarter.java index 5219793169cc58ddbea98fb9555cbaf9dc0f990b..5d191fc9b88cd8c12c65327e7378a5ecc01dc7b4 100644 --- a/src/explorviz/live_trace_processing/main/WorkerStarter.java +++ b/src/explorviz/live_trace_processing/main/WorkerStarter.java @@ -5,7 +5,7 @@ import java.io.IOException; import explorviz.live_trace_processing.configuration.Configuration; import explorviz.live_trace_processing.configuration.ConfigurationFactory; import explorviz.live_trace_processing.connector.TCPConnector; -import explorviz.live_trace_processing.reader.TCPReader; +import explorviz.live_trace_processing.filter.ITraceSink; import explorviz.live_trace_processing.writer.load_balancer.LoadBalancer; public class WorkerStarter { @@ -13,24 +13,22 @@ public class WorkerStarter { public static void main(final String[] args) { final Configuration configuration = ConfigurationFactory.createSingletonConfiguration(); - final boolean worker = configuration + final boolean isWorker = configuration .getBooleanProperty(ConfigurationFactory.WORKER_ENABLED); - if (worker) { - final TCPConnector tcpConnector = new TCPConnector( + ITraceSink sink = null; + + if (isWorker) { + final TCPConnector connector = new TCPConnector( configuration.getStringProperty(ConfigurationFactory.WRITER_TARGET_IP), configuration.getIntProperty(ConfigurationFactory.WRITER_TARGET_PORT), configuration); - configureLoadBalancerIfEnabled(configuration, tcpConnector); - - new TCPReader(configuration.getIntProperty(ConfigurationFactory.READER_LISTENING_PORT), - tcpConnector).read(); - } else { // testing purpose - new TCPReader(configuration.getIntProperty(ConfigurationFactory.READER_LISTENING_PORT), - null).read(); + configureLoadBalancerIfEnabled(configuration, connector); + sink = connector; } + FilterConfiguration.configureAndStartFilters(configuration, sink); } private static void configureLoadBalancerIfEnabled(final Configuration configuration, diff --git a/src/explorviz/live_trace_processing/reader/TCPReader.java b/src/explorviz/live_trace_processing/reader/TCPReader.java index c449d95981908ecfb367b6694bae30be8f3fce95..9fe1d35deebf7a5f1d60c2282ce3832398249aaf 100644 --- a/src/explorviz/live_trace_processing/reader/TCPReader.java +++ b/src/explorviz/live_trace_processing/reader/TCPReader.java @@ -5,7 +5,6 @@ import java.net.InetSocketAddress; import java.nio.channels.ServerSocketChannel; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.TimeUnit; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.RingBuffer; @@ -13,10 +12,9 @@ import com.lmax.disruptor.dsl.Disruptor; import explorviz.live_trace_processing.Constants; import explorviz.live_trace_processing.filter.AbstractFilter; -import explorviz.live_trace_processing.filter.IPipeReceiver; import explorviz.live_trace_processing.filter.RecordArrayEvent; import explorviz.live_trace_processing.filter.RecordArrayEventFactory; -import explorviz.live_trace_processing.filter.reconstruction.TraceReconstructionFilter; +import explorviz.live_trace_processing.filter.reconstruction.ITraceReconstruction; public final class TCPReader { private final int listeningPort; @@ -28,7 +26,7 @@ public final class TCPReader { private final List<TCPReaderOneClient> threads = new ArrayList<TCPReaderOneClient>(); - public TCPReader(final int listeningPort, final IPipeReceiver endReceiver) { + public TCPReader(final int listeningPort, final ITraceReconstruction traceReconstruction) { this.listeningPort = listeningPort; final Disruptor<RecordArrayEvent> disruptor = new Disruptor<RecordArrayEvent>( @@ -37,7 +35,7 @@ public final class TCPReader { @SuppressWarnings("unchecked") final EventHandler<RecordArrayEvent>[] eventHandlers = new EventHandler[1]; - eventHandlers[0] = new TraceReconstructionFilter(TimeUnit.SECONDS.toNanos(5), endReceiver); + eventHandlers[0] = traceReconstruction; disruptor.handleEventsWith(eventHandlers); ringBuffer = disruptor.start(); }