diff --git a/src/main/java/kieker/analysis/stage/RecordFromBinaryFileCreator.java b/src/main/java/kieker/analysis/stage/RecordFromBinaryFileCreator.java index 2889e9328d1f288294f86e1be230dcbcc48d4d69..e86bf1e0aee660f7d3692ccbc931a72b321f9cbf 100644 --- a/src/main/java/kieker/analysis/stage/RecordFromBinaryFileCreator.java +++ b/src/main/java/kieker/analysis/stage/RecordFromBinaryFileCreator.java @@ -20,12 +20,13 @@ import java.io.EOFException; import java.io.File; import java.io.IOException; +import teetime.stage.kieker.className.ClassNameRegistry; +import teetime.stage.kieker.className.ClassNameRegistryRepository; + import kieker.common.exception.MonitoringRecordException; import kieker.common.logging.Log; import kieker.common.record.AbstractMonitoringRecord; import kieker.common.record.IMonitoringRecord; -import teetime.stage.kieker.className.ClassNameRegistry; -import teetime.stage.kieker.className.ClassNameRegistryRepository; /** * @author Christian Wulf @@ -66,36 +67,9 @@ public class RecordFromBinaryFileCreator { int idx = -1; for (final Class<?> type : typeArray) { idx++; - if (type == String.class) { - final Integer strId = inputStream.readInt(); - final String str = classNameRegistry.get(strId); - if (str == null) { - this.logger.error("No String mapping found for id " + strId.toString()); - objectArray[idx] = ""; - } else { - objectArray[idx] = str; - } - } else if ((type == int.class) || (type == Integer.class)) { - objectArray[idx] = inputStream.readInt(); - } else if ((type == long.class) || (type == Long.class)) { - objectArray[idx] = inputStream.readLong(); - } else if ((type == float.class) || (type == Float.class)) { - objectArray[idx] = inputStream.readFloat(); - } else if ((type == double.class) || (type == Double.class)) { - objectArray[idx] = inputStream.readDouble(); - } else if ((type == byte.class) || (type == Byte.class)) { - objectArray[idx] = inputStream.readByte(); - } else if ((type == short.class) || (type == Short.class)) { // NOPMD (short) - objectArray[idx] = inputStream.readShort(); - } else if ((type == boolean.class) || (type == Boolean.class)) { - objectArray[idx] = inputStream.readBoolean(); - } else { - if (inputStream.readByte() != 0) { - this.logger.error("Unexpected value for unsupported type: " + clazz.getName()); - return null; // breaking error (break would not terminate the correct loop) - } - this.logger.warn("Unsupported type: " + clazz.getName()); - objectArray[idx] = null; + boolean successful = this.writeToObjectArray(inputStream, classNameRegistry, clazz, objectArray, idx, type); + if (!successful) { + return null; } } final IMonitoringRecord record = AbstractMonitoringRecord.createFromArray(clazz, objectArray); @@ -103,4 +77,41 @@ public class RecordFromBinaryFileCreator { return record; } + + private boolean writeToObjectArray(final DataInputStream inputStream, final ClassNameRegistry classNameRegistry, final Class<? extends IMonitoringRecord> clazz, + final Object[] objectArray, final int idx, final Class<?> type) throws IOException { + if (type == String.class) { + final Integer strId = inputStream.readInt(); + final String str = classNameRegistry.get(strId); + if (str == null) { + this.logger.error("No String mapping found for id " + strId.toString()); + objectArray[idx] = ""; + } else { + objectArray[idx] = str; + } + } else if ((type == int.class) || (type == Integer.class)) { + objectArray[idx] = inputStream.readInt(); + } else if ((type == long.class) || (type == Long.class)) { + objectArray[idx] = inputStream.readLong(); + } else if ((type == float.class) || (type == Float.class)) { + objectArray[idx] = inputStream.readFloat(); + } else if ((type == double.class) || (type == Double.class)) { + objectArray[idx] = inputStream.readDouble(); + } else if ((type == byte.class) || (type == Byte.class)) { + objectArray[idx] = inputStream.readByte(); + } else if ((type == short.class) || (type == Short.class)) { // NOPMD (short) + objectArray[idx] = inputStream.readShort(); + } else if ((type == boolean.class) || (type == Boolean.class)) { + objectArray[idx] = inputStream.readBoolean(); + } else { + if (inputStream.readByte() != 0) { + this.logger.error("Unexpected value for unsupported type: " + clazz.getName()); + return false; // breaking error (break would not terminate the correct loop) + } + this.logger.warn("Unsupported type: " + clazz.getName()); + objectArray[idx] = null; + } + + return true; + } } diff --git a/src/main/java/teetime/examples/throughput/ThroughputTimestampAnalysis.java b/src/main/java/teetime/examples/throughput/ThroughputTimestampAnalysis.java index 54c150d7f587cb4bb4d6a594f57119debe54314f..30faf7b8746e8a1f0b8c6eb1e0470164d2e6d10d 100644 --- a/src/main/java/teetime/examples/throughput/ThroughputTimestampAnalysis.java +++ b/src/main/java/teetime/examples/throughput/ThroughputTimestampAnalysis.java @@ -29,7 +29,6 @@ import teetime.framework.core.IPipeline; import teetime.framework.core.IStage; import teetime.framework.core.Pipeline; import teetime.framework.sequential.MethodCallPipe; -import teetime.framework.sequential.QueuePipe; import teetime.framework.sequential.ReservableQueuePipe; import teetime.stage.CollectorSink; import teetime.stage.NoopFilter; diff --git a/src/main/java/teetime/framework/concurrent/WorkerThread.java b/src/main/java/teetime/framework/concurrent/WorkerThread.java index 3b419628eebb43e57274931dcfa25c82a03d3058..e45b3ed829501f7e48ebdd73a21f1614735b24d9 100644 --- a/src/main/java/teetime/framework/concurrent/WorkerThread.java +++ b/src/main/java/teetime/framework/concurrent/WorkerThread.java @@ -45,8 +45,8 @@ public class WorkerThread extends Thread { private IStageScheduler stageScheduler; private StageStateManager stageStateManager; - private volatile StageTerminationPolicy terminationPolicy; - private volatile boolean shouldTerminate = false; + private StageTerminationPolicy terminationPolicy; + private boolean shouldTerminate = false; private final int accessesDeviceId; private int executedUnsuccessfullyCount; @@ -55,6 +55,10 @@ public class WorkerThread extends Thread { private final List<Long> durationPerXIterationsInNs = new LinkedList<Long>(); private int iterations; + private final StopWatch afterStageExecutionStopWatch = new StopWatch(); + + private final List<Long> afterStageExecutions = new LinkedList<Long>(); + public WorkerThread(final IPipeline pipeline, final int accessesDeviceId) { this.pipeline = pipeline; this.accessesDeviceId = accessesDeviceId; @@ -147,14 +151,14 @@ public class WorkerThread extends Thread { // stageExecutionStopWatch.end(); this.finishStageExecution(stage, executedSuccessfully); - // afterStageExecutionStopWatch.start(); + // this.afterStageExecutionStopWatch.start(); if (this.shouldTerminate) { this.executeTerminationPolicy(stage, executedSuccessfully); } this.stageScheduler.determineNextStage(stage, executedSuccessfully); - // afterStageExecutionStopWatch.end(); + // this.afterStageExecutionStopWatch.end(); // this.iterationStopWatch.end(); @@ -166,7 +170,11 @@ public class WorkerThread extends Thread { // final long schedulingOverhead = this.iterationStopWatch.getDurationInNs(); // final long schedulingOverhead = beforeStageExecutionStopWatch.getDurationInNs(); //327 // final long schedulingOverhead = stageExecutionStopWatch.getDurationInNs(); //1416 - // final long schedulingOverhead = afterStageExecutionStopWatch.getDurationInNs(); //2450 + // final long schedulingOverhead = this.afterStageExecutionStopWatch.getDurationInNs(); // 2450 + + // if (this.iterations > 50000 && this.iterations % 1000 == 0) + // this.afterStageExecutions.add(this.afterStageExecutionStopWatch.getDurationInNs()); + // rest: ~2000 (measurement overhead?) if ((this.iterations % NUM_ITERATIONS_TO_MEASURE) == 0) { this.stopWatch.end(); @@ -178,6 +186,8 @@ public class WorkerThread extends Thread { this.stopWatch.end(); this.durationPerXIterationsInNs.add(this.stopWatch.getDurationInNs()); + // System.out.println("avg: " + StatisticsUtil.calculateAverage(this.afterStageExecutions) + " ns"); + this.cleanUpDatastructures(); } diff --git a/src/main/java/teetime/framework/core/AbstractFilter.java b/src/main/java/teetime/framework/core/AbstractFilter.java index 50dd6e9d485815566ea522ff975858314eb63043..916a8985ff10d345c1d5c03296a5cd6419b67eb8 100644 --- a/src/main/java/teetime/framework/core/AbstractFilter.java +++ b/src/main/java/teetime/framework/core/AbstractFilter.java @@ -107,7 +107,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp try { success = this.execute(this.context); if (success) { // deprecated boolean return value - this.context.clear(); + this.context.commit(); } else { this.context.rollback(); } @@ -115,6 +115,7 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp this.context.rollback(); } catch (final Exception e) { this.logger.error("Error in stage execution", e); + throw new IllegalStateException(e); } return success; } diff --git a/src/main/java/teetime/framework/core/Context.java b/src/main/java/teetime/framework/core/Context.java index 86c23b8d791d131acb154bf581a38f5b092ef7bb..5a6e40fa39de6f79e17e1b539d4d729121078efb 100644 --- a/src/main/java/teetime/framework/core/Context.java +++ b/src/main/java/teetime/framework/core/Context.java @@ -29,6 +29,8 @@ public class Context<S extends IStage> { private long numPushedElements = 0; private long numTakenElements = 0; + private long numTakenElementsInCurrentTransaction = 0; + @SuppressWarnings("unchecked") public Context(final IStage owningStage, final List<IInputPort<S, ?>> allTargetPorts) { this.inputPortContainers = this.createInputPortLists(owningStage.getInputPorts()); @@ -92,10 +94,9 @@ public class Context<S extends IStage> { private final <T> void logTransaction(final IInputPort<S, T> inputPort, final T token) { // final InputPortContainer inputPortContainer = this.inputPortContainers[inputPort.getIndex()]; - // final List<Object> tokenList = this.pipesTakenFrom.get(inputPort); // inputPortContainer.takenElements.add(token); - // this.numTakenElements++; + this.numTakenElementsInCurrentTransaction++; } /** @@ -110,31 +111,43 @@ public class Context<S extends IStage> { return associatedPipe.read(); } - void clear() { + void commit() { // for (final List<Object> takenElements : this.pipesTakenFrom.values()) { for (final InputPortContainer inputPortContainer : this.inputPortContainers) { // inputPortContainer.takenElements.clear(); + IReservablePipe<Object> reservablePipe = (IReservablePipe<Object>) inputPortContainer.pipe; reservablePipe.commit(); } + + this.numTakenElements += this.numTakenElementsInCurrentTransaction; + this.numTakenElementsInCurrentTransaction = 0; + + for (final IOutputPort<S, ?> outputPort : this.outputPorts) { + if (outputPort != null) { + @SuppressWarnings("unchecked") + IReservablePipe<Object> reservablePipe = (IReservablePipe<Object>) outputPort.getAssociatedPipe(); + reservablePipe.commit(); + } + } } void rollback() { - // for (final Entry<IPipe<Object>, List<Object>> entry : this.pipesTakenFrom.entrySet()) { - // final IPipe<Object> associatedPipe = entry.getKey(); - // final List<Object> takenElements = entry.getValue(); - for (final InputPortContainer inputPortContainer : this.inputPortContainers) { - // for (int k = inputPortContainer.takenElements.size() - 1; k >= 0; k--) { // final Object element = inputPortContainer.takenElements.get(k); // inputPortContainer.pipe.put(element); // } - IReservablePipe<Object> reservablePipe = (IReservablePipe<Object>) inputPortContainer.pipe; reservablePipe.rollback(); + } - this.numTakenElements -= inputPortContainer.takenElements.size(); + for (final IOutputPort<S, ?> outputPort : this.outputPorts) { + if (outputPort != null) { + @SuppressWarnings("unchecked") + IReservablePipe<Object> reservablePipe = (IReservablePipe<Object>) outputPort.getAssociatedPipe(); + reservablePipe.rollback(); + } } } diff --git a/src/main/java/teetime/framework/scheduling/NextStageScheduler.java b/src/main/java/teetime/framework/scheduling/NextStageScheduler.java index 0455e76a053daa0883902d7c93e0a855a45be0d0..db71c1a5cc8f1a71e5d9097c0972f0d995aa857a 100644 --- a/src/main/java/teetime/framework/scheduling/NextStageScheduler.java +++ b/src/main/java/teetime/framework/scheduling/NextStageScheduler.java @@ -101,5 +101,6 @@ public final class NextStageScheduler implements IStageScheduler { if (this.workList.isEmpty()) { this.workList.pushAll(this.highestPrioritizedEnabledStages); } + } } diff --git a/src/main/java/teetime/framework/sequential/QueuePipe.java b/src/main/java/teetime/framework/sequential/QueuePipe.java index 17605b600e3199113787d00cf2c5f6ab57f9b6cb..b78284662eed868e3aedb136d616bfd771427531 100644 --- a/src/main/java/teetime/framework/sequential/QueuePipe.java +++ b/src/main/java/teetime/framework/sequential/QueuePipe.java @@ -15,12 +15,14 @@ ***************************************************************************/ package teetime.framework.sequential; -import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; +import java.util.Queue; import teetime.framework.core.AbstractPipe; import teetime.framework.core.IInputPort; import teetime.framework.core.IOutputPort; +import teetime.framework.core.IReservablePipe; import teetime.framework.core.ISink; import teetime.framework.core.ISource; import teetime.util.concurrent.workstealing.CircularWorkStealingDeque; @@ -30,9 +32,10 @@ import teetime.util.concurrent.workstealing.CircularWorkStealingDeque; * * @since 1.10 */ -public class QueuePipe<T> extends AbstractPipe<T> { +public class QueuePipe<T> extends AbstractPipe<T> implements IReservablePipe<T> { - private final List<T> queue = new ArrayList<T>(10); + // private final List<T> queue = new ArrayList<T>(10); + private final Queue<T> queue = new LinkedList<T>(); // private final List<T> queue = new ReservableArrayList<T>(10); @@ -58,8 +61,8 @@ public class QueuePipe<T> extends AbstractPipe<T> { @Override public T tryTakeInternal() { - // return this.queue.poll(); - return this.queue.remove(this.queue.size() - 1); + return this.queue.poll(); + // return this.queue.remove(this.queue.size() - 1); // return this.queue.pop(); } @@ -74,8 +77,8 @@ public class QueuePipe<T> extends AbstractPipe<T> { @Override public T read() { - // return this.queue.peek(); - return this.queue.get(this.queue.size() - 1); + return this.queue.peek(); + // return this.queue.get(this.queue.size() - 1); // return this.queue.read(); } @@ -90,4 +93,16 @@ public class QueuePipe<T> extends AbstractPipe<T> { // return this.queue.size() == 0; } + @Override + public void commit() { + // TODO Auto-generated method stub + + } + + @Override + public void rollback() { + // TODO Auto-generated method stub + + } + } diff --git a/src/main/java/teetime/framework/sequential/ReservableQueuePipe.java b/src/main/java/teetime/framework/sequential/ReservableQueuePipe.java index 8165cd350e100d4644a17217ef4d6487c7ce5f48..15afb470620d7e74a24d8475a1a4f9d3443004b6 100644 --- a/src/main/java/teetime/framework/sequential/ReservableQueuePipe.java +++ b/src/main/java/teetime/framework/sequential/ReservableQueuePipe.java @@ -1,5 +1,7 @@ package teetime.framework.sequential; +import java.util.List; + import teetime.framework.core.IInputPort; import teetime.framework.core.IOutputPort; import teetime.framework.core.IReservablePipe; @@ -47,4 +49,14 @@ public class ReservableQueuePipe<T> extends QueuePipe<T> implements IReservableP return this.reservableQueue.isEmpty(); } + @Override + public void putMultiple(final List<T> elements) { + throw new IllegalStateException(); + } + + @Override + public List<?> tryTakeMultiple(final int numElementsToTake) { + throw new IllegalStateException(); + } + } diff --git a/src/main/java/teetime/stage/basic/ObjectProducer.java b/src/main/java/teetime/stage/basic/ObjectProducer.java index c94c639789ef0d84d0254a702083333a64000d0a..0c39313de2f73d3194d8ff05b20cf44eeb141eff 100644 --- a/src/main/java/teetime/stage/basic/ObjectProducer.java +++ b/src/main/java/teetime/stage/basic/ObjectProducer.java @@ -58,7 +58,7 @@ public class ObjectProducer<T> extends AbstractFilter<ObjectProducer<T>> { final T newObject = this.objectCreator.call(); context.put(this.outputPort, newObject); } catch (final Exception e) { - throw new IllegalStateException(); + throw new IllegalStateException(e); } this.numObjectsToCreate--; diff --git a/src/main/java/teetime/util/list/ReservableArrayList.java b/src/main/java/teetime/util/list/ReservableArrayList.java index 5c014ea9b984afef95ef83b533de486416a31b10..67e8966076e93a60b03ced8ec08ca28e3a8a3d8a 100644 --- a/src/main/java/teetime/util/list/ReservableArrayList.java +++ b/src/main/java/teetime/util/list/ReservableArrayList.java @@ -17,6 +17,9 @@ public class ReservableArrayList<T> implements List<T> { } public void reservedAdd(final T element) { + if (this.lastFreeReservedIndex == this.elements.length) { + throw new IllegalStateException("not enough space"); + } this.elements[this.lastFreeReservedIndex++] = element; } @@ -64,9 +67,8 @@ public class ReservableArrayList<T> implements List<T> { @Override public boolean add(final T e) { - this.elements[this.lastFreeIndex++] = e; - this.lastFreeReservedIndex = this.lastFreeIndex; - return true; + // TODO Auto-generated method stub + return false; } @Override @@ -111,7 +113,10 @@ public class ReservableArrayList<T> implements List<T> { } @Override - public final T get(final int index) { + public T get(final int index) { + if (index < 0) { + return null; + } T element = this.elements[index]; return element; } @@ -170,7 +175,7 @@ public class ReservableArrayList<T> implements List<T> { } public T reservedRemoveLast() { - T element = this.get(this.lastFreeReservedIndex--); + T element = this.get(--this.lastFreeReservedIndex); return element; } } diff --git a/src/test/java/teetime/util/list/ReservableArrayListTest.java b/src/test/java/teetime/util/list/ReservableArrayListTest.java index 0c33dedfc6b244dbf5a9b33017118182996f755e..680b8c390bac926aaabc5d6564ef131ffff08e20 100644 --- a/src/test/java/teetime/util/list/ReservableArrayListTest.java +++ b/src/test/java/teetime/util/list/ReservableArrayListTest.java @@ -32,4 +32,19 @@ public class ReservableArrayListTest { Assert.assertTrue(reservableArrayList.isEmpty()); // Assert.assertEquals(element, reservableArrayList.getLast()); } + + @Test + public void testRemove() throws Exception { + ReservableArrayList<Object> reservableArrayList = new ReservableArrayList<Object>(10); + Object element = new Object(); + reservableArrayList.reservedAdd(element); + reservableArrayList.commit(); + + Assert.assertEquals(element, reservableArrayList.reservedRemoveLast()); + Assert.assertFalse(reservableArrayList.isEmpty()); + + reservableArrayList.commit(); + + Assert.assertTrue(reservableArrayList.isEmpty()); + } }