Skip to content
Snippets Groups Projects
Commit 5cae9140 authored by Christian Wulf's avatar Christian Wulf
Browse files

added CommittableQueue

parent 19d73980
No related branches found
No related tags found
No related merge requests found
Showing
with 202 additions and 206 deletions
......@@ -18,9 +18,9 @@ package teetime.framework.core;
/**
* @author Christian Wulf
*
*
* @since 1.10
*
*
* @param <T>
* The type of the pipe
* @param <P>
......@@ -28,9 +28,11 @@ package teetime.framework.core;
*/
public abstract class AbstractPipe<T> implements IPipe<T> {
public static final Object EMPTY_OBJECT = null;
/**
* @author Christian Wulf
*
*
* @since 1.10
*/
public enum PipeState {
......@@ -91,7 +93,7 @@ public abstract class AbstractPipe<T> implements IPipe<T> {
/**
* This method is called exactly once iff the pipeline is started.
*
*
* @since 1.10
*/
public void onPipelineStarts() {
......@@ -109,7 +111,7 @@ public abstract class AbstractPipe<T> implements IPipe<T> {
/**
* This method is called exactly once iff the pipeline is stopped.
*
*
* @since 1.10
*/
public void onPipelineStops() {
......
......@@ -21,6 +21,7 @@ import java.util.List;
import teetime.framework.core.AbstractPipe;
import teetime.framework.core.IInputPort;
import teetime.framework.core.IOutputPort;
import teetime.framework.core.IReservablePipe;
import teetime.framework.core.ISink;
import teetime.framework.core.ISource;
......@@ -29,7 +30,7 @@ import teetime.framework.core.ISource;
*
* @since 1.10
*/
public class MethodCallPipe<T> extends AbstractPipe<T> {
public class MethodCallPipe<T> extends AbstractPipe<T> implements IReservablePipe<T> {
private T storedToken;
......@@ -60,18 +61,22 @@ public class MethodCallPipe<T> extends AbstractPipe<T> {
return temp;
}
@Override
public T take() {
return this.tryTake();
}
@Override
public T read() {
return this.storedToken;
}
@Override
public void putMultiple(final List<T> items) {
throw new IllegalStateException("Putting more than one element is not possible. You tried to put " + items.size() + " items.");
}
@Override
public List<?> tryTakeMultiple(final int numElementsToTake) {
throw new IllegalStateException("Taking more than one element is not possible. You tried to take " + numElementsToTake + " items.");
}
......@@ -80,8 +85,21 @@ public class MethodCallPipe<T> extends AbstractPipe<T> {
// is not needed in a synchronous execution
}
@Override
public boolean isEmpty() {
return this.storedToken == null;
}
@Override
public void commit() {
// TODO Auto-generated method stub
}
@Override
public void rollback() {
// TODO Auto-generated method stub
}
}
......@@ -7,11 +7,12 @@ import teetime.framework.core.IOutputPort;
import teetime.framework.core.IReservablePipe;
import teetime.framework.core.ISink;
import teetime.framework.core.ISource;
import teetime.util.list.ReservableArrayList;
import teetime.util.list.CommittableQueue;
import teetime.util.list.CommittableResizableArrayQueue;
public class ReservableQueuePipe<T> extends QueuePipe<T> implements IReservablePipe<T> {
private final ReservableArrayList<T> reservableQueue = new ReservableArrayList<T>(10);
private final CommittableQueue<T> reservableQueue = new CommittableResizableArrayQueue<T>(EMPTY_OBJECT, 10);
static public <S0 extends ISource, S1 extends ISink<S1>, T> void connect(final IOutputPort<S0, ? extends T> sourcePort, final IInputPort<S1, T> targetPort) {
final QueuePipe<T> pipe = new ReservableQueuePipe<T>();
......@@ -31,17 +32,17 @@ public class ReservableQueuePipe<T> extends QueuePipe<T> implements IReservableP
@Override
public void putInternal(final T element) {
this.reservableQueue.reservedAdd(element);
this.reservableQueue.addToTailUncommitted(element);
}
@Override
public T tryTakeInternal() {
return this.reservableQueue.reservedRemoveLast();
return this.reservableQueue.removeFromHeadUncommitted();
}
@Override
public T read() {
return this.reservableQueue.getLast();
return this.reservableQueue.getTail();
}
@Override
......
......@@ -45,7 +45,7 @@ public class CircularArray<T> {
}
public long getCapacity() {
return 1 << this.logSize;
return this.segment.length;
}
public T get(final long i) {
......
package teetime.util.list;
import java.util.HashMap;
import java.util.Map;
public class ArrayPool<T> {
// BETTER use a map with int as key due to performance
private final Map<Integer, T[]> cache = new HashMap<Integer, T[]>();
@SuppressWarnings("unchecked")
public T[] acquire(final int capacity) {
T[] array = this.cache.get(capacity);
if (array == null) {
array = (T[]) new Object[capacity];
}
return array;
}
public void release(final T[] array) {
this.cache.put(array.length, array);
}
}
package teetime.util.list;
public interface CommittableQueue<T> {
// basic methods
T get(int index);
void addToTailUncommitted(T element);
T removeFromHeadUncommitted();
void commit();
void rollback();
int size();
boolean isEmpty();
void clear();
// convenient methods
// T removeFromHeadUncommitted(int count);
T getTail();
}
package teetime.util.list;
public class CommittableResizableArrayQueue<T> implements CommittableQueue<T> {
private final ArrayPool<T> arrayPool;
private T[] elements;
private int lastFreeIndex, lastFreeIndexUncommitted;
@SuppressWarnings("unchecked")
public CommittableResizableArrayQueue(final Object emptyObject, final int initialCapacity) {
super();
this.arrayPool = new ArrayPool<T>();
this.elements = this.arrayPool.acquire(initialCapacity);
this.elements[0] = (T) emptyObject; // optimization: avoids the use of an index out-of-bounds check
this.clear();
}
@Override
public T get(final int index) {
T element = this.elements[index + 1];
return element;
}
@Override
public void addToTailUncommitted(final T element) {
if (this.lastFreeIndexUncommitted == this.capacity()) {
this.grow();
}
this.put(this.lastFreeIndexUncommitted++, element);
}
@Override
public T removeFromHeadUncommitted() {
if (this.lastFreeIndexUncommitted < this.capacity() / 2) {
this.shrink();
}
T element = this.get(--this.lastFreeIndexUncommitted);
return element;
}
@Override
public void commit() {
// TODO set elements to null to help the gc
this.lastFreeIndex = this.lastFreeIndexUncommitted;
}
@Override
public void rollback() {
this.lastFreeIndexUncommitted = this.lastFreeIndex;
}
@Override
public int size() {
return this.lastFreeIndex;
}
@Override
public boolean isEmpty() {
return this.size() == 0;
}
@Override
public void clear() {
this.lastFreeIndex = this.lastFreeIndexUncommitted = 0;
}
@Override
public T getTail() {
T element = this.get(this.lastFreeIndex - 1);
return element;
}
private void grow() {
T[] newElements = this.arrayPool.acquire(this.capacity() * 2);
this.replaceCurrentArrayBy(newElements);
}
private void shrink() {
T[] newElements = this.arrayPool.acquire(this.capacity() / 2);
this.replaceCurrentArrayBy(newElements);
}
private final void replaceCurrentArrayBy(final T[] newElements) {
this.copyArray(this.elements, newElements);
this.arrayPool.release(this.elements);
this.elements = newElements;
}
private final void copyArray(final T[] elements, final T[] newElements) {
// for (int i = 0; i < this.lastFreeIndexUncommitted; i++) {
// newElements[i] = elements[i];
// }
System.arraycopy(elements, 0, newElements, 0, this.lastFreeIndexUncommitted + 1);
}
private final void put(final int index, final T element) {
this.elements[index + 1] = element;
}
private final int capacity() {
return this.elements.length - 1;
}
}
......@@ -14,7 +14,7 @@ public class ListContainerPool<T> implements ObjectPool<ListContainer<T>> {
}
@Override
public ListContainer<T> get() {
public ListContainer<T> acquire() {
ListContainer<T> obj;
if (this.pool.size() > 0) {
obj = this.pool.remove(this.pool.size() - 1);
......
......@@ -2,8 +2,8 @@ package teetime.util.list;
public interface ObjectPool<T> {
T get();
T acquire();
void release(T obj);
void release(T element);
}
......@@ -30,7 +30,7 @@ public class ObjectPooledLinkedList<T> {
}
public void push(final T element) {
ListContainer<T> listContainer = this.objectPool.get();
ListContainer<T> listContainer = this.objectPool.acquire();
listContainer.previous = this.top;
listContainer.value = element;
this.top = listContainer;
......
package teetime.util.list;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
public class ReservableArrayList<T> implements List<T> {
private final T[] elements;
private int lastFreeIndex, lastFreeReservedIndex;
@SuppressWarnings("unchecked")
public ReservableArrayList(final int initialSize) {
this.elements = (T[]) new Object[initialSize];
}
public void reservedAdd(final T element) {
if (this.lastFreeReservedIndex == this.elements.length) {
throw new IllegalStateException("not enough space");
}
this.elements[this.lastFreeReservedIndex++] = element;
}
public void commit() {
// TODO set elements to null
this.lastFreeIndex = this.lastFreeReservedIndex;
}
public void rollback() {
this.lastFreeReservedIndex = this.lastFreeIndex;
}
@Override
public int size() {
return this.lastFreeIndex;
}
@Override
public boolean isEmpty() {
return this.size() == 0;
}
@Override
public boolean contains(final Object o) {
// TODO Auto-generated method stub
return false;
}
@Override
public Iterator<T> iterator() {
// TODO Auto-generated method stub
return null;
}
@Override
public Object[] toArray() {
return this.elements;
}
@Override
public <T> T[] toArray(final T[] a) {
// TODO Auto-generated method stub
return null;
}
@Override
public boolean add(final T e) {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean remove(final Object o) {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean containsAll(final Collection<?> c) {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean addAll(final Collection<? extends T> c) {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean addAll(final int index, final Collection<? extends T> c) {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean removeAll(final Collection<?> c) {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean retainAll(final Collection<?> c) {
// TODO Auto-generated method stub
return false;
}
@Override
public void clear() {
this.lastFreeIndex = this.lastFreeReservedIndex = 0;
}
@Override
public T get(final int index) {
if (index < 0) {
return null;
}
T element = this.elements[index];
return element;
}
@Override
public T set(final int index, final T element) {
// TODO Auto-generated method stub
return null;
}
@Override
public void add(final int index, final T element) {
// TODO Auto-generated method stub
}
@Override
public T remove(final int index) {
// TODO Auto-generated method stub
return null;
}
@Override
public int indexOf(final Object o) {
// TODO Auto-generated method stub
return 0;
}
@Override
public int lastIndexOf(final Object o) {
// TODO Auto-generated method stub
return 0;
}
@Override
public ListIterator<T> listIterator() {
// TODO Auto-generated method stub
return null;
}
@Override
public ListIterator<T> listIterator(final int index) {
// TODO Auto-generated method stub
return null;
}
@Override
public List<T> subList(final int fromIndex, final int toIndex) {
// TODO Auto-generated method stub
return null;
}
public T getLast() {
T element = this.get(this.lastFreeIndex - 1);
return element;
}
public T reservedRemoveLast() {
T element = this.get(--this.lastFreeReservedIndex);
return element;
}
}
......@@ -3,27 +3,27 @@ package teetime.util.list;
import org.junit.Assert;
import org.junit.Test;
public class ReservableArrayListTest {
public class CommittableResizableArrayQueueTest {
@Test
public void testCommit() throws Exception {
ReservableArrayList<Object> reservableArrayList = new ReservableArrayList<Object>(10);
CommittableResizableArrayQueue<Object> reservableArrayList = new CommittableResizableArrayQueue<Object>(null, 10);
Object element = new Object();
reservableArrayList.reservedAdd(element);
reservableArrayList.addToTailUncommitted(element);
Assert.assertTrue(reservableArrayList.isEmpty());
reservableArrayList.commit();
Assert.assertFalse(reservableArrayList.isEmpty());
Assert.assertEquals(element, reservableArrayList.getLast());
Assert.assertEquals(element, reservableArrayList.getTail());
}
@Test
public void testRollback() throws Exception {
ReservableArrayList<Object> reservableArrayList = new ReservableArrayList<Object>(10);
CommittableResizableArrayQueue<Object> reservableArrayList = new CommittableResizableArrayQueue<Object>(null, 10);
Object element = new Object();
reservableArrayList.reservedAdd(element);
reservableArrayList.addToTailUncommitted(element);
Assert.assertTrue(reservableArrayList.isEmpty());
......@@ -35,12 +35,12 @@ public class ReservableArrayListTest {
@Test
public void testRemove() throws Exception {
ReservableArrayList<Object> reservableArrayList = new ReservableArrayList<Object>(10);
CommittableResizableArrayQueue<Object> reservableArrayList = new CommittableResizableArrayQueue<Object>(null, 10);
Object element = new Object();
reservableArrayList.reservedAdd(element);
reservableArrayList.addToTailUncommitted(element);
reservableArrayList.commit();
Assert.assertEquals(element, reservableArrayList.reservedRemoveLast());
Assert.assertEquals(element, reservableArrayList.removeFromHeadUncommitted());
Assert.assertFalse(reservableArrayList.isEmpty());
reservableArrayList.commit();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment