Skip to content
Snippets Groups Projects
Commit d4b35b76 authored by Christian Wulf's avatar Christian Wulf
Browse files

switched to the generic distributor

parent 121e1e29
No related branches found
No related tags found
No related merge requests found
Showing
with 55 additions and 141 deletions
......@@ -8,6 +8,6 @@ java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %4$s: %5$s (%2$s)%n
#teetime.level = ALL
#teetime.variant.methodcallWithPorts.framework.level = ALL
#teetime.variant.methodcallWithPorts.framework.core.level = ALL
#teetime.variant.methodcallWithPorts.stage.level = FINE
teetime.variant.methodcallWithPorts.framework.core.level = FINE
teetime.variant.methodcallWithPorts.stage.level = INFO
#teetime.variant.methodcallWithPorts.examples.traceReconstructionWithThreads.level = FINE
......@@ -107,8 +107,7 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> {
*/
@Override
public void onSignal(final Signal signal, final InputPort<?> inputPort) {
this.logger.debug("Got signal: " + signal + " from input port: " + inputPort);
// System.out.println("Got signal: " + signal + " from input port: " + this.getClass().getSimpleName() + "." + inputPort);
this.logger.info("Got signal: " + signal + " from input port: " + inputPort);
switch (signal) {
case FINISHED:
......
package teetime.variant.methodcallWithPorts.stage;
import java.util.ArrayList;
import java.util.List;
import teetime.util.concurrent.spsc.Pow2;
import teetime.util.list.CommittableQueue;
import teetime.variant.methodcallWithPorts.framework.core.AbstractStage;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import teetime.variant.methodcallWithPorts.framework.core.Signal;
public final class Distributor<T> extends AbstractStage<T, T> {
// TODO do not inherit from AbstractStage since it provides the default output port that is unnecessary for the distributor
private final List<OutputPort<T>> outputPortList = new ArrayList<OutputPort<T>>();
private OutputPort<T>[] outputPorts;
private int nextOutputPortIndex;
private int size;
// private int mask;
@Override
protected void execute4(final CommittableQueue<T> elements) {
// TODO Auto-generated method stub
}
@Override
protected void execute5(final T element) {
OutputPort<T> outputPort = this.outputPorts[this.nextOutputPortIndex % this.size];
this.nextOutputPortIndex++;
outputPort.send(element);
}
@Override
public void onIsPipelineHead() {
// for (OutputPort<?> op : this.outputPorts) {
// op.getPipe().close();
// if (this.logger.isDebugEnabled()) {
// this.logger.debug("End signal sent, size: " + op.getPipe().size());
// }
// }
// for (OutputPort<?> op : this.outputPorts) {
// op.pipe = null;
// }
// this.outputPorts = null;
// this.outputPortList.clear();
}
@Override
public void onSignal(final Signal signal, final InputPort<?> inputPort) {
this.logger.debug("Got signal: " + signal + " from input port: " + inputPort);
// System.out.println("Got signal: " + signal + " from input port: " + this.getClass().getSimpleName() + "." + inputPort);
switch (signal) {
case FINISHED:
this.onFinished();
break;
default:
this.logger.warn("Aborted sending signal " + signal + ". Reason: Unknown signal.");
break;
}
for (OutputPort<?> op : this.outputPorts) {
op.sendSignal(signal);
}
}
@SuppressWarnings("unchecked")
@Override
public void onStart() {
this.size = this.outputPortList.size();
// this.mask = this.size - 1;
int sizeInPow2 = Pow2.findNextPositivePowerOfTwo(this.size); // is not necessary so far
this.outputPorts = this.outputPortList.toArray(new OutputPort[sizeInPow2]);
// System.out.println("outputPorts: " + this.outputPorts);
}
@Override
public OutputPort<T> getOutputPort() {
return this.getNewOutputPort();
}
private OutputPort<T> getNewOutputPort() {
OutputPort<T> outputPort = new OutputPort<T>();
this.outputPortList.add(outputPort);
return outputPort;
}
@Override
public void executeWithPorts() {
T element = this.getInputPort().receive();
this.setReschedulable(this.getInputPort().getPipe().size() > 0);
this.execute5(element);
}
}
......@@ -37,23 +37,23 @@ public class ElementThroughputMeasuringStage<T> extends ConsumerStage<T, T> {
long diffInNs = timestampInNs - this.lastTimestampInNs;
// BETTER use the TimeUnit of the clock
long diffInMs = TimeUnit.NANOSECONDS.toMillis(diffInNs);
if (diffInMs > 0) {
long throughputPerSec = this.numPassedElements / diffInMs;
this.throughputs.add(throughputPerSec);
this.logger.info("Throughput: " + throughputPerSec + " elements/ms" + " -> numPassedElements=" + this.numPassedElements);
long throughputPerTimeUnit = -1;
this.resetTimestamp(timestampInNs);
} else {
long diffInSec = TimeUnit.NANOSECONDS.toSeconds(diffInNs);
if (diffInSec > 0) {
long throughputPerSec = this.numPassedElements / diffInSec;
this.throughputs.add(throughputPerSec);
this.logger.info("Throughput: " + throughputPerSec + " elements/s" + " -> numPassedElements=" + this.numPassedElements);
throughputPerTimeUnit = this.numPassedElements / diffInSec;
this.logger.info("Throughput: " + throughputPerTimeUnit + " elements/s" + " -> numPassedElements=" + this.numPassedElements);
} else {
long diffInMs = TimeUnit.NANOSECONDS.toMillis(diffInNs);
if (diffInMs > 0) {
throughputPerTimeUnit = this.numPassedElements / diffInMs;
this.logger.info("Throughput: " + throughputPerTimeUnit + " elements/ms" + " -> numPassedElements=" + this.numPassedElements);
this.resetTimestamp(timestampInNs);
}
}
this.throughputs.add(throughputPerTimeUnit);
this.resetTimestamp(timestampInNs);
}
private void resetTimestamp(final Long timestampInNs) {
......
......@@ -20,7 +20,9 @@ import java.util.ArrayList;
import java.util.List;
import teetime.variant.methodcallWithPorts.framework.core.AbstractStage;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import teetime.variant.methodcallWithPorts.framework.core.Signal;
/**
* @author Christian Wulf
......@@ -39,12 +41,13 @@ public class Distributor<T> extends AbstractStage<T, T> {
private IDistributorStrategy<T> strategy = new RoundRobinStrategy<T>();
public IDistributorStrategy<T> getStrategy() {
return this.strategy;
}
@Override
public void executeWithPorts() {
T element = this.getInputPort().receive();
public void setStrategy(final IDistributorStrategy<T> strategy) {
this.strategy = strategy;
this.setReschedulable(this.getInputPort().getPipe().size() > 0);
this.execute5(element);
}
@Override
......@@ -60,6 +63,24 @@ public class Distributor<T> extends AbstractStage<T, T> {
// }
}
@Override
public void onSignal(final Signal signal, final InputPort<?> inputPort) {
this.logger.info("Got signal: " + signal + " from input port: " + inputPort);
switch (signal) {
case FINISHED:
this.onFinished();
break;
default:
this.logger.warn("Aborted sending signal " + signal + ". Reason: Unknown signal.");
break;
}
for (OutputPort<T> op : this.outputPortList) {
op.sendSignal(signal);
}
}
@Override
public OutputPort<T> getOutputPort() {
return this.getNewOutputPort();
......@@ -75,13 +96,12 @@ public class Distributor<T> extends AbstractStage<T, T> {
return this.outputPortList;
}
@Override
public void executeWithPorts() {
T element = this.getInputPort().receive();
this.setReschedulable(this.getInputPort().getPipe().size() > 0);
public IDistributorStrategy<T> getStrategy() {
return this.strategy;
}
this.execute5(element);
public void setStrategy(final IDistributorStrategy<T> strategy) {
this.strategy = strategy;
}
}
......@@ -28,12 +28,12 @@ import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.CollectorSink;
import teetime.variant.methodcallWithPorts.stage.Distributor;
import teetime.variant.methodcallWithPorts.stage.NoopFilter;
import teetime.variant.methodcallWithPorts.stage.ObjectProducer;
import teetime.variant.methodcallWithPorts.stage.Relay;
import teetime.variant.methodcallWithPorts.stage.StartTimestampFilter;
import teetime.variant.methodcallWithPorts.stage.StopTimestampFilter;
import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
/**
* @author Christian Wulf
......
......@@ -30,7 +30,6 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.UnorderedGrowablePipe;
import teetime.variant.methodcallWithPorts.stage.CollectorSink;
import teetime.variant.methodcallWithPorts.stage.Distributor;
import teetime.variant.methodcallWithPorts.stage.EndStage;
import teetime.variant.methodcallWithPorts.stage.NoopFilter;
import teetime.variant.methodcallWithPorts.stage.ObjectProducer;
......@@ -38,6 +37,7 @@ import teetime.variant.methodcallWithPorts.stage.Relay;
import teetime.variant.methodcallWithPorts.stage.Sink;
import teetime.variant.methodcallWithPorts.stage.StartTimestampFilter;
import teetime.variant.methodcallWithPorts.stage.StopTimestampFilter;
import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
/**
* @author Christian Wulf
......
......@@ -28,12 +28,12 @@ import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.UnorderedGrowablePipe;
import teetime.variant.methodcallWithPorts.stage.CollectorSink;
import teetime.variant.methodcallWithPorts.stage.Distributor;
import teetime.variant.methodcallWithPorts.stage.NoopFilter;
import teetime.variant.methodcallWithPorts.stage.ObjectProducer;
import teetime.variant.methodcallWithPorts.stage.Relay;
import teetime.variant.methodcallWithPorts.stage.StartTimestampFilter;
import teetime.variant.methodcallWithPorts.stage.StopTimestampFilter;
import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
/**
* @author Christian Wulf
......
......@@ -28,12 +28,12 @@ import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.OrderedGrowableArrayPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.CollectorSink;
import teetime.variant.methodcallWithPorts.stage.Distributor;
import teetime.variant.methodcallWithPorts.stage.NoopFilter;
import teetime.variant.methodcallWithPorts.stage.ObjectProducer;
import teetime.variant.methodcallWithPorts.stage.Relay;
import teetime.variant.methodcallWithPorts.stage.StartTimestampFilter;
import teetime.variant.methodcallWithPorts.stage.StopTimestampFilter;
import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
/**
* @author Christian Wulf
......
......@@ -12,10 +12,10 @@ import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.Distributor;
import teetime.variant.methodcallWithPorts.stage.EndStage;
import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
import teetime.variant.methodcallWithPorts.stage.Relay;
import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter;
......
......@@ -14,10 +14,10 @@ import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.Clock;
import teetime.variant.methodcallWithPorts.stage.Distributor;
import teetime.variant.methodcallWithPorts.stage.EndStage;
import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
import teetime.variant.methodcallWithPorts.stage.Relay;
import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter;
import teetime.variant.methodcallWithPorts.stage.kieker.traceReduction.TraceAggregationBuffer;
......
......@@ -14,10 +14,10 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.Clock;
import teetime.variant.methodcallWithPorts.stage.CountingFilter;
import teetime.variant.methodcallWithPorts.stage.Distributor;
import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage;
import teetime.variant.methodcallWithPorts.stage.EndStage;
import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter;
......
......@@ -17,12 +17,12 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.Clock;
import teetime.variant.methodcallWithPorts.stage.CountingFilter;
import teetime.variant.methodcallWithPorts.stage.Distributor;
import teetime.variant.methodcallWithPorts.stage.ElementDelayMeasuringStage;
import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage;
import teetime.variant.methodcallWithPorts.stage.EndStage;
import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
import teetime.variant.methodcallWithPorts.stage.Relay;
import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter;
......
......@@ -18,12 +18,12 @@ import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe
import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe;
import teetime.variant.methodcallWithPorts.stage.Clock;
import teetime.variant.methodcallWithPorts.stage.CountingFilter;
import teetime.variant.methodcallWithPorts.stage.Distributor;
import teetime.variant.methodcallWithPorts.stage.ElementDelayMeasuringStage;
import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage;
import teetime.variant.methodcallWithPorts.stage.EndStage;
import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
import teetime.variant.methodcallWithPorts.stage.Relay;
import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter;
import teetime.variant.methodcallWithPorts.stage.kieker.traceReduction.TraceAggregationBuffer;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment