Skip to content
Snippets Groups Projects
Commit d8f1a84d authored by Nils Christian Ehmke's avatar Nils Christian Ehmke
Browse files

Improved the API regarding the distributor and the merger. Added also a test for the distributor.

parent ffea3d45
No related branches found
No related tags found
No related merge requests found
Showing
with 161 additions and 45 deletions
......@@ -19,13 +19,13 @@ import teetime.framework.OutputPort;
/**
* @author Nils Christian Ehmke
*
* @since 1.10
*
* @since 1.0
*/
public final class CloneStrategy<T> implements IDistributorStrategy<T> {
public final class CloneStrategy implements IDistributorStrategy {
@Override
public boolean distribute(final OutputPort<T>[] outputPorts, final T element) {
public <T> boolean distribute(final OutputPort<T>[] outputPorts, final T element) {
throw new UnsupportedOperationException();
}
......
......@@ -19,13 +19,13 @@ import teetime.framework.OutputPort;
/**
* @author Nils Christian Ehmke
*
* @since 1.10
*
* @since 1.0
*/
public final class CopyByReferenceStrategy<T> implements IDistributorStrategy<T> {
public final class CopyByReferenceStrategy implements IDistributorStrategy {
@Override
public boolean distribute(final OutputPort<T>[] outputPorts, final T element) {
public <T> boolean distribute(final OutputPort<T>[] outputPorts, final T element) {
for (final OutputPort<T> outputPort : outputPorts) {
outputPort.send(element);
}
......
......@@ -29,7 +29,15 @@ import teetime.framework.OutputPort;
*/
public class Distributor<T> extends AbstractConsumerStage<T> {
private IDistributorStrategy<T> strategy = new RoundRobinStrategy<T>();
private IDistributorStrategy strategy;
public Distributor() {
this(new RoundRobinStrategy());
}
public Distributor(final IDistributorStrategy strategy) {
this.strategy = strategy;
}
@SuppressWarnings("unchecked")
@Override
......@@ -41,11 +49,11 @@ public class Distributor<T> extends AbstractConsumerStage<T> {
return this.createOutputPort();
}
public IDistributorStrategy<T> getStrategy() {
public IDistributorStrategy getStrategy() {
return this.strategy;
}
public void setStrategy(final IDistributorStrategy<T> strategy) {
public void setStrategy(final IDistributorStrategy strategy) {
this.strategy = strategy;
}
......
......@@ -19,11 +19,11 @@ import teetime.framework.OutputPort;
/**
* @author Nils Christian Ehmke
*
* @since 1.10
*
* @since 1.0
*/
public interface IDistributorStrategy<T> {
public interface IDistributorStrategy {
public boolean distribute(final OutputPort<T>[] allOutputPorts, final T element);
public <T> boolean distribute(final OutputPort<T>[] allOutputPorts, final T element);
}
......@@ -19,15 +19,15 @@ import teetime.framework.OutputPort;
/**
* @author Nils Christian Ehmke
*
* @since 1.10
*
* @since 1.0
*/
public final class RoundRobinStrategy<T> implements IDistributorStrategy<T> {
public final class RoundRobinStrategy implements IDistributorStrategy {
private int index = 0;
@Override
public boolean distribute(final OutputPort<T>[] outputPorts, final T element) {
public <T> boolean distribute(final OutputPort<T>[] outputPorts, final T element) {
final OutputPort<T> outputPort = this.getNextPortInRoundRobinOrder(outputPorts);
outputPort.send(element);
......@@ -35,7 +35,7 @@ public final class RoundRobinStrategy<T> implements IDistributorStrategy<T> {
return true;
}
private OutputPort<T> getNextPortInRoundRobinOrder(final OutputPort<T>[] outputPorts) {
private <T> OutputPort<T> getNextPortInRoundRobinOrder(final OutputPort<T>[] outputPorts) {
final OutputPort<T> outputPort = outputPorts[this.index];
this.index = (this.index + 1) % outputPorts.length;
......
......@@ -17,11 +17,11 @@ package teetime.stage.basic.merger;
/**
* @author Nils Christian Ehmke
*
* @since 1.10
*
* @since 1.0
*/
public interface IMergerStrategy<T> {
public interface IMergerStrategy {
public T getNextInput(Merger<T> merger);
public <T> T getNextInput(Merger<T> merger);
}
......@@ -42,10 +42,18 @@ public final class Merger<T> extends AbstractStage {
private final OutputPort<T> outputPort = this.createOutputPort();
private IMergerStrategy<T> strategy = new RoundRobinStrategy<T>();
private IMergerStrategy strategy;
private final Map<Class<?>, Set<InputPort<?>>> signalMap = new HashMap<Class<?>, Set<InputPort<?>>>();
public Merger() {
this(new RoundRobinStrategy());
}
public Merger(final IMergerStrategy strategy) {
this.strategy = strategy;
}
@Override
public void executeWithPorts() {
final T token = this.strategy.getNextInput(this);
......@@ -90,11 +98,11 @@ public final class Merger<T> extends AbstractStage {
}
public IMergerStrategy<T> getMergerStrategy() {
public IMergerStrategy getMergerStrategy() {
return this.strategy;
}
public void setStrategy(final IMergerStrategy<T> strategy) {
public void setStrategy(final IMergerStrategy strategy) {
this.strategy = strategy;
}
......
......@@ -19,15 +19,15 @@ import teetime.framework.InputPort;
/**
* @author Nils Christian Ehmke
*
* @since 1.10
*
* @since 1.0
*/
public final class RoundRobinStrategy<T> implements IMergerStrategy<T> {
public final class RoundRobinStrategy implements IMergerStrategy {
private int index = 0;
@Override
public T getNextInput(final Merger<T> merger) {
public <T> T getNextInput(final Merger<T> merger) {
@SuppressWarnings("unchecked")
InputPort<T>[] inputPorts = (InputPort<T>[]) merger.getInputPorts();
int size = inputPorts.length;
......@@ -42,7 +42,7 @@ public final class RoundRobinStrategy<T> implements IMergerStrategy<T> {
return null;
}
private InputPort<T> getNextPortInRoundRobinOrder(final InputPort<T>[] inputPorts) {
private <T> InputPort<T> getNextPortInRoundRobinOrder(final InputPort<T>[] inputPorts) {
InputPort<T> inputPort = inputPorts[this.index];
this.index = (this.index + 1) % inputPorts.length;
......
......@@ -29,7 +29,7 @@ public final class EveryXthPrinter<T> extends Stage {
pipeFactory.create(distributor.getNewOutputPort(), everyXthStage.getInputPort());
pipeFactory.create(everyXthStage.getOutputPort(), printer.getInputPort());
distributor.setStrategy(new CopyByReferenceStrategy<T>());
distributor.setStrategy(new CopyByReferenceStrategy());
}
@Override
......
<?xml version="1.0" encoding="utf-8"?>
<browserconfig>
<msapplication>
<tile>
<square70x70logo src="/mstile-70x70.png"/>
<square150x150logo src="/mstile-150x150.png"/>
<square310x310logo src="/mstile-310x310.png"/>
<wide310x150logo src="/mstile-310x150.png"/>
<TileColor>#2b5797</TileColor>
</tile>
</msapplication>
</browserconfig>
<?xml version="1.0" encoding="utf-8"?>
<browserconfig>
<msapplication>
<tile>
<square70x70logo src="/mstile-70x70.png"/>
<square150x150logo src="/mstile-150x150.png"/>
<square310x310logo src="/mstile-310x310.png"/>
<wide310x150logo src="/mstile-310x150.png"/>
<TileColor>#2b5797</TileColor>
</tile>
</msapplication>
</browserconfig>
package teetime.stage.basic.distributor;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.collection.IsEmptyCollection.empty;
import static org.junit.Assert.assertThat;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.SingleElementPipeFactory;
import teetime.stage.CollectorSink;
/**
* @author Nils Christian Ehmke
*
* @since 1.0
*/
public class DistributorTest {
@Rule
public ExpectedException expectedException = ExpectedException.none();
private Distributor<Integer> distributorUnderTest;
private CollectorSink<Integer> fstCollector;
private CollectorSink<Integer> sndCollector;
@Before
public void initializeRecordSimplificator() throws Exception {
this.distributorUnderTest = new Distributor<Integer>();
this.fstCollector = new CollectorSink<Integer>();
this.sndCollector = new CollectorSink<Integer>();
final IPipeFactory pipeFactory = new SingleElementPipeFactory();
pipeFactory.create(this.distributorUnderTest.getNewOutputPort(), this.fstCollector.getInputPort());
pipeFactory.create(this.distributorUnderTest.getNewOutputPort(), this.sndCollector.getInputPort());
distributorUnderTest.onStarting();
}
@Test
public void roundRobinShouldWork() {
distributorUnderTest.setStrategy(new RoundRobinStrategy());
this.distributorUnderTest.execute(1);
this.distributorUnderTest.execute(2);
this.distributorUnderTest.execute(3);
this.distributorUnderTest.execute(4);
this.distributorUnderTest.execute(5);
assertThat(this.fstCollector.getElements(), contains(1, 3, 5));
assertThat(this.sndCollector.getElements(), contains(2, 4));
}
@Test
public void singleElementRoundRobinShouldWork() {
distributorUnderTest.setStrategy(new RoundRobinStrategy());
this.distributorUnderTest.execute(1);
assertThat(this.fstCollector.getElements(), contains(1));
assertThat(this.sndCollector.getElements(), is(empty()));
}
@Test
public void copyByReferenceShouldWork() {
distributorUnderTest.setStrategy(new CopyByReferenceStrategy());
this.distributorUnderTest.execute(1);
this.distributorUnderTest.execute(2);
this.distributorUnderTest.execute(3);
this.distributorUnderTest.execute(4);
this.distributorUnderTest.execute(5);
assertThat(this.fstCollector.getElements(), contains(1, 2, 3, 4, 5));
assertThat(this.sndCollector.getElements(), contains(1, 2, 3, 4, 5));
}
@Test
public void singleElementCopyByReferenceShouldWork() {
distributorUnderTest.setStrategy(new CopyByReferenceStrategy());
this.distributorUnderTest.execute(1);
assertThat(this.fstCollector.getElements(), contains(1));
assertThat(this.sndCollector.getElements(), contains(1));
}
@Test
public void cloneShouldNotWork() {
distributorUnderTest.setStrategy(new CloneStrategy());
expectedException.expect(UnsupportedOperationException.class);
this.distributorUnderTest.execute(1);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment