From 4539b5ccfce91beb3d1d9de0b9f0b6303fe21fa5 Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Fri, 4 Jul 2014 12:32:49 +0200
Subject: [PATCH] fixed test
---
conf/logging.properties | 10 +++----
.../framework/core/pipe/SpScPipe.java | 6 ++---
.../methodcallWithPorts/stage/Cache.java | 4 +--
.../stage/CollectorSink.java | 12 ++++++---
.../ElementThroughputMeasuringStage.java | 19 +++++++++++---
.../stage/FileExtensionSwitch.java | 22 ++++++++++++++++
.../stage/basic/merger/Merger.java | 26 +++++++++++++++++++
.../fileToRecord/DatFile2RecordFilter.java | 4 +--
...hwWorkTraceReconstructionAnalysisTest.java | 23 +++++++++++++---
.../TraceReconstructionAnalysis.java | 14 +++++++---
10 files changed, 112 insertions(+), 28 deletions(-)
diff --git a/conf/logging.properties b/conf/logging.properties
index 7e34493d..d8c2b84d 100644
--- a/conf/logging.properties
+++ b/conf/logging.properties
@@ -1,13 +1,13 @@
.handlers = java.util.logging.ConsoleHandler
-.level = ALL
+.level = WARNING
java.util.logging.ConsoleHandler.level = ALL
#java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %4$s: %5$s (%2$s)%n
-teetime.level = ALL
+#teetime.level = ALL
-teetime.variant.methodcallWithPorts.framework.level = ALL
-teetime.variant.methodcallWithPorts.framework.core.level = ALL
-teetime.variant.methodcallWithPorts.stage.level = FINE
+#teetime.variant.methodcallWithPorts.framework.level = ALL
+#teetime.variant.methodcallWithPorts.framework.core.level = ALL
+#teetime.variant.methodcallWithPorts.stage.level = FINE
#teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads.level = FINE
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java
index ed380b19..a805089e 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/SpScPipe.java
@@ -20,10 +20,8 @@ public class SpScPipe<T> extends AbstractPipe<T> {
public static <T> SpScPipe<T> connect(final OutputPort<T> sourcePort, final InputPort<T> targetPort, final int capacity) {
SpScPipe<T> pipe = new SpScPipe<T>(capacity);
targetPort.setPipe(pipe);
- if (sourcePort != null) {
- sourcePort.setPipe(pipe);
- sourcePort.setCachedTargetStage(targetPort.getOwningStage());
- }
+ sourcePort.setPipe(pipe);
+ sourcePort.setCachedTargetStage(targetPort.getOwningStage());
return pipe;
}
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/Cache.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/Cache.java
index 44213e29..b7904b19 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/Cache.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/Cache.java
@@ -18,14 +18,14 @@ public class Cache<T> extends ConsumerStage<T, T> {
@Override
public void onIsPipelineHead() {
- this.logger.debug("Emitting cached elements...");
+ this.logger.debug("Emitting " + this.cachedObjects.size() + " cached elements...");
StopWatch stopWatch = new StopWatch();
stopWatch.start();
for (T cachedElement : this.cachedObjects) {
this.send(cachedElement);
}
stopWatch.end();
- System.out.println("dur: " + TimeUnit.NANOSECONDS.toMillis(stopWatch.getDurationInNs()) + " ms");
+ this.logger.debug("Emitting took " + TimeUnit.NANOSECONDS.toMillis(stopWatch.getDurationInNs()) + " ms");
super.onIsPipelineHead();
}
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/CollectorSink.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/CollectorSink.java
index 5e643e68..b172d211 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/CollectorSink.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/CollectorSink.java
@@ -27,12 +27,16 @@ import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
*/
public class CollectorSink<T> extends ConsumerStage<T, Void> {
- private static final int THRESHOLD = 10000; // TODO make configurable or use an sysout stage instead
-
private final List<T> elements;
+ private final int threshold;
- public CollectorSink(final List<T> list) {
+ public CollectorSink(final List<T> list, final int threshold) {
this.elements = list;
+ this.threshold = threshold;
+ }
+
+ public CollectorSink(final List<T> list) {
+ this(list, 100000);
}
@Override
@@ -50,7 +54,7 @@ public class CollectorSink<T> extends ConsumerStage<T, Void> {
protected void execute5(final T element) {
this.elements.add(element);
- if ((this.elements.size() % THRESHOLD) == 0) {
+ if ((this.elements.size() % this.threshold) == 0) {
System.out.println("size: " + this.elements.size());
}
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java
index 3f5c82ac..fd2bdfc1 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/ElementThroughputMeasuringStage.java
@@ -35,13 +35,24 @@ public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T, T> {
private void computeElementThroughput(final Long timestampInNs) {
long diffInNs = timestampInNs - this.lastTimestampInNs;
- long diffInSec = TimeUnit.NANOSECONDS.toSeconds(diffInNs);
- if (diffInSec > 0) {
- long throughputPerSec = this.numPassedElements / diffInSec;
+
+ // BETTER use the TimeUnit of the clock
+ long diffInMs = TimeUnit.NANOSECONDS.toMillis(diffInNs);
+ if (diffInMs > 0) {
+ long throughputPerSec = this.numPassedElements / diffInMs;
this.throughputs.add(throughputPerSec);
- this.logger.info("Throughput: " + throughputPerSec + " elements/s" + " -> numPassedElements=" + this.numPassedElements);
+ this.logger.info("Throughput: " + throughputPerSec + " elements/ms" + " -> numPassedElements=" + this.numPassedElements);
this.resetTimestamp(timestampInNs);
+ } else {
+ long diffInSec = TimeUnit.NANOSECONDS.toSeconds(diffInNs);
+ if (diffInSec > 0) {
+ long throughputPerSec = this.numPassedElements / diffInSec;
+ this.throughputs.add(throughputPerSec);
+ this.logger.info("Throughput: " + throughputPerSec + " elements/s" + " -> numPassedElements=" + this.numPassedElements);
+
+ this.resetTimestamp(timestampInNs);
+ }
}
}
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/FileExtensionSwitch.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/FileExtensionSwitch.java
index 5454e532..044a2ce0 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/FileExtensionSwitch.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/FileExtensionSwitch.java
@@ -5,12 +5,16 @@ import java.util.HashMap;
import java.util.Map;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
+import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
+import teetime.variant.methodcallWithPorts.framework.core.Signal;
import com.google.common.io.Files;
public class FileExtensionSwitch extends ConsumerStage<File, File> {
+ // BETTER do not extends from AbstractStage since it provides another unused output port
+
private final Map<String, OutputPort<File>> fileExtensions = new HashMap<String, OutputPort<File>>();
@Override
@@ -23,6 +27,24 @@ public class FileExtensionSwitch extends ConsumerStage<File, File> {
}
}
+ @Override
+ public void onSignal(final Signal signal, final InputPort<?> inputPort) {
+ this.logger.debug("Got signal: " + signal + " from input port: " + inputPort);
+
+ switch (signal) {
+ case FINISHED:
+ this.onFinished();
+ break;
+ default:
+ this.logger.warn("Aborted sending signal " + signal + ". Reason: Unknown signal.");
+ break;
+ }
+
+ for (OutputPort<File> op : this.fileExtensions.values()) {
+ op.sendSignal(signal);
+ }
+ }
+
public OutputPort<File> addFileExtension(String fileExtension) {
if (fileExtension.startsWith(".")) {
fileExtension = fileExtension.substring(1);
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java
index 70ec2387..99b69911 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/basic/merger/Merger.java
@@ -21,6 +21,7 @@ import java.util.List;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
+import teetime.variant.methodcallWithPorts.framework.core.Signal;
/**
*
@@ -40,6 +41,8 @@ public class Merger<T> extends ConsumerStage<T, T> {
// BETTER use an array since a list always creates a new iterator when looping
private final List<InputPort<T>> inputPortList = new ArrayList<InputPort<T>>();
+ private int finishedInputPorts;
+
private IMergerStrategy<T> strategy = new RoundRobinStrategy<T>();
public IMergerStrategy<T> getStrategy() {
@@ -65,6 +68,29 @@ public class Merger<T> extends ConsumerStage<T, T> {
this.setReschedulable(isReschedulable);
}
+ @Override
+ public void onSignal(final Signal signal, final InputPort<?> inputPort) {
+ this.logger.debug("Got signal: " + signal + " from input port: " + inputPort);
+
+ switch (signal) {
+ case FINISHED:
+ this.onFinished();
+ break;
+ default:
+ this.logger.warn("Aborted sending signal " + signal + ". Reason: Unknown signal.");
+ break;
+ }
+
+ if (this.finishedInputPorts == this.inputPortList.size()) {
+ this.getOutputPort().sendSignal(signal);
+ }
+ }
+
+ @Override
+ protected void onFinished() {
+ this.finishedInputPorts++;
+ }
+
@Override
protected void execute5(final T element) {
final T token = this.strategy.getNextInput(this);
diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/DatFile2RecordFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/DatFile2RecordFilter.java
index 2660c06c..116e8b49 100644
--- a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/DatFile2RecordFilter.java
+++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/DatFile2RecordFilter.java
@@ -18,7 +18,7 @@ package teetime.variant.methodcallWithPorts.stage.kieker.fileToRecord;
import java.io.File;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
-import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
+import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.stage.io.File2TextLinesFilter;
import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository;
import teetime.variant.methodcallWithPorts.stage.kieker.fileToRecord.textLine.TextLine2RecordFilter;
@@ -40,6 +40,6 @@ public class DatFile2RecordFilter extends Pipeline<File, IMonitoringRecord> {
this.setLastStage(textLine2RecordFilter);
// BETTER let the framework choose the optimal pipe implementation
- SpScPipe.connect(file2TextLinesFilter.getOutputPort(), textLine2RecordFilter.getInputPort(), 1);
+ SingleElementPipe.connect(file2TextLinesFilter.getOutputPort(), textLine2RecordFilter.getInputPort());
}
}
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTraceReconstructionAnalysisTest.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTraceReconstructionAnalysisTest.java
index 82f3e628..1bfbc094 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTraceReconstructionAnalysisTest.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/ChwWorkTraceReconstructionAnalysisTest.java
@@ -23,6 +23,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import java.io.File;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -78,8 +80,9 @@ public class ChwWorkTraceReconstructionAnalysisTest {
TraceEventRecords trace6886 = analysis.getElementCollection().get(1);
assertEquals(6886, trace6886.getTraceMetadata().getTraceId());
+ this.removeFirstZeroThroughputs(analysis);
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
- System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element");
+ System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/time unit");
}
@Test
@@ -105,8 +108,9 @@ public class ChwWorkTraceReconstructionAnalysisTest {
TraceEventRecords trace1 = analysis.getElementCollection().get(1);
assertEquals(8974347286117089281l, trace1.getTraceMetadata().getTraceId());
+ this.removeFirstZeroThroughputs(analysis);
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
- System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element");
+ System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/time unit");
assertThat(quintiles.get(0.5), is(both(greaterThan(1100l)).and(lessThan(1400l))));
}
@@ -134,8 +138,21 @@ public class ChwWorkTraceReconstructionAnalysisTest {
TraceEventRecords trace1 = analysis.getElementCollection().get(1);
assertEquals(1, trace1.getTraceMetadata().getTraceId());
+ this.removeFirstZeroThroughputs(analysis);
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(analysis.getThroughputs());
- System.out.println("Median throughput: " + quintiles.get(0.5) + " time units/element");
+ System.out.println("Median throughput: " + quintiles.get(0.5) + " elements/time unit");
+ }
+
+ private void removeFirstZeroThroughputs(final TraceReconstructionAnalysis analysis) {
+ List<Long> throughputs = analysis.getThroughputs();
+ Iterator<Long> iterator = throughputs.iterator();
+ while (iterator.hasNext()) {
+ if (iterator.next() == 0) {
+ iterator.remove();
+ } else {
+ break;
+ }
+ }
}
}
diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java
index 8a13ff2d..c85f066c 100644
--- a/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java
+++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/traceReconstruction/TraceReconstructionAnalysis.java
@@ -50,10 +50,10 @@ public class TraceReconstructionAnalysis extends Analysis {
public void init() {
super.init();
StageWithPort<Void, Long> clockStage = this.buildClockPipeline();
- this.clockThread = new Thread(new RunnableStage(clockStage));
+ this.clockThread = new Thread(new RunnableStage<Void>(clockStage));
- Pipeline<?, ?> pipeline = this.buildPipeline(clockStage);
- this.workerThread = new Thread(new RunnableStage(pipeline));
+ Pipeline<File, ?> pipeline = this.buildPipeline(clockStage);
+ this.workerThread = new Thread(new RunnableStage<File>(pipeline));
}
private StageWithPort<Void, Long> buildClockPipeline() {
@@ -84,7 +84,7 @@ public class TraceReconstructionAnalysis extends Analysis {
stringBufferFilter.getDataTypeHandlers().add(new StringHandler());
// connect stages
- SpScPipe.connect(null, dir2RecordsFilter.getInputPort(), 1);
+ dir2RecordsFilter.getInputPort().setPipe(new SingleElementPipe<File>());
SingleElementPipe.connect(dir2RecordsFilter.getOutputPort(), this.recordCounter.getInputPort());
SingleElementPipe.connect(this.recordCounter.getOutputPort(), cache.getInputPort());
SingleElementPipe.connect(cache.getOutputPort(), stringBufferFilter.getInputPort());
@@ -126,7 +126,13 @@ public class TraceReconstructionAnalysis extends Analysis {
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
+
this.clockThread.interrupt();
+ try {
+ this.clockThread.join();
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(e);
+ }
}
public List<TraceEventRecords> getElementCollection() {
--
GitLab