From d8f1a84de619f8148acae54293dc4123fb85e4c4 Mon Sep 17 00:00:00 2001
From: Nils Christian Ehmke <nie@informatik.uni-kiel.de>
Date: Thu, 18 Dec 2014 10:43:38 +0100
Subject: [PATCH] Improved the API regarding the distributor and the merger.
 Added also a test for the distributor.

---
 .../basic/distributor/CloneStrategy.java      |   8 +-
 .../distributor/CopyByReferenceStrategy.java  |   8 +-
 .../stage/basic/distributor/Distributor.java  |  14 ++-
 .../distributor/IDistributorStrategy.java     |   8 +-
 .../basic/distributor/RoundRobinStrategy.java |  10 +-
 .../stage/basic/merger/IMergerStrategy.java   |   8 +-
 .../teetime/stage/basic/merger/Merger.java    |  14 ++-
 .../basic/merger/RoundRobinStrategy.java      |  10 +-
 .../teetime/stage/io/EveryXthPrinter.java     |   2 +-
 src/site/resources/browserconfig.xml          |  24 ++---
 .../basic/distributor/DistributorTest.java    | 100 ++++++++++++++++++
 11 files changed, 161 insertions(+), 45 deletions(-)
 create mode 100644 src/test/java/teetime/stage/basic/distributor/DistributorTest.java

diff --git a/src/main/java/teetime/stage/basic/distributor/CloneStrategy.java b/src/main/java/teetime/stage/basic/distributor/CloneStrategy.java
index 9512aad0..29df8f56 100644
--- a/src/main/java/teetime/stage/basic/distributor/CloneStrategy.java
+++ b/src/main/java/teetime/stage/basic/distributor/CloneStrategy.java
@@ -19,13 +19,13 @@ import teetime.framework.OutputPort;
 
 /**
  * @author Nils Christian Ehmke
- * 
- * @since 1.10
+ *
+ * @since 1.0
  */
