diff --git a/conf/logging.properties b/conf/logging.properties index 3e86eac8825fa96ce44b9eeae6883d9f55da6a5e..19edacb8c3100024c710d0c5fd1abe4103af6f16 100644 --- a/conf/logging.properties +++ b/conf/logging.properties @@ -1,8 +1,12 @@ .handlers = java.util.logging.ConsoleHandler -.level = ALL +.level = WARNING -java.util.logging.ConsoleHandler.level = WARNING +java.util.logging.ConsoleHandler.level = ALL #java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %4$s: %5$s (%2$s)%n -#teetime.level = ALL \ No newline at end of file +#teetime.level = ALL + +teetime.variant.methodcallWithPorts.framework.core.level = ALL +#teetime.variant.methodcallWithPorts.stage.level = ALL +teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads.level = FINE \ No newline at end of file diff --git a/src/main/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/SysOutFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/SysOutFilter.java new file mode 100644 index 0000000000000000000000000000000000000000..0d173381ca30c1acbde7a1bd0b7289deb98fc2aa --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/SysOutFilter.java @@ -0,0 +1,33 @@ +package teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads; + +import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; +import teetime.variant.methodcallWithPorts.framework.core.InputPort; +import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe; + +import kieker.common.record.IMonitoringRecord; + +public class SysOutFilter<T> extends ConsumerStage<T, T> { + + private final InputPort<Long> triggerInputPort = new InputPort<Long>(this); + + private final IPipe<IMonitoringRecord> pipe; + + public SysOutFilter(final IPipe<IMonitoringRecord> pipe) { + this.pipe = pipe; + } + + @Override + protected void execute5(final T element) { + Long timestamp = this.triggerInputPort.receive(); + if (timestamp != null) { + // this.logger.info("pipe.size: " + this.pipe.size()); + System.out.println("pipe.size: " + this.pipe.size()); + } + this.send(element); + } + + public InputPort<Long> getTriggerInputPort() { + return this.triggerInputPort; + } + +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Clock.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Clock.java index 1b7186d6fd80f7d6e77465faa66da85a3df3c55b..d5a462efc9933d8eff88a7e389739c086a89c447 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Clock.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Clock.java @@ -25,8 +25,8 @@ public class Clock extends ProducerStage<Void, Long> { this.sleep(this.intervalDelayInMs); } - // System.out.println("Emitting timestamp"); - this.getOutputPort().send(this.getCurrentTimeInNs()); + this.logger.debug("Emitting timestamp"); + this.send(this.getCurrentTimeInNs()); } private void sleep(final long delayInMs) { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java index 0486733ea08bb6c7fbf598f95808365a645866c7..cec12ca5ea92132d5b11d770d2a6572616a57c5b 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java @@ -15,7 +15,7 @@ public class Relay<T> extends AbstractStage<T, T> { if (null == element) { if (this.getInputPort().getPipe().isClosed()) { this.setReschedulable(false); - System.out.println("got end signal; pipe.size: " + this.getInputPort().getPipe().size()); + this.logger.debug("got end signal; pipe.size: " + this.getInputPort().getPipe().size()); assert 0 == this.getInputPort().getPipe().size(); } return; @@ -25,7 +25,7 @@ public class Relay<T> extends AbstractStage<T, T> { @Override public void onIsPipelineHead() { - System.out.println("onIsPipelineHead"); + this.logger.debug("onIsPipelineHead"); if (this.getInputPort().getPipe().isClosed()) { this.setReschedulable(false); } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java index 329cbcc06a9258e8f409ebc02158bdf7901d971a..544ac57b6a8c44d95211794b8d5d543a21938ca7 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java @@ -11,6 +11,7 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import teetime.variant.methodcallWithPorts.stage.Clock; import teetime.variant.methodcallWithPorts.stage.CountingFilter; +import teetime.variant.methodcallWithPorts.stage.Distributor; import teetime.variant.methodcallWithPorts.stage.EndStage; import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter; import teetime.variant.methodcallWithPorts.stage.ThroughputFilter; @@ -39,28 +40,28 @@ public class TcpTraceReconstructionAnalysis extends Analysis { @Override public void init() { super.init(); - StageWithPort<Void, Long> clockStage = this.buildClockPipeline(); + StageWithPort<Void, Long> clockStage = this.buildClockPipeline(1000); this.clockThread = new Thread(new RunnableStage(clockStage)); - StageWithPort<Void, Long> clock2Stage = this.buildClock2Pipeline(); + StageWithPort<Void, Long> clock2Stage = this.buildClockPipeline(2000); this.clock2Thread = new Thread(new RunnableStage(clock2Stage)); Pipeline<?, ?> pipeline = this.buildPipeline(clockStage, clock2Stage); this.workerThread = new Thread(new RunnableStage(pipeline)); } - private StageWithPort<Void, Long> buildClockPipeline() { + private StageWithPort<Void, Long> buildClockPipeline(final long intervalDelayInMs) { Clock clock = new Clock(); - clock.setIntervalDelayInMs(1000); + clock.setIntervalDelayInMs(intervalDelayInMs); + Distributor<Long> distributor = new Distributor<Long>(); - return clock; - } - - private StageWithPort<Void, Long> buildClock2Pipeline() { - Clock clock = new Clock(); - clock.setIntervalDelayInMs(2000); + SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort()); - return clock; + // create and configure pipeline + Pipeline<Void, Long> pipeline = new Pipeline<Void, Long>(); + pipeline.setFirstStage(clock); + pipeline.setLastStage(distributor); + return pipeline; } private Pipeline<Void, TraceEventRecords> buildPipeline(final StageWithPort<Void, Long> clockStage, final StageWithPort<Void, Long> clock2Stage) { diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java index 2c1e1f86cce45bf3f1467f3da4c17086b1ac0ff4..579cefb7aef0933b054bff13ec19c98e00a88d67 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java @@ -50,6 +50,7 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest { @Test public void performAnalysis() { final TcpTraceReconstructionAnalysisWithThreads analysis = new TcpTraceReconstructionAnalysisWithThreads(); + analysis.setNumWorkerThreads(2); analysis.init(); this.stopWatch.start(); @@ -60,7 +61,7 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest { analysis.onTerminate(); } - System.out.println("Max size of pipe: " + analysis.getTcpRelayPipe().getMaxSize()); + System.out.println("Max size of tcp-relay pipe: " + analysis.getTcpRelayPipe().getMaxSize()); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceThroughputs()); System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms"); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java index f7fef31a4f64aceaa4f41cfa7593800fe8a2fce5..665bb47b16817ecd27be3155a5ed8933f4becf74 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java @@ -25,6 +25,7 @@ import kieker.common.record.flow.IFlowRecord; public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { + private static final int NUM_VIRTUAL_CORES = Runtime.getRuntime().availableProcessors(); private static final int TCP_RELAY_MAX_SIZE = 500000; private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>(); @@ -32,7 +33,7 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { private Thread tcpThread; private Thread clockThread; private Thread clock2Thread; - private Thread workerThread; + private Thread[] workerThreads; private CountingFilter<IMonitoringRecord> recordCounter; @@ -42,6 +43,7 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { private ThroughputFilter<TraceEventRecords> traceThroughputFilter; private SpScPipe<IMonitoringRecord> tcpRelayPipe; + private int numWorkerThreads; @Override public void init() { @@ -49,15 +51,19 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { StageWithPort<Void, IMonitoringRecord> tcpPipeline = this.buildTcpPipeline(); this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); - StageWithPort<Void, Long> clockStage = this.buildClockPipeline(); + StageWithPort<Void, Long> clockStage = this.buildClockPipeline(1000); this.clockThread = new Thread(new RunnableStage(clockStage)); - StageWithPort<Void, Long> clock2Stage = this.buildClock2Pipeline(); + StageWithPort<Void, Long> clock2Stage = this.buildClockPipeline(2000); this.clock2Thread = new Thread(new RunnableStage(clock2Stage)); - StageWithPort<?, ?> pipeline = this.buildPipeline(tcpPipeline, clockStage, clock2Stage); - this.workerThread = new Thread(new RunnableStage(pipeline)); + this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads); + this.workerThreads = new Thread[this.numWorkerThreads]; + for (int i = 0; i < this.workerThreads.length; i++) { + StageWithPort<?, ?> pipeline = this.buildPipeline(tcpPipeline, clockStage, clock2Stage); + this.workerThreads[i] = new Thread(new RunnableStage(pipeline)); + } } private StageWithPort<Void, IMonitoringRecord> buildTcpPipeline() { @@ -73,21 +79,21 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { return pipeline; } - private StageWithPort<Void, Long> buildClockPipeline() { + private StageWithPort<Void, Long> buildClockPipeline(final long intervalDelayInMs) { Clock clock = new Clock(); - clock.setIntervalDelayInMs(1000); + clock.setIntervalDelayInMs(intervalDelayInMs); + Distributor<Long> distributor = new Distributor<Long>(); - return clock; - } + SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort()); - private StageWithPort<Void, Long> buildClock2Pipeline() { - Clock clock = new Clock(); - clock.setIntervalDelayInMs(2000); - - return clock; + // create and configure pipeline + Pipeline<Void, Long> pipeline = new Pipeline<Void, Long>(); + pipeline.setFirstStage(clock); + pipeline.setLastStage(distributor); + return pipeline; } - private Pipeline<IMonitoringRecord, TraceEventRecords> buildPipeline(final StageWithPort<Void, IMonitoringRecord> tcpReaderPipeline, + private Pipeline<IMonitoringRecord, ?> buildPipeline(final StageWithPort<Void, IMonitoringRecord> tcpReaderPipeline, final StageWithPort<Void, Long> clockStage, final StageWithPort<Void, Long> clock2Stage) { // create stages @@ -99,31 +105,43 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(); this.traceThroughputFilter = new ThroughputFilter<TraceEventRecords>(); this.traceCounter = new CountingFilter<TraceEventRecords>(); - EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>(); + // EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>(); + EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>(); // connect stages this.tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE); - SingleElementPipe.connect(relay.getOutputPort(), this.recordCounter.getInputPort()); - SingleElementPipe.connect(this.recordCounter.getOutputPort(), instanceOfFilter.getInputPort()); - // SingleElementPipe.connect(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort()); - // SingleElementPipe.connect(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); - SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); - SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.traceThroughputFilter.getInputPort()); - SingleElementPipe.connect(this.traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort()); - SingleElementPipe.connect(this.traceCounter.getOutputPort(), endStage.getInputPort()); - - SpScPipe.connect(clockStage.getOutputPort(), this.recordThroughputFilter.getTriggerInputPort(), 1); - SpScPipe.connect(clock2Stage.getOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 1); + + SysOutFilter<IMonitoringRecord> sysout = new SysOutFilter<IMonitoringRecord>(this.tcpRelayPipe); + + // // SingleElementPipe.connect(relay.getOutputPort(), this.recordCounter.getInputPort()); + // // SingleElementPipe.connect(this.recordCounter.getOutputPort(), instanceOfFilter.getInputPort()); + // SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort()); + // SingleElementPipe.connect(relay.getOutputPort(), sysout.getInputPort()); + // SingleElementPipe.connect(sysout.getOutputPort(), endStage.getInputPort()); + SingleElementPipe.connect(relay.getOutputPort(), endStage.getInputPort()); + // // SingleElementPipe.connect(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort()); + // // SingleElementPipe.connect(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); + // SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); + // // SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.traceThroughputFilter.getInputPort()); + // // SingleElementPipe.connect(this.traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort()); + // // SingleElementPipe.connect(this.traceCounter.getOutputPort(), endStage.getInputPort()); + // SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), endStage.getInputPort()); + + SpScPipe.connect(clockStage.getOutputPort(), sysout.getTriggerInputPort(), 10); + // SpScPipe.connect(clockStage.getOutputPort(), this.recordThroughputFilter.getTriggerInputPort(), 10); + SpScPipe.connect(clock2Stage.getOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 10); // create and configure pipeline - Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>(); + // Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>(); + Pipeline<IMonitoringRecord, IMonitoringRecord> pipeline = new Pipeline<IMonitoringRecord, IMonitoringRecord>(); pipeline.setFirstStage(relay); - pipeline.addIntermediateStage(this.recordCounter); - pipeline.addIntermediateStage(instanceOfFilter); + // pipeline.addIntermediateStage(this.recordCounter); + pipeline.addIntermediateStage(sysout); + // pipeline.addIntermediateStage(instanceOfFilter); // pipeline.addIntermediateStage(this.recordThroughputFilter); - pipeline.addIntermediateStage(traceReconstructionFilter); - pipeline.addIntermediateStage(this.traceThroughputFilter); - pipeline.addIntermediateStage(this.traceCounter); + // pipeline.addIntermediateStage(traceReconstructionFilter); + // pipeline.addIntermediateStage(this.traceThroughputFilter); + // pipeline.addIntermediateStage(this.traceCounter); pipeline.setLastStage(endStage); return pipeline; } @@ -134,17 +152,18 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { this.tcpThread.start(); // this.clockThread.start(); - this.clock2Thread.start(); - this.workerThread.start(); + // this.clock2Thread.start(); - try { - this.tcpThread.join(); - } catch (InterruptedException e) { - throw new IllegalStateException(e); + for (Thread workerThread : this.workerThreads) { + workerThread.start(); } try { - this.workerThread.join(); + this.tcpThread.join(); + + for (Thread workerThread : this.workerThreads) { + workerThread.join(); + } } catch (InterruptedException e) { throw new IllegalStateException(e); } @@ -176,4 +195,12 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { return this.tcpRelayPipe; } + public int getNumWorkerThreads() { + return this.numWorkerThreads; + } + + public void setNumWorkerThreads(final int numWorkerThreads) { + this.numWorkerThreads = numWorkerThreads; + } + }