Skip to content
Snippets Groups Projects
Commit 0b650cee authored by Christian Wulf's avatar Christian Wulf
Browse files

Merge remote-tracking branch 'origin/merged-prod-cons'

Conflicts:
	src/main/java/teetime/framework/AbstractStage.java
	src/main/java/teetime/framework/RunnableStage.java
parents c02cdd09 088233e7
No related branches found
No related tags found
No related merge requests found
Showing
with 145 additions and 37 deletions
#FindBugs User Preferences #FindBugs User Preferences
#Tue Nov 04 15:49:25 CET 2014 #Tue Nov 18 10:57:28 CET 2014
detector_threshold=3 detector_threshold=3
effort=max effort=max
excludefilter0=.fbExcludeFilterFile|true excludefilter0=.fbExcludeFilterFile|true
......
...@@ -33,6 +33,7 @@ public abstract class AbstractStage implements Stage { ...@@ -33,6 +33,7 @@ public abstract class AbstractStage implements Stage {
protected OutputPort<?>[] cachedOutputPorts; protected OutputPort<?>[] cachedOutputPorts;
private final Map<ISignal, Void> visited = new HashMap<ISignal, Void>(); private final Map<ISignal, Void> visited = new HashMap<ISignal, Void>();
private boolean shouldTerminate;
public AbstractStage() { public AbstractStage() {
this.id = UUID.randomUUID().toString(); // the id should only be represented by a UUID, not additionally by the class name this.id = UUID.randomUUID().toString(); // the id should only be represented by a UUID, not additionally by the class name
...@@ -124,8 +125,8 @@ public abstract class AbstractStage implements Stage { ...@@ -124,8 +125,8 @@ public abstract class AbstractStage implements Stage {
this.connectUnconnectedOutputPorts(); this.connectUnconnectedOutputPorts();
} }
public void onTerminating() throws Exception { // NOPMD public void onTerminating() throws Exception {
// empty default implementation terminate();
} }
protected <T> InputPort<T> createInputPort() { protected <T> InputPort<T> createInputPort() {
...@@ -162,4 +163,19 @@ public abstract class AbstractStage implements Stage { ...@@ -162,4 +163,19 @@ public abstract class AbstractStage implements Stage {
return this.getClass().getName() + ": " + this.id; return this.getClass().getName() + ": " + this.id;
} }
@Override
public void terminate() {
this.shouldTerminate = true;
}
@Override
public boolean shouldBeTerminated() {
return this.shouldTerminate;
}
@Override
public TerminationStrategy getTerminationStrategy() {
return TerminationStrategy.BY_SIGNAL;
}
} }
...@@ -28,17 +28,17 @@ public class Analysis implements UncaughtExceptionHandler { ...@@ -28,17 +28,17 @@ public class Analysis implements UncaughtExceptionHandler {
} }
public void init() { public void init() {
for (HeadStage stage : this.configuration.getConsumerStages()) { for (Stage stage : this.configuration.getConsumerStages()) {
Thread thread = new Thread(new RunnableStage(stage)); Thread thread = new Thread(new RunnableStage(stage));
this.consumerThreads.add(thread); this.consumerThreads.add(thread);
} }
for (HeadStage stage : this.configuration.getFiniteProducerStages()) { for (Stage stage : this.configuration.getFiniteProducerStages()) {
Thread thread = new Thread(new RunnableStage(stage)); Thread thread = new Thread(new RunnableStage(stage));
this.finiteProducerThreads.add(thread); this.finiteProducerThreads.add(thread);
} }
for (HeadStage stage : this.configuration.getInfiniteProducerStages()) { for (Stage stage : this.configuration.getInfiniteProducerStages()) {
Thread thread = new Thread(new RunnableStage(stage)); Thread thread = new Thread(new RunnableStage(stage));
this.infiniteProducerThreads.add(thread); this.infiniteProducerThreads.add(thread);
} }
......
package teetime.framework; package teetime.framework;
import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Set;
import teetime.framework.pipe.PipeFactoryRegistry; import teetime.framework.pipe.PipeFactoryRegistry;
...@@ -9,20 +11,51 @@ public class AnalysisConfiguration { ...@@ -9,20 +11,51 @@ public class AnalysisConfiguration {
protected static final PipeFactoryRegistry PIPE_FACTORY_REGISTRY = PipeFactoryRegistry.INSTANCE; protected static final PipeFactoryRegistry PIPE_FACTORY_REGISTRY = PipeFactoryRegistry.INSTANCE;
private final List<HeadStage> consumerStages = new LinkedList<HeadStage>(); private final List<Runnable> threadableStageJobs = new LinkedList<Runnable>();
private final List<HeadStage> finiteProducerStages = new LinkedList<HeadStage>();
private final List<HeadStage> infiniteProducerStages = new LinkedList<HeadStage>();
public List<HeadStage> getConsumerStages() { private final Set<Stage> consumerStages = new HashSet<Stage>();
private final Set<Stage> finiteProducerStages = new HashSet<Stage>();
private final Set<Stage> infiniteProducerStages = new HashSet<Stage>();
public Set<Stage> getConsumerStages() {
return this.consumerStages; return this.consumerStages;
} }
public List<HeadStage> getFiniteProducerStages() { public Set<Stage> getFiniteProducerStages() {
return this.finiteProducerStages; return this.finiteProducerStages;
} }
public List<HeadStage> getInfiniteProducerStages() { public Set<Stage> getInfiniteProducerStages() {
return this.infiniteProducerStages; return this.infiniteProducerStages;
} }
public void addThreadableStage(final Stage stage) {
// wrap the stage categorization in a runnable
// because the termination strategy could depend on port configuration that is set later
final Runnable addThreadableStageJob = new Runnable() {
@Override
public void run() {
switch (stage.getTerminationStrategy()) {
case BY_SIGNAL:
consumerStages.add(stage);
break;
case BY_SELF_DECISION:
finiteProducerStages.add(stage);
break;
case BY_INTERRUPT:
infiniteProducerStages.add(stage);
break;
}
}
};
threadableStageJobs.add(addThreadableStageJob);
}
void init() {
for (Runnable job : threadableStageJobs) {
job.run();
}
}
} }
...@@ -9,10 +9,9 @@ package teetime.framework; ...@@ -9,10 +9,9 @@ package teetime.framework;
* the type of the default output port * the type of the default output port
* *
*/ */
public abstract class ProducerStage<O> extends AbstractStage implements HeadStage { public abstract class ProducerStage<O> extends AbstractStage implements Stage {
protected final OutputPort<O> outputPort = this.createOutputPort(); protected final OutputPort<O> outputPort = this.createOutputPort();
private boolean shouldTerminate;
public final OutputPort<O> getOutputPort() { public final OutputPort<O> getOutputPort() {
return this.outputPort; return this.outputPort;
...@@ -24,13 +23,8 @@ public abstract class ProducerStage<O> extends AbstractStage implements HeadStag ...@@ -24,13 +23,8 @@ public abstract class ProducerStage<O> extends AbstractStage implements HeadStag
} }
@Override @Override
public void terminate() { public TerminationStrategy getTerminationStrategy() {
this.shouldTerminate = true; return TerminationStrategy.BY_SELF_DECISION;
}
@Override
public boolean shouldBeTerminated() {
return this.shouldTerminate;
} }
protected abstract void execute(); protected abstract void execute();
......
...@@ -11,11 +11,11 @@ import teetime.framework.validation.AnalysisNotValidException; ...@@ -11,11 +11,11 @@ import teetime.framework.validation.AnalysisNotValidException;
@SuppressWarnings("PMD.BeanMembersShouldSerialize") @SuppressWarnings("PMD.BeanMembersShouldSerialize")
public class RunnableStage implements Runnable { public class RunnableStage implements Runnable {
private final HeadStage stage; private final Stage stage;
private final Logger logger; // NOPMD private final Logger logger; // NOPMD
private boolean validationEnabled; private boolean validationEnabled;
public RunnableStage(final HeadStage stage) { public RunnableStage(final Stage stage) {
this.stage = stage; this.stage = stage;
this.logger = LoggerFactory.getLogger(stage.getClass()); this.logger = LoggerFactory.getLogger(stage.getClass());
} }
......
...@@ -5,7 +5,7 @@ import java.util.List; ...@@ -5,7 +5,7 @@ import java.util.List;
import teetime.framework.signal.ISignal; import teetime.framework.signal.ISignal;
import teetime.framework.validation.InvalidPortConnection; import teetime.framework.validation.InvalidPortConnection;
public interface Stage { public interface Stage extends Terminable {
String getId(); String getId();
......
package teetime.framework; package teetime.framework;
public interface HeadStage extends Stage { interface Terminable {
boolean shouldBeTerminated(); TerminationStrategy getTerminationStrategy();
void terminate(); void terminate();
boolean shouldBeTerminated();
} }
package teetime.framework;
public enum TerminationStrategy {
BY_SIGNAL, BY_SELF_DECISION, BY_INTERRUPT
}
package teetime.stage; package teetime.stage;
import teetime.framework.ProducerStage; import teetime.framework.ProducerStage;
import teetime.framework.TerminationStrategy;
public class Clock extends ProducerStage<Long> { public class Clock extends ProducerStage<Long> {
...@@ -9,6 +10,11 @@ public class Clock extends ProducerStage<Long> { ...@@ -9,6 +10,11 @@ public class Clock extends ProducerStage<Long> {
private long initialDelayInMs; private long initialDelayInMs;
private long intervalDelayInMs; private long intervalDelayInMs;
@Override
public TerminationStrategy getTerminationStrategy() {
return TerminationStrategy.BY_INTERRUPT;
}
@Override @Override
protected void execute() { protected void execute() {
if (!this.initialDelayExceeded) { if (!this.initialDelayExceeded) {
......
...@@ -4,12 +4,12 @@ import java.io.File; ...@@ -4,12 +4,12 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import teetime.framework.ConsumerStage; import teetime.framework.ConsumerStage;
import teetime.framework.HeadStage;
import teetime.framework.OutputPort; import teetime.framework.OutputPort;
import teetime.framework.Stage;
import com.google.common.io.Files; import com.google.common.io.Files;
public class File2ByteArray extends ConsumerStage<File> implements HeadStage { public class File2ByteArray extends ConsumerStage<File> implements Stage {
private final OutputPort<byte[]> outputPort = this.createOutputPort(); private final OutputPort<byte[]> outputPort = this.createOutputPort();
......
...@@ -18,8 +18,8 @@ package teetime.examples.experiment09; ...@@ -18,8 +18,8 @@ package teetime.examples.experiment09;
import java.util.List; import java.util.List;
import teetime.framework.OldHeadPipeline; import teetime.framework.OldHeadPipeline;
import teetime.framework.HeadStage;
import teetime.framework.RunnableStage; import teetime.framework.RunnableStage;
import teetime.framework.Stage;
import teetime.framework.pipe.CommittablePipe; import teetime.framework.pipe.CommittablePipe;
import teetime.stage.CollectorSink; import teetime.stage.CollectorSink;
import teetime.stage.NoopFilter; import teetime.stage.NoopFilter;
...@@ -43,7 +43,7 @@ public class MethodCallThroughputAnalysis9 { ...@@ -43,7 +43,7 @@ public class MethodCallThroughputAnalysis9 {
private Runnable runnable; private Runnable runnable;
public void init() { public void init() {
HeadStage pipeline = this.buildPipeline(); Stage pipeline = this.buildPipeline();
this.runnable = new RunnableStage(pipeline); this.runnable = new RunnableStage(pipeline);
} }
......
...@@ -18,8 +18,8 @@ package teetime.examples.experiment11; ...@@ -18,8 +18,8 @@ package teetime.examples.experiment11;
import java.util.List; import java.util.List;
import teetime.framework.OldHeadPipeline; import teetime.framework.OldHeadPipeline;
import teetime.framework.HeadStage;
import teetime.framework.RunnableStage; import teetime.framework.RunnableStage;
import teetime.framework.Stage;
import teetime.framework.pipe.UnorderedGrowablePipe; import teetime.framework.pipe.UnorderedGrowablePipe;
import teetime.stage.CollectorSink; import teetime.stage.CollectorSink;
import teetime.stage.NoopFilter; import teetime.stage.NoopFilter;
...@@ -43,7 +43,7 @@ public class MethodCallThroughputAnalysis11 { ...@@ -43,7 +43,7 @@ public class MethodCallThroughputAnalysis11 {
private Runnable runnable; private Runnable runnable;
public void init() { public void init() {
HeadStage pipeline = this.buildPipeline(this.numInputObjects, this.inputObjectCreator); Stage pipeline = this.buildPipeline(this.numInputObjects, this.inputObjectCreator);
this.runnable = new RunnableStage(pipeline); this.runnable = new RunnableStage(pipeline);
} }
......
...@@ -18,8 +18,8 @@ package teetime.examples.experiment14; ...@@ -18,8 +18,8 @@ package teetime.examples.experiment14;
import java.util.List; import java.util.List;
import teetime.framework.OldHeadPipeline; import teetime.framework.OldHeadPipeline;
import teetime.framework.HeadStage;
import teetime.framework.RunnableStage; import teetime.framework.RunnableStage;
import teetime.framework.Stage;
import teetime.framework.pipe.IPipeFactory; import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry; import teetime.framework.pipe.PipeFactoryRegistry;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
...@@ -47,7 +47,7 @@ public class MethodCallThroughputAnalysis14 { ...@@ -47,7 +47,7 @@ public class MethodCallThroughputAnalysis14 {
private final PipeFactoryRegistry pipeFactory = PipeFactoryRegistry.INSTANCE; private final PipeFactoryRegistry pipeFactory = PipeFactoryRegistry.INSTANCE;
public void init() { public void init() {
HeadStage pipeline = this.buildPipeline(); Stage pipeline = this.buildPipeline();
this.runnable = new RunnableStage(pipeline); this.runnable = new RunnableStage(pipeline);
} }
......
...@@ -18,8 +18,8 @@ package teetime.examples.experiment15; ...@@ -18,8 +18,8 @@ package teetime.examples.experiment15;
import java.util.List; import java.util.List;
import teetime.framework.OldHeadPipeline; import teetime.framework.OldHeadPipeline;
import teetime.framework.HeadStage;
import teetime.framework.RunnableStage; import teetime.framework.RunnableStage;
import teetime.framework.Stage;
import teetime.framework.pipe.OrderedGrowableArrayPipe; import teetime.framework.pipe.OrderedGrowableArrayPipe;
import teetime.framework.pipe.SingleElementPipe; import teetime.framework.pipe.SingleElementPipe;
import teetime.framework.pipe.SpScPipe; import teetime.framework.pipe.SpScPipe;
...@@ -58,7 +58,7 @@ public class MethodCallThroughputAnalysis15 { ...@@ -58,7 +58,7 @@ public class MethodCallThroughputAnalysis15 {
OldHeadPipeline<Clock, Sink<Long>> clockPipeline = this.buildClockPipeline(); OldHeadPipeline<Clock, Sink<Long>> clockPipeline = this.buildClockPipeline();
this.clockRunnable = new RunnableStage(clockPipeline); this.clockRunnable = new RunnableStage(clockPipeline);
HeadStage pipeline = this.buildPipeline(this.clock); Stage pipeline = this.buildPipeline(this.clock);
this.runnable = new RunnableStage(pipeline); this.runnable = new RunnableStage(pipeline);
} }
......
package teetime.framework; package teetime.framework;
@Deprecated @Deprecated
public class OldHeadPipeline<FirstStage extends HeadStage, LastStage extends Stage> extends OldPipeline<FirstStage, LastStage> implements HeadStage { public class OldHeadPipeline<FirstStage extends Stage, LastStage extends Stage> extends OldPipeline<FirstStage, LastStage> implements Stage {
public OldHeadPipeline() {} public OldHeadPipeline() {}
......
...@@ -57,4 +57,22 @@ public class OldPipeline<FirstStage extends Stage, LastStage extends Stage> impl ...@@ -57,4 +57,22 @@ public class OldPipeline<FirstStage extends Stage, LastStage extends Stage> impl
this.lastStage.validateOutputPorts(invalidPortConnections); this.lastStage.validateOutputPorts(invalidPortConnections);
} }
@Override
public TerminationStrategy getTerminationStrategy() {
// TODO Auto-generated method stub
return null;
}
@Override
public void terminate() {
// TODO Auto-generated method stub
}
@Override
public boolean shouldBeTerminated() {
// TODO Auto-generated method stub
return false;
}
} }
package teetime.framework;
import org.junit.Assert;
import org.junit.Test;
import teetime.stage.Clock;
import teetime.stage.Counter;
import teetime.stage.InitialElementProducer;
public class AnalysisConfigurationTest {
@Test
public void checkIfCorrectAdded() {
AnalysisConfiguration config = new AnalysisConfiguration();
// Consumer -> BY_SIGNAL
Counter<String> counter = new Counter<String>();
config.addThreadableStage(counter);
// Infinite producer -> BY_INTERRUPT
Clock clock = new Clock();
config.addThreadableStage(clock);
// Finite Producer -> BY_SELF_DECISION
InitialElementProducer<Integer> producer = new InitialElementProducer<Integer>(1, 2, 3, 4);
config.addThreadableStage(producer);
config.init();
Assert.assertTrue(config.getConsumerStages().remove(counter));
Assert.assertTrue(config.getInfiniteProducerStages().remove(clock));
Assert.assertTrue(config.getFiniteProducerStages().remove(producer));
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment