From 6b5b9b13f4560d8a5ccd996fb5014453d5646138 Mon Sep 17 00:00:00 2001
From: Nelson Tavares de Sousa <ntd@informatik.uni-kiel.de>
Date: Thu, 12 Feb 2015 18:45:07 +0100
Subject: [PATCH] added a new class

---
 .../teetime/stage/DistributedMapCounter.java  | 44 +++++++++++++++++++
 1 file changed, 44 insertions(+)
 create mode 100644 src/main/java/teetime/stage/DistributedMapCounter.java

diff --git a/src/main/java/teetime/stage/DistributedMapCounter.java b/src/main/java/teetime/stage/DistributedMapCounter.java
new file mode 100644
index 00000000..2b5dd2a3
--- /dev/null
+++ b/src/main/java/teetime/stage/DistributedMapCounter.java
@@ -0,0 +1,44 @@
+package teetime.stage;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import teetime.framework.AbstractConsumerStage;
+import teetime.framework.OutputPort;
+
+/**
+ * This counts how many of different elements are sent to this stage. Nothing is forwarded.
+ * On termination a Map of T's and counter value is sent to its outputport.
+ *
+ * @author Nelson Tavares de Sousa
+ *
+ * @param <T>
+ */
+public class DistributedMapCounter<T> extends AbstractConsumerStage<T> {
+
+	private final Map<T, Integer> counter = new HashMap<T, Integer>();
+	private final OutputPort<Map<T, Integer>> port = createOutputPort();
+
+	public DistributedMapCounter() {
+
+	}
+
+	@Override
+	protected void execute(final T element) {
+		if (counter.containsKey(element)) {
+			Integer i = counter.get(element);
+			i++;
+			counter.put(element, i);
+		} else {
+			counter.put(element, 0);
+		}
+
+	}
+
+	@Override
+	public void onTerminating() throws Exception {
+		port.send(counter);
+		super.onTerminating();
+	}
+
+}
-- 
GitLab