diff --git a/src/main/java/teetime/framework/AbstractCompositeStage.java b/src/main/java/teetime/framework/AbstractCompositeStage.java index 59b8a5ef242a301beb788f1ba142c7fde144d365..d09ed0bb56bd9031c9b89e17899a76d76da6d293 100644 --- a/src/main/java/teetime/framework/AbstractCompositeStage.java +++ b/src/main/java/teetime/framework/AbstractCompositeStage.java @@ -18,12 +18,12 @@ package teetime.framework; /** * Represents a minimal stage that composes several other stages. * - * @since 1.2 - * @author Christian Wulf, Nelson Tavares de Sousa + * @since 2.0 * + * @author Christian Wulf, Nelson Tavares de Sousa * */ -public abstract class AbstractCompositeStage extends Configuration { +public abstract class AbstractCompositeStage { private final ConfigurationContext context; @@ -34,23 +34,20 @@ public abstract class AbstractCompositeStage extends Configuration { this.context = context; } - @Override - protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { - context.connectPorts(sourcePort, targetPort, capacity); + protected ConfigurationContext getContext() { + return context; } - @Override - protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { - connectPorts(sourcePort, targetPort, 4); + protected final void addThreadableStage(final Stage stage) { + context.addThreadableStage(stage); } - @Override - protected void addThreadableStage(final Stage stage) { - context.addThreadableStage(stage); + protected final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + context.connectPorts(sourcePort, targetPort); } - protected ConfigurationContext getContext() { - return context; + protected final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + context.connectPorts(sourcePort, targetPort, capacity); } } diff --git a/src/main/java/teetime/framework/Configuration.java b/src/main/java/teetime/framework/Configuration.java index 9615e6b08857db181bb973fd4b22855e1d413a27..e01908f7399cfe10f70688ad0e4b4ad5118bb88c 100644 --- a/src/main/java/teetime/framework/Configuration.java +++ b/src/main/java/teetime/framework/Configuration.java @@ -1,10 +1,17 @@ package teetime.framework; -public abstract class Configuration { +/** + * + * + * @author Christian Wulf, Nelson Tavares de Sousa + * + * @since 2.0 + * + */ +public abstract class Configuration extends AbstractCompositeStage { - protected abstract <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity); + public Configuration() { + super(new ConfigurationContext()); + } - protected abstract <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort); - - protected abstract void addThreadableStage(final Stage stage); } diff --git a/src/main/java/teetime/framework/ConfigurationContext.java b/src/main/java/teetime/framework/ConfigurationContext.java index 875b13d0afa2bc5a40474dceabe673da6c3ce166..4c6e011b890c15cd375ccff1a5be1490aeae8804 100644 --- a/src/main/java/teetime/framework/ConfigurationContext.java +++ b/src/main/java/teetime/framework/ConfigurationContext.java @@ -24,10 +24,12 @@ import org.slf4j.LoggerFactory; import teetime.framework.pipe.InstantiationPipe; /** - * Represents a configuration of connected stages, which is needed to run a analysis. + * Represents a context that is used by a configuration and composite stages to connect ports, for example. * Stages can be added by executing {@link #addThreadableStage(Stage)}. + * + * @since 2.0 */ -public abstract class ConfigurationContext extends Configuration { +public final class ConfigurationContext { private static final Logger LOGGER = LoggerFactory.getLogger(ConfigurationContext.class); @@ -43,7 +45,6 @@ public abstract class ConfigurationContext extends Configuration { * @param stage * A arbitrary stage, which will be added to the configuration and executed in a thread. */ - @Override protected final void addThreadableStage(final Stage stage) { if (!this.threadableStages.add(stage)) { LOGGER.warn("Stage " + stage.getId() + " was already marked as threadable stage."); @@ -60,7 +61,6 @@ public abstract class ConfigurationContext extends Configuration { * @param <T> * the type of elements to be sent */ - @Override protected final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { connectPorts(sourcePort, targetPort, 4); } @@ -77,7 +77,6 @@ public abstract class ConfigurationContext extends Configuration { * @param <T> * the type of elements to be sent */ - @Override protected final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { if (sourcePort.getOwningStage().getInputPorts().length == 0 && !threadableStages.contains(sourcePort.getOwningStage())) { addThreadableStage(sourcePort.getOwningStage()); diff --git a/src/main/java/teetime/framework/Execution.java b/src/main/java/teetime/framework/Execution.java index 710ec8b6ac48e5d5a5dbd0477d66ca9030a01c07..6219bd924f0943444dc5bd4873c177bd9d482b91 100644 --- a/src/main/java/teetime/framework/Execution.java +++ b/src/main/java/teetime/framework/Execution.java @@ -35,7 +35,7 @@ import teetime.util.ThreadThrowableContainer; /** * Represents an Execution to which stages can be added and executed later. - * This needs a {@link ConfigurationContext}, + * This needs a {@link Configuration}, * in which the adding and configuring of stages takes place. * To start the analysis {@link #executeBlocking()} needs to be executed. * This class will automatically create threads and join them without any further commitment. @@ -43,9 +43,11 @@ import teetime.util.ThreadThrowableContainer; * @author Christian Wulf, Nelson Tavares de Sousa * * @param <T> - * the type of the {@link ConfigurationContext} + * the type of the {@link Configuration} + * + * @since 2.0 */ -public final class Execution<T extends ConfigurationContext> implements UncaughtExceptionHandler { +public final class Execution<T extends Configuration> implements UncaughtExceptionHandler { private static final Logger LOGGER = LoggerFactory.getLogger(Execution.class); @@ -100,7 +102,7 @@ public final class Execution<T extends ConfigurationContext> implements Uncaught // BETTER validate concurrently private void validateStages() { - final Set<Stage> threadableStageJobs = this.configuration.getThreadableStages(); + final Set<Stage> threadableStageJobs = this.configuration.getContext().getThreadableStages(); for (Stage stage : threadableStageJobs) { // // portConnectionValidator.validate(stage); // } @@ -118,10 +120,10 @@ public final class Execution<T extends ConfigurationContext> implements Uncaught * */ private final void init() { - ExecutionInstantiation executionInstantiation = new ExecutionInstantiation(configuration); + ExecutionInstantiation executionInstantiation = new ExecutionInstantiation(configuration.getContext()); executionInstantiation.instantiatePipes(); - final Set<Stage> threadableStageJobs = this.configuration.getThreadableStages(); + final Set<Stage> threadableStageJobs = this.configuration.getContext().getThreadableStages(); if (threadableStageJobs.isEmpty()) { throw new IllegalStateException("No stage was added using the addThreadableStage(..) method. Add at least one stage."); } @@ -298,7 +300,7 @@ public final class Execution<T extends ConfigurationContext> implements Uncaught if (!executionInterrupted) { executionInterrupted = true; LOGGER.warn("Thread " + thread + " was interrupted. Terminating analysis now."); - for (Stage stage : configuration.getThreadableStages()) { + for (Stage stage : configuration.getContext().getThreadableStages()) { if (stage.getOwningThread() != thread) { if (stage.getTerminationStrategy() == TerminationStrategy.BY_SELF_DECISION) { stage.terminate(); diff --git a/src/main/java/teetime/framework/test/StageTester.java b/src/main/java/teetime/framework/test/StageTester.java index b9c55537c3b45ea6f42f5064b7aa1fd2120f5bf5..2c430e969e4236e85d28638030a99d9c7435d7c1 100644 --- a/src/main/java/teetime/framework/test/StageTester.java +++ b/src/main/java/teetime/framework/test/StageTester.java @@ -19,7 +19,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import teetime.framework.ConfigurationContext; +import teetime.framework.Configuration; import teetime.framework.Execution; import teetime.framework.ExecutionException; import teetime.framework.Stage; @@ -78,14 +78,14 @@ public final class StageTester { * */ public void start() { - final ConfigurationContext configuration = new Configuration(inputHolders, stage, outputHolders); - final Execution<ConfigurationContext> analysis = new Execution<ConfigurationContext>(configuration); + final Configuration configuration = new TestConfiguration(inputHolders, stage, outputHolders); + final Execution<Configuration> analysis = new Execution<Configuration>(configuration); analysis.executeBlocking(); } - private final class Configuration extends ConfigurationContext { + private final class TestConfiguration extends Configuration { - public Configuration(final List<InputHolder<?>> inputHolders, final Stage stage, final List<OutputHolder<?>> outputHolders) { + public TestConfiguration(final List<InputHolder<?>> inputHolders, final Stage stage, final List<OutputHolder<?>> outputHolders) { for (InputHolder<?> inputHolder : inputHolders) { final InitialElementProducer<Object> producer = new InitialElementProducer<Object>(inputHolder.getInput()); connectPorts(producer.getOutputPort(), inputHolder.getPort()); diff --git a/src/test/java/teetime/examples/cipher/CipherConfiguration.java b/src/test/java/teetime/examples/cipher/CipherConfiguration.java index 7f46f37b9b21359fc9995d38d17b766d259cb3ce..ce4fe662cf66f6e5d9e148527c2334c8fe6741ea 100644 --- a/src/test/java/teetime/examples/cipher/CipherConfiguration.java +++ b/src/test/java/teetime/examples/cipher/CipherConfiguration.java @@ -17,7 +17,7 @@ package teetime.examples.cipher; import java.io.File; -import teetime.framework.ConfigurationContext; +import teetime.framework.Configuration; import teetime.stage.CipherStage; import teetime.stage.CipherStage.CipherMode; import teetime.stage.InitialElementProducer; @@ -26,7 +26,7 @@ import teetime.stage.ZipByteArray.ZipMode; import teetime.stage.io.ByteArrayFileWriter; import teetime.stage.io.File2ByteArray; -public class CipherConfiguration extends ConfigurationContext { +public class CipherConfiguration extends Configuration { public CipherConfiguration(final String inputFile, final String outputFile, final String password) { final File input = new File(inputFile); diff --git a/src/test/java/teetime/examples/cipher/CipherTest.java b/src/test/java/teetime/examples/cipher/CipherTest.java index ca53aa4a0585c47be782d9a68c2ffeb6bdd5a4a2..de2a62ad2f81c31d6467998cebc4e92d344a877f 100644 --- a/src/test/java/teetime/examples/cipher/CipherTest.java +++ b/src/test/java/teetime/examples/cipher/CipherTest.java @@ -22,7 +22,6 @@ import org.junit.Assert; import org.junit.Test; import teetime.framework.Execution; -import teetime.framework.ConfigurationContext; import com.google.common.io.Files; @@ -43,8 +42,8 @@ public class CipherTest { final String outputFile = "src/test/resources/data/output.txt"; final String password = "Password"; - final ConfigurationContext configuration = new CipherConfiguration(inputFile, outputFile, password); - final Execution execution = new Execution(configuration); + final CipherConfiguration configuration = new CipherConfiguration(inputFile, outputFile, password); + final Execution<CipherConfiguration> execution = new Execution<CipherConfiguration>(configuration); execution.executeBlocking(); Assert.assertTrue(Files.equal(new File(inputFile), new File(outputFile))); diff --git a/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java b/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java index abafb20ba5a1f71cdb522769d18077a4fb81e695..8be24112c33d8baf6e64dd07e5f3531d74674f3c 100644 --- a/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java +++ b/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java @@ -17,7 +17,7 @@ package teetime.examples.tokenizer; import java.io.File; -import teetime.framework.ConfigurationContext; +import teetime.framework.Configuration; import teetime.stage.ByteArray2String; import teetime.stage.CipherStage; import teetime.stage.CipherStage.CipherMode; @@ -28,7 +28,7 @@ import teetime.stage.ZipByteArray.ZipMode; import teetime.stage.io.File2ByteArray; import teetime.stage.string.Tokenizer; -public class TokenizerConfiguration extends ConfigurationContext { +public class TokenizerConfiguration extends Configuration { private final Counter<String> counter; diff --git a/src/test/java/teetime/framework/ExecutionTest.java b/src/test/java/teetime/framework/ExecutionTest.java index 758dcfb1221b12ca368821057712af080c844d77..028513cf2a29be48d60d00445efb186fade57d70 100644 --- a/src/test/java/teetime/framework/ExecutionTest.java +++ b/src/test/java/teetime/framework/ExecutionTest.java @@ -70,7 +70,7 @@ public class ExecutionTest { assertThat(watch.getDurationInMs() + ABSOLUTE_MAX_ERROR_IN_MS, is(greaterThanOrEqualTo(DELAY_IN_MS))); } - private static class TestConfig extends ConfigurationContext { + private static class TestConfig extends Configuration { public final DelayAndTerminate delay; public TestConfig() { @@ -111,7 +111,7 @@ public class ExecutionTest { assertThat(intraAnalysis.getConfiguration().init.getOwningThread(), is(intraAnalysis.getConfiguration().sink.getOwningThread())); } - private class AnalysisTestConfig extends ConfigurationContext { + private class AnalysisTestConfig extends Configuration { public InitialElementProducer<Object> init = new InitialElementProducer<Object>(); public Sink<Object> sink = new Sink<Object>(); @@ -134,7 +134,7 @@ public class ExecutionTest { new Execution<InvalidTestConfig>(configuration); } - private class InvalidTestConfig extends ConfigurationContext { + private class InvalidTestConfig extends Configuration { public InitialElementProducer<Object> init = new InitialElementProducer<Object>(); public InstanceOfFilter<Object, Object> iof = new InstanceOfFilter<Object, Object>(Object.class); public Sink<Object> sink = new Sink<Object>(); @@ -150,11 +150,11 @@ public class ExecutionTest { @Test public void automaticallyAddHeadStages() { AutomaticallyConfig context = new AutomaticallyConfig(); - new Execution<ConfigurationContext>(context).executeBlocking(); + new Execution<Configuration>(context).executeBlocking(); assertTrue(context.executed); } - private class AutomaticallyConfig extends ConfigurationContext { + private class AutomaticallyConfig extends Configuration { public boolean executed; diff --git a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java index ed9eabec19e7b7da6258cff1bd9b401e220844d8..52575215317fb1f537e7e65e8b0d977cb65654d7 100644 --- a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java +++ b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java @@ -22,7 +22,7 @@ import teetime.framework.pipe.SpScPipeFactory; import teetime.stage.CollectorSink; import teetime.stage.InitialElementProducer; -public class RunnableConsumerStageTestConfiguration extends ConfigurationContext { +public class RunnableConsumerStageTestConfiguration extends Configuration { private final List<Integer> collectedElements = new ArrayList<Integer>(); private final CollectorSink<Integer> collectorSink; diff --git a/src/test/java/teetime/framework/StageTest.java b/src/test/java/teetime/framework/StageTest.java index c8cf92c2d4a5543de9b6c3c31d00f81f991c6b5b..dccd7007671e1c20c2aa395d6ca3b83eb6dfd9d7 100644 --- a/src/test/java/teetime/framework/StageTest.java +++ b/src/test/java/teetime/framework/StageTest.java @@ -53,7 +53,7 @@ public class StageTest { assertEquals(tc.init.exceptionHandler, tc.delay.exceptionHandler); } - private static class TestConfig extends ConfigurationContext { + private static class TestConfig extends Configuration { public final DelayAndTerminate delay; public InitialElementProducer<String> init; diff --git a/src/test/java/teetime/framework/TraversorTest.java b/src/test/java/teetime/framework/TraversorTest.java index 782fc8cfafb85c3ff2538ec54f9dcf71d739bf8e..0b312dcdadb5ad77776754d5d4d8f370075123dd 100644 --- a/src/test/java/teetime/framework/TraversorTest.java +++ b/src/test/java/teetime/framework/TraversorTest.java @@ -53,7 +53,7 @@ public class TraversorTest { } // WordCounterConfiguration - private class TestConfiguration extends ConfigurationContext { + private class TestConfiguration extends Configuration { public final CountingMapMerger<String> result = new CountingMapMerger<String>(); public final InitialElementProducer<File> init; @@ -77,7 +77,7 @@ public class TraversorTest { // Middle part... multiple instances of WordCounter are created and connected to the merger and distrubuter stages for (int i = 0; i < threads; i++) { // final InputPortSizePrinter<String> inputPortSizePrinter = new InputPortSizePrinter<String>(); - final WordCounter wc = new WordCounter(this); + final WordCounter wc = new WordCounter(this.getContext()); // intraFact.create(inputPortSizePrinter.getOutputPort(), wc.getInputPort()); connectPorts(distributor.getNewOutputPort(), wc.getInputPort()); diff --git a/src/test/java/teetime/framework/WaitStrategyConfiguration.java b/src/test/java/teetime/framework/WaitStrategyConfiguration.java index 5ca67a386f4ed103dfe9370ed92b5142a5515ce4..4b63b95b354440d26e620b914319d456ca688497 100644 --- a/src/test/java/teetime/framework/WaitStrategyConfiguration.java +++ b/src/test/java/teetime/framework/WaitStrategyConfiguration.java @@ -21,7 +21,7 @@ import teetime.stage.InitialElementProducer; import teetime.stage.Relay; import teetime.stage.basic.Delay; -class WaitStrategyConfiguration extends ConfigurationContext { +class WaitStrategyConfiguration extends Configuration { private Delay<Object> delay; private CollectorSink<Object> collectorSink; diff --git a/src/test/java/teetime/framework/YieldStrategyConfiguration.java b/src/test/java/teetime/framework/YieldStrategyConfiguration.java index b19df100b246d87c72d1e76ae67c267ef99d2f4e..1adb09c9835e1c762dbdb2d08d8d8a85e59b188c 100644 --- a/src/test/java/teetime/framework/YieldStrategyConfiguration.java +++ b/src/test/java/teetime/framework/YieldStrategyConfiguration.java @@ -19,7 +19,7 @@ import teetime.stage.CollectorSink; import teetime.stage.InitialElementProducer; import teetime.stage.Relay; -class YieldStrategyConfiguration extends ConfigurationContext { +class YieldStrategyConfiguration extends Configuration { private CollectorSink<Object> collectorSink; diff --git a/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java b/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java index f7d1e4503c52766e6ddfe9b1f42b012a3fe4d726..5586a3e1890e3ee3d2bb57e8de1aaede7873ef1a 100644 --- a/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java +++ b/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java @@ -15,9 +15,9 @@ */ package teetime.framework.exceptionHandling; -import teetime.framework.ConfigurationContext; +import teetime.framework.Configuration; -public class ExceptionTestConfiguration extends ConfigurationContext { +public class ExceptionTestConfiguration extends Configuration { ExceptionTestProducerStage first; ExceptionTestConsumerStage second; diff --git a/src/test/java/teetime/stage/InstanceOfFilterTest.java b/src/test/java/teetime/stage/InstanceOfFilterTest.java index e582a9b256eb21d0faa76800acc8246c08ccfe43..a9e8d1acf9fe80ae432a560d13f46eb6747e9d3f 100644 --- a/src/test/java/teetime/stage/InstanceOfFilterTest.java +++ b/src/test/java/teetime/stage/InstanceOfFilterTest.java @@ -29,7 +29,7 @@ import java.util.List; import org.junit.Before; import org.junit.Test; -import teetime.framework.ConfigurationContext; +import teetime.framework.Configuration; import teetime.framework.Execution; import teetime.framework.ExecutionException; import teetime.util.ThreadThrowableContainer; @@ -113,7 +113,7 @@ public class InstanceOfFilterTest { @Test public void filterShouldSendToBothOutputPorts() throws Exception { InstanceOfFilterTestConfig config = new InstanceOfFilterTestConfig(); - Execution execution = new Execution(config); + Execution<InstanceOfFilterTestConfig> execution = new Execution<InstanceOfFilterTestConfig>(config); try { execution.executeBlocking(); } catch (ExecutionException e) { @@ -122,7 +122,7 @@ public class InstanceOfFilterTest { } } - private static class InstanceOfFilterTestConfig extends ConfigurationContext { + private static class InstanceOfFilterTestConfig extends Configuration { public InstanceOfFilterTestConfig() { InitialElementProducer<Object> elementProducer = new InitialElementProducer<Object>();