Skip to content
Snippets Groups Projects
Commit ab6e659d authored by Christian Wulf's avatar Christian Wulf
Browse files

migrated TcpTraceLoggingExtAnalysis to configuration

parent 3539ee89
No related branches found
No related tags found
No related merge requests found
...@@ -21,6 +21,7 @@ import java.util.List; ...@@ -21,6 +21,7 @@ import java.util.List;
import teetime.framework.AnalysisConfiguration; import teetime.framework.AnalysisConfiguration;
import teetime.framework.IStage; import teetime.framework.IStage;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.stage.CollectorSink; import teetime.stage.CollectorSink;
...@@ -38,14 +39,16 @@ import kieker.common.record.IMonitoringRecord; ...@@ -38,14 +39,16 @@ import kieker.common.record.IMonitoringRecord;
public class RecordReaderConfiguration extends AnalysisConfiguration { public class RecordReaderConfiguration extends AnalysisConfiguration {
private final List<IMonitoringRecord> elementCollection = new LinkedList<IMonitoringRecord>(); private final List<IMonitoringRecord> elementCollection = new LinkedList<IMonitoringRecord>();
private final IPipeFactory intraThreadPipeFactory;
public RecordReaderConfiguration() { public RecordReaderConfiguration() {
intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
this.buildConfiguration(); this.buildConfiguration();
} }
private void buildConfiguration() { private void buildConfiguration() {
IStage producerPipeline = this.buildProducerPipeline(); IStage producerPipeline = this.buildProducerPipeline();
this.getFiniteProducerStages().add(producerPipeline); addThreadableStage(producerPipeline);
} }
private IStage buildProducerPipeline() { private IStage buildProducerPipeline() {
...@@ -57,11 +60,8 @@ public class RecordReaderConfiguration extends AnalysisConfiguration { ...@@ -57,11 +60,8 @@ public class RecordReaderConfiguration extends AnalysisConfiguration {
CollectorSink<IMonitoringRecord> collector = new CollectorSink<IMonitoringRecord>(this.elementCollection); CollectorSink<IMonitoringRecord> collector = new CollectorSink<IMonitoringRecord>(this.elementCollection);
// connect stages // connect stages
PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false) intraThreadPipeFactory.create(initialElementProducer.getOutputPort(), dir2RecordsFilter.getInputPort());
.create(initialElementProducer.getOutputPort(), dir2RecordsFilter.getInputPort()); intraThreadPipeFactory.create(dir2RecordsFilter.getOutputPort(), collector.getInputPort());
PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false)
.create(dir2RecordsFilter.getOutputPort(), collector.getInputPort());
return initialElementProducer; return initialElementProducer;
} }
......
...@@ -32,6 +32,7 @@ import org.junit.FixMethodOrder; ...@@ -32,6 +32,7 @@ import org.junit.FixMethodOrder;
import org.junit.Test; import org.junit.Test;
import org.junit.runners.MethodSorters; import org.junit.runners.MethodSorters;
import teetime.framework.Analysis;
import teetime.util.ListUtil; import teetime.util.ListUtil;
import teetime.util.StopWatch; import teetime.util.StopWatch;
import util.test.StatisticsUtil; import util.test.StatisticsUtil;
...@@ -63,7 +64,9 @@ public class ChwHomeTcpTraceReadingTest { ...@@ -63,7 +64,9 @@ public class ChwHomeTcpTraceReadingTest {
@Test @Test
public void performAnalysis() { public void performAnalysis() {
final TcpTraceLoggingExtAnalysis analysis = new TcpTraceLoggingExtAnalysis(); final TcpTraceLoggingExtAnalysisConfiguration configuration = new TcpTraceLoggingExtAnalysisConfiguration();
final Analysis analysis = new Analysis(configuration);
analysis.init(); analysis.init();
this.stopWatch.start(); this.stopWatch.start();
...@@ -73,11 +76,11 @@ public class ChwHomeTcpTraceReadingTest { ...@@ -73,11 +76,11 @@ public class ChwHomeTcpTraceReadingTest {
this.stopWatch.end(); this.stopWatch.end();
} }
List<Long> recordThroughputs = ListUtil.removeFirstHalfElements(analysis.getRecordThroughputs()); List<Long> recordThroughputs = ListUtil.removeFirstHalfElements(configuration.getRecordThroughputs());
Map<Double, Long> recordQuintiles = StatisticsUtil.calculateQuintiles(recordThroughputs); Map<Double, Long> recordQuintiles = StatisticsUtil.calculateQuintiles(recordThroughputs);
System.out.println("Median record throughput: " + recordQuintiles.get(0.5) + " records/time unit"); System.out.println("Median record throughput: " + recordQuintiles.get(0.5) + " records/time unit");
assertEquals("#records", EXPECTED_NUM_RECORDS, analysis.getNumRecords()); assertEquals("#records", EXPECTED_NUM_RECORDS, configuration.getNumRecords());
// 08.07.2014 (incl.) // 08.07.2014 (incl.)
assertThat(recordQuintiles.get(0.5), is(both(greaterThan(3000L)).and(lessThan(3500L)))); assertThat(recordQuintiles.get(0.5), is(both(greaterThan(3000L)).and(lessThan(3500L))));
......
...@@ -2,36 +2,47 @@ package teetime.examples.traceReading; ...@@ -2,36 +2,47 @@ package teetime.examples.traceReading;
import java.util.List; import java.util.List;
import teetime.framework.AnalysisConfiguration;
import teetime.framework.IStage; import teetime.framework.IStage;
import teetime.framework.RunnableStage; import teetime.framework.RunnableStage;
import teetime.framework.pipe.SingleElementPipe; import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.SpScPipe; import teetime.framework.pipe.PipeFactoryRegistry;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.stage.Clock; import teetime.stage.Clock;
import teetime.stage.Counter; import teetime.stage.Counter;
import teetime.stage.ElementThroughputMeasuringStage; import teetime.stage.ElementThroughputMeasuringStage;
import teetime.stage.Pipeline;
import teetime.stage.basic.Sink; import teetime.stage.basic.Sink;
import teetime.stage.basic.distributor.Distributor; import teetime.stage.basic.distributor.Distributor;
import teetime.stage.io.network.TcpReader; import teetime.stage.io.network.TcpReader;
import kieker.common.record.IMonitoringRecord; import kieker.common.record.IMonitoringRecord;
public class TcpTraceLoggingExtAnalysis { public class TcpTraceLoggingExtAnalysisConfiguration extends AnalysisConfiguration {
private Thread clockThread; private Thread clockThread;
private Thread tcpThread; private Thread tcpThread;
private Counter<IMonitoringRecord> recordCounter; private Counter<IMonitoringRecord> recordCounter;
private ElementThroughputMeasuringStage<IMonitoringRecord> recordThroughputStage; private ElementThroughputMeasuringStage<IMonitoringRecord> recordThroughputStage;
private final IPipeFactory intraThreadPipeFactory;
private final IPipeFactory interThreadPipeFactory;
private IStage buildClockPipeline(final long intervalDelayInMs) { public TcpTraceLoggingExtAnalysisConfiguration() {
intraThreadPipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
interThreadPipeFactory = PipeFactoryRegistry.INSTANCE.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false);
}
private Pipeline<Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) {
Clock clockStage = new Clock(); Clock clockStage = new Clock();
clockStage.setInitialDelayInMs(intervalDelayInMs); clockStage.setInitialDelayInMs(intervalDelayInMs);
clockStage.setIntervalDelayInMs(intervalDelayInMs); clockStage.setIntervalDelayInMs(intervalDelayInMs);
Distributor<Long> distributor = new Distributor<Long>(); Distributor<Long> distributor = new Distributor<Long>();
SingleElementPipe.connect(clockStage.getOutputPort(), distributor.getInputPort()); intraThreadPipeFactory.create(clockStage.getOutputPort(), distributor.getInputPort());
return clockStage; return new Pipeline<Distributor<Long>>(clockStage, distributor);
} }
private IStage buildTcpPipeline(final Distributor<Long> previousClockStage) { private IStage buildTcpPipeline(final Distributor<Long> previousClockStage) {
...@@ -40,38 +51,24 @@ public class TcpTraceLoggingExtAnalysis { ...@@ -40,38 +51,24 @@ public class TcpTraceLoggingExtAnalysis {
this.recordThroughputStage = new ElementThroughputMeasuringStage<IMonitoringRecord>(); this.recordThroughputStage = new ElementThroughputMeasuringStage<IMonitoringRecord>();
Sink<IMonitoringRecord> endStage = new Sink<IMonitoringRecord>(); Sink<IMonitoringRecord> endStage = new Sink<IMonitoringRecord>();
SingleElementPipe.connect(tcpReader.getOutputPort(), this.recordCounter.getInputPort()); intraThreadPipeFactory.create(tcpReader.getOutputPort(), this.recordCounter.getInputPort());
SingleElementPipe.connect(this.recordCounter.getOutputPort(), this.recordThroughputStage.getInputPort()); intraThreadPipeFactory.create(this.recordCounter.getOutputPort(), this.recordThroughputStage.getInputPort());
SingleElementPipe.connect(this.recordThroughputStage.getOutputPort(), endStage.getInputPort()); intraThreadPipeFactory.create(this.recordThroughputStage.getOutputPort(), endStage.getInputPort());
// SingleElementPipe.connect(this.recordCounter.getOutputPort(), endStage.getInputPort()); // intraThreadPipeFactory.create(this.recordCounter.getOutputPort(), endStage.getInputPort());
SpScPipe.connect(previousClockStage.getNewOutputPort(), this.recordThroughputStage.getTriggerInputPort(), 10); interThreadPipeFactory.create(previousClockStage.getNewOutputPort(), this.recordThroughputStage.getTriggerInputPort(), 10);
return tcpReader; return tcpReader;
} }
public void init() { public void init() {
IStage clockPipeline = this.buildClockPipeline(1000); Pipeline<Distributor<Long>> clockPipeline = this.buildClockPipeline(1000);
this.clockThread = new Thread(new RunnableStage(clockPipeline)); this.clockThread = new Thread(new RunnableStage(clockPipeline));
IStage tcpPipeline = this.buildTcpPipeline(clockPipeline.getLastStage()); IStage tcpPipeline = this.buildTcpPipeline(clockPipeline.getLastStage());
this.tcpThread = new Thread(new RunnableStage(tcpPipeline)); this.tcpThread = new Thread(new RunnableStage(tcpPipeline));
} }
public void start() {
this.tcpThread.start();
this.clockThread.start();
try {
this.tcpThread.join();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
this.clockThread.interrupt();
}
public int getNumRecords() { public int getNumRecords() {
return this.recordCounter.getNumElementsPassed(); return this.recordCounter.getNumElementsPassed();
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment