Skip to content
Snippets Groups Projects
Commit d83dea0e authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

Added functionality from prod-cons-abstraction branch

parent 9e4b0245
No related branches found
No related tags found
No related merge requests found
Showing
with 102 additions and 35 deletions
......@@ -33,6 +33,7 @@ public abstract class AbstractStage implements Stage {
protected OutputPort<?>[] cachedOutputPorts;
private final Map<ISignal, Void> visited = new HashMap<ISignal, Void>();
private boolean shouldTerminate;
public AbstractStage() {
this.id = UUID.randomUUID().toString(); // the id should only be represented by a UUID, not additionally by the class name
......@@ -125,7 +126,7 @@ public abstract class AbstractStage implements Stage {
}
public void onTerminating() throws Exception {
// empty default implementation
terminate();
}
protected <T> InputPort<T> createInputPort() {
......@@ -162,4 +163,19 @@ public abstract class AbstractStage implements Stage {
return this.getClass().getName() + ": " + this.id;
}
@Override
public void terminate() {
this.shouldTerminate = true;
}
@Override
public boolean shouldBeTerminated() {
return this.shouldTerminate;
}
@Override
public TerminationStrategy getTerminationStrategy() {
return TerminationStrategy.BY_SIGNAL;
}
}
......@@ -28,17 +28,17 @@ public class Analysis implements UncaughtExceptionHandler {
}
public void init() {
for (HeadStage stage : this.configuration.getConsumerStages()) {
for (Stage stage : this.configuration.getConsumerStages()) {
Thread thread = new Thread(new RunnableStage(stage));
this.consumerThreads.add(thread);
}
for (HeadStage stage : this.configuration.getFiniteProducerStages()) {
for (Stage stage : this.configuration.getFiniteProducerStages()) {
Thread thread = new Thread(new RunnableStage(stage));
this.finiteProducerThreads.add(thread);
}
for (HeadStage stage : this.configuration.getInfiniteProducerStages()) {
for (Stage stage : this.configuration.getInfiniteProducerStages()) {
Thread thread = new Thread(new RunnableStage(stage));
this.infiniteProducerThreads.add(thread);
}
......
......
......@@ -9,20 +9,51 @@ public class AnalysisConfiguration {
protected static final PipeFactoryRegistry PIPE_FACTORY_REGISTRY = PipeFactoryRegistry.INSTANCE;
private final List<HeadStage> consumerStages = new LinkedList<HeadStage>();
private final List<HeadStage> finiteProducerStages = new LinkedList<HeadStage>();
private final List<HeadStage> infiniteProducerStages = new LinkedList<HeadStage>();
private final List<Runnable> threadableStageJobs = new LinkedList<Runnable>();
public List<HeadStage> getConsumerStages() {
private final List<Stage> consumerStages = new LinkedList<Stage>();
private final List<Stage> finiteProducerStages = new LinkedList<Stage>();
private final List<Stage> infiniteProducerStages = new LinkedList<Stage>();
public List<Stage> getConsumerStages() {
return this.consumerStages;
}
public List<HeadStage> getFiniteProducerStages() {
public List<Stage> getFiniteProducerStages() {
return this.finiteProducerStages;
}
public List<HeadStage> getInfiniteProducerStages() {
public List<Stage> getInfiniteProducerStages() {
return this.infiniteProducerStages;
}
public void addThreadableStage(final Stage stage) {
// wrap the stage categorization in a runnable
// because the termination strategy could depend on port configuration that is set later
final Runnable addThreadableStageJob = new Runnable() {
@Override
public void run() {
switch (stage.getTerminationStrategy()) {
case BY_SIGNAL:
consumerStages.add(stage);
break;
case BY_SELF_DECISION:
finiteProducerStages.add(stage);
break;
case BY_INTERRUPT:
infiniteProducerStages.add(stage);
break;
}
}
};
threadableStageJobs.add(addThreadableStageJob);
}
void init() {
for (Runnable job : threadableStageJobs) {
job.run();
}
}
}
......@@ -9,10 +9,9 @@ package teetime.framework;
* the type of the default output port
*
*/
public abstract class ProducerStage<O> extends AbstractStage implements HeadStage {
public abstract class ProducerStage<O> extends AbstractStage implements Stage {
protected final OutputPort<O> outputPort = this.createOutputPort();
private boolean shouldTerminate;
public final OutputPort<O> getOutputPort() {
return this.outputPort;
......@@ -24,13 +23,8 @@ public abstract class ProducerStage<O> extends AbstractStage implements HeadStag
}
@Override
public void terminate() {
this.shouldTerminate = true;
}
@Override
public boolean shouldBeTerminated() {
return this.shouldTerminate;
public TerminationStrategy getTerminationStrategy() {
return TerminationStrategy.BY_SELF_DECISION;
}
protected abstract void execute();
......
......
......@@ -10,11 +10,11 @@ import teetime.framework.validation.AnalysisNotValidException;
public class RunnableStage implements Runnable {
private final HeadStage stage;
private final Stage stage;
private final Logger logger;
private boolean validationEnabled;
public RunnableStage(final HeadStage stage) {
public RunnableStage(final Stage stage) {
this.stage = stage;
this.logger = LoggerFactory.getLogger(stage.getClass());
}
......
......
......@@ -5,7 +5,7 @@ import java.util.List;
import teetime.framework.signal.ISignal;
import teetime.framework.validation.InvalidPortConnection;
public interface Stage {
public interface Stage extends Terminable {
String getId();
......
......
package teetime.framework;
public interface HeadStage extends Stage {
interface Terminable {
boolean shouldBeTerminated();
TerminationStrategy getTerminationStrategy();
void terminate();
boolean shouldBeTerminated();
}
package teetime.framework;
public enum TerminationStrategy {
BY_SIGNAL, BY_SELF_DECISION, BY_INTERRUPT
}
......@@ -4,12 +4,12 @@ import java.io.File;
import java.io.IOException;
import teetime.framework.ConsumerStage;
import teetime.framework.HeadStage;
import teetime.framework.OutputPort;
import teetime.framework.Stage;
import com.google.common.io.Files;
public class File2ByteArray extends ConsumerStage<File> implements HeadStage {
public class File2ByteArray extends ConsumerStage<File> implements Stage {
private final OutputPort<byte[]> outputPort = this.createOutputPort();
......
......
......@@ -18,8 +18,8 @@ package teetime.examples.experiment09;
import java.util.List;
import teetime.framework.OldHeadPipeline;
import teetime.framework.HeadStage;
import teetime.framework.RunnableStage;
import teetime.framework.Stage;
import teetime.framework.pipe.CommittablePipe;
import teetime.stage.CollectorSink;
import teetime.stage.NoopFilter;
......@@ -43,7 +43,7 @@ public class MethodCallThroughputAnalysis9 {
private Runnable runnable;
public void init() {
HeadStage pipeline = this.buildPipeline();
Stage pipeline = this.buildPipeline();
this.runnable = new RunnableStage(pipeline);
}
......
......
......@@ -18,8 +18,8 @@ package teetime.examples.experiment11;
import java.util.List;
import teetime.framework.OldHeadPipeline;
import teetime.framework.HeadStage;
import teetime.framework.RunnableStage;
import teetime.framework.Stage;
import teetime.framework.pipe.UnorderedGrowablePipe;
import teetime.stage.CollectorSink;
import teetime.stage.NoopFilter;
......@@ -43,7 +43,7 @@ public class MethodCallThroughputAnalysis11 {
private Runnable runnable;
public void init() {
HeadStage pipeline = this.buildPipeline(this.numInputObjects, this.inputObjectCreator);
Stage pipeline = this.buildPipeline(this.numInputObjects, this.inputObjectCreator);
this.runnable = new RunnableStage(pipeline);
}
......
......
......@@ -18,8 +18,8 @@ package teetime.examples.experiment14;
import java.util.List;
import teetime.framework.OldHeadPipeline;
import teetime.framework.HeadStage;
import teetime.framework.RunnableStage;
import teetime.framework.Stage;
import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
......@@ -47,7 +47,7 @@ public class MethodCallThroughputAnalysis14 {
private final PipeFactoryRegistry pipeFactory = PipeFactoryRegistry.INSTANCE;
public void init() {
HeadStage pipeline = this.buildPipeline();
Stage pipeline = this.buildPipeline();
this.runnable = new RunnableStage(pipeline);
}
......
......
......@@ -18,8 +18,8 @@ package teetime.examples.experiment15;
import java.util.List;
import teetime.framework.OldHeadPipeline;
import teetime.framework.HeadStage;
import teetime.framework.RunnableStage;
import teetime.framework.Stage;
import teetime.framework.pipe.OrderedGrowableArrayPipe;
import teetime.framework.pipe.SingleElementPipe;
import teetime.framework.pipe.SpScPipe;
......@@ -58,7 +58,7 @@ public class MethodCallThroughputAnalysis15 {
OldHeadPipeline<Clock, Sink<Long>> clockPipeline = this.buildClockPipeline();
this.clockRunnable = new RunnableStage(clockPipeline);
HeadStage pipeline = this.buildPipeline(this.clock);
Stage pipeline = this.buildPipeline(this.clock);
this.runnable = new RunnableStage(pipeline);
}
......
......
package teetime.framework;
@Deprecated
public class OldHeadPipeline<FirstStage extends HeadStage, LastStage extends Stage> extends OldPipeline<FirstStage, LastStage> implements HeadStage {
public class OldHeadPipeline<FirstStage extends Stage, LastStage extends Stage> extends OldPipeline<FirstStage, LastStage> implements Stage {
public OldHeadPipeline() {}
......
......
......@@ -57,4 +57,22 @@ public class OldPipeline<FirstStage extends Stage, LastStage extends Stage> impl
this.lastStage.validateOutputPorts(invalidPortConnections);
}
@Override
public TerminationStrategy getTerminationStrategy() {
// TODO Auto-generated method stub
return null;
}
@Override
public void terminate() {
// TODO Auto-generated method stub
}
@Override
public boolean shouldBeTerminated() {
// TODO Auto-generated method stub
return false;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment