Skip to content
Snippets Groups Projects
Commit 933b8786 authored by Christian Wulf's avatar Christian Wulf
Browse files

added IStage.setSchedulable();

added analysis to Experiment1
parent 439fa32f
No related branches found
No related tags found
No related merge requests found
Showing
with 179 additions and 81 deletions
...@@ -29,6 +29,7 @@ import teetime.framework.concurrent.WorkerThread; ...@@ -29,6 +29,7 @@ import teetime.framework.concurrent.WorkerThread;
import teetime.framework.core.Analysis; import teetime.framework.core.Analysis;
import teetime.framework.core.IStage; import teetime.framework.core.IStage;
import teetime.framework.core.Pipeline; import teetime.framework.core.Pipeline;
import teetime.framework.sequential.MethodCallPipe;
import teetime.framework.sequential.QueuePipe; import teetime.framework.sequential.QueuePipe;
import teetime.stage.NoopFilter; import teetime.stage.NoopFilter;
import teetime.util.StatisticsUtil; import teetime.util.StatisticsUtil;
...@@ -55,7 +56,7 @@ public class Experiment1 { ...@@ -55,7 +56,7 @@ public class Experiment1 {
private static final int NUMBER_OF_MAXIMAL_FILTERS = 1000; private static final int NUMBER_OF_MAXIMAL_FILTERS = 1000;
private static final int NUMBER_OF_FILTERS_PER_STEP = 50; private static final int NUMBER_OF_FILTERS_PER_STEP = 50;
private static final IAnalysis[] analyses = { new TeeTimeAnalysis(), new KiekerAnalysis() }; private static final IAnalysis[] analyses = { new TeeTimeMethodCallAnalysis(), new TeeTimeAnalysis(), new KiekerAnalysis() };
private static final List<Long> measuredTimes = new ArrayList<Long>(); private static final List<Long> measuredTimes = new ArrayList<Long>();
...@@ -122,6 +123,74 @@ public class Experiment1 { ...@@ -122,6 +123,74 @@ public class Experiment1 {
} }
private static final class TeeTimeMethodCallAnalysis extends Analysis implements IAnalysis {
private static final int SECONDS = 1000;
private Pipeline pipeline;
private WorkerThread workerThread;
public TeeTimeMethodCallAnalysis() {}
@Override
public void initialize(final int numberOfFilters, final int numberOfObjectsToSend) {
@SuppressWarnings("unchecked")
final NoopFilter<Object>[] noopFilters = new NoopFilter[numberOfFilters];
// create stages
final teetime.stage.basic.ObjectProducer<Object> objectProducer = new teetime.stage.basic.ObjectProducer<Object>(
numberOfObjectsToSend, new Callable<Object>() {
@Override
public Object call() throws Exception {
return new Object();
}
});
for (int i = 0; i < noopFilters.length; i++) {
noopFilters[i] = new NoopFilter<Object>();
noopFilters[i].setSchedulable(false);
}
// add each stage to a stage list
final List<IStage> startStages = new LinkedList<IStage>();
startStages.add(objectProducer);
final List<IStage> stages = new LinkedList<IStage>();
stages.add(objectProducer);
stages.addAll(Arrays.asList(noopFilters));
// connect stages by pipes
MethodCallPipe.connect(objectProducer.outputPort, noopFilters[0].inputPort);
for (int i = 1; i < noopFilters.length; i++) {
MethodCallPipe.connect(noopFilters[i - 1].outputPort, noopFilters[i].inputPort);
}
this.pipeline = new Pipeline();
this.pipeline.setStartStages(startStages);
this.pipeline.setStages(stages);
this.workerThread = new WorkerThread(this.pipeline, 0);
this.workerThread.setTerminationPolicy(StageTerminationPolicy.TERMINATE_STAGE_AFTER_UNSUCCESSFUL_EXECUTION);
}
@Override
public String getName() {
return "TeeTimeMethodCall";
}
@Override
public void execute() {
super.start();
this.workerThread.start();
try {
this.workerThread.join(60 * SECONDS);
} catch (final InterruptedException e) {
e.printStackTrace();
}
}
}
private static final class TeeTimeAnalysis extends Analysis implements IAnalysis { private static final class TeeTimeAnalysis extends Analysis implements IAnalysis {
private static final int SECONDS = 1000; private static final int SECONDS = 1000;
...@@ -167,7 +236,7 @@ public class Experiment1 { ...@@ -167,7 +236,7 @@ public class Experiment1 {
this.pipeline.setStages(stages); this.pipeline.setStages(stages);
this.workerThread = new WorkerThread(this.pipeline, 0); this.workerThread = new WorkerThread(this.pipeline, 0);
this.workerThread.terminate(StageTerminationPolicy.TERMINATE_STAGE_AFTER_UNSUCCESSFUL_EXECUTION); this.workerThread.setTerminationPolicy(StageTerminationPolicy.TERMINATE_STAGE_AFTER_UNSUCCESSFUL_EXECUTION);
} }
@Override @Override
......
...@@ -70,7 +70,7 @@ public class ConcurrentCountWordsAnalysis extends Analysis { ...@@ -70,7 +70,7 @@ public class ConcurrentCountWordsAnalysis extends Analysis {
final Distributor<File> distributor = (Distributor<File>) readerThreadPipeline.getStages().get(readerThreadPipeline.getStages().size() - 1); final Distributor<File> distributor = (Distributor<File>) readerThreadPipeline.getStages().get(readerThreadPipeline.getStages().size() - 1);
this.ioThreads[0] = new WorkerThread(readerThreadPipeline, 1); this.ioThreads[0] = new WorkerThread(readerThreadPipeline, 1);
this.ioThreads[0].setName("startThread"); this.ioThreads[0].setName("startThread");
this.ioThreads[0].terminate(StageTerminationPolicy.TERMINATE_STAGE_AFTER_UNSUCCESSFUL_EXECUTION); this.ioThreads[0].setTerminationPolicy(StageTerminationPolicy.TERMINATE_STAGE_AFTER_UNSUCCESSFUL_EXECUTION);
final IPipeline printingThreadPipeline = this.printingThreadPipeline(); final IPipeline printingThreadPipeline = this.printingThreadPipeline();
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
......
...@@ -61,7 +61,7 @@ public class QueuedCountWordsAnalysis extends Analysis { ...@@ -61,7 +61,7 @@ public class QueuedCountWordsAnalysis extends Analysis {
public void start() { public void start() {
super.start(); super.start();
this.workerThread.terminate(StageTerminationPolicy.TERMINATE_STAGE_AFTER_UNSUCCESSFUL_EXECUTION); this.workerThread.setTerminationPolicy(StageTerminationPolicy.TERMINATE_STAGE_AFTER_UNSUCCESSFUL_EXECUTION);
this.workerThread.start(); this.workerThread.start();
try { try {
......
...@@ -19,7 +19,6 @@ import java.io.File; ...@@ -19,7 +19,6 @@ import java.io.File;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import kieker.common.record.IMonitoringRecord;
import teetime.framework.concurrent.StageTerminationPolicy; import teetime.framework.concurrent.StageTerminationPolicy;
import teetime.framework.concurrent.WorkerThread; import teetime.framework.concurrent.WorkerThread;
import teetime.framework.core.AbstractFilter; import teetime.framework.core.AbstractFilter;
...@@ -38,6 +37,8 @@ import teetime.stage.CollectorSink; ...@@ -38,6 +37,8 @@ import teetime.stage.CollectorSink;
import teetime.stage.kieker.File2RecordFilter; import teetime.stage.kieker.File2RecordFilter;
import teetime.stage.kieker.className.ClassNameRegistryRepository; import teetime.stage.kieker.className.ClassNameRegistryRepository;
import kieker.common.record.IMonitoringRecord;
/** /**
* @author Christian Wulf * @author Christian Wulf
* *
...@@ -65,7 +66,7 @@ public class RecordReaderAnalysis extends Analysis { ...@@ -65,7 +66,7 @@ public class RecordReaderAnalysis extends Analysis {
public void start() { public void start() {
super.start(); super.start();
this.workerThread.terminate(StageTerminationPolicy.TERMINATE_STAGE_AFTER_UNSUCCESSFUL_EXECUTION); this.workerThread.setTerminationPolicy(StageTerminationPolicy.TERMINATE_STAGE_AFTER_UNSUCCESSFUL_EXECUTION);
this.workerThread.start(); this.workerThread.start();
try { try {
......
...@@ -53,7 +53,7 @@ public class ThroughputAnalysis<T> extends Analysis { ...@@ -53,7 +53,7 @@ public class ThroughputAnalysis<T> extends Analysis {
final IPipeline pipeline = this.buildPipeline(this.numNoopFilters); final IPipeline pipeline = this.buildPipeline(this.numNoopFilters);
this.workerThread = new WorkerThread(pipeline, 0); this.workerThread = new WorkerThread(pipeline, 0);
this.workerThread.terminate(StageTerminationPolicy.TERMINATE_STAGE_AFTER_UNSUCCESSFUL_EXECUTION); this.workerThread.setTerminationPolicy(StageTerminationPolicy.TERMINATE_STAGE_AFTER_UNSUCCESSFUL_EXECUTION);
} }
/** /**
......
...@@ -63,7 +63,7 @@ public class ThroughputTimestampAnalysis extends Analysis { ...@@ -63,7 +63,7 @@ public class ThroughputTimestampAnalysis extends Analysis {
final IPipeline pipeline = this.buildPipeline(this.numNoopFilters); final IPipeline pipeline = this.buildPipeline(this.numNoopFilters);
this.workerThread = new WorkerThread(pipeline, 0); this.workerThread = new WorkerThread(pipeline, 0);
this.workerThread.terminate(StageTerminationPolicy.TERMINATE_STAGE_AFTER_UNSUCCESSFUL_EXECUTION); this.workerThread.setTerminationPolicy(StageTerminationPolicy.TERMINATE_STAGE_AFTER_UNSUCCESSFUL_EXECUTION);
} }
/** /**
...@@ -89,12 +89,12 @@ public class ThroughputTimestampAnalysis extends Analysis { ...@@ -89,12 +89,12 @@ public class ThroughputTimestampAnalysis extends Analysis {
final List<IStage> stages = new LinkedList<IStage>(); final List<IStage> stages = new LinkedList<IStage>();
stages.add(objectProducer); stages.add(objectProducer);
if (this.shouldUseQueue) {
stages.add(startTimestampFilter); stages.add(startTimestampFilter);
stages.addAll(Arrays.asList(noopFilters)); stages.addAll(Arrays.asList(noopFilters));
stages.add(stopTimestampFilter); stages.add(stopTimestampFilter);
stages.add(collectorSink); stages.add(collectorSink);
if (this.shouldUseQueue) {
// connect stages by pipes // connect stages by pipes
QueuePipe.connect(objectProducer.outputPort, startTimestampFilter.inputPort); QueuePipe.connect(objectProducer.outputPort, startTimestampFilter.inputPort);
QueuePipe.connect(startTimestampFilter.outputPort, noopFilters[0].inputPort); QueuePipe.connect(startTimestampFilter.outputPort, noopFilters[0].inputPort);
...@@ -104,6 +104,12 @@ public class ThroughputTimestampAnalysis extends Analysis { ...@@ -104,6 +104,12 @@ public class ThroughputTimestampAnalysis extends Analysis {
QueuePipe.connect(noopFilters[noopFilters.length - 1].outputPort, stopTimestampFilter.inputPort); QueuePipe.connect(noopFilters[noopFilters.length - 1].outputPort, stopTimestampFilter.inputPort);
QueuePipe.connect(stopTimestampFilter.outputPort, collectorSink.objectInputPort); QueuePipe.connect(stopTimestampFilter.outputPort, collectorSink.objectInputPort);
} else { } else {
startTimestampFilter.setSchedulable(false);
for (NoopFilter<TimestampObject> noopFilter : noopFilters) {
noopFilter.setSchedulable(false);
}
stopTimestampFilter.setSchedulable(false);
collectorSink.setSchedulable(false);
// connect stages by pipes // connect stages by pipes
MethodCallPipe.connect(objectProducer.outputPort, startTimestampFilter.inputPort); MethodCallPipe.connect(objectProducer.outputPort, startTimestampFilter.inputPort);
MethodCallPipe.connect(startTimestampFilter.outputPort, noopFilters[0].inputPort); MethodCallPipe.connect(startTimestampFilter.outputPort, noopFilters[0].inputPort);
...@@ -131,7 +137,7 @@ public class ThroughputTimestampAnalysis extends Analysis { ...@@ -131,7 +137,7 @@ public class ThroughputTimestampAnalysis extends Analysis {
e.printStackTrace(); e.printStackTrace();
} }
List<Long> durationPer10000IterationsInNs = workerThread.getDurationPer10000IterationsInNs(); List<Long> durationPer10000IterationsInNs = this.workerThread.getDurationPer10000IterationsInNs();
long overallSumInNs = 0; long overallSumInNs = 0;
for (int i = 0; i < durationPer10000IterationsInNs.size(); i++) { for (int i = 0; i < durationPer10000IterationsInNs.size(); i++) {
...@@ -143,11 +149,11 @@ public class ThroughputTimestampAnalysis extends Analysis { ...@@ -143,11 +149,11 @@ public class ThroughputTimestampAnalysis extends Analysis {
sumInNs += durationPer10000IterationsInNs.get(i); sumInNs += durationPer10000IterationsInNs.get(i);
} }
System.out.println("Thread iterations: " + workerThread.getIterations() + " times"); System.out.println("Thread iterations: " + this.workerThread.getIterations() + " times");
System.out.println("Thread execution time: " + TimeUnit.NANOSECONDS.toMillis(overallSumInNs) + " ms"); System.out.println("Thread execution time: " + TimeUnit.NANOSECONDS.toMillis(overallSumInNs) + " ms");
System.out.println("Thread half duration/iterations: " + sumInNs / (workerThread.getIterations() / 2) System.out.println("Thread half duration/iterations: " + sumInNs / (this.workerThread.getIterations() / 2)
+ " ns/iteration"); + " ns/iteration");
System.out.println("Thread unsuccessfully executed stages: " + workerThread.getExecutedUnsuccessfullyCount() System.out.println("Thread unsuccessfully executed stages: " + this.workerThread.getExecutedUnsuccessfullyCount()
+ " times"); + " times");
} }
......
...@@ -67,21 +67,24 @@ public class TraceReconstructionAnalysis extends Analysis { ...@@ -67,21 +67,24 @@ public class TraceReconstructionAnalysis extends Analysis {
final List<IStage> stages = new LinkedList<IStage>(); final List<IStage> stages = new LinkedList<IStage>();
final IPipeline pipeline = new IPipeline() { final IPipeline pipeline = new IPipeline() {
@SuppressWarnings("unchecked") @Override
public List<? extends IStage> getStartStages() { public List<? extends IStage> getStartStages() {
return startStages; return startStages;
} }
@Override
public List<IStage> getStages() { public List<IStage> getStages() {
return stages; return stages;
} }
@Override
public void fireStartNotification() throws Exception { public void fireStartNotification() throws Exception {
for (final IStage stage : this.getStartStages()) { for (final IStage stage : this.getStartStages()) {
stage.notifyPipelineStarts(); stage.notifyPipelineStarts();
} }
} }
@Override
public void fireStopNotification() { public void fireStopNotification() {
for (final IStage stage : this.getStartStages()) { for (final IStage stage : this.getStartStages()) {
stage.notifyPipelineStops(); stage.notifyPipelineStops();
...@@ -96,7 +99,7 @@ public class TraceReconstructionAnalysis extends Analysis { ...@@ -96,7 +99,7 @@ public class TraceReconstructionAnalysis extends Analysis {
public void start() { public void start() {
super.start(); super.start();
this.workerThread.terminate(StageTerminationPolicy.TERMINATE_STAGE_AFTER_UNSUCCESSFUL_EXECUTION); this.workerThread.setTerminationPolicy(StageTerminationPolicy.TERMINATE_STAGE_AFTER_UNSUCCESSFUL_EXECUTION);
this.workerThread.start(); this.workerThread.start();
try { try {
......
...@@ -18,10 +18,6 @@ package teetime.examples.traceReconstruction; ...@@ -18,10 +18,6 @@ package teetime.examples.traceReconstruction;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import kieker.analysis.plugin.filter.flow.TraceEventRecords;
import kieker.common.record.IMonitoringRecord;
import kieker.common.record.controlflow.OperationExecutionRecord;
import kieker.common.record.flow.IFlowRecord;
import teetime.framework.concurrent.StageTerminationPolicy; import teetime.framework.concurrent.StageTerminationPolicy;
import teetime.framework.concurrent.WorkerThread; import teetime.framework.concurrent.WorkerThread;
import teetime.framework.core.Analysis; import teetime.framework.core.Analysis;
...@@ -46,6 +42,11 @@ import teetime.stage.stringBuffer.handler.IMonitoringRecordHandler; ...@@ -46,6 +42,11 @@ import teetime.stage.stringBuffer.handler.IMonitoringRecordHandler;
import teetime.stage.stringBuffer.handler.StringHandler; import teetime.stage.stringBuffer.handler.StringHandler;
import teetime.stage.util.TextLine; import teetime.stage.util.TextLine;
import kieker.analysis.plugin.filter.flow.TraceEventRecords;
import kieker.common.record.IMonitoringRecord;
import kieker.common.record.controlflow.OperationExecutionRecord;
import kieker.common.record.flow.IFlowRecord;
/** /**
* @author Christian Wulf * @author Christian Wulf
* *
...@@ -150,7 +151,7 @@ public class TraceReconstructionAnalysis2 extends Analysis { ...@@ -150,7 +151,7 @@ public class TraceReconstructionAnalysis2 extends Analysis {
public void start() { public void start() {
super.start(); super.start();
this.workerThread.terminate(StageTerminationPolicy.TERMINATE_STAGE_AFTER_UNSUCCESSFUL_EXECUTION); this.workerThread.setTerminationPolicy(StageTerminationPolicy.TERMINATE_STAGE_AFTER_UNSUCCESSFUL_EXECUTION);
this.workerThread.start(); this.workerThread.start();
try { try {
......
...@@ -75,7 +75,7 @@ public class StageWorkArrayList implements IStageWorkList { ...@@ -75,7 +75,7 @@ public class StageWorkArrayList implements IStageWorkList {
} }
private void push(final IStage stage) { private void push(final IStage stage) {
if (this.isValid(stage)) { if (stage.isSchedulable() && this.isValid(stage)) {
this.firstIndex = Math.min(stage.getSchedulingIndex(), this.firstIndex); this.firstIndex = Math.min(stage.getSchedulingIndex(), this.firstIndex);
this.lastIndex = Math.max(stage.getSchedulingIndex(), this.lastIndex); this.lastIndex = Math.max(stage.getSchedulingIndex(), this.lastIndex);
this.stages[stage.getSchedulingIndex()].numToBeExecuted++; this.stages[stage.getSchedulingIndex()].numToBeExecuted++;
......
...@@ -243,18 +243,6 @@ public class WorkerThread extends Thread { ...@@ -243,18 +243,6 @@ public class WorkerThread extends Thread {
return this.pipeline; return this.pipeline;
} }
// BETTER remove this method since it is not intuitive; add a check to onStartPipeline so that a stage automatically
// disables itself if it has no input ports
public void terminate(final StageTerminationPolicy terminationPolicyToUse) {
// for (final IStage startStage : this.pipeline.getStartStages()) {
// if (this.stageStateManager.areAllInputPortsClosed(startStage)) {
// startStage.fireSignalClosingToAllInputPorts();
// }
// }
this.setTerminationPolicy(terminationPolicyToUse);
}
/** /**
* If not set, this thread will run infinitely. * If not set, this thread will run infinitely.
* *
......
...@@ -53,6 +53,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp ...@@ -53,6 +53,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp
private int depth = IStage.DEPTH_NOT_SET; private int depth = IStage.DEPTH_NOT_SET;
private int schedulingIndex; private int schedulingIndex;
private boolean schedulable = true;
private final List<IInputPort<S, ?>> inputPorts = new ArrayList<IInputPort<S, ?>>(); private final List<IInputPort<S, ?>> inputPorts = new ArrayList<IInputPort<S, ?>>();
private final List<IInputPort<S, ?>> readOnlyInputPorts = Collections.unmodifiableList(this.inputPorts); private final List<IInputPort<S, ?>> readOnlyInputPorts = Collections.unmodifiableList(this.inputPorts);
...@@ -271,4 +272,13 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp ...@@ -271,4 +272,13 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp
public void setSchedulingIndex(final int schedulingIndex) { public void setSchedulingIndex(final int schedulingIndex) {
this.schedulingIndex = schedulingIndex; this.schedulingIndex = schedulingIndex;
} }
@Override
public boolean isSchedulable() {
return this.schedulable;
}
public void setSchedulable(final boolean schedulable) {
this.schedulable = schedulable;
}
} }
...@@ -16,7 +16,9 @@ ...@@ -16,7 +16,9 @@
package teetime.framework.core; package teetime.framework.core;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
* @author Christian Wulf * @author Christian Wulf
...@@ -27,7 +29,17 @@ public abstract class CompositeFilter implements IBaseStage { ...@@ -27,7 +29,17 @@ public abstract class CompositeFilter implements IBaseStage {
protected final List<IBaseStage> schedulableStages = new ArrayList<IBaseStage>(); protected final List<IBaseStage> schedulableStages = new ArrayList<IBaseStage>();
private final Map<IOutputPort<?, ?>, IInputPort<?, ?>> connections = new HashMap<IOutputPort<?, ?>, IInputPort<?, ?>>();
public List<IBaseStage> getSchedulableStages() { public List<IBaseStage> getSchedulableStages() {
return this.schedulableStages; return this.schedulableStages;
} }
protected <T, S1 extends ISink<S1>, S0 extends ISource> void connectWithPipe(final IOutputPort<S0, T> sourcePort, final IInputPort<S1, T> targetPort) {
this.connections.put(sourcePort, targetPort);
}
public Map<IOutputPort<?, ?>, IInputPort<?, ?>> getConnections() {
return this.connections;
}
} }
...@@ -157,4 +157,9 @@ public interface IStage extends IBaseStage { ...@@ -157,4 +157,9 @@ public interface IStage extends IBaseStage {
*/ */
public void setSchedulingIndex(int schedulingIndex); public void setSchedulingIndex(int schedulingIndex);
/**
* @since 1.10
*/
public boolean isSchedulable();
} }
...@@ -17,15 +17,11 @@ package teetime.stage.kieker; ...@@ -17,15 +17,11 @@ package teetime.stage.kieker;
import java.io.File; import java.io.File;
import kieker.common.record.IMonitoringRecord;
import kieker.common.util.filesystem.BinaryCompressionMethod;
import kieker.common.util.filesystem.FSUtil;
import teetime.framework.concurrent.ConcurrentWorkStealingPipe; import teetime.framework.concurrent.ConcurrentWorkStealingPipe;
import teetime.framework.concurrent.ConcurrentWorkStealingPipeFactory; import teetime.framework.concurrent.ConcurrentWorkStealingPipeFactory;
import teetime.framework.core.CompositeFilter; import teetime.framework.core.CompositeFilter;
import teetime.framework.core.IInputPort; import teetime.framework.core.IInputPort;
import teetime.framework.core.IOutputPort; import teetime.framework.core.IOutputPort;
import teetime.framework.sequential.MethodCallPipe;
import teetime.stage.FileExtensionFilter; import teetime.stage.FileExtensionFilter;
import teetime.stage.basic.merger.Merger; import teetime.stage.basic.merger.Merger;
import teetime.stage.kieker.className.ClassNameRegistryCreationFilter; import teetime.stage.kieker.className.ClassNameRegistryCreationFilter;
...@@ -36,6 +32,10 @@ import teetime.stage.kieker.fileToRecord.ZipFile2RecordFilter; ...@@ -36,6 +32,10 @@ import teetime.stage.kieker.fileToRecord.ZipFile2RecordFilter;
import teetime.stage.predicate.IsDirectoryPredicate; import teetime.stage.predicate.IsDirectoryPredicate;
import teetime.stage.predicate.PredicateFilter; import teetime.stage.predicate.PredicateFilter;
import kieker.common.record.IMonitoringRecord;
import kieker.common.util.filesystem.BinaryCompressionMethod;
import kieker.common.util.filesystem.FSUtil;
/** /**
* @author Christian Wulf * @author Christian Wulf
* *
...@@ -55,7 +55,7 @@ public class File2RecordFilter extends CompositeFilter { ...@@ -55,7 +55,7 @@ public class File2RecordFilter extends CompositeFilter {
public File2RecordFilter(final ClassNameRegistryRepository classNameRegistryRepository) { public File2RecordFilter(final ClassNameRegistryRepository classNameRegistryRepository) {
this.classNameRegistryRepository = classNameRegistryRepository; this.classNameRegistryRepository = classNameRegistryRepository;
// FIXME does not yet work with more than one thread due to classNameRegistryRepository // FIXME does not yet work with more than one thread due to classNameRegistryRepository (reason not comprehensible)
// create stages // create stages
final PredicateFilter<File> isDirectoryFilter = new PredicateFilter<File>(new IsDirectoryPredicate()); final PredicateFilter<File> isDirectoryFilter = new PredicateFilter<File>(new IsDirectoryPredicate());
final ClassNameRegistryCreationFilter classNameRegistryCreationFilter = new ClassNameRegistryCreationFilter(this.classNameRegistryRepository); final ClassNameRegistryCreationFilter classNameRegistryCreationFilter = new ClassNameRegistryCreationFilter(this.classNameRegistryRepository);
...@@ -76,12 +76,12 @@ public class File2RecordFilter extends CompositeFilter { ...@@ -76,12 +76,12 @@ public class File2RecordFilter extends CompositeFilter {
final IOutputPort<FileExtensionFilter, File> zipFileOutputPort = fileExtensionFilter.createOutputPortForFileExtension(FSUtil.ZIP_FILE_EXTENSION); final IOutputPort<FileExtensionFilter, File> zipFileOutputPort = fileExtensionFilter.createOutputPortForFileExtension(FSUtil.ZIP_FILE_EXTENSION);
// connect ports by pipes // connect ports by pipes
MethodCallPipe.connect(isDirectoryFilter.matchingOutputPort, classNameRegistryCreationFilter.directoryInputPort); this.connectWithPipe(isDirectoryFilter.matchingOutputPort, classNameRegistryCreationFilter.directoryInputPort);
MethodCallPipe.connect(isDirectoryFilter.mismatchingOutputPort, fileMerger.getNewInputPort()); // BETTER restructure pipeline this.connectWithPipe(isDirectoryFilter.mismatchingOutputPort, fileMerger.getNewInputPort()); // BETTER restructure pipeline
MethodCallPipe.connect(classNameRegistryCreationFilter.relayDirectoryOutputPort, directory2FilesFilter.directoryInputPort); this.connectWithPipe(classNameRegistryCreationFilter.relayDirectoryOutputPort, directory2FilesFilter.directoryInputPort);
MethodCallPipe.connect(classNameRegistryCreationFilter.filePrefixOutputPort, directory2FilesFilter.filePrefixInputPort); this.connectWithPipe(classNameRegistryCreationFilter.filePrefixOutputPort, directory2FilesFilter.filePrefixInputPort);
MethodCallPipe.connect(directory2FilesFilter.fileOutputPort, fileExtensionFilter.fileInputPort); this.connectWithPipe(directory2FilesFilter.fileOutputPort, fileExtensionFilter.fileInputPort);
MethodCallPipe.connect(zipFileOutputPort, fileMerger.getNewInputPort()); this.connectWithPipe(zipFileOutputPort, fileMerger.getNewInputPort());
final ConcurrentWorkStealingPipeFactory<File> concurrentWorkStealingPipeFactory0 = new ConcurrentWorkStealingPipeFactory<File>(); final ConcurrentWorkStealingPipeFactory<File> concurrentWorkStealingPipeFactory0 = new ConcurrentWorkStealingPipeFactory<File>();
final ConcurrentWorkStealingPipe<File> concurrentWorkStealingPipe0 = concurrentWorkStealingPipeFactory0.create(); final ConcurrentWorkStealingPipe<File> concurrentWorkStealingPipe0 = concurrentWorkStealingPipeFactory0.create();
......
...@@ -19,8 +19,6 @@ import java.util.ArrayList; ...@@ -19,8 +19,6 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import kieker.common.logging.LogFactory;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
...@@ -28,6 +26,8 @@ import teetime.examples.throughput.methodcall.MethodCallThroughputAnalysis; ...@@ -28,6 +26,8 @@ import teetime.examples.throughput.methodcall.MethodCallThroughputAnalysis;
import teetime.util.StatisticsUtil; import teetime.util.StatisticsUtil;
import teetime.util.StopWatch; import teetime.util.StopWatch;
import kieker.common.logging.LogFactory;
/** /**
* @author Christian Wulf * @author Christian Wulf
* *
...@@ -36,6 +36,7 @@ import teetime.util.StopWatch; ...@@ -36,6 +36,7 @@ import teetime.util.StopWatch;
public class MethodCallThoughputTimestampAnalysisTest { public class MethodCallThoughputTimestampAnalysisTest {
private static final int NUM_OBJECTS_TO_CREATE = 100000; private static final int NUM_OBJECTS_TO_CREATE = 100000;
private static final int NUM_NOOP_FILTERS = 800;
@Before @Before
public void before() { public void before() {
...@@ -47,11 +48,13 @@ public class MethodCallThoughputTimestampAnalysisTest { ...@@ -47,11 +48,13 @@ public class MethodCallThoughputTimestampAnalysisTest {
@Test @Test
public void testWithManyObjects() { public void testWithManyObjects() {
System.out.println("Testing teetime (mc) with NUM_OBJECTS_TO_CREATE=" + NUM_OBJECTS_TO_CREATE + ", NUM_NOOP_FILTERS="
+ NUM_NOOP_FILTERS + "...");
final StopWatch stopWatch = new StopWatch(); final StopWatch stopWatch = new StopWatch();
final List<TimestampObject> timestampObjects = new ArrayList<TimestampObject>(NUM_OBJECTS_TO_CREATE); final List<TimestampObject> timestampObjects = new ArrayList<TimestampObject>(NUM_OBJECTS_TO_CREATE);
final MethodCallThroughputAnalysis analysis = new MethodCallThroughputAnalysis(); final MethodCallThroughputAnalysis analysis = new MethodCallThroughputAnalysis();
analysis.setNumNoopFilters(800); analysis.setNumNoopFilters(NUM_NOOP_FILTERS);
analysis.setTimestampObjects(timestampObjects); analysis.setTimestampObjects(timestampObjects);
analysis.setInput(NUM_OBJECTS_TO_CREATE, new Callable<TimestampObject>() { analysis.setInput(NUM_OBJECTS_TO_CREATE, new Callable<TimestampObject>() {
@Override @Override
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment