Skip to content
Snippets Groups Projects
Commit 0064e47f authored by Christian Wulf's avatar Christian Wulf
Browse files

refactored some tests to use the new analysis configuration concept

parent b55d8c18
No related branches found
No related tags found
No related merge requests found
Showing with 52 additions and 78 deletions
......@@ -10,12 +10,16 @@ public class Analysis {
private static final Logger LOGGER = LoggerFactory.getLogger(Analysis.class);
private Configuration configuration;
private final Configuration configuration;
private final List<Thread> consumerThreads = new LinkedList<Thread>();
private final List<Thread> finiteProducerThreads = new LinkedList<Thread>();
private final List<Thread> infiniteProducerThreads = new LinkedList<Thread>();
public Analysis(final Configuration configuration) {
this.configuration = configuration;
}
public void init() {
for (StageWithPort stage : this.configuration.getConsumerStages()) {
Thread thread = new Thread(new RunnableStage(stage));
......@@ -76,8 +80,4 @@ public class Analysis {
public Configuration getConfiguration() {
return this.configuration;
}
public void setConfiguration(final Configuration configuration) {
this.configuration = configuration;
}
}
package teetime.variant.methodcallWithPorts.stage;
import teetime.variant.methodcallWithPorts.framework.core.AbstractStage;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import teetime.variant.methodcallWithPorts.framework.core.ProducerStage;
import teetime.variant.methodcallWithPorts.framework.core.Signal;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
public class Relay<T> extends AbstractStage {
public class Relay<T> extends ProducerStage<T> {
private final InputPort<T> inputPort = this.createInputPort();
private final OutputPort<T> outputPort = this.createOutputPort();
private SpScPipe<T> cachedCastedInputPipe;
public Relay() {
this.setReschedulable(true);
}
@Override
public void executeWithPorts() {
public void execute() {
T element = this.inputPort.receive();
if (null == element) {
// if (this.getInputPort().getPipe().isClosed()) {
......@@ -48,8 +42,4 @@ public class Relay<T> extends AbstractStage {
public InputPort<T> getInputPort() {
return this.inputPort;
}
public OutputPort<T> getOutputPort() {
return this.outputPort;
}
}
......@@ -24,6 +24,7 @@ import org.junit.Before;
import org.junit.Test;
import teetime.util.StopWatch;
import teetime.variant.methodcallWithPorts.framework.core.Analysis;
import kieker.common.record.IMonitoringRecord;
import kieker.common.record.controlflow.OperationExecutionRecord;
......@@ -51,7 +52,10 @@ public class RecordReaderAnalysisTest {
@Test
public void performAnalysis() {
final RecordReaderAnalysis analysis = new RecordReaderAnalysis();
final RecordReaderConfiguration configuration = new RecordReaderConfiguration();
configuration.buildConfiguration();
Analysis analysis = new Analysis(configuration);
analysis.init();
this.stopWatch.start();
......@@ -61,19 +65,19 @@ public class RecordReaderAnalysisTest {
this.stopWatch.end();
}
assertEquals(6541, analysis.getElementCollection().size());
assertEquals(6541, configuration.getElementCollection().size());
KiekerMetadataRecord metadataRecord = (KiekerMetadataRecord) analysis.getElementCollection().get(0);
KiekerMetadataRecord metadataRecord = (KiekerMetadataRecord) configuration.getElementCollection().get(0);
assertEquals("1.9-SNAPSHOT", metadataRecord.getVersion());
assertEquals("NANOSECONDS", metadataRecord.getTimeUnit());
IMonitoringRecord monitoringRecord = analysis.getElementCollection().get(1);
IMonitoringRecord monitoringRecord = configuration.getElementCollection().get(1);
OperationExecutionRecord oer = (OperationExecutionRecord) monitoringRecord;
assertEquals("bookstoreTracing.Catalog.getBook(boolean)", oer.getOperationSignature());
assertEquals(1283156498771185344l, oer.getTin());
assertEquals(1283156498773323582l, oer.getTout());
monitoringRecord = analysis.getElementCollection().get(analysis.getElementCollection().size() - 1);
monitoringRecord = configuration.getElementCollection().get(configuration.getElementCollection().size() - 1);
oer = (OperationExecutionRecord) monitoringRecord;
assertEquals("bookstoreTracing.Bookstore.searchBook()", oer.getOperationSignature());
assertEquals(1283156499331233504l, oer.getTin());
......
......@@ -19,7 +19,6 @@ import java.io.File;
import java.util.LinkedList;
import java.util.List;
import teetime.variant.methodcallWithPorts.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.Configuration;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
......@@ -36,42 +35,28 @@ import kieker.common.record.IMonitoringRecord;
*
* @since 1.10
*/
public class RecordReaderAnalysis extends Analysis {
public class RecordReaderConfiguration extends Configuration {
private final List<IMonitoringRecord> elementCollection = new LinkedList<IMonitoringRecord>();
private ClassNameRegistryRepository classNameRegistryRepository;
@Override
public void init() {
Configuration configuration = this.buildConfiguration();
this.setConfiguration(configuration);
super.init();
}
private Configuration buildConfiguration() {
Configuration localConfiguration = new Configuration();
public void buildConfiguration() {
StageWithPort producerPipeline = this.buildProducerPipeline();
localConfiguration.getFiniteProducerStages().add(producerPipeline);
return localConfiguration;
this.getFiniteProducerStages().add(producerPipeline);
}
private StageWithPort buildProducerPipeline() {
this.classNameRegistryRepository = new ClassNameRegistryRepository();
ClassNameRegistryRepository classNameRegistryRepository = new ClassNameRegistryRepository();
// create stages
Dir2RecordsFilter dir2RecordsFilter = new Dir2RecordsFilter(this.classNameRegistryRepository);
Dir2RecordsFilter dir2RecordsFilter = new Dir2RecordsFilter(classNameRegistryRepository);
CollectorSink<IMonitoringRecord> collector = new CollectorSink<IMonitoringRecord>(this.elementCollection);
final Pipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>> pipeline = new Pipeline<Dir2RecordsFilter, CollectorSink<IMonitoringRecord>>();
pipeline.setFirstStage(dir2RecordsFilter);
pipeline.setLastStage(collector);
dir2RecordsFilter.getInputPort().setPipe(new SpScPipe<File>(1));
SingleElementPipe.connect(dir2RecordsFilter.getOutputPort(), collector.getInputPort());
dir2RecordsFilter.getInputPort().setPipe(new SpScPipe<File>(1));
dir2RecordsFilter.getInputPort().getPipe().add(new File("src/test/data/bookstore-logs"));
return pipeline;
......
......@@ -35,6 +35,7 @@ import org.junit.runners.MethodSorters;
import teetime.util.ListUtil;
import teetime.util.StatisticsUtil;
import teetime.util.StopWatch;
import teetime.variant.methodcallWithPorts.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import kieker.common.record.IMonitoringRecord;
......@@ -80,8 +81,11 @@ public class ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest {
}
void performAnalysis(final int numWorkerThreads) {
final TcpTraceReconstructionAnalysisWithThreads analysis = new TcpTraceReconstructionAnalysisWithThreads();
analysis.setNumWorkerThreads(numWorkerThreads);
final TcpTraceReconstructionAnalysisWithThreadsConfiguration configuration = new TcpTraceReconstructionAnalysisWithThreadsConfiguration();
configuration.setNumWorkerThreads(numWorkerThreads);
configuration.buildConfiguration();
Analysis analysis = new Analysis(configuration);
analysis.init();
this.stopWatch.start();
......@@ -92,7 +96,7 @@ public class ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest {
}
int maxNumWaits = 0;
for (SpScPipe<IMonitoringRecord> pipe : analysis.getTcpRelayPipes()) {
for (SpScPipe<IMonitoringRecord> pipe : configuration.getTcpRelayPipes()) {
maxNumWaits = Math.max(maxNumWaits, pipe.getNumWaits());
}
System.out.println("max #waits of TcpRelayPipes: " + maxNumWaits);
......@@ -106,7 +110,7 @@ public class ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest {
// Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceDelays());
// System.out.println("Median trace delay: " + traceQuintiles.get(0.5) + " time units/trace");
List<Long> recordThroughputs = ListUtil.removeFirstHalfElements(analysis.getRecordThroughputs());
List<Long> recordThroughputs = ListUtil.removeFirstHalfElements(configuration.getRecordThroughputs());
Map<Double, Long> recordQuintiles = StatisticsUtil.calculateQuintiles(recordThroughputs);
System.out.println("Median record throughput: " + recordQuintiles.get(0.5) + " elements/time unit");
......@@ -120,10 +124,10 @@ public class ChwHomeTcpTraceReconstructionAnalysisWithThreadsTest {
// TraceEventRecords trace6886 = analysis.getElementCollection().get(1);
// assertEquals(6886, trace6886.getTraceMetadata().getTraceId());
assertEquals("#records", EXPECTED_NUM_RECORDS, analysis.getNumRecords());
assertEquals("#traces", EXPECTED_NUM_TRACES, analysis.getNumTraces());
assertEquals("#records", EXPECTED_NUM_RECORDS, configuration.getNumRecords());
assertEquals("#traces", EXPECTED_NUM_TRACES, configuration.getNumTraces());
for (Integer count : analysis.getNumTraceMetadatas()) {
for (Integer count : configuration.getNumTraceMetadatas()) {
assertEquals("#traceMetadata per worker thread", EXPECTED_NUM_TRACES / numWorkerThreads, count.intValue()); // even distribution
}
......
......@@ -30,6 +30,7 @@ import org.junit.runners.MethodSorters;
import teetime.util.ListUtil;
import teetime.util.StatisticsUtil;
import teetime.util.StopWatch;
import teetime.variant.methodcallWithPorts.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import kieker.common.record.IMonitoringRecord;
......@@ -88,8 +89,11 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest {
// Duration: 22373 ms
void performAnalysis(final int numWorkerThreads) {
final TcpTraceReconstructionAnalysisWithThreads analysis = new TcpTraceReconstructionAnalysisWithThreads();
analysis.setNumWorkerThreads(numWorkerThreads);
final TcpTraceReconstructionAnalysisWithThreadsConfiguration configuration = new TcpTraceReconstructionAnalysisWithThreadsConfiguration();
configuration.setNumWorkerThreads(numWorkerThreads);
configuration.buildConfiguration();
Analysis analysis = new Analysis(configuration);
analysis.init();
this.stopWatch.start();
......@@ -100,7 +104,7 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest {
}
int maxNumWaits = 0;
for (SpScPipe<IMonitoringRecord> pipe : analysis.getTcpRelayPipes()) {
for (SpScPipe<IMonitoringRecord> pipe : configuration.getTcpRelayPipes()) {
maxNumWaits = Math.max(maxNumWaits, pipe.getNumWaits());
}
System.out.println("max #waits of TcpRelayPipes: " + maxNumWaits);
......@@ -114,7 +118,7 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest {
// Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(analysis.getTraceDelays());
// System.out.println("Median trace delay: " + traceQuintiles.get(0.5) + " time units/trace");
List<Long> traceThroughputs = ListUtil.removeFirstHalfElements(analysis.getTraceThroughputs());
List<Long> traceThroughputs = ListUtil.removeFirstHalfElements(configuration.getTraceThroughputs());
Map<Double, Long> traceQuintiles = StatisticsUtil.calculateQuintiles(traceThroughputs);
System.out.println("Median trace throughput: " + traceQuintiles.get(0.5) + " traces/time unit");
......@@ -124,13 +128,13 @@ public class ChwWorkTcpTraceReconstructionAnalysisWithThreadsTest {
// TraceEventRecords trace6886 = analysis.getElementCollection().get(1);
// assertEquals(6886, trace6886.getTraceMetadata().getTraceId());
assertEquals("#records", 21000001, analysis.getNumRecords());
assertEquals("#records", 21000001, configuration.getNumRecords());
for (Integer count : analysis.getNumTraceMetadatas()) {
for (Integer count : configuration.getNumTraceMetadatas()) {
assertEquals("#traceMetadata per worker thread", EXPECTED_NUM_TRACES / numWorkerThreads, count.intValue()); // even distribution
}
assertEquals("#traces", EXPECTED_NUM_TRACES, analysis.getNumTraces());
assertEquals("#traces", EXPECTED_NUM_TRACES, configuration.getNumTraces());
}
public static void main(final String[] args) {
......
......@@ -8,7 +8,6 @@ import java.util.List;
import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
import teetime.util.concurrent.hashmap.TraceBuffer;
import teetime.variant.methodcallWithPorts.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.Configuration;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
......@@ -31,7 +30,7 @@ import kieker.common.record.IMonitoringRecord;
import kieker.common.record.flow.IFlowRecord;
import kieker.common.record.flow.trace.TraceMetadata;
public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
public class TcpTraceReconstructionAnalysisWithThreadsConfiguration extends Configuration {
private static final int NUM_VIRTUAL_CORES = Runtime.getRuntime().availableProcessors();
private static final int MIO = 1000000;
......@@ -54,7 +53,7 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
private final List<SpScPipe<IMonitoringRecord>> tcpRelayPipes = new LinkedList<SpScPipe<IMonitoringRecord>>();
@SuppressWarnings({ "rawtypes", "unchecked" })
public TcpTraceReconstructionAnalysisWithThreads() {
public TcpTraceReconstructionAnalysisWithThreadsConfiguration() {
super();
try {
......@@ -72,33 +71,21 @@ public class TcpTraceReconstructionAnalysisWithThreads extends Analysis {
}
}
@Override
public void init() {
Configuration configuration = this.buildConfiguration();
this.setConfiguration(configuration);
super.init();
}
private Configuration buildConfiguration() {
Configuration localConfiguration = new Configuration();
public void buildConfiguration() {
final Pipeline<TCPReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline();
localConfiguration.getFiniteProducerStages().add(tcpPipeline);
this.getFiniteProducerStages().add(tcpPipeline);
final Pipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000);
localConfiguration.getInfiniteProducerStages().add(clockStage);
this.getInfiniteProducerStages().add(clockStage);
final Pipeline<Clock, Distributor<Long>> clock2Stage = this.buildClockPipeline(2000);
localConfiguration.getInfiniteProducerStages().add(clock2Stage);
this.getInfiniteProducerStages().add(clock2Stage);
this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads);
for (int i = 0; i < this.numWorkerThreads; i++) {
StageWithPort pipeline = this.buildPipeline(tcpPipeline.getLastStage(), clockStage.getLastStage(), clock2Stage.getLastStage());
localConfiguration.getConsumerStages().add(pipeline);
this.getConsumerStages().add(pipeline);
}
return localConfiguration;
}
private Pipeline<TCPReader, Distributor<IMonitoringRecord>> buildTcpPipeline() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment