From 6b16890a812f35430b95e66a0ed62db94be47e4f Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Mon, 30 Jun 2014 09:23:36 +0200 Subject: [PATCH] fixed bug in Clock; added SysOutFilter --- conf/logging.properties | 10 +- .../SysOutFilter.java | 33 ++++++ .../methodcallWithPorts/stage/Clock.java | 4 +- .../methodcallWithPorts/stage/Relay.java | 4 +- .../TcpTraceReconstructionAnalysis.java | 23 ++-- ...ReconstructionAnalysisWithThreadsTest.java | 3 +- ...raceReconstructionAnalysisWithThreads.java | 107 +++++++++++------- 7 files changed, 125 insertions(+), 59 deletions(-) create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/SysOutFilter.java diff --git a/conf/logging.properties b/conf/logging.properties index 3e86eac8..19edacb8 100644 --- a/conf/logging.properties +++ b/conf/logging.properties @@ -1,8 +1,12 @@ .handlers = java.util.logging.ConsoleHandler -.level = ALL +.level = WARNING -java.util.logging.ConsoleHandler.level = WARNING +java.util.logging.ConsoleHandler.level = ALL #java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %4$s: %5$s (%2$s)%n -#teetime.level = ALL \ No newline at end of file +#teetime.level = ALL + +teetime.variant.methodcallWithPorts.framework.core.level = ALL +#teetime.variant.methodcallWithPorts.stage.level = ALL +teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads.level = FINE \ No newline at end of file diff --git a/src/main/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/SysOutFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/SysOutFilter.java new file mode 100644 index 00000000..0d173381 --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/SysOutFilter.java @@ -0,0 +1,33 @@ +package teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads; + +import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; +import teetime.variant.methodcallWithPorts.framework.core.InputPort; +import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe; + +import kieker.common.record.IMonitoringRecord; + +public class SysOutFilter<T> extends ConsumerStage<T, T> { + + private final InputPort<Long> triggerInputPort = new InputPort<Long>(this); + + private final IPipe<IMonitoringRecord> pipe; + + public SysOutFilter(final IPipe<IMonitoringRecord> pipe) { + this.pipe = pipe; + } + + @Override + protected void execute5(final T element) { + Long timestamp = this.triggerInputPort.receive(); + if (timestamp != null) { + // this.logger.info("pipe.size: " + this.pipe.size()); + System.out.println("pipe.size: " + this.pipe.size()); + } + this.send(element); + } + + public InputPort<Long> getTriggerInputPort() { + return this.triggerInputPort; + } + +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Clock.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Clock.java index 1b7186d6..d5a462ef 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Clock.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Clock.java @@ -25,8 +25,8 @@ public class Clock extends ProducerStage<Void, Long> { this.sleep(this.intervalDelayInMs); } - // System.out.println("Emitting timestamp"); - this.getOutputPort().send(this.getCurrentTimeInNs()); + this.logger.debug("Emitting timestamp"); + this.send(this.getCurrentTimeInNs()); } private void sleep(final long delayInMs) { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java index 0486733e..cec12ca5 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Relay.java @@ -15,7 +15,7 @@ public class Relay<T> extends AbstractStage<T, T> { if (null == element) { if (this.getInputPort().getPipe().isClosed()) { this.setReschedulable(false); - System.out.println("got end signal; pipe.size: " + this.getInputPort().getPipe().size()); + this.logger.debug("got end signal; pipe.size: " + this.getInputPort().getPipe().size()); assert 0 == this.getInputPort().getPipe().size(); } return; @@ -25,7 +25,7 @@ public class Relay<T> extends AbstractStage<T, T> { @Override public void onIsPipelineHead() { - System.out.println("onIsPipelineHead"); + this.logger.debug("onIsPipelineHead"); if (this.getInputPort().getPipe().isClosed()) { this.setReschedulable(false); } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java index 329cbcc0..544ac57b 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TcpTraceReconstructionAnalysis.java @@ -11,6 +11,7 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; import teetime.variant.methodcallWithPorts.stage.Clock; import teetime.variant.methodcallWithPorts.stage.CountingFilter; +import teetime.variant.methodcallWithPorts.stage.Distributor; import teetime.variant.methodcallWithPorts.stage.EndStage; import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter; import teetime.variant.methodcallWithPorts.stage.ThroughputFilter; @@ -39,28 +40,28 @@ public class TcpTraceReconstructionAnalysis extends Analysis { @Override public void init() { super.init(); - StageWithPort<Void, Long> clockStage = this.buildClockPipeline(); + StageWithPort<Void, Long> clockStage = this.buildClockPipeline(1000); this.clockThread = new Thread(new RunnableStage(clockStage)); - StageWithPort<Void, Long> clock2Stage = this.buildClock2Pipeline(); + StageWithPort<Void, Long> clock2Stage = this.buildClockPipeline(2000); this.clock2Thread = new Thread(new RunnableStage(clock2Stage)); Pipeline<?, ?> pipeline = this.buildPipeline(clockStage, clock2Stage); this.workerThread = new Thread(new RunnableStage(pipeline)); } - private StageWithPort<Void, Long> buildClockPipeline() { + private StageWithPort<Void, Long> buildClockPipeline(final long intervalDelayInMs) { Clock clock = new Clock(); - clock.setIntervalDelayInMs(1000); + clock.setIntervalDelayInMs(intervalDelayInMs); + Distributor<Long> distributor = new Distributor<Long>(); - return clock; - } - - private StageWithPort<Void, Long> buildClock2Pipeline() { - Clock clock = new Clock(); - clock.setIntervalDelayInMs(2000); + SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort()); - return clock; + // create and configure pipeline + Pipeline<Void, Long> pipeline = new Pipeline<Void, Long>(); + pipeline.setFirstStage(clock); + pipeline.setLastStage(distributor); + return pipeline; } private Pipeline<Void, TraceEventRecords> buildPipeline(final StageWithPort<Void, Long> clockStage, final StageWithPort<Void, Long> clock2Stage) { diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java index 2c1e1f86..579cefb7 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest.java @@ -50,6 +50,7 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest { @Test public void performAnalysis() { final TcpTraceReconstructionAnalysisWithThreads analysis = new TcpTraceReconstructionAnalysisWithThreads(); + analysis.setNumWorkerThreads(2); analysis.init(); this.stopWatch.start(); @@ -60,7 +61,7 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest { analysis.onTerminate(); } - System.out.println("Max size of pipe: " + analysis.getTcpRelayPipe().getMaxSize()); + System.out.println("Max size of tcp-relay pipe: " + analysis.getTcpRelayPipe().getMaxSize()); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceThroughputs()); System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/ms"); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java index f7fef31a..665bb47b 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstructionWithThreads/TcpTraceReconstructionAnalysisWithThreads.java @@ -25,6 +25,7 @@ import kieker.common.record.flow.IFlowRecord; public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { + private static final int NUM_VIRTUAL_CORES = Runtime.getRuntime().availableProcessors(); private static final int TCP_RELAY_MAX_SIZE = 500000; private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>(); @@ -32,7 +33,7 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { private Thread tcpThread; private Thread clockThread; private Thread clock2Thread; - private Thread workerThread; + private Thread[] workerThreads; private CountingFilter<IMonitoringRecord> recordCounter; @@ -42,6 +43,7 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { private ThroughputFilter<TraceEventRecords> traceThroughputFilter; private SpScPipe<IMonitoringRecord> tcpRelayPipe; + private int numWorkerThreads; @Override public void init() { @@ -49,15 +51,19 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { StageWithPort<Void, IMonitoringRecord> tcpPipeline = this.buildTcpPipeline(); this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); - StageWithPort<Void, Long> clockStage = this.buildClockPipeline(); + StageWithPort<Void, Long> clockStage = this.buildClockPipeline(1000); this.clockThread = new Thread(new RunnableStage(clockStage)); - StageWithPort<Void, Long> clock2Stage = this.buildClock2Pipeline(); + StageWithPort<Void, Long> clock2Stage = this.buildClockPipeline(2000); this.clock2Thread = new Thread(new RunnableStage(clock2Stage)); - StageWithPort<?, ?> pipeline = this.buildPipeline(tcpPipeline, clockStage, clock2Stage); - this.workerThread = new Thread(new RunnableStage(pipeline)); + this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads); + this.workerThreads = new Thread[this.numWorkerThreads]; + for (int i = 0; i < this.workerThreads.length; i++) { + StageWithPort<?, ?> pipeline = this.buildPipeline(tcpPipeline, clockStage, clock2Stage); + this.workerThreads[i] = new Thread(new RunnableStage(pipeline)); + } } private StageWithPort<Void, IMonitoringRecord> buildTcpPipeline() { @@ -73,21 +79,21 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { return pipeline; } - private StageWithPort<Void, Long> buildClockPipeline() { + private StageWithPort<Void, Long> buildClockPipeline(final long intervalDelayInMs) { Clock clock = new Clock(); - clock.setIntervalDelayInMs(1000); + clock.setIntervalDelayInMs(intervalDelayInMs); + Distributor<Long> distributor = new Distributor<Long>(); - return clock; - } + SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort()); - private StageWithPort<Void, Long> buildClock2Pipeline() { - Clock clock = new Clock(); - clock.setIntervalDelayInMs(2000); - - return clock; + // create and configure pipeline + Pipeline<Void, Long> pipeline = new Pipeline<Void, Long>(); + pipeline.setFirstStage(clock); + pipeline.setLastStage(distributor); + return pipeline; } - private Pipeline<IMonitoringRecord, TraceEventRecords> buildPipeline(final StageWithPort<Void, IMonitoringRecord> tcpReaderPipeline, + private Pipeline<IMonitoringRecord, ?> buildPipeline(final StageWithPort<Void, IMonitoringRecord> tcpReaderPipeline, final StageWithPort<Void, Long> clockStage, final StageWithPort<Void, Long> clock2Stage) { // create stages @@ -99,31 +105,43 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { final TraceReconstructionFilter traceReconstructionFilter = new TraceReconstructionFilter(); this.traceThroughputFilter = new ThroughputFilter<TraceEventRecords>(); this.traceCounter = new CountingFilter<TraceEventRecords>(); - EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>(); + // EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>(); + EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>(); // connect stages this.tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE); - SingleElementPipe.connect(relay.getOutputPort(), this.recordCounter.getInputPort()); - SingleElementPipe.connect(this.recordCounter.getOutputPort(), instanceOfFilter.getInputPort()); - // SingleElementPipe.connect(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort()); - // SingleElementPipe.connect(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); - SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); - SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.traceThroughputFilter.getInputPort()); - SingleElementPipe.connect(this.traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort()); - SingleElementPipe.connect(this.traceCounter.getOutputPort(), endStage.getInputPort()); - - SpScPipe.connect(clockStage.getOutputPort(), this.recordThroughputFilter.getTriggerInputPort(), 1); - SpScPipe.connect(clock2Stage.getOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 1); + + SysOutFilter<IMonitoringRecord> sysout = new SysOutFilter<IMonitoringRecord>(this.tcpRelayPipe); + + // // SingleElementPipe.connect(relay.getOutputPort(), this.recordCounter.getInputPort()); + // // SingleElementPipe.connect(this.recordCounter.getOutputPort(), instanceOfFilter.getInputPort()); + // SingleElementPipe.connect(relay.getOutputPort(), instanceOfFilter.getInputPort()); + // SingleElementPipe.connect(relay.getOutputPort(), sysout.getInputPort()); + // SingleElementPipe.connect(sysout.getOutputPort(), endStage.getInputPort()); + SingleElementPipe.connect(relay.getOutputPort(), endStage.getInputPort()); + // // SingleElementPipe.connect(instanceOfFilter.getOutputPort(), this.recordThroughputFilter.getInputPort()); + // // SingleElementPipe.connect(this.recordThroughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); + // SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); + // // SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), this.traceThroughputFilter.getInputPort()); + // // SingleElementPipe.connect(this.traceThroughputFilter.getOutputPort(), this.traceCounter.getInputPort()); + // // SingleElementPipe.connect(this.traceCounter.getOutputPort(), endStage.getInputPort()); + // SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), endStage.getInputPort()); + + SpScPipe.connect(clockStage.getOutputPort(), sysout.getTriggerInputPort(), 10); + // SpScPipe.connect(clockStage.getOutputPort(), this.recordThroughputFilter.getTriggerInputPort(), 10); + SpScPipe.connect(clock2Stage.getOutputPort(), this.traceThroughputFilter.getTriggerInputPort(), 10); // create and configure pipeline - Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>(); + // Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>(); + Pipeline<IMonitoringRecord, IMonitoringRecord> pipeline = new Pipeline<IMonitoringRecord, IMonitoringRecord>(); pipeline.setFirstStage(relay); - pipeline.addIntermediateStage(this.recordCounter); - pipeline.addIntermediateStage(instanceOfFilter); + // pipeline.addIntermediateStage(this.recordCounter); + pipeline.addIntermediateStage(sysout); + // pipeline.addIntermediateStage(instanceOfFilter); // pipeline.addIntermediateStage(this.recordThroughputFilter); - pipeline.addIntermediateStage(traceReconstructionFilter); - pipeline.addIntermediateStage(this.traceThroughputFilter); - pipeline.addIntermediateStage(this.traceCounter); + // pipeline.addIntermediateStage(traceReconstructionFilter); + // pipeline.addIntermediateStage(this.traceThroughputFilter); + // pipeline.addIntermediateStage(this.traceCounter); pipeline.setLastStage(endStage); return pipeline; } @@ -134,17 +152,18 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { this.tcpThread.start(); // this.clockThread.start(); - this.clock2Thread.start(); - this.workerThread.start(); + // this.clock2Thread.start(); - try { - this.tcpThread.join(); - } catch (InterruptedException e) { - throw new IllegalStateException(e); + for (Thread workerThread : this.workerThreads) { + workerThread.start(); } try { - this.workerThread.join(); + this.tcpThread.join(); + + for (Thread workerThread : this.workerThreads) { + workerThread.join(); + } } catch (InterruptedException e) { throw new IllegalStateException(e); } @@ -176,4 +195,12 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis { return this.tcpRelayPipe; } + public int getNumWorkerThreads() { + return this.numWorkerThreads; + } + + public void setNumWorkerThreads(final int numWorkerThreads) { + this.numWorkerThreads = numWorkerThreads; + } + } -- GitLab