From 4591ba39d5ac32ce473e688c0bf27439aaa6bf12 Mon Sep 17 00:00:00 2001
From: Nils Christian Ehmke <nie@informatik.uni-kiel.de>
Date: Thu, 11 Dec 2014 15:01:44 +0100
Subject: [PATCH] Implementation of the MultipleInstanceOfFilter.

---
 .../stage/MultipleInstanceOfFilter.java       | 34 +++++++++
 .../stage/MultipleInstanceOfFilterTest.java   | 72 +++++++++++++++++++
 2 files changed, 106 insertions(+)
 create mode 100644 src/main/java/teetime/stage/MultipleInstanceOfFilter.java
 create mode 100644 src/test/java/teetime/stage/MultipleInstanceOfFilterTest.java

diff --git a/src/main/java/teetime/stage/MultipleInstanceOfFilter.java b/src/main/java/teetime/stage/MultipleInstanceOfFilter.java
new file mode 100644
index 00000000..f2d34990
--- /dev/null
+++ b/src/main/java/teetime/stage/MultipleInstanceOfFilter.java
@@ -0,0 +1,34 @@
+package teetime.stage;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import teetime.framework.AbstractConsumerStage;
+import teetime.framework.OutputPort;
+
+/**
+ * @author Nils Christian Ehmke
+ */
+public final class MultipleInstanceOfFilter<I> extends AbstractConsumerStage<I> {
+
+	private final Map<Class<? extends I>, OutputPort<? super I>> outputPortsMap = new HashMap<Class<? extends I>, OutputPort<? super I>>();
+
+	@SuppressWarnings("unchecked")
+	public <T extends I> OutputPort<T> getOutputPortForType(final Class<T> clazz) {
+		if (!this.outputPortsMap.containsKey(clazz)) {
+			this.outputPortsMap.put(clazz, super.createOutputPort());
+		}
+		return (OutputPort<T>) this.outputPortsMap.get(clazz);
+	}
+
+	@Override
+	protected void execute(final I element) {
+		for (Entry<Class<? extends I>, OutputPort<? super I>> outputPortMapEntry : outputPortsMap.entrySet()) {
+			if (outputPortMapEntry.getKey().isInstance(element)) {
+				outputPortMapEntry.getValue().send(element);
+			}
+		}
+	}
+
+}
diff --git a/src/test/java/teetime/stage/MultipleInstanceOfFilterTest.java b/src/test/java/teetime/stage/MultipleInstanceOfFilterTest.java
new file mode 100644
index 00000000..4740dea2
--- /dev/null
+++ b/src/test/java/teetime/stage/MultipleInstanceOfFilterTest.java
@@ -0,0 +1,72 @@
+package teetime.stage;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import org.junit.Test;
+
+import teetime.framework.Analysis;
+import teetime.framework.AnalysisConfiguration;
+import teetime.framework.OutputPort;
+import teetime.framework.pipe.IPipeFactory;
+import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
+import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
+import teetime.util.Pair;
+
+public class MultipleInstanceOfFilterTest {
+
+	private static class TestConfiguration extends AnalysisConfiguration {
+
+		public TestConfiguration(final List<Number> initialInput, final List<Integer> integerList, final List<Float> floatList) {
+			// Create the stages
+			final InitialElementProducer<Number> producer = new InitialElementProducer<Number>(initialInput.toArray(new Number[0]));
+			final MultipleInstanceOfFilter<Number> filter = new MultipleInstanceOfFilter<Number>();
+			final CollectorSink<Integer> integerSink = new CollectorSink<Integer>(integerList);
+			final CollectorSink<Float> floatSink = new CollectorSink<Float>(floatList);
+
+			// Connect the stages
+			final IPipeFactory factory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
+			factory.create(producer.getOutputPort(), filter.getInputPort());
+			factory.create(filter.getOutputPortForType(Integer.class), integerSink.getInputPort());
+			factory.create(filter.getOutputPortForType(Float.class), floatSink.getInputPort());
+
+			super.addThreadableStage(producer);
+		}
+
+	}
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void filteringShouldWork() {
+		final List<Number> initialInput = new ArrayList<Number>(Arrays.asList(1, 1.5f, 2, 2.5f, 3, 3.5f));
+		final List<Integer> integerList = new ArrayList<Integer>();
+		final List<Float> floatList = new ArrayList<Float>();
+
+		final Analysis analysis = new Analysis(new TestConfiguration(initialInput, integerList, floatList));
+		analysis.init();
+		final Collection<Pair<Thread, Throwable>> errors = analysis.start();
+
+		assertThat(errors, is(empty()));
+
+		assertThat(integerList, contains(1, 2, 3));
+		assertThat(floatList, contains(1.5f, 2.5f, 3.5f));
+	}
+
+	@Test
+	public void outputPortForSameTypeShouldBeCached() {
+		final MultipleInstanceOfFilter<Number> filter = new MultipleInstanceOfFilter<Number>();
+
+		final OutputPort<Float> firstPort = filter.getOutputPortForType(Float.class);
+		final OutputPort<Float> secondPort = filter.getOutputPortForType(Float.class);
+
+		assertThat(firstPort, is(secondPort));
+	}
+
+}
-- 
GitLab