From 5cae91408dd04a569c3d6af71de21a001a8107a7 Mon Sep 17 00:00:00 2001
From: Christian Wulf <chw@informatik.uni-kiel.de>
Date: Fri, 13 Jun 2014 22:07:49 +0100
Subject: [PATCH] added CommittableQueue

---
 .../teetime/framework/core/AbstractPipe.java  |  12 +-
 .../framework/sequential/MethodCallPipe.java  |  20 +-
 .../sequential/ReservableQueuePipe.java       |  11 +-
 .../workstealing/CircularArray.java           |   2 +-
 .../java/teetime/util/list/ArrayPool.java     |  24 +++
 .../teetime/util/list/CommittableQueue.java   |  27 +++
 .../list/CommittableResizableArrayQueue.java  | 105 ++++++++++
 .../teetime/util/list/ListContainerPool.java  |   2 +-
 .../java/teetime/util/list/ObjectPool.java    |   4 +-
 .../util/list/ObjectPooledLinkedList.java     |   2 +-
 .../util/list/ReservableArrayList.java        | 181 ------------------
 ...> CommittableResizableArrayQueueTest.java} |  18 +-
 12 files changed, 202 insertions(+), 206 deletions(-)
 create mode 100644 src/main/java/teetime/util/list/ArrayPool.java
 create mode 100644 src/main/java/teetime/util/list/CommittableQueue.java
 create mode 100644 src/main/java/teetime/util/list/CommittableResizableArrayQueue.java
 delete mode 100644 src/main/java/teetime/util/list/ReservableArrayList.java
 rename src/test/java/teetime/util/list/{ReservableArrayListTest.java => CommittableResizableArrayQueueTest.java} (55%)

