Skip to content
Snippets Groups Projects
Commit ab0edfbd authored by Christian Wulf's avatar Christian Wulf
Browse files

added logging.properties;

changed successor concept
parent 23c2cba5
Branches
Tags
No related merge requests found
Showing
with 6646 additions and 127 deletions
...@@ -104,7 +104,7 @@ sp_cleanup.remove_unused_private_methods=true ...@@ -104,7 +104,7 @@ sp_cleanup.remove_unused_private_methods=true
sp_cleanup.remove_unused_private_types=true sp_cleanup.remove_unused_private_types=true
sp_cleanup.sort_members=false sp_cleanup.sort_members=false
sp_cleanup.sort_members_all=false sp_cleanup.sort_members_all=false
sp_cleanup.use_blocks=false sp_cleanup.use_blocks=true
sp_cleanup.use_blocks_only_for_return_and_throw=false sp_cleanup.use_blocks_only_for_return_and_throw=false
sp_cleanup.use_parentheses_in_expressions=false sp_cleanup.use_parentheses_in_expressions=false
sp_cleanup.use_this_for_non_static_field_access=true sp_cleanup.use_this_for_non_static_field_access=true
......
.handlers = java.util.logging.ConsoleHandler
.level= ALL
java.util.logging.ConsoleHandler.level = ALL
#java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
#teetime.level = ALL
\ No newline at end of file
...@@ -25,8 +25,6 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { ...@@ -25,8 +25,6 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
private int index; private int index;
private StageWithPort<?, ?> successor;
private boolean reschedulable; private boolean reschedulable;
public AbstractStage() { public AbstractStage() {
...@@ -70,18 +68,6 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { ...@@ -70,18 +68,6 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
return null; return null;
} }
// @Override
// public void executeWithPorts() {
// CommittableQueue execute;
// do {
// // execute = this.next().execute2(this.outputElements);
// // execute = this.next().execute2(this.getOutputPort().pipe.getElements());
// this.next().executeWithPorts();
// } while (this.next().isReschedulable());
// }
// protected abstract void execute4(CommittableQueue<I> elements);
protected void execute4(final CommittableQueue<I> elements) { protected void execute4(final CommittableQueue<I> elements) {
throw new IllegalStateException(); // default implementation throw new IllegalStateException(); // default implementation
} }
...@@ -89,22 +75,18 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { ...@@ -89,22 +75,18 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
protected abstract void execute5(I element); protected abstract void execute5(I element);
protected final void send(final O element) { protected final void send(final O element) {
// this.outputElements.addToTailUncommitted(element); this.send(this.getOutputPort(), element);
// this.outputElements.commit(); }
this.getOutputPort().send(element); protected final void send(final OutputPort<O> outputPort, final O element) {
outputPort.send(element);
// CommittableQueue execute; StageWithPort<O, ?> next = outputPort.getPipe().getTargetStage();
StageWithPort<?, ?> next = this.next(); // StageWithPort<?, ?> next = this.next();
do { do {
// execute = this.next().execute2(this.outputElements);
// execute = this.next().execute2(this.getOutputPort().pipe.getElements());
next.executeWithPorts(); next.executeWithPorts();
// System.out.println("Executed " + this.next().getClass().getSimpleName());
} while (next.isReschedulable()); } while (next.isReschedulable());
// } while (this.next().getInputPort().pipe.size() > 0);
// } while (execute.size() > 0);
} }
// @Override // @Override
...@@ -139,16 +121,6 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { ...@@ -139,16 +121,6 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
this.parentStage = parentStage; this.parentStage = parentStage;
} }
@Override
public StageWithPort<?, ?> next() {
return this.successor;
}
@Override
public void setSuccessor(final StageWithPort<? super O, ?> successor) {
this.successor = successor;
}
@Override @Override
public boolean isReschedulable() { public boolean isReschedulable() {
return this.reschedulable; return this.reschedulable;
......
...@@ -20,6 +20,7 @@ public abstract class ConsumerStage<I, O> extends AbstractStage<I, O> { ...@@ -20,6 +20,7 @@ public abstract class ConsumerStage<I, O> extends AbstractStage<I, O> {
@Override @Override
public void executeWithPorts() { public void executeWithPorts() {
this.logger.debug("Executing stage...");
I element = this.getInputPort().receive(); I element = this.getInputPort().receive();
this.setReschedulable(this.getInputPort().getPipe().size() > 0); this.setReschedulable(this.getInputPort().getPipe().size() > 0);
......
...@@ -3,16 +3,27 @@ package teetime.variant.methodcallWithPorts.framework.core; ...@@ -3,16 +3,27 @@ package teetime.variant.methodcallWithPorts.framework.core;
import java.util.Arrays; import java.util.Arrays;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.UUID;
import teetime.util.list.CommittableQueue; import teetime.util.list.CommittableQueue;
import teetime.variant.methodcallWithPorts.stage.EndStage;
import kieker.common.logging.Log;
import kieker.common.logging.LogFactory;
public class Pipeline<I, O> implements StageWithPort<I, O> { public class Pipeline<I, O> implements StageWithPort<I, O> {
private final String id;
/**
* A unique logger instance per stage instance
*/
protected Log logger;
private StageWithPort<I, ?> firstStage; private StageWithPort<I, ?> firstStage;
private final List<StageWithPort<?, ?>> intermediateStages = new LinkedList<StageWithPort<?, ?>>(); private final List<StageWithPort<?, ?>> intermediateStages = new LinkedList<StageWithPort<?, ?>>();
private StageWithPort<?, O> lastStage; private StageWithPort<?, O> lastStage;
private StageWithPort<?, ?> successor;
private StageWithPort<?, ?>[] stages; private StageWithPort<?, ?>[] stages;
private StageWithPort<?, ?> parentStage; private StageWithPort<?, ?> parentStage;
private int index; private int index;
...@@ -21,6 +32,15 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { ...@@ -21,6 +32,15 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
private boolean reschedulable; private boolean reschedulable;
private int firstStageIndex; private int firstStageIndex;
public Pipeline() {
this.id = UUID.randomUUID().toString(); // the id should only be represented by a UUID, not additionally by the class name
this.logger = LogFactory.getLog(this.id);
}
public String getId() {
return this.id;
}
public void setFirstStage(final StageWithPort<I, ?> stage) { public void setFirstStage(final StageWithPort<I, ?> stage) {
this.firstStage = stage; this.firstStage = stage;
} }
...@@ -60,6 +80,7 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { ...@@ -60,6 +80,7 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
@Override @Override
public void executeWithPorts() { public void executeWithPorts() {
this.logger.debug("Executing stage...");
StageWithPort<?, ?> firstStage = this.stages[this.firstStageIndex]; StageWithPort<?, ?> firstStage = this.stages[this.firstStageIndex];
firstStage.executeWithPorts(); firstStage.executeWithPorts();
...@@ -71,7 +92,8 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { ...@@ -71,7 +92,8 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
StageWithPort<?, ?> currentStage = stage; StageWithPort<?, ?> currentStage = stage;
while (!currentStage.isReschedulable()) { while (!currentStage.isReschedulable()) {
this.firstStageIndex++; this.firstStageIndex++;
currentStage = currentStage.next(); // currentStage = currentStage.getOutputPort().getPipe().getTargetStage(); // FIXME what to do with a stage with more than one output port?
currentStage = this.stages[this.firstStageIndex];
if (currentStage == null) { // loop reaches the last stage if (currentStage == null) { // loop reaches the last stage
this.setReschedulable(false); this.setReschedulable(false);
this.cleanUp(); this.cleanUp();
...@@ -120,17 +142,17 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { ...@@ -120,17 +142,17 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
} }
this.stages[this.stages.length - 1] = this.lastStage; this.stages[this.stages.length - 1] = this.lastStage;
for (int i = 0; i < this.stages.length; i++) { // for (int i = 0; i < this.stages.length; i++) {
// StageWithPort<?, ?> stage = this.stages[i]; // StageWithPort<?, ?> stage = this.stages[i];
// stage.setParentStage(this, i); // stage.setParentStage(this, i);
// stage.setListener(this); // stage.setListener(this);
} // }
for (int i = 0; i < this.stages.length - 1; i++) { // for (int i = 0; i < this.stages.length - 1; i++) {
StageWithPort stage = this.stages[i]; // StageWithPort stage = this.stages[i];
stage.setSuccessor(this.stages[i + 1]); // stage.setSuccessor(this.stages[i + 1]);
} // }
this.stages[this.stages.length - 1].setSuccessor(new EndStage<Object>()); // this.stages[this.stages.length - 1].setSuccessor(new EndStage<Object>());
for (StageWithPort<?, ?> stage : this.stages) { for (StageWithPort<?, ?> stage : this.stages) {
stage.onStart(); stage.onStart();
...@@ -148,16 +170,6 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { ...@@ -148,16 +170,6 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
this.parentStage = parentStage; this.parentStage = parentStage;
} }
@Override
public StageWithPort<?, ?> next() {
throw new IllegalStateException();
}
@Override
public void setSuccessor(final StageWithPort<? super O, ?> successor) {
throw new IllegalStateException();
}
@Override @Override
public boolean isReschedulable() { public boolean isReschedulable() {
return this.reschedulable; return this.reschedulable;
...@@ -183,7 +195,7 @@ public class Pipeline<I, O> implements StageWithPort<I, O> { ...@@ -183,7 +195,7 @@ public class Pipeline<I, O> implements StageWithPort<I, O> {
StageWithPort<?, ?> stage = this.stages[i]; StageWithPort<?, ?> stage = this.stages[i];
stage.setParentStage(null, i); stage.setParentStage(null, i);
// stage.setListener(null); // stage.setListener(null);
stage.setSuccessor(null); // stage.setSuccessor(null);
} }
this.firstStage = null; this.firstStage = null;
......
...@@ -10,26 +10,14 @@ public interface StageWithPort<I, O> { ...@@ -10,26 +10,14 @@ public interface StageWithPort<I, O> {
void executeWithPorts(); void executeWithPorts();
// void executeWithPorts(Object element);
// O execute(Object element);
// CommittableQueue<O> execute2();
CommittableQueue<O> execute2(CommittableQueue<I> elements); CommittableQueue<O> execute2(CommittableQueue<I> elements);
// SchedulingInformation getSchedulingInformation();
StageWithPort<?, ?> getParentStage(); StageWithPort<?, ?> getParentStage();
void setParentStage(StageWithPort<?, ?> parentStage, int index); void setParentStage(StageWithPort<?, ?> parentStage, int index);
// void setListener(OnDisableListener listener); // void setListener(OnDisableListener listener);
StageWithPort<?, ?> next();
void setSuccessor(StageWithPort<? super O, ?> successor);
boolean isReschedulable(); boolean isReschedulable();
void onIsPipelineHead(); void onIsPipelineHead();
......
...@@ -2,9 +2,12 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe; ...@@ -2,9 +2,12 @@ package teetime.variant.methodcallWithPorts.framework.core.pipe;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
public abstract class AbstractPipe<T> implements IPipe<T> { public abstract class AbstractPipe<T> implements IPipe<T> {
private final AtomicBoolean closed = new AtomicBoolean(); private final AtomicBoolean closed = new AtomicBoolean();
private StageWithPort<T, ?> targetStage;
@Override @Override
public boolean isClosed() { public boolean isClosed() {
...@@ -16,4 +19,13 @@ public abstract class AbstractPipe<T> implements IPipe<T> { ...@@ -16,4 +19,13 @@ public abstract class AbstractPipe<T> implements IPipe<T> {
this.closed.lazySet(true); // lazySet is legal due to our single-writer requirement this.closed.lazySet(true); // lazySet is legal due to our single-writer requirement
} }
@Override
public StageWithPort<T, ?> getTargetStage() {
return this.targetStage;
}
public void setTargetStage(StageWithPort<T, ?> targetStage) {
this.targetStage = targetStage;
}
} }
package teetime.variant.methodcallWithPorts.framework.core.pipe; package teetime.variant.methodcallWithPorts.framework.core.pipe;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
public interface IPipe<T> { public interface IPipe<T> {
public abstract void add(T element); public abstract void add(T element);
...@@ -16,4 +18,6 @@ public interface IPipe<T> { ...@@ -16,4 +18,6 @@ public interface IPipe<T> {
public abstract boolean isClosed(); public abstract boolean isClosed();
public abstract StageWithPort<T, ?> getTargetStage();
} }
...@@ -27,25 +27,13 @@ public class EndStage<T> implements StageWithPort<T, T> { ...@@ -27,25 +27,13 @@ public class EndStage<T> implements StageWithPort<T, T> {
} }
@Override @Override
public StageWithPort getParentStage() { public StageWithPort<?, ?> getParentStage() {
// TODO Auto-generated method stub // TODO Auto-generated method stub
return null; return null;
} }
@Override @Override
public void setParentStage(final StageWithPort parentStage, final int index) { public void setParentStage(final StageWithPort<?, ?> parentStage, final int index) {
// TODO Auto-generated method stub
}
@Override
public StageWithPort next() {
// TODO Auto-generated method stub
return null;
}
@Override
public void setSuccessor(final StageWithPort<? super T, ?> successor) {
// TODO Auto-generated method stub // TODO Auto-generated method stub
} }
......
...@@ -16,13 +16,20 @@ public class FileExtensionSwitch extends ConsumerStage<File, File> { ...@@ -16,13 +16,20 @@ public class FileExtensionSwitch extends ConsumerStage<File, File> {
@Override @Override
protected void execute5(final File file) { protected void execute5(final File file) {
String fileExtension = Files.getFileExtension(file.getAbsolutePath()); String fileExtension = Files.getFileExtension(file.getAbsolutePath());
this.logger.debug("fileExtension: " + fileExtension);
OutputPort<File> outputPort = this.fileExtensions.get(fileExtension); OutputPort<File> outputPort = this.fileExtensions.get(fileExtension);
outputPort.send(file); if (outputPort != null) {
this.send(outputPort, file);
}
} }
public OutputPort<File> addFileExtension(final String fileExtension) { public OutputPort<File> addFileExtension(String fileExtension) {
if (fileExtension.startsWith(".")) {
fileExtension = fileExtension.substring(1);
}
OutputPort<File> outputPort = new OutputPort<File>(); OutputPort<File> outputPort = new OutputPort<File>();
this.fileExtensions.put(fileExtension, outputPort); this.fileExtensions.put(fileExtension, outputPort);
this.logger.debug("SUCCESS: Registered output port for '" + fileExtension + "'");
return outputPort; return outputPort;
} }
......
This diff is collapsed.
$0=kieker.common.record.misc.KiekerMetadataRecord
$1=kieker.common.record.controlflow.OperationExecutionRecord
...@@ -19,12 +19,11 @@ import java.io.File; ...@@ -19,12 +19,11 @@ import java.io.File;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import teetime.util.ConstructorClosure;
import teetime.variant.explicitScheduling.examples.throughput.TimestampObject;
import teetime.variant.explicitScheduling.framework.core.Analysis; import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline; import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.CollectorSink; import teetime.variant.methodcallWithPorts.stage.CollectorSink;
import teetime.variant.methodcallWithPorts.stage.kieker.File2RecordFilter; import teetime.variant.methodcallWithPorts.stage.kieker.File2RecordFilter;
import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository; import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository;
...@@ -38,11 +37,7 @@ import kieker.common.record.IMonitoringRecord; ...@@ -38,11 +37,7 @@ import kieker.common.record.IMonitoringRecord;
*/ */
public class RecordReaderAnalysis extends Analysis { public class RecordReaderAnalysis extends Analysis {
private int numInputObjects; private final List<IMonitoringRecord> elementCollection = new LinkedList<IMonitoringRecord>();
private ConstructorClosure<TimestampObject> inputObjectCreator;
private int numNoopFilters;
private final List<IMonitoringRecord> timestampObjectsList = new LinkedList<IMonitoringRecord>();
private Thread producerThread; private Thread producerThread;
...@@ -51,15 +46,15 @@ public class RecordReaderAnalysis extends Analysis { ...@@ -51,15 +46,15 @@ public class RecordReaderAnalysis extends Analysis {
@Override @Override
public void init() { public void init() {
super.init(); super.init();
Pipeline<File, Object> producerPipeline = this.buildProducerPipeline(this.numInputObjects, this.inputObjectCreator); Pipeline<File, Object> producerPipeline = this.buildProducerPipeline();
this.producerThread = new Thread(new RunnableStage(producerPipeline)); this.producerThread = new Thread(new RunnableStage(producerPipeline));
} }
private Pipeline<File, Object> buildProducerPipeline(final int numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) { private Pipeline<File, Object> buildProducerPipeline() {
this.classNameRegistryRepository = new ClassNameRegistryRepository(); this.classNameRegistryRepository = new ClassNameRegistryRepository();
// create stages // create stages
File2RecordFilter file2RecordFilter = new File2RecordFilter(this.classNameRegistryRepository); File2RecordFilter file2RecordFilter = new File2RecordFilter(this.classNameRegistryRepository);
CollectorSink<IMonitoringRecord> collector = new CollectorSink<IMonitoringRecord>(this.timestampObjectsList); CollectorSink<IMonitoringRecord> collector = new CollectorSink<IMonitoringRecord>(this.elementCollection);
final Pipeline<File, Object> pipeline = new Pipeline<File, Object>(); final Pipeline<File, Object> pipeline = new Pipeline<File, Object>();
pipeline.setFirstStage(file2RecordFilter); pipeline.setFirstStage(file2RecordFilter);
...@@ -67,6 +62,10 @@ public class RecordReaderAnalysis extends Analysis { ...@@ -67,6 +62,10 @@ public class RecordReaderAnalysis extends Analysis {
SingleElementPipe.connect(file2RecordFilter.getOutputPort(), collector.getInputPort()); SingleElementPipe.connect(file2RecordFilter.getOutputPort(), collector.getInputPort());
SpScPipe<File> dirInputPipe = new SpScPipe<File>(1);
dirInputPipe.add(new File("src/test/data/bookstore-logs"));
file2RecordFilter.getInputPort().setPipe(dirInputPipe);
return pipeline; return pipeline;
} }
...@@ -83,21 +82,8 @@ public class RecordReaderAnalysis extends Analysis { ...@@ -83,21 +82,8 @@ public class RecordReaderAnalysis extends Analysis {
} }
} }
public void setInput(final int numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) { public List<IMonitoringRecord> getElementCollection() {
this.numInputObjects = numInputObjects; return this.elementCollection;
this.inputObjectCreator = inputObjectCreator;
}
public int getNumNoopFilters() {
return this.numNoopFilters;
}
public void setNumNoopFilters(final int numNoopFilters) {
this.numNoopFilters = numNoopFilters;
}
public List<IMonitoringRecord> getTimestampObjectsList() {
return this.timestampObjectsList;
} }
} }
...@@ -15,32 +15,30 @@ ...@@ -15,32 +15,30 @@
***************************************************************************/ ***************************************************************************/
package teetime.variant.methodcallWithPorts.examples.recordReader; package teetime.variant.methodcallWithPorts.examples.recordReader;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import teetime.util.ConstructorClosure; import teetime.util.StopWatch;
import teetime.variant.explicitScheduling.examples.throughput.TimestampObject;
import test.PerformanceTest;
/** /**
* @author Christian Wulf * @author Christian Wulf
* *
* @since 1.10 * @since 1.10
*/ */
public class RecordReaderAnalysisTest extends PerformanceTest { public class RecordReaderAnalysisTest {
@Test private StopWatch stopWatch;
public void performAnalysis(final int numThreads) {
System.out.println("Testing teetime (mc) with NUM_OBJECTS_TO_CREATE=" + NUM_OBJECTS_TO_CREATE + ", NUM_NOOP_FILTERS="
+ NUM_NOOP_FILTERS + "...");
final RecordReaderAnalysis analysis = new RecordReaderAnalysis(); @Before
analysis.setNumNoopFilters(NUM_NOOP_FILTERS); public void before() {
analysis.setInput(NUM_OBJECTS_TO_CREATE, new ConstructorClosure<TimestampObject>() { this.stopWatch = new StopWatch();
@Override
public TimestampObject create() {
return new TimestampObject();
} }
});
@Test
public void performAnalysis() {
final RecordReaderAnalysis analysis = new RecordReaderAnalysis();
analysis.init(); analysis.init();
this.stopWatch.start(); this.stopWatch.start();
...@@ -51,7 +49,8 @@ public class RecordReaderAnalysisTest extends PerformanceTest { ...@@ -51,7 +49,8 @@ public class RecordReaderAnalysisTest extends PerformanceTest {
analysis.onTerminate(); analysis.onTerminate();
} }
// this.timestampObjects = analysis.getTimestampObjectsList(); long overallDurationInNs = this.stopWatch.getDurationInNs();
System.out.println("Duration: " + TimeUnit.NANOSECONDS.toMillis(overallDurationInNs) + " ms");
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment