diff --git a/conf/logging.properties b/conf/logging.properties index 7e34493daab251c1f3ec04fe34770320dfa97598..d8c2b84d8dfc04878acbd74aff21841bd821ca7e 100644 --- a/conf/logging.properties +++ b/conf/logging.properties @@ -1,13 +1,13 @@ .handlers = java.util.logging.ConsoleHandler -.level = ALL +.level = WARNING java.util.logging.ConsoleHandler.level = ALL #java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %4$s: %5$s (%2$s)%n -teetime.level = ALL +#teetime.level = ALL -teetime.variant.methodcallWithPorts.framework.level = ALL -teetime.variant.methodcallWithPorts.framework.core.level = ALL -teetime.variant.methodcallWithPorts.stage.level = FINE +#teetime.variant.methodcallWithPorts.framework.level = ALL +#teetime.variant.methodcallWithPorts.framework.core.level = ALL +#teetime.variant.methodcallWithPorts.stage.level = FINE #teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads.level = FINE diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java index ed380b19e855b18b643fa06eee2ca5c78f6b2e42..a805089e6805be089657d71a33bd9626124ee20e 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java @@ -20,10 +20,8 @@ public class SpScPipe<T> extends AbstractPipe<T> { public static <T> SpScPipe<T> connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort, final int capacity) { SpScPipe<T> pipe = new SpScPipe<T>(capacity); targetPort.setPipe(pipe); - if (sourcePort != null) { - sourcePort.setPipe(pipe); - sourcePort.setCachedTargetStage(targetPort.getOwningStage()); - } + sourcePort.setPipe(pipe); + sourcePort.setCachedTargetStage(targetPort.getOwningStage()); return pipe; } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Cache.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Cache.java index 44213e2910d02f75f3e37ae6ed03105b9751e8c3..b7904b19a2ec18a569c2ebff48453f7080e9a958 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Cache.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Cache.java @@ -18,14 +18,14 @@ public class Cache<T> extends ConsumerStage<T, T> { @Override public void onIsPipelineHead() { - this.logger.debug("Emitting cached elements..."); + this.logger.debug("Emitting " + this.cachedObjects.size() + " cached elements..."); StopWatch stopWatch = new StopWatch(); stopWatch.start(); for (T cachedElement : this.cachedObjects) { this.send(cachedElement); } stopWatch.end(); - System.out.println("dur: " + TimeUnit.NANOSECONDS.toMillis(stopWatch.getDurationInNs()) + " ms"); + this.logger.debug("Emitting took " + TimeUnit.NANOSECONDS.toMillis(stopWatch.getDurationInNs()) + " ms"); super.onIsPipelineHead(); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/CollectorSink.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/CollectorSink.java index 5e643e68b7ff57975a2ed20ad14ddc88f2a6b76b..b172d211f53696f5c031ac4c5a02b7c8ba2bbb29 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/CollectorSink.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/CollectorSink.java @@ -27,12 +27,16 @@ import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; */ public class CollectorSink<T> extends ConsumerStage<T, Void> { - private static final int THRESHOLD = 10000; // TODO make configurable or use an sysout stage instead - private final List<T> elements; + private final int threshold; - public CollectorSink(final List<T> list) { + public CollectorSink(final List<T> list, final int threshold) { this.elements = list; + this.threshold = threshold; + } + + public CollectorSink(final List<T> list) { + this(list, 100000); } @Override @@ -50,7 +54,7 @@ public class CollectorSink<T> extends ConsumerStage<T, Void> { protected void execute5(final T element) { this.elements.add(element); - if ((this.elements.size() % THRESHOLD) == 0) { + if ((this.elements.size() % this.threshold) == 0) { System.out.println("size: " + this.elements.size()); } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java index 3f5c82aca9bf68232b6737c18211918f43f204d6..fd2bdfc19166306aa057bcd78c323af86363c896 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java @@ -35,13 +35,24 @@ public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T, T> { private void computeElementThroughput(final Long timestampInNs) { long diffInNs = timestampInNs - this.lastTimestampInNs; - long diffInSec = TimeUnit.NANOSECONDS.toSeconds(diffInNs); - if (diffInSec > 0) { - long throughputPerSec = this.numPassedElements / diffInSec; + + // BETTER use the TimeUnit of the clock + long diffInMs = TimeUnit.NANOSECONDS.toMillis(diffInNs); + if (diffInMs > 0) { + long throughputPerSec = this.numPassedElements / diffInMs; this.throughputs.add(throughputPerSec); - this.logger.info("Throughput: " + throughputPerSec + " elements/s" + " -> numPassedElements=" + this.numPassedElements); + this.logger.info("Throughput: " + throughputPerSec + " elements/ms" + " -> numPassedElements=" + this.numPassedElements); this.resetTimestamp(timestampInNs); + } else { + long diffInSec = TimeUnit.NANOSECONDS.toSeconds(diffInNs); + if (diffInSec > 0) { + long throughputPerSec = this.numPassedElements / diffInSec; + this.throughputs.add(throughputPerSec); + this.logger.info("Throughput: " + throughputPerSec + " elements/s" + " -> numPassedElements=" + this.numPassedElements); + + this.resetTimestamp(timestampInNs); + } } } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/FileExtensionSwitch.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/FileExtensionSwitch.java index 5454e5322eadbe782a3ce05a92737dc1ec853c89..044a2ce0ba14b2dd0cc067fc36ab9f261864ee1b 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/FileExtensionSwitch.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/FileExtensionSwitch.java @@ -5,12 +5,16 @@ import java.util.HashMap; import java.util.Map; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; +import teetime.variant.methodcallWithPorts.framework.core.InputPort; import teetime.variant.methodcallWithPorts.framework.core.OutputPort; +import teetime.variant.methodcallWithPorts.framework.core.Signal; import com.google.common.io.Files; public class FileExtensionSwitch extends ConsumerStage<File, File> { + // BETTER do not extends from AbstractStage since it provides another unused output port + private final Map<String, OutputPort<File>> fileExtensions = new HashMap<String, OutputPort<File>>(); @Override @@ -23,6 +27,24 @@ public class FileExtensionSwitch extends ConsumerStage<File, File> { } } + @Override + public void onSignal(final Signal signal, final InputPort<?> inputPort) { + this.logger.debug("Got signal: " + signal + " from input port: " + inputPort); + + switch (signal) { + case FINISHED: + this.onFinished(); + break; + default: + this.logger.warn("Aborted sending signal " + signal + ". Reason: Unknown signal."); + break; + } + + for (OutputPort<File> op : this.fileExtensions.values()) { + op.sendSignal(signal); + } + } + public OutputPort<File> addFileExtension(String fileExtension) { if (fileExtension.startsWith(".")) { fileExtension = fileExtension.substring(1); diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java index 70ec23873c60b18af3e79440e8f9d917afdd3f3f..99b69911238dfadd2d517fe126efd2df2b2f6c65 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java @@ -21,6 +21,7 @@ import java.util.List; import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; import teetime.variant.methodcallWithPorts.framework.core.InputPort; +import teetime.variant.methodcallWithPorts.framework.core.Signal; /** * @@ -40,6 +41,8 @@ public class Merger<T> extends ConsumerStage<T, T> { // BETTER use an array since a list always creates a new iterator when looping private final List<InputPort<T>> inputPortList = new ArrayList<InputPort<T>>(); + private int finishedInputPorts; + private IMergerStrategy<T> strategy = new RoundRobinStrategy<T>(); public IMergerStrategy<T> getStrategy() { @@ -65,6 +68,29 @@ public class Merger<T> extends ConsumerStage<T, T> { this.setReschedulable(isReschedulable); } + @Override + public void onSignal(final Signal signal, final InputPort<?> inputPort) { + this.logger.debug("Got signal: " + signal + " from input port: " + inputPort); + + switch (signal) { + case FINISHED: + this.onFinished(); + break; + default: + this.logger.warn("Aborted sending signal " + signal + ". Reason: Unknown signal."); + break; + } + + if (this.finishedInputPorts == this.inputPortList.size()) { + this.getOutputPort().sendSignal(signal); + } + } + + @Override + protected void onFinished() { + this.finishedInputPorts++; + } + @Override protected void execute5(final T element) { final T token = this.strategy.getNextInput(this); diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/DatFile2RecordFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/DatFile2RecordFilter.java index 2660c06c303ebab2a79f0fa29d6a523bbb354e36..116e8b49c918e36419aac7e70dc74cf78bf71fc7 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/DatFile2RecordFilter.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/DatFile2RecordFilter.java @@ -18,7 +18,7 @@ package teetime.variant.methodcallWithPorts.stage.kieker.fileToRecord; import java.io.File; import teetime.variant.methodcallWithPorts.framework.core.Pipeline; -import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; +import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; import teetime.variant.methodcallWithPorts.stage.io.File2TextLinesFilter; import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository; import teetime.variant.methodcallWithPorts.stage.kieker.fileToRecord.textLine.TextLine2RecordFilter; @@ -40,6 +40,6 @@ public class DatFile2RecordFilter extends Pipeline<File, IMonitoringRecord> { this.setLastStage(textLine2RecordFilter); // BETTER let the framework choose the optimal pipe implementation - SpScPipe.connect(file2TextLinesFilter.getOutputPort(), textLine2RecordFilter.getInputPort(), 1); + SingleElementPipe.connect(file2TextLinesFilter.getOutputPort(), textLine2RecordFilter.getInputPort()); } } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTraceReconstructionAnalysisTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTraceReconstructionAnalysisTest.java index 82f3e6283b857c11f3f2334897575d974e57c8b5..1bfbc094e63976aea4fe68b90033dabf14069902 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTraceReconstructionAnalysisTest.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTraceReconstructionAnalysisTest.java @@ -23,6 +23,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import java.io.File; +import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -78,8 +80,9 @@ public class ChwWorkTraceReconstructionAnalysisTest { TraceEventRecords trace6886 = analysis.getElementCollection().get(1); assertEquals(6886, trace6886.getTraceMetadata().getTraceId()); + this.removeFirstZeroThroughputs(analysis); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); - System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element"); + System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/time unit"); } @Test @@ -105,8 +108,9 @@ public class ChwWorkTraceReconstructionAnalysisTest { TraceEventRecords trace1 = analysis.getElementCollection().get(1); assertEquals(8974347286117089281l, trace1.getTraceMetadata().getTraceId()); + this.removeFirstZeroThroughputs(analysis); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); - System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element"); + System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/time unit"); assertThat(quintiles.get(0.5), is(both(greaterThan(1100l)).and(lessThan(1400l)))); } @@ -134,8 +138,21 @@ public class ChwWorkTraceReconstructionAnalysisTest { TraceEventRecords trace1 = analysis.getElementCollection().get(1); assertEquals(1, trace1.getTraceMetadata().getTraceId()); + this.removeFirstZeroThroughputs(analysis); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs()); - System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element"); + System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/time unit"); + } + + private void removeFirstZeroThroughputs(final TraceReconstructionAnalysis analysis) { + List<Long> throughputs = analysis.getThroughputs(); + Iterator<Long> iterator = throughputs.iterator(); + while (iterator.hasNext()) { + if (iterator.next() == 0) { + iterator.remove(); + } else { + break; + } + } } } diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java index 8a13ff2d07b26a7468584b8459cc2c62edcb7f88..c85f066cbbe3a8b82027577d0f39ac0536deb964 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java @@ -50,10 +50,10 @@ public class TraceReconstructionAnalysis extends Analysis { public void init() { super.init(); StageWithPort<Void, Long> clockStage = this.buildClockPipeline(); - this.clockThread = new Thread(new RunnableStage(clockStage)); + this.clockThread = new Thread(new RunnableStage<Void>(clockStage)); - Pipeline<?, ?> pipeline = this.buildPipeline(clockStage); - this.workerThread = new Thread(new RunnableStage(pipeline)); + Pipeline<File, ?> pipeline = this.buildPipeline(clockStage); + this.workerThread = new Thread(new RunnableStage<File>(pipeline)); } private StageWithPort<Void, Long> buildClockPipeline() { @@ -84,7 +84,7 @@ public class TraceReconstructionAnalysis extends Analysis { stringBufferFilter.getDataTypeHandlers().add(new StringHandler()); // connect stages - SpScPipe.connect(null, dir2RecordsFilter.getInputPort(), 1); + dir2RecordsFilter.getInputPort().setPipe(new SingleElementPipe<File>()); SingleElementPipe.connect(dir2RecordsFilter.getOutputPort(), this.recordCounter.getInputPort()); SingleElementPipe.connect(this.recordCounter.getOutputPort(), cache.getInputPort()); SingleElementPipe.connect(cache.getOutputPort(), stringBufferFilter.getInputPort()); @@ -126,7 +126,13 @@ public class TraceReconstructionAnalysis extends Analysis { } catch (InterruptedException e) { throw new IllegalStateException(e); } + this.clockThread.interrupt(); + try { + this.clockThread.join(); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } } public List<TraceEventRecords> getElementCollection() {