diff --git a/src/main/java/teetime/framework/core/AbstractPipe.java b/src/main/java/teetime/framework/core/AbstractPipe.java
index 14ed5f78..214e014a 100644
--- a/src/main/java/teetime/framework/core/AbstractPipe.java
+++ b/src/main/java/teetime/framework/core/AbstractPipe.java
@@ -18,9 +18,9 @@ package teetime.framework.core;
 
 /**
  * @author Christian Wulf
- *
+ * 
  * @since 1.10
- *
+ * 
  * @param <T>
  *            The type of the pipe
  * @param <P>
@@ -28,9 +28,11 @@ package teetime.framework.core;
  */
 public abstract class AbstractPipe<T> implements IPipe<T> {
 
+	public static final Object EMPTY_OBJECT = null;
+
 	/**
 	 * @author Christian Wulf
-	 *
+	 * 
 	 * @since 1.10
 	 */
 	public enum PipeState {
@@ -91,7 +93,7 @@ public abstract class AbstractPipe<T> implements IPipe<T> {
 
 	/**
 	 * This method is called exactly once iff the pipeline is started.
-	 *
+	 * 
 	 * @since 1.10
 	 */
 	public void onPipelineStarts() {
@@ -109,7 +111,7 @@ public abstract class AbstractPipe<T> implements IPipe<T> {
 
 	/**
 	 * This method is called exactly once iff the pipeline is stopped.
-	 *
+	 * 
 	 * @since 1.10
 	 */
 	public void onPipelineStops() {
diff --git a/src/main/java/teetime/framework/sequential/MethodCallPipe.java b/src/main/java/teetime/framework/sequential/MethodCallPipe.java
index cff55296..2a18cc0d 100644
--- a/src/main/java/teetime/framework/sequential/MethodCallPipe.java
+++ b/src/main/java/teetime/framework/sequential/MethodCallPipe.java
@@ -21,6 +21,7 @@ import java.util.List;
 import teetime.framework.core.AbstractPipe;
 import teetime.framework.core.IInputPort;
 import teetime.framework.core.IOutputPort;
+import teetime.framework.core.IReservablePipe;
 import teetime.framework.core.ISink;
 import teetime.framework.core.ISource;
 
@@ -29,7 +30,7 @@ import teetime.framework.core.ISource;
  * 
  * @since 1.10
  */
-public class MethodCallPipe<T> extends AbstractPipe<T> {
+public class MethodCallPipe<T> extends AbstractPipe<T> implements IReservablePipe<T> {
 
 	private T storedToken;
 
@@ -60,18 +61,22 @@ public class MethodCallPipe<T> extends AbstractPipe<T> {
 		return temp;
 	}
 
+	@Override
 	public T take() {
 		return this.tryTake();
 	}
 
+	@Override
 	public T read() {
 		return this.storedToken;
 	}
 
+	@Override
 	public void putMultiple(final List<T> items) {
 		throw new IllegalStateException("Putting more than one element is not possible. You tried to put " + items.size() + " items.");
 	}
 
+	@Override
 	public List<?> tryTakeMultiple(final int numElementsToTake) {
 		throw new IllegalStateException("Taking more than one element is not possible. You tried to take " + numElementsToTake + " items.");
 	}
@@ -80,8 +85,21 @@ public class MethodCallPipe<T> extends AbstractPipe<T> {
 		// is not needed in a synchronous execution
 	}
 
+	@Override
 	public boolean isEmpty() {
 		return this.storedToken == null;
 	}
 
+	@Override
+	public void commit() {
+		// TODO Auto-generated method stub
+
+	}
+
+	@Override
+	public void rollback() {
+		// TODO Auto-generated method stub
+
+	}
+
 }
diff --git a/src/main/java/teetime/framework/sequential/ReservableQueuePipe.java b/src/main/java/teetime/framework/sequential/ReservableQueuePipe.java
index 15afb470..9cce059c 100644
--- a/src/main/java/teetime/framework/sequential/ReservableQueuePipe.java
+++ b/src/main/java/teetime/framework/sequential/ReservableQueuePipe.java
@@ -7,11 +7,12 @@ import teetime.framework.core.IOutputPort;
 import teetime.framework.core.IReservablePipe;
 import teetime.framework.core.ISink;
 import teetime.framework.core.ISource;
-import teetime.util.list.ReservableArrayList;
+import teetime.util.list.CommittableQueue;
+import teetime.util.list.CommittableResizableArrayQueue;
 
 public class ReservableQueuePipe<T> extends QueuePipe<T> implements IReservablePipe<T> {
 
-	private final ReservableArrayList<T> reservableQueue = new ReservableArrayList<T>(10);
+	private final CommittableQueue<T> reservableQueue = new CommittableResizableArrayQueue<T>(EMPTY_OBJECT, 10);
 
 	static public <S0 extends ISource, S1 extends ISink<S1>, T> void connect(final IOutputPort<S0, ? extends T> sourcePort, final IInputPort<S1, T> targetPort) {
 		final QueuePipe<T> pipe = new ReservableQueuePipe<T>();
@@ -31,17 +32,17 @@ public class ReservableQueuePipe<T> extends QueuePipe<T> implements IReservableP
 
 	@Override
 	public void putInternal(final T element) {
-		this.reservableQueue.reservedAdd(element);
+		this.reservableQueue.addToTailUncommitted(element);
 	}
 
 	@Override
 	public T tryTakeInternal() {
-		return this.reservableQueue.reservedRemoveLast();
+		return this.reservableQueue.removeFromHeadUncommitted();
 	}
 
 	@Override
 	public T read() {
-		return this.reservableQueue.getLast();
+		return this.reservableQueue.getTail();
 	}
 
 	@Override
diff --git a/src/main/java/teetime/util/concurrent/workstealing/CircularArray.java b/src/main/java/teetime/util/concurrent/workstealing/CircularArray.java
index d1daf143..f48def9d 100644
--- a/src/main/java/teetime/util/concurrent/workstealing/CircularArray.java
+++ b/src/main/java/teetime/util/concurrent/workstealing/CircularArray.java
@@ -45,7 +45,7 @@ public class CircularArray<T> {
 	}
 
 	public long getCapacity() {
-		return 1 << this.logSize;
+		return this.segment.length;
 	}
 
 	public T get(final long i) {
diff --git a/src/main/java/teetime/util/list/ArrayPool.java b/src/main/java/teetime/util/list/ArrayPool.java
new file mode 100644
index 00000000..b21216ad
--- /dev/null
+++ b/src/main/java/teetime/util/list/ArrayPool.java
@@ -0,0 +1,24 @@
+package teetime.util.list;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ArrayPool<T> {
+
+	// BETTER use a map with int as key due to performance
+	private final Map<Integer, T[]> cache = new HashMap<Integer, T[]>();
+
+	@SuppressWarnings("unchecked")
+	public T[] acquire(final int capacity) {
+		T[] array = this.cache.get(capacity);
+		if (array == null) {
+			array = (T[]) new Object[capacity];
+		}
+		return array;
+	}
+
+	public void release(final T[] array) {
+		this.cache.put(array.length, array);
+	}
+
+}
diff --git a/src/main/java/teetime/util/list/CommittableQueue.java b/src/main/java/teetime/util/list/CommittableQueue.java
new file mode 100644
index 00000000..83f43f1b
--- /dev/null
+++ b/src/main/java/teetime/util/list/CommittableQueue.java
@@ -0,0 +1,27 @@
+package teetime.util.list;
+
+public interface CommittableQueue<T> {
+
+	// basic methods
+	T get(int index);
+
+	void addToTailUncommitted(T element);
+
+	T removeFromHeadUncommitted();
+
+	void commit();
+
+	void rollback();
+
+	int size();
+
+	boolean isEmpty();
+
+	void clear();
+
+	// convenient methods
+	// T removeFromHeadUncommitted(int count);
+
+	T getTail();
+
+}
diff --git a/src/main/java/teetime/util/list/CommittableResizableArrayQueue.java b/src/main/java/teetime/util/list/CommittableResizableArrayQueue.java
new file mode 100644
index 00000000..987552ad
--- /dev/null
+++ b/src/main/java/teetime/util/list/CommittableResizableArrayQueue.java
@@ -0,0 +1,105 @@
+package teetime.util.list;
+
+public class CommittableResizableArrayQueue<T> implements CommittableQueue<T> {
+
+	private final ArrayPool<T> arrayPool;
+	private T[] elements;
+
+	private int lastFreeIndex, lastFreeIndexUncommitted;
+
+	@SuppressWarnings("unchecked")
+	public CommittableResizableArrayQueue(final Object emptyObject, final int initialCapacity) {
+		super();
+		this.arrayPool = new ArrayPool<T>();
+		this.elements = this.arrayPool.acquire(initialCapacity);
+
+		this.elements[0] = (T) emptyObject; // optimization: avoids the use of an index out-of-bounds check
+		this.clear();
+	}
+
+	@Override
+	public T get(final int index) {
+		T element = this.elements[index + 1];
+		return element;
+	}
+
+	@Override
+	public void addToTailUncommitted(final T element) {
+		if (this.lastFreeIndexUncommitted == this.capacity()) {
+			this.grow();
+		}
+		this.put(this.lastFreeIndexUncommitted++, element);
+	}
+
+	@Override
+	public T removeFromHeadUncommitted() {
+		if (this.lastFreeIndexUncommitted < this.capacity() / 2) {
+			this.shrink();
+		}
+		T element = this.get(--this.lastFreeIndexUncommitted);
+		return element;
+	}
+
+	@Override
+	public void commit() {
+		// TODO set elements to null to help the gc
+		this.lastFreeIndex = this.lastFreeIndexUncommitted;
+	}
+
+	@Override
+	public void rollback() {
+		this.lastFreeIndexUncommitted = this.lastFreeIndex;
+	}
+
+	@Override
+	public int size() {
+		return this.lastFreeIndex;
+	}
+
+	@Override
+	public boolean isEmpty() {
+		return this.size() == 0;
+	}
+
+	@Override
+	public void clear() {
+		this.lastFreeIndex = this.lastFreeIndexUncommitted = 0;
+	}
+
+	@Override
+	public T getTail() {
+		T element = this.get(this.lastFreeIndex - 1);
+		return element;
+	}
+
+	private void grow() {
+		T[] newElements = this.arrayPool.acquire(this.capacity() * 2);
+		this.replaceCurrentArrayBy(newElements);
+	}
+
+	private void shrink() {
+		T[] newElements = this.arrayPool.acquire(this.capacity() / 2);
+		this.replaceCurrentArrayBy(newElements);
+	}
+
+	private final void replaceCurrentArrayBy(final T[] newElements) {
+		this.copyArray(this.elements, newElements);
+		this.arrayPool.release(this.elements);
+		this.elements = newElements;
+	}
+
+	private final void copyArray(final T[] elements, final T[] newElements) {
+		// for (int i = 0; i < this.lastFreeIndexUncommitted; i++) {
+		// newElements[i] = elements[i];
+		// }
+		System.arraycopy(elements, 0, newElements, 0, this.lastFreeIndexUncommitted + 1);
+	}
+
+	private final void put(final int index, final T element) {
+		this.elements[index + 1] = element;
+	}
+
+	private final int capacity() {
+		return this.elements.length - 1;
+	}
+}
diff --git a/src/main/java/teetime/util/list/ListContainerPool.java b/src/main/java/teetime/util/list/ListContainerPool.java
index 2860f112..d2a27ecc 100644
--- a/src/main/java/teetime/util/list/ListContainerPool.java
+++ b/src/main/java/teetime/util/list/ListContainerPool.java
@@ -14,7 +14,7 @@ public class ListContainerPool<T> implements ObjectPool<ListContainer<T>> {
 	}
 
 	@Override
-	public ListContainer<T> get() {
+	public ListContainer<T> acquire() {
 		ListContainer<T> obj;
 		if (this.pool.size() > 0) {
 			obj = this.pool.remove(this.pool.size() - 1);
diff --git a/src/main/java/teetime/util/list/ObjectPool.java b/src/main/java/teetime/util/list/ObjectPool.java
index acf01119..e1820468 100644
--- a/src/main/java/teetime/util/list/ObjectPool.java
+++ b/src/main/java/teetime/util/list/ObjectPool.java
@@ -2,8 +2,8 @@ package teetime.util.list;
 
 public interface ObjectPool<T> {
 
-	T get();
+	T acquire();
 
-	void release(T obj);
+	void release(T element);
 
 }
diff --git a/src/main/java/teetime/util/list/ObjectPooledLinkedList.java b/src/main/java/teetime/util/list/ObjectPooledLinkedList.java
index 567d66f9..b0744508 100644
--- a/src/main/java/teetime/util/list/ObjectPooledLinkedList.java
+++ b/src/main/java/teetime/util/list/ObjectPooledLinkedList.java
@@ -30,7 +30,7 @@ public class ObjectPooledLinkedList<T> {
 	}
 
 	public void push(final T element) {
-		ListContainer<T> listContainer = this.objectPool.get();
+		ListContainer<T> listContainer = this.objectPool.acquire();
 		listContainer.previous = this.top;
 		listContainer.value = element;
 		this.top = listContainer;
diff --git a/src/main/java/teetime/util/list/ReservableArrayList.java b/src/main/java/teetime/util/list/ReservableArrayList.java
deleted file mode 100644
index 67e89660..00000000
--- a/src/main/java/teetime/util/list/ReservableArrayList.java
+++ /dev/null
@@ -1,181 +0,0 @@
-package teetime.util.list;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.ListIterator;
-
-public class ReservableArrayList<T> implements List<T> {
-
-	private final T[] elements;
-
-	private int lastFreeIndex, lastFreeReservedIndex;
-
-	@SuppressWarnings("unchecked")
-	public ReservableArrayList(final int initialSize) {
-		this.elements = (T[]) new Object[initialSize];
-	}
-
-	public void reservedAdd(final T element) {
-		if (this.lastFreeReservedIndex == this.elements.length) {
-			throw new IllegalStateException("not enough space");
-		}
-		this.elements[this.lastFreeReservedIndex++] = element;
-	}
-
-	public void commit() {
-		// TODO set elements to null
-		this.lastFreeIndex = this.lastFreeReservedIndex;
-	}
-
-	public void rollback() {
-		this.lastFreeReservedIndex = this.lastFreeIndex;
-	}
-
-	@Override
-	public int size() {
-		return this.lastFreeIndex;
-	}
-
-	@Override
-	public boolean isEmpty() {
-		return this.size() == 0;
-	}
-
-	@Override
-	public boolean contains(final Object o) {
-		// TODO Auto-generated method stub
-		return false;
-	}
-
-	@Override
-	public Iterator<T> iterator() {
-		// TODO Auto-generated method stub
-		return null;
-	}
-
-	@Override
-	public Object[] toArray() {
-		return this.elements;
-	}
-
-	@Override
-	public <T> T[] toArray(final T[] a) {
-		// TODO Auto-generated method stub
-		return null;
-	}
-
-	@Override
-	public boolean add(final T e) {
-		// TODO Auto-generated method stub
-		return false;
-	}
-
-	@Override
-	public boolean remove(final Object o) {
-		// TODO Auto-generated method stub
-		return false;
-	}
-
-	@Override
-	public boolean containsAll(final Collection<?> c) {
-		// TODO Auto-generated method stub
-		return false;
-	}
-
-	@Override
-	public boolean addAll(final Collection<? extends T> c) {
-		// TODO Auto-generated method stub
-		return false;
-	}
-
-	@Override
-	public boolean addAll(final int index, final Collection<? extends T> c) {
-		// TODO Auto-generated method stub
-		return false;
-	}
-
-	@Override
-	public boolean removeAll(final Collection<?> c) {
-		// TODO Auto-generated method stub
-		return false;
-	}
-
-	@Override
-	public boolean retainAll(final Collection<?> c) {
-		// TODO Auto-generated method stub
-		return false;
-	}
-
-	@Override
-	public void clear() {
-		this.lastFreeIndex = this.lastFreeReservedIndex = 0;
-	}
-
-	@Override
-	public T get(final int index) {
-		if (index < 0) {
-			return null;
-		}
-		T element = this.elements[index];
-		return element;
-	}
-
-	@Override
-	public T set(final int index, final T element) {
-		// TODO Auto-generated method stub
-		return null;
-	}
-
-	@Override
-	public void add(final int index, final T element) {
-		// TODO Auto-generated method stub
-
-	}
-
-	@Override
-	public T remove(final int index) {
-		// TODO Auto-generated method stub
-		return null;
-	}
-
-	@Override
-	public int indexOf(final Object o) {
-		// TODO Auto-generated method stub
-		return 0;
-	}
-
-	@Override
-	public int lastIndexOf(final Object o) {
-		// TODO Auto-generated method stub
-		return 0;
-	}
-
-	@Override
-	public ListIterator<T> listIterator() {
-		// TODO Auto-generated method stub
-		return null;
-	}
-
-	@Override
-	public ListIterator<T> listIterator(final int index) {
-		// TODO Auto-generated method stub
-		return null;
-	}
-
-	@Override
-	public List<T> subList(final int fromIndex, final int toIndex) {
-		// TODO Auto-generated method stub
-		return null;
-	}
-
-	public T getLast() {
-		T element = this.get(this.lastFreeIndex - 1);
-		return element;
-	}
-
-	public T reservedRemoveLast() {
-		T element = this.get(--this.lastFreeReservedIndex);
-		return element;
-	}
-}
diff --git a/src/test/java/teetime/util/list/ReservableArrayListTest.java b/src/test/java/teetime/util/list/CommittableResizableArrayQueueTest.java
similarity index 55%
rename from src/test/java/teetime/util/list/ReservableArrayListTest.java
rename to src/test/java/teetime/util/list/CommittableResizableArrayQueueTest.java
index 680b8c39..d78fd182 100644
--- a/src/test/java/teetime/util/list/ReservableArrayListTest.java
+++ b/src/test/java/teetime/util/list/CommittableResizableArrayQueueTest.java
@@ -3,27 +3,27 @@ package teetime.util.list;
 import org.junit.Assert;
 import org.junit.Test;
 
-public class ReservableArrayListTest {
+public class CommittableResizableArrayQueueTest {
 
 	@Test
 	public void testCommit() throws Exception {
-		ReservableArrayList<Object> reservableArrayList = new ReservableArrayList<Object>(10);
+		CommittableResizableArrayQueue<Object> reservableArrayList = new CommittableResizableArrayQueue<Object>(null, 10);
 		Object element = new Object();
-		reservableArrayList.reservedAdd(element);
+		reservableArrayList.addToTailUncommitted(element);
 
 		Assert.assertTrue(reservableArrayList.isEmpty());
 
 		reservableArrayList.commit();
 
 		Assert.assertFalse(reservableArrayList.isEmpty());
-		Assert.assertEquals(element, reservableArrayList.getLast());
+		Assert.assertEquals(element, reservableArrayList.getTail());
 	}
 
 	@Test
 	public void testRollback() throws Exception {
-		ReservableArrayList<Object> reservableArrayList = new ReservableArrayList<Object>(10);
+		CommittableResizableArrayQueue<Object> reservableArrayList = new CommittableResizableArrayQueue<Object>(null, 10);
 		Object element = new Object();
-		reservableArrayList.reservedAdd(element);
+		reservableArrayList.addToTailUncommitted(element);
 
 		Assert.assertTrue(reservableArrayList.isEmpty());
 
@@ -35,12 +35,12 @@ public class ReservableArrayListTest {
 
 	@Test
 	public void testRemove() throws Exception {
-		ReservableArrayList<Object> reservableArrayList = new ReservableArrayList<Object>(10);
+		CommittableResizableArrayQueue<Object> reservableArrayList = new CommittableResizableArrayQueue<Object>(null, 10);
 		Object element = new Object();
-		reservableArrayList.reservedAdd(element);
+		reservableArrayList.addToTailUncommitted(element);
 		reservableArrayList.commit();
 
-		Assert.assertEquals(element, reservableArrayList.reservedRemoveLast());
+		Assert.assertEquals(element, reservableArrayList.removeFromHeadUncommitted());
 		Assert.assertFalse(reservableArrayList.isEmpty());
 
 		reservableArrayList.commit();
-- 
GitLab