Skip to content
Snippets Groups Projects
Commit 410bfc24 authored by Christian Wulf's avatar Christian Wulf
Browse files

modularized code

parent a129601d
No related branches found
No related tags found
No related merge requests found
Showing
with 144 additions and 62 deletions
......@@ -20,12 +20,13 @@ import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import teetime.stage.kieker.className.ClassNameRegistry;
import teetime.stage.kieker.className.ClassNameRegistryRepository;
import kieker.common.exception.MonitoringRecordException;
import kieker.common.logging.Log;
import kieker.common.record.AbstractMonitoringRecord;
import kieker.common.record.IMonitoringRecord;
import teetime.stage.kieker.className.ClassNameRegistry;
import teetime.stage.kieker.className.ClassNameRegistryRepository;
/**
* @author Christian Wulf
......@@ -66,36 +67,9 @@ public class RecordFromBinaryFileCreator {
int idx = -1;
for (final Class<?> type : typeArray) {
idx++;
if (type == String.class) {
final Integer strId = inputStream.readInt();
final String str = classNameRegistry.get(strId);
if (str == null) {
this.logger.error("No String mapping found for id " + strId.toString());
objectArray[idx] = "";
} else {
objectArray[idx] = str;
}
} else if ((type == int.class) || (type == Integer.class)) {
objectArray[idx] = inputStream.readInt();
} else if ((type == long.class) || (type == Long.class)) {
objectArray[idx] = inputStream.readLong();
} else if ((type == float.class) || (type == Float.class)) {
objectArray[idx] = inputStream.readFloat();
} else if ((type == double.class) || (type == Double.class)) {
objectArray[idx] = inputStream.readDouble();
} else if ((type == byte.class) || (type == Byte.class)) {
objectArray[idx] = inputStream.readByte();
} else if ((type == short.class) || (type == Short.class)) { // NOPMD (short)
objectArray[idx] = inputStream.readShort();
} else if ((type == boolean.class) || (type == Boolean.class)) {
objectArray[idx] = inputStream.readBoolean();
} else {
if (inputStream.readByte() != 0) {
this.logger.error("Unexpected value for unsupported type: " + clazz.getName());
return null; // breaking error (break would not terminate the correct loop)
}
this.logger.warn("Unsupported type: " + clazz.getName());
objectArray[idx] = null;
boolean successful = this.writeToObjectArray(inputStream, classNameRegistry, clazz, objectArray, idx, type);
if (!successful) {
return null;
}
}
final IMonitoringRecord record = AbstractMonitoringRecord.createFromArray(clazz, objectArray);
......@@ -103,4 +77,41 @@ public class RecordFromBinaryFileCreator {
return record;
}
private boolean writeToObjectArray(final DataInputStream inputStream, final ClassNameRegistry classNameRegistry, final Class<? extends IMonitoringRecord> clazz,
final Object[] objectArray, final int idx, final Class<?> type) throws IOException {
if (type == String.class) {
final Integer strId = inputStream.readInt();
final String str = classNameRegistry.get(strId);
if (str == null) {
this.logger.error("No String mapping found for id " + strId.toString());
objectArray[idx] = "";
} else {
objectArray[idx] = str;
}
} else if ((type == int.class) || (type == Integer.class)) {
objectArray[idx] = inputStream.readInt();
} else if ((type == long.class) || (type == Long.class)) {
objectArray[idx] = inputStream.readLong();
} else if ((type == float.class) || (type == Float.class)) {
objectArray[idx] = inputStream.readFloat();
} else if ((type == double.class) || (type == Double.class)) {
objectArray[idx] = inputStream.readDouble();
} else if ((type == byte.class) || (type == Byte.class)) {
objectArray[idx] = inputStream.readByte();
} else if ((type == short.class) || (type == Short.class)) { // NOPMD (short)
objectArray[idx] = inputStream.readShort();
} else if ((type == boolean.class) || (type == Boolean.class)) {
objectArray[idx] = inputStream.readBoolean();
} else {
if (inputStream.readByte() != 0) {
this.logger.error("Unexpected value for unsupported type: " + clazz.getName());
return false; // breaking error (break would not terminate the correct loop)
}
this.logger.warn("Unsupported type: " + clazz.getName());
objectArray[idx] = null;
}
return true;
}
}
......@@ -29,7 +29,6 @@ import teetime.framework.core.IPipeline;
import teetime.framework.core.IStage;
import teetime.framework.core.Pipeline;
import teetime.framework.sequential.MethodCallPipe;
import teetime.framework.sequential.QueuePipe;
import teetime.framework.sequential.ReservableQueuePipe;
import teetime.stage.CollectorSink;
import teetime.stage.NoopFilter;
......
......@@ -45,8 +45,8 @@ public class WorkerThread extends Thread {
private IStageScheduler stageScheduler;
private StageStateManager stageStateManager;
private volatile StageTerminationPolicy terminationPolicy;
private volatile boolean shouldTerminate = false;
private StageTerminationPolicy terminationPolicy;
private boolean shouldTerminate = false;
private final int accessesDeviceId;
private int executedUnsuccessfullyCount;
......@@ -55,6 +55,10 @@ public class WorkerThread extends Thread {
private final List<Long> durationPerXIterationsInNs = new LinkedList<Long>();
private int iterations;
private final StopWatch afterStageExecutionStopWatch = new StopWatch();
private final List<Long> afterStageExecutions = new LinkedList<Long>();
public WorkerThread(final IPipeline pipeline, final int accessesDeviceId) {
this.pipeline = pipeline;
this.accessesDeviceId = accessesDeviceId;
......@@ -147,14 +151,14 @@ public class WorkerThread extends Thread {
// stageExecutionStopWatch.end();
this.finishStageExecution(stage, executedSuccessfully);
// afterStageExecutionStopWatch.start();
// this.afterStageExecutionStopWatch.start();
if (this.shouldTerminate) {
this.executeTerminationPolicy(stage, executedSuccessfully);
}
this.stageScheduler.determineNextStage(stage, executedSuccessfully);
// afterStageExecutionStopWatch.end();
// this.afterStageExecutionStopWatch.end();
// this.iterationStopWatch.end();
......@@ -166,7 +170,11 @@ public class WorkerThread extends Thread {
// final long schedulingOverhead = this.iterationStopWatch.getDurationInNs();
// final long schedulingOverhead = beforeStageExecutionStopWatch.getDurationInNs(); //327
// final long schedulingOverhead = stageExecutionStopWatch.getDurationInNs(); //1416
// final long schedulingOverhead = afterStageExecutionStopWatch.getDurationInNs(); //2450
// final long schedulingOverhead = this.afterStageExecutionStopWatch.getDurationInNs(); // 2450
// if (this.iterations > 50000 && this.iterations % 1000 == 0)
// this.afterStageExecutions.add(this.afterStageExecutionStopWatch.getDurationInNs());
// rest: ~2000 (measurement overhead?)
if ((this.iterations % NUM_ITERATIONS_TO_MEASURE) == 0) {
this.stopWatch.end();
......@@ -178,6 +186,8 @@ public class WorkerThread extends Thread {
this.stopWatch.end();
this.durationPerXIterationsInNs.add(this.stopWatch.getDurationInNs());
// System.out.println("avg: " + StatisticsUtil.calculateAverage(this.afterStageExecutions) + " ns");
this.cleanUpDatastructures();
}
......
......@@ -107,7 +107,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp
try {
success = this.execute(this.context);
if (success) { // deprecated boolean return value
this.context.clear();
this.context.commit();
} else {
this.context.rollback();
}
......@@ -115,6 +115,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp
this.context.rollback();
} catch (final Exception e) {
this.logger.error("Error in stage execution", e);
throw new IllegalStateException(e);
}
return success;
}
......
......@@ -29,6 +29,8 @@ public class Context<S extends IStage> {
private long numPushedElements = 0;
private long numTakenElements = 0;
private long numTakenElementsInCurrentTransaction = 0;
@SuppressWarnings("unchecked")
public Context(final IStage owningStage, final List<IInputPort<S, ?>> allTargetPorts) {
this.inputPortContainers = this.createInputPortLists(owningStage.getInputPorts());
......@@ -92,10 +94,9 @@ public class Context<S extends IStage> {
private final <T> void logTransaction(final IInputPort<S, T> inputPort, final T token) {
// final InputPortContainer inputPortContainer = this.inputPortContainers[inputPort.getIndex()];
// final List<Object> tokenList = this.pipesTakenFrom.get(inputPort);
// inputPortContainer.takenElements.add(token);
// this.numTakenElements++;
this.numTakenElementsInCurrentTransaction++;
}
/**
......@@ -110,31 +111,43 @@ public class Context<S extends IStage> {
return associatedPipe.read();
}
void clear() {
void commit() {
// for (final List<Object> takenElements : this.pipesTakenFrom.values()) {
for (final InputPortContainer inputPortContainer : this.inputPortContainers) {
// inputPortContainer.takenElements.clear();
IReservablePipe<Object> reservablePipe = (IReservablePipe<Object>) inputPortContainer.pipe;
reservablePipe.commit();
}
this.numTakenElements += this.numTakenElementsInCurrentTransaction;
this.numTakenElementsInCurrentTransaction = 0;
for (final IOutputPort<S, ?> outputPort : this.outputPorts) {
if (outputPort != null) {
@SuppressWarnings("unchecked")
IReservablePipe<Object> reservablePipe = (IReservablePipe<Object>) outputPort.getAssociatedPipe();
reservablePipe.commit();
}
}
}
void rollback() {
// for (final Entry<IPipe<Object>, List<Object>> entry : this.pipesTakenFrom.entrySet()) {
// final IPipe<Object> associatedPipe = entry.getKey();
// final List<Object> takenElements = entry.getValue();
for (final InputPortContainer inputPortContainer : this.inputPortContainers) {
// for (int k = inputPortContainer.takenElements.size() - 1; k >= 0; k--) {
// final Object element = inputPortContainer.takenElements.get(k);
// inputPortContainer.pipe.put(element);
// }
IReservablePipe<Object> reservablePipe = (IReservablePipe<Object>) inputPortContainer.pipe;
reservablePipe.rollback();
}
this.numTakenElements -= inputPortContainer.takenElements.size();
for (final IOutputPort<S, ?> outputPort : this.outputPorts) {
if (outputPort != null) {
@SuppressWarnings("unchecked")
IReservablePipe<Object> reservablePipe = (IReservablePipe<Object>) outputPort.getAssociatedPipe();
reservablePipe.rollback();
}
}
}
......
......@@ -101,5 +101,6 @@ public final class NextStageScheduler implements IStageScheduler {
if (this.workList.isEmpty()) {
this.workList.pushAll(this.highestPrioritizedEnabledStages);
}
}
}
......@@ -15,12 +15,14 @@
***************************************************************************/
package teetime.framework.sequential;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import teetime.framework.core.AbstractPipe;
import teetime.framework.core.IInputPort;
import teetime.framework.core.IOutputPort;
import teetime.framework.core.IReservablePipe;
import teetime.framework.core.ISink;
import teetime.framework.core.ISource;
import teetime.util.concurrent.workstealing.CircularWorkStealingDeque;
......@@ -30,9 +32,10 @@ import teetime.util.concurrent.workstealing.CircularWorkStealingDeque;
*
* @since 1.10
*/
public class QueuePipe<T> extends AbstractPipe<T> {
public class QueuePipe<T> extends AbstractPipe<T> implements IReservablePipe<T> {
private final List<T> queue = new ArrayList<T>(10);
// private final List<T> queue = new ArrayList<T>(10);
private final Queue<T> queue = new LinkedList<T>();
// private final List<T> queue = new ReservableArrayList<T>(10);
......@@ -58,8 +61,8 @@ public class QueuePipe<T> extends AbstractPipe<T> {
@Override
public T tryTakeInternal() {
// return this.queue.poll();
return this.queue.remove(this.queue.size() - 1);
return this.queue.poll();
// return this.queue.remove(this.queue.size() - 1);
// return this.queue.pop();
}
......@@ -74,8 +77,8 @@ public class QueuePipe<T> extends AbstractPipe<T> {
@Override
public T read() {
// return this.queue.peek();
return this.queue.get(this.queue.size() - 1);
return this.queue.peek();
// return this.queue.get(this.queue.size() - 1);
// return this.queue.read();
}
......@@ -90,4 +93,16 @@ public class QueuePipe<T> extends AbstractPipe<T> {
// return this.queue.size() == 0;
}
@Override
public void commit() {
// TODO Auto-generated method stub
}
@Override
public void rollback() {
// TODO Auto-generated method stub
}
}
package teetime.framework.sequential;
import java.util.List;
import teetime.framework.core.IInputPort;
import teetime.framework.core.IOutputPort;
import teetime.framework.core.IReservablePipe;
......@@ -47,4 +49,14 @@ public class ReservableQueuePipe<T> extends QueuePipe<T> implements IReservableP
return this.reservableQueue.isEmpty();
}
@Override
public void putMultiple(final List<T> elements) {
throw new IllegalStateException();
}
@Override
public List<?> tryTakeMultiple(final int numElementsToTake) {
throw new IllegalStateException();
}
}
......@@ -58,7 +58,7 @@ public class ObjectProducer<T> extends AbstractFilter<ObjectProducer<T>> {
final T newObject = this.objectCreator.call();
context.put(this.outputPort, newObject);
} catch (final Exception e) {
throw new IllegalStateException();
throw new IllegalStateException(e);
}
this.numObjectsToCreate--;
......
......@@ -17,6 +17,9 @@ public class ReservableArrayList<T> implements List<T> {
}
public void reservedAdd(final T element) {
if (this.lastFreeReservedIndex == this.elements.length) {
throw new IllegalStateException("not enough space");
}
this.elements[this.lastFreeReservedIndex++] = element;
}
......@@ -64,9 +67,8 @@ public class ReservableArrayList<T> implements List<T> {
@Override
public boolean add(final T e) {
this.elements[this.lastFreeIndex++] = e;
this.lastFreeReservedIndex = this.lastFreeIndex;
return true;
// TODO Auto-generated method stub
return false;
}
@Override
......@@ -111,7 +113,10 @@ public class ReservableArrayList<T> implements List<T> {
}
@Override
public final T get(final int index) {
public T get(final int index) {
if (index < 0) {
return null;
}
T element = this.elements[index];
return element;
}
......@@ -170,7 +175,7 @@ public class ReservableArrayList<T> implements List<T> {
}
public T reservedRemoveLast() {
T element = this.get(this.lastFreeReservedIndex--);
T element = this.get(--this.lastFreeReservedIndex);
return element;
}
}
......@@ -32,4 +32,19 @@ public class ReservableArrayListTest {
Assert.assertTrue(reservableArrayList.isEmpty());
// Assert.assertEquals(element, reservableArrayList.getLast());
}
@Test
public void testRemove() throws Exception {
ReservableArrayList<Object> reservableArrayList = new ReservableArrayList<Object>(10);
Object element = new Object();
reservableArrayList.reservedAdd(element);
reservableArrayList.commit();
Assert.assertEquals(element, reservableArrayList.reservedRemoveLast());
Assert.assertFalse(reservableArrayList.isEmpty());
reservableArrayList.commit();
Assert.assertTrue(reservableArrayList.isEmpty());
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment