diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/EndStage.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/EndStage.java index cc7daea791965673adf42d73bb725c67c34e2234..d402e4def68eed6c369aefe24d098942efdb98ba 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/EndStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/EndStage.java @@ -11,6 +11,8 @@ import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; public class EndStage<T> implements StageWithPort<T, T> { + private final InputPort<T> inputPort = new InputPort<T>(this); + public int count; public ConstructorClosure<?> closure; public List<Object> list = new LinkedList<Object>(); @@ -46,8 +48,7 @@ public class EndStage<T> implements StageWithPort<T, T> { @Override public InputPort<T> getInputPort() { - // TODO Auto-generated method stub - return null; + return this.inputPort; } @Override diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/ThroughputFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/ThroughputFilter.java index 3296c6ffa2177d87a2bf31ecf767467a7db23148..87abfc8d810709912f60f1b5839556350d8c2a84 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/ThroughputFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/ThroughputFilter.java @@ -35,10 +35,12 @@ public class ThroughputFilter<T> extends ConsumerStage<T, T> { private void computeThroughput() { long diffInNs = System.nanoTime() - this.timestamp; - long diffInMs = TimeUnit.NANOSECONDS.toMillis(diffInNs); - long throughputPerMs = this.numPassedElements / diffInMs; - this.throughputs.add(throughputPerMs); - // this.logger.info("Throughput: " + throughputPerMs + " elements/ms"); + // long diffInMs = TimeUnit.NANOSECONDS.toMillis(diffInNs); + // long throughputPerMs = this.numPassedElements / diffInMs; + long diffInSec = TimeUnit.NANOSECONDS.toSeconds(diffInNs); + long throughputPerSec = this.numPassedElements / diffInSec; + this.throughputs.add(throughputPerSec); + this.logger.info("Throughput: " + throughputPerSec + " elements/s"); } private void resetTimestamp() { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java index 10b3c8b1f576bd10fe564bb9efd39908b6f0e792..9a775c4ead39e2f55cdce2c596a3558ccc07bb1d 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java @@ -19,14 +19,18 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; +import java.nio.channels.ClosedByInterruptException; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import teetime.variant.methodcallWithPorts.framework.core.ProducerStage; import kieker.common.exception.MonitoringRecordException; +import kieker.common.logging.Log; +import kieker.common.logging.LogFactory; import kieker.common.record.AbstractMonitoringRecord; import kieker.common.record.IMonitoringRecord; +import kieker.common.record.misc.RegistryRecord; import kieker.common.util.registry.ILookup; import kieker.common.util.registry.Lookup; @@ -46,6 +50,8 @@ public class TCPReader extends ProducerStage<Void, IMonitoringRecord> { private int port1 = 10133; private int port2 = 10134; + private TCPStringReader tcpStringReader; + // @Override // implement onStop // public void onPipelineStops() { // super.logger.info("Shutdown of TCPReader requested."); @@ -69,6 +75,13 @@ public class TCPReader extends ProducerStage<Void, IMonitoringRecord> { this.port2 = port2; } + @Override + public void onStart() { + this.tcpStringReader = new TCPStringReader(this.port2, this.stringRegistry); + this.tcpStringReader.start(); + super.onStart(); + } + @Override protected void execute5(final Void element) { ServerSocketChannel serversocket = null; @@ -120,6 +133,82 @@ public class TCPReader extends ProducerStage<Void, IMonitoringRecord> { } } } + + this.setReschedulable(false); + this.tcpStringReader.terminate(); + } + } + + /** + * + * @author Jan Waller + * + * @since 1.8 + */ + private static class TCPStringReader extends Thread { + + private static final int MESSAGE_BUFFER_SIZE = 65535; + + private static final Log LOG = LogFactory.getLog(TCPStringReader.class); + + private final int port; + private final ILookup<String> stringRegistry; + private volatile boolean terminated = false; // NOPMD + private volatile Thread readerThread; + + public TCPStringReader(final int port, final ILookup<String> stringRegistry) { + this.port = port; + this.stringRegistry = stringRegistry; + } + + public void terminate() { + this.terminated = true; + this.readerThread.interrupt(); + } + + @Override + public void run() { + this.readerThread = Thread.currentThread(); + ServerSocketChannel serversocket = null; + try { + serversocket = ServerSocketChannel.open(); + serversocket.socket().bind(new InetSocketAddress(this.port)); + if (LOG.isDebugEnabled()) { + LOG.debug("Listening on port " + this.port); + } + // BEGIN also loop this one? + final SocketChannel socketChannel = serversocket.accept(); + final ByteBuffer buffer = ByteBuffer.allocateDirect(MESSAGE_BUFFER_SIZE); + while ((socketChannel.read(buffer) != -1) && (!this.terminated)) { + buffer.flip(); + try { + while (buffer.hasRemaining()) { + buffer.mark(); + RegistryRecord.registerRecordInRegistry(buffer, this.stringRegistry); + } + buffer.clear(); + } catch (final BufferUnderflowException ex) { + buffer.reset(); + buffer.compact(); + } + } + socketChannel.close(); + // END also loop this one? + } catch (final ClosedByInterruptException ex) { + LOG.warn("Reader interrupted", ex); + } catch (final IOException ex) { + LOG.error("Error while reading", ex); + } finally { + if (null != serversocket) { + try { + serversocket.close(); + } catch (final IOException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Failed to close TCP connection!", e); + } + } + } + } } } } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwHomeTraceReconstructionAnalysisTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwHomeTraceReconstructionAnalysisTest.java index ed26c3e61a0183e0cb8dffc5e2549075c35eb000..1fddc4ccb73fb648e65c4fa81c264c1a7b36e9e3 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwHomeTraceReconstructionAnalysisTest.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwHomeTraceReconstructionAnalysisTest.java @@ -79,7 +79,7 @@ public class ChwHomeTraceReconstructionAnalysisTest { assertEquals(6886, trace6886.getTraceMetadata().getTraceId()); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); - System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms"); + System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms"); } @Test @@ -106,7 +106,7 @@ public class ChwHomeTraceReconstructionAnalysisTest { assertEquals(8974347286117089281l, trace1.getTraceMetadata().getTraceId()); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); - System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms"); + System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms"); assertThat(quintiles.get(0.5), is(both(greaterThan(1700l)).and(lessThan(1900l)))); } @@ -135,7 +135,7 @@ public class ChwHomeTraceReconstructionAnalysisTest { assertEquals(1, trace1.getTraceMetadata().getTraceId()); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); - System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms"); + System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms"); } } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTcpTraceReconstructionAnalysisTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTcpTraceReconstructionAnalysisTest.java new file mode 100644 index 0000000000000000000000000000000000000000..e1693a942e3d46d0e99394d1b1643ad233206622 --- /dev/null +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTcpTraceReconstructionAnalysisTest.java @@ -0,0 +1,76 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ +package teetime.variant.methodcallWithPorts.examples.traceReconstruction; + +import static org.junit.Assert.assertEquals; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import teetime.util.StatisticsUtil; +import teetime.util.StopWatch; + +/** + * @author Christian Wulf + * + * @since 1.10 + */ +public class ChwWorkTcpTraceReconstructionAnalysisTest { + + private StopWatch stopWatch; + + @Before + public void before() { + this.stopWatch = new StopWatch(); + } + + @After + public void after() { + long overallDurationInNs = this.stopWatch.getDurationInNs(); + System.out.println("Duration: " + TimeUnit.NANOSECONDS.toMillis(overallDurationInNs) + " ms"); + } + + @Test + public void performAnalysis() { + final TcpTraceReconstructionAnalysis analysis = new TcpTraceReconstructionAnalysis(); + analysis.init(); + + this.stopWatch.start(); + try { + analysis.start(); + } finally { + this.stopWatch.end(); + analysis.onTerminate(); + } + + assertEquals(21001, analysis.getNumRecords()); + assertEquals(1000, analysis.getNumTraces()); + + // TraceEventRecords trace6884 = analysis.getElementCollection().get(0); + // assertEquals(6884, trace6884.getTraceMetadata().getTraceId()); + // + // TraceEventRecords trace6886 = analysis.getElementCollection().get(1); + // assertEquals(6886, trace6886.getTraceMetadata().getTraceId()); + + Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getRecordThroughputs()); + System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms"); + } + +} diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTraceReconstructionAnalysisTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTraceReconstructionAnalysisTest.java index 18e456c538e814e5ccba52889937ddcac12c73ec..87702a47a98969ad255585461ba15ee0584ca645 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTraceReconstructionAnalysisTest.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTraceReconstructionAnalysisTest.java @@ -79,7 +79,7 @@ public class ChwWorkTraceReconstructionAnalysisTest { assertEquals(6886, trace6886.getTraceMetadata().getTraceId()); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); - System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms"); + System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms"); } @Test @@ -106,7 +106,7 @@ public class ChwWorkTraceReconstructionAnalysisTest { assertEquals(8974347286117089281l, trace1.getTraceMetadata().getTraceId()); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); - System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms"); + System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms"); assertThat(quintiles.get(0.5), is(both(greaterThan(1100l)).and(lessThan(1400l)))); } @@ -135,7 +135,7 @@ public class ChwWorkTraceReconstructionAnalysisTest { assertEquals(1, trace1.getTraceMetadata().getTraceId()); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); - System.out.println("Mean throughput: " + quintiles.get(0.5) + " elements/ms"); + System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms"); } } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java new file mode 100644 index 0000000000000000000000000000000000000000..c86e10c0fd8e6a5db29e927d511fc62f21b12779 --- /dev/null +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java @@ -0,0 +1,146 @@ +package teetime.variant.methodcallWithPorts.examples.traceReconstruction; + +import java.util.LinkedList; +import java.util.List; + +import teetime.variant.explicitScheduling.framework.core.Analysis; +import teetime.variant.methodcallWithPorts.framework.core.Pipeline; +import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; +import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; +import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; +import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; +import teetime.variant.methodcallWithPorts.stage.Clock; +import teetime.variant.methodcallWithPorts.stage.CountingFilter; +import teetime.variant.methodcallWithPorts.stage.EndStage; +import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter; +import teetime.variant.methodcallWithPorts.stage.ThroughputFilter; +import teetime.variant.methodcallWithPorts.stage.io.TCPReader; +import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository; +import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter; + +import kieker.analysis.plugin.filter.flow.TraceEventRecords; +import kieker.common.record.IMonitoringRecord; +import kieker.common.record.flow.IFlowRecord; + +public class TcpTraceReconstructionAnalysis extends Analysis { + + private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>(); + + private Thread clockThread; + private Thread clock2Thread; + private Thread workerThread; + + private ClassNameRegistryRepository classNameRegistryRepository; + + private CountingFilter<IMonitoringRecord> recordCounter; + + private CountingFilter<TraceEventRecords> traceCounter; + + private ThroughputFilter<IFlowRecord> recordThroughputFilter; + private ThroughputFilter<TraceEventRecords> traceThroughputFilter; + + @Override + public void init() { + super.init(); + StageWithPort<Void, Long> clockStage = this.buildClockPipeline(); + this.clockThread = new Thread(new RunnableStage(clockStage)); + + StageWithPort<Void, Long> clock2Stage = this.buildClock2Pipeline(); + this.clock2Thread = new Thread(new RunnableStage(clock2Stage)); + + Pipeline<?, ?> pipeline = this.buildPipeline(clockStage, clock2Stage); + this.workerThread = new Thread(new RunnableStage(pipeline)); + } + + private StageWithPort<Void, Long> buildClockPipeline() { + Clock clock = new Clock(); + clock.setIntervalDelayInMs(1000); + + return clock; + } + + private StageWithPort<Void, Long> buildClock2Pipeline() { + Clock clock = new Clock(); + clock.setIntervalDelayInMs(2000); + + return clock; + } + + private Pipeline<Void, TraceEventRecords> buildPipeline(final StageWithPort<Void, Long> clockStage, final StageWithPort<Void, Long> clock2Stage) { + this.classNameRegistryRepository = new ClassNameRegistryRepository(); + + // create stages + TCPReader tcpReader = new TCPReader(); + this.recordCounter = new CountingFilter<IMonitoringRecord>(); + final InstanceOfFilter<IMonitoringRecord, IFlowRecord> instanceOfFilter = new InstanceOfFilter<IMonitoringRecord, IFlowRecord>( + IFlowRecord.class); + this.recordThroughputFilter = new ThroughputFilter<IFlowRecord>(); + final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(); + this.traceThroughputFilter = new ThroughputFilter<TraceEventRecords>(); + this.traceCounter = new CountingFilter<TraceEventRecords>(); + EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>(); + + // connect stages + SpScPipe.connect(tcpReader.getOutputPort(), this.recordCounter.getInputPort(), 1024); + SingleElementPipe.connect(this.recordCounter.getOutputPort(), instanceOfFilter.getInputPort()); + // SingleElementPipe.connect(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort()); + // SingleElementPipe.connect(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); + SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); + SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.traceThroughputFilter.getInputPort()); + SingleElementPipe.connect(this.traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort()); + SingleElementPipe.connect(this.traceCounter.getOutputPort(), endStage.getInputPort()); + + SpScPipe.connect(clockStage.getOutputPort(), this.recordThroughputFilter.getTriggerInputPort(), 1); + SpScPipe.connect(clock2Stage.getOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 1); + + // create and configure pipeline + Pipeline<Void, TraceEventRecords> pipeline = new Pipeline<Void, TraceEventRecords>(); + pipeline.setFirstStage(tcpReader); + pipeline.addIntermediateStage(this.recordCounter); + pipeline.addIntermediateStage(instanceOfFilter); + // pipeline.addIntermediateStage(this.recordThroughputFilter); + pipeline.addIntermediateStage(traceReconstructionFilter); + pipeline.addIntermediateStage(this.traceThroughputFilter); + pipeline.addIntermediateStage(this.traceCounter); + pipeline.setLastStage(endStage); + return pipeline; + } + + @Override + public void start() { + super.start(); + + this.clockThread.start(); + this.clock2Thread.start(); + this.workerThread.start(); + + try { + this.workerThread.join(); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + this.clockThread.interrupt(); + this.clock2Thread.interrupt(); + } + + public List<TraceEventRecords> getElementCollection() { + return this.elementCollection; + } + + public int getNumRecords() { + return this.recordCounter.getNumElementsPassed(); + } + + public int getNumTraces() { + return this.traceCounter.getNumElementsPassed(); + } + + public List<Long> getRecordThroughputs() { + return this.recordThroughputFilter.getThroughputs(); + } + + public List<Long> getTraceThroughputFilter() { + return this.traceThroughputFilter.getThroughputs(); + } + +} diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java index 65b9a5628f0fe3a15db4e2b4ba83296213359910..76a10668248890349eb4c3b899b9bfd201eb49b6 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java @@ -31,8 +31,8 @@ public class TraceReconstructionAnalysis extends Analysis { private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>(); - private Thread producerThread; private Thread clockThread; + private Thread workerThread; private ClassNameRegistryRepository classNameRegistryRepository; @@ -50,8 +50,8 @@ public class TraceReconstructionAnalysis extends Analysis { StageWithPort<Void, Long> clockStage = this.buildClockPipeline(); this.clockThread = new Thread(new RunnableStage(clockStage)); - Pipeline<File, Void> producerPipeline = this.buildProducerPipeline(clockStage); - this.producerThread = new Thread(new RunnableStage(producerPipeline)); + Pipeline<?, ?> pipeline = this.buildPipeline(clockStage); + this.workerThread = new Thread(new RunnableStage(pipeline)); } private StageWithPort<Void, Long> buildClockPipeline() { @@ -61,7 +61,7 @@ public class TraceReconstructionAnalysis extends Analysis { return clock; } - private Pipeline<File, Void> buildProducerPipeline(final StageWithPort<Void, Long> clockStage) { + private Pipeline<File, Void> buildPipeline(final StageWithPort<Void, Long> clockStage) { this.classNameRegistryRepository = new ClassNameRegistryRepository(); // final IsIMonitoringRecordInRange isIMonitoringRecordInRange = new IsIMonitoringRecordInRange(0, 1000); @@ -124,10 +124,10 @@ public class TraceReconstructionAnalysis extends Analysis { super.start(); this.clockThread.start(); - this.producerThread.start(); + this.workerThread.start(); try { - this.producerThread.join(); + this.workerThread.join(); } catch (InterruptedException e) { throw new IllegalStateException(e); } @@ -151,10 +151,10 @@ public class TraceReconstructionAnalysis extends Analysis { } public File getInputDir() { - return inputDir; + return this.inputDir; } - public void setInputDir(File inputDir) { + public void setInputDir(final File inputDir) { this.inputDir = inputDir; } }