-public final class CloneStrategy<T> implements IDistributorStrategy<T> {
+public final class CloneStrategy implements IDistributorStrategy {
 
 	@Override
-	public boolean distribute(final OutputPort<T>[] outputPorts, final T element) {
+	public <T> boolean distribute(final OutputPort<T>[] outputPorts, final T element) {
 		throw new UnsupportedOperationException();
 	}
 
diff --git a/src/main/java/teetime/stage/basic/distributor/CopyByReferenceStrategy.java b/src/main/java/teetime/stage/basic/distributor/CopyByReferenceStrategy.java
index 3a31976e..ac6ad95c 100644
--- a/src/main/java/teetime/stage/basic/distributor/CopyByReferenceStrategy.java
+++ b/src/main/java/teetime/stage/basic/distributor/CopyByReferenceStrategy.java
@@ -19,13 +19,13 @@ import teetime.framework.OutputPort;
 
 /**
  * @author Nils Christian Ehmke
- * 
- * @since 1.10
+ *
+ * @since 1.0
  */
-public final class CopyByReferenceStrategy<T> implements IDistributorStrategy<T> {
+public final class CopyByReferenceStrategy implements IDistributorStrategy {
 
 	@Override
-	public boolean distribute(final OutputPort<T>[] outputPorts, final T element) {
+	public <T> boolean distribute(final OutputPort<T>[] outputPorts, final T element) {
 		for (final OutputPort<T> outputPort : outputPorts) {
 			outputPort.send(element);
 		}
diff --git a/src/main/java/teetime/stage/basic/distributor/Distributor.java b/src/main/java/teetime/stage/basic/distributor/Distributor.java
index 5fecfe44..aab742cf 100644
--- a/src/main/java/teetime/stage/basic/distributor/Distributor.java
+++ b/src/main/java/teetime/stage/basic/distributor/Distributor.java
@@ -29,7 +29,15 @@ import teetime.framework.OutputPort;
  */
 public class Distributor<T> extends AbstractConsumerStage<T> {
 
-	private IDistributorStrategy<T> strategy = new RoundRobinStrategy<T>();
+	private IDistributorStrategy strategy;
+
+	public Distributor() {
+		this(new RoundRobinStrategy());
+	}
+
+	public Distributor(final IDistributorStrategy strategy) {
+		this.strategy = strategy;
+	}
 
 	@SuppressWarnings("unchecked")
 	@Override
@@ -41,11 +49,11 @@ public class Distributor<T> extends AbstractConsumerStage<T> {
 		return this.createOutputPort();
 	}
 
-	public IDistributorStrategy<T> getStrategy() {
+	public IDistributorStrategy getStrategy() {
 		return this.strategy;
 	}
 
-	public void setStrategy(final IDistributorStrategy<T> strategy) {
+	public void setStrategy(final IDistributorStrategy strategy) {
 		this.strategy = strategy;
 	}
 
diff --git a/src/main/java/teetime/stage/basic/distributor/IDistributorStrategy.java b/src/main/java/teetime/stage/basic/distributor/IDistributorStrategy.java
index 69e53787..9f67ccb6 100644
--- a/src/main/java/teetime/stage/basic/distributor/IDistributorStrategy.java
+++ b/src/main/java/teetime/stage/basic/distributor/IDistributorStrategy.java
@@ -19,11 +19,11 @@ import teetime.framework.OutputPort;
 
 /**
  * @author Nils Christian Ehmke
- * 
- * @since 1.10
+ *
+ * @since 1.0
  */
-public interface IDistributorStrategy<T> {
+public interface IDistributorStrategy {
 
-	public boolean distribute(final OutputPort<T>[] allOutputPorts, final T element);
+	public <T> boolean distribute(final OutputPort<T>[] allOutputPorts, final T element);
 
 }
diff --git a/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy.java b/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy.java
index 64bf90f0..e43ce784 100644
--- a/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy.java
+++ b/src/main/java/teetime/stage/basic/distributor/RoundRobinStrategy.java
@@ -19,15 +19,15 @@ import teetime.framework.OutputPort;
 
 /**
  * @author Nils Christian Ehmke
- * 
- * @since 1.10
+ *
+ * @since 1.0
  */
-public final class RoundRobinStrategy<T> implements IDistributorStrategy<T> {
+public final class RoundRobinStrategy implements IDistributorStrategy {
 
 	private int index = 0;
 
 	@Override
-	public boolean distribute(final OutputPort<T>[] outputPorts, final T element) {
+	public <T> boolean distribute(final OutputPort<T>[] outputPorts, final T element) {
 		final OutputPort<T> outputPort = this.getNextPortInRoundRobinOrder(outputPorts);
 
 		outputPort.send(element);
@@ -35,7 +35,7 @@ public final class RoundRobinStrategy<T> implements IDistributorStrategy<T> {
 		return true;
 	}
 
-	private OutputPort<T> getNextPortInRoundRobinOrder(final OutputPort<T>[] outputPorts) {
+	private <T> OutputPort<T> getNextPortInRoundRobinOrder(final OutputPort<T>[] outputPorts) {
 		final OutputPort<T> outputPort = outputPorts[this.index];
 
 		this.index = (this.index + 1) % outputPorts.length;
diff --git a/src/main/java/teetime/stage/basic/merger/IMergerStrategy.java b/src/main/java/teetime/stage/basic/merger/IMergerStrategy.java
index 0114cf9b..d001f86e 100644
--- a/src/main/java/teetime/stage/basic/merger/IMergerStrategy.java
+++ b/src/main/java/teetime/stage/basic/merger/IMergerStrategy.java
@@ -17,11 +17,11 @@ package teetime.stage.basic.merger;
 
 /**
  * @author Nils Christian Ehmke
- * 
- * @since 1.10
+ *
+ * @since 1.0
  */
-public interface IMergerStrategy<T> {
+public interface IMergerStrategy {
 
-	public T getNextInput(Merger<T> merger);
+	public <T> T getNextInput(Merger<T> merger);
 
 }
diff --git a/src/main/java/teetime/stage/basic/merger/Merger.java b/src/main/java/teetime/stage/basic/merger/Merger.java
index 69b778a4..aa84f0e3 100644
--- a/src/main/java/teetime/stage/basic/merger/Merger.java
+++ b/src/main/java/teetime/stage/basic/merger/Merger.java
@@ -42,10 +42,18 @@ public final class Merger<T> extends AbstractStage {
 
 	private final OutputPort<T> outputPort = this.createOutputPort();
 
-	private IMergerStrategy<T> strategy = new RoundRobinStrategy<T>();
+	private IMergerStrategy strategy;
 
 	private final Map<Class<?>, Set<InputPort<?>>> signalMap = new HashMap<Class<?>, Set<InputPort<?>>>();
 
+	public Merger() {
+		this(new RoundRobinStrategy());
+	}
+
+	public Merger(final IMergerStrategy strategy) {
+		this.strategy = strategy;
+	}
+
 	@Override
 	public void executeWithPorts() {
 		final T token = this.strategy.getNextInput(this);
@@ -90,11 +98,11 @@ public final class Merger<T> extends AbstractStage {
 
 	}
 
-	public IMergerStrategy<T> getMergerStrategy() {
+	public IMergerStrategy getMergerStrategy() {
 		return this.strategy;
 	}
 
-	public void setStrategy(final IMergerStrategy<T> strategy) {
+	public void setStrategy(final IMergerStrategy strategy) {
 		this.strategy = strategy;
 	}
 
diff --git a/src/main/java/teetime/stage/basic/merger/RoundRobinStrategy.java b/src/main/java/teetime/stage/basic/merger/RoundRobinStrategy.java
index 3edb0478..0ed2af2f 100644
--- a/src/main/java/teetime/stage/basic/merger/RoundRobinStrategy.java
+++ b/src/main/java/teetime/stage/basic/merger/RoundRobinStrategy.java
@@ -19,15 +19,15 @@ import teetime.framework.InputPort;
 
 /**
  * @author Nils Christian Ehmke
- * 
- * @since 1.10
+ *
+ * @since 1.0
  */
-public final class RoundRobinStrategy<T> implements IMergerStrategy<T> {
+public final class RoundRobinStrategy implements IMergerStrategy {
 
 	private int index = 0;
 
 	@Override
-	public T getNextInput(final Merger<T> merger) {
+	public <T> T getNextInput(final Merger<T> merger) {
 		@SuppressWarnings("unchecked")
 		InputPort<T>[] inputPorts = (InputPort<T>[]) merger.getInputPorts();
 		int size = inputPorts.length;
@@ -42,7 +42,7 @@ public final class RoundRobinStrategy<T> implements IMergerStrategy<T> {
 		return null;
 	}
 
-	private InputPort<T> getNextPortInRoundRobinOrder(final InputPort<T>[] inputPorts) {
+	private <T> InputPort<T> getNextPortInRoundRobinOrder(final InputPort<T>[] inputPorts) {
 		InputPort<T> inputPort = inputPorts[this.index];
 
 		this.index = (this.index + 1) % inputPorts.length;
diff --git a/src/main/java/teetime/stage/io/EveryXthPrinter.java b/src/main/java/teetime/stage/io/EveryXthPrinter.java
index 1af922a1..aacc5936 100644
--- a/src/main/java/teetime/stage/io/EveryXthPrinter.java
+++ b/src/main/java/teetime/stage/io/EveryXthPrinter.java
@@ -29,7 +29,7 @@ public final class EveryXthPrinter<T> extends Stage {
 		pipeFactory.create(distributor.getNewOutputPort(), everyXthStage.getInputPort());
 		pipeFactory.create(everyXthStage.getOutputPort(), printer.getInputPort());
 
-		distributor.setStrategy(new CopyByReferenceStrategy<T>());
+		distributor.setStrategy(new CopyByReferenceStrategy());
 	}
 
 	@Override
diff --git a/src/site/resources/browserconfig.xml b/src/site/resources/browserconfig.xml
index bcbd6c9f..587f54d5 100644
--- a/src/site/resources/browserconfig.xml
+++ b/src/site/resources/browserconfig.xml
@@ -1,12 +1,12 @@
-<?xml version="1.0" encoding="utf-8"?>
-<browserconfig>
-  <msapplication>
-    <tile>
-      <square70x70logo src="/mstile-70x70.png"/>
-      <square150x150logo src="/mstile-150x150.png"/>
-      <square310x310logo src="/mstile-310x310.png"/>
-      <wide310x150logo src="/mstile-310x150.png"/>
-      <TileColor>#2b5797</TileColor>
-    </tile>
-  </msapplication>
-</browserconfig>
+<?xml version="1.0" encoding="utf-8"?>
+<browserconfig>
+  <msapplication>
+    <tile>
+      <square70x70logo src="/mstile-70x70.png"/>
+      <square150x150logo src="/mstile-150x150.png"/>
+      <square310x310logo src="/mstile-310x310.png"/>
+      <wide310x150logo src="/mstile-310x150.png"/>
+      <TileColor>#2b5797</TileColor>
+    </tile>
+  </msapplication>
+</browserconfig>
diff --git a/src/test/java/teetime/stage/basic/distributor/DistributorTest.java b/src/test/java/teetime/stage/basic/distributor/DistributorTest.java
new file mode 100644
index 00000000..3ea966db
--- /dev/null
+++ b/src/test/java/teetime/stage/basic/distributor/DistributorTest.java
@@ -0,0 +1,100 @@
+package teetime.stage.basic.distributor;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.collection.IsEmptyCollection.empty;
+import static org.junit.Assert.assertThat;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import teetime.framework.pipe.IPipeFactory;
+import teetime.framework.pipe.SingleElementPipeFactory;
+import teetime.stage.CollectorSink;
+
+/**
+ * @author Nils Christian Ehmke
+ *
+ * @since 1.0
+ */
+public class DistributorTest {
+
+	@Rule
+	public ExpectedException expectedException = ExpectedException.none();
+
+	private Distributor<Integer> distributorUnderTest;
+	private CollectorSink<Integer> fstCollector;
+	private CollectorSink<Integer> sndCollector;
+
+	@Before
+	public void initializeRecordSimplificator() throws Exception {
+		this.distributorUnderTest = new Distributor<Integer>();
+		this.fstCollector = new CollectorSink<Integer>();
+		this.sndCollector = new CollectorSink<Integer>();
+
+		final IPipeFactory pipeFactory = new SingleElementPipeFactory();
+		pipeFactory.create(this.distributorUnderTest.getNewOutputPort(), this.fstCollector.getInputPort());
+		pipeFactory.create(this.distributorUnderTest.getNewOutputPort(), this.sndCollector.getInputPort());
+
+		distributorUnderTest.onStarting();
+	}
+
+	@Test
+	public void roundRobinShouldWork() {
+		distributorUnderTest.setStrategy(new RoundRobinStrategy());
+
+		this.distributorUnderTest.execute(1);
+		this.distributorUnderTest.execute(2);
+		this.distributorUnderTest.execute(3);
+		this.distributorUnderTest.execute(4);
+		this.distributorUnderTest.execute(5);
+
+		assertThat(this.fstCollector.getElements(), contains(1, 3, 5));
+		assertThat(this.sndCollector.getElements(), contains(2, 4));
+	}
+
+	@Test
+	public void singleElementRoundRobinShouldWork() {
+		distributorUnderTest.setStrategy(new RoundRobinStrategy());
+
+		this.distributorUnderTest.execute(1);
+
+		assertThat(this.fstCollector.getElements(), contains(1));
+		assertThat(this.sndCollector.getElements(), is(empty()));
+	}
+
+	@Test
+	public void copyByReferenceShouldWork() {
+		distributorUnderTest.setStrategy(new CopyByReferenceStrategy());
+
+		this.distributorUnderTest.execute(1);
+		this.distributorUnderTest.execute(2);
+		this.distributorUnderTest.execute(3);
+		this.distributorUnderTest.execute(4);
+		this.distributorUnderTest.execute(5);
+
+		assertThat(this.fstCollector.getElements(), contains(1, 2, 3, 4, 5));
+		assertThat(this.sndCollector.getElements(), contains(1, 2, 3, 4, 5));
+	}
+
+	@Test
+	public void singleElementCopyByReferenceShouldWork() {
+		distributorUnderTest.setStrategy(new CopyByReferenceStrategy());
+
+		this.distributorUnderTest.execute(1);
+
+		assertThat(this.fstCollector.getElements(), contains(1));
+		assertThat(this.sndCollector.getElements(), contains(1));
+	}
+
+	@Test
+	public void cloneShouldNotWork() {
+		distributorUnderTest.setStrategy(new CloneStrategy());
+
+		expectedException.expect(UnsupportedOperationException.class);
+		this.distributorUnderTest.execute(1);
+	}
+
+}
-- 
GitLab