From 2d1da8730705b179fef5218fe5674932f2ab8c22 Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Mon, 22 Jun 2015 17:33:35 +0200 Subject: [PATCH] Configuration extends AbstractCompositeStage; both uses the context now --- .../framework/AbstractCompositeStage.java | 28 +++++++++++-------- .../java/teetime/framework/Configuration.java | 9 +++--- .../framework/ConfigurationContext.java | 7 ++--- .../java/teetime/framework/Execution.java | 14 +++++----- .../teetime/framework/test/StageTester.java | 10 +++---- .../examples/cipher/CipherConfiguration.java | 4 +-- .../teetime/examples/cipher/CipherTest.java | 5 ++-- .../tokenizer/TokenizerConfiguration.java | 4 +-- .../java/teetime/framework/ExecutionTest.java | 10 +++---- ...unnableConsumerStageTestConfiguration.java | 2 +- .../java/teetime/framework/StageTest.java | 2 +- .../java/teetime/framework/TraversorTest.java | 4 +-- .../framework/WaitStrategyConfiguration.java | 2 +- .../framework/YieldStrategyConfiguration.java | 2 +- .../ExceptionTestConfiguration.java | 4 +-- .../teetime/stage/InstanceOfFilterTest.java | 6 ++-- 16 files changed, 56 insertions(+), 57 deletions(-) diff --git a/src/main/java/teetime/framework/AbstractCompositeStage.java b/src/main/java/teetime/framework/AbstractCompositeStage.java index 59b8a5ef..1a7fd869 100644 --- a/src/main/java/teetime/framework/AbstractCompositeStage.java +++ b/src/main/java/teetime/framework/AbstractCompositeStage.java @@ -23,7 +23,7 @@ package teetime.framework; * * */ -public abstract class AbstractCompositeStage extends Configuration { +public abstract class AbstractCompositeStage { private final ConfigurationContext context; @@ -34,23 +34,27 @@ public abstract class AbstractCompositeStage extends Configuration { this.context = context; } - @Override - protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { - context.connectPorts(sourcePort, targetPort, capacity); + protected ConfigurationContext getContext() { + return context; } - @Override - protected <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { - connectPorts(sourcePort, targetPort, 4); + protected final void addThreadableStage(final Stage stage) { + context.addThreadableStage(stage); } - @Override - protected void addThreadableStage(final Stage stage) { - context.addThreadableStage(stage); + protected final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + context.connectPorts(sourcePort, targetPort); } - protected ConfigurationContext getContext() { - return context; + protected final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + context.connectPorts(sourcePort, targetPort, capacity); } + protected final <T> void connectBoundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { + ConfigurationContext.connectBoundedInterThreads(sourcePort, targetPort); + } + + protected final <T> void connectBoundedInterThreads(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { + ConfigurationContext.connectBoundedInterThreads(sourcePort, targetPort, capacity); + } } diff --git a/src/main/java/teetime/framework/Configuration.java b/src/main/java/teetime/framework/Configuration.java index 9615e6b0..35f225ea 100644 --- a/src/main/java/teetime/framework/Configuration.java +++ b/src/main/java/teetime/framework/Configuration.java @@ -1,10 +1,9 @@ package teetime.framework; -public abstract class Configuration { +public abstract class Configuration extends AbstractCompositeStage { - protected abstract <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity); + public Configuration() { + super(new ConfigurationContext()); + } - protected abstract <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort); - - protected abstract void addThreadableStage(final Stage stage); } diff --git a/src/main/java/teetime/framework/ConfigurationContext.java b/src/main/java/teetime/framework/ConfigurationContext.java index 4e2f5f6a..a89df567 100644 --- a/src/main/java/teetime/framework/ConfigurationContext.java +++ b/src/main/java/teetime/framework/ConfigurationContext.java @@ -29,10 +29,10 @@ import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; /** - * Represents a configuration of connected stages, which is needed to run a analysis. + * Represents a context that is used by a configuration and composite stages to connect ports, for example. * Stages can be added by executing {@link #addThreadableStage(Stage)}. */ -public abstract class ConfigurationContext extends Configuration { +public final class ConfigurationContext { private static final Logger LOGGER = LoggerFactory.getLogger(ConfigurationContext.class); @@ -64,7 +64,6 @@ public abstract class ConfigurationContext extends Configuration { * @param stage * A arbitrary stage, which will be added to the configuration and executed in a thread. */ - @Override protected final void addThreadableStage(final Stage stage) { if (!this.threadableStages.add(stage)) { LOGGER.warn("Stage " + stage.getId() + " was already marked as threadable stage."); @@ -180,7 +179,6 @@ public abstract class ConfigurationContext extends Configuration { * @param <T> * the type of elements to be sent */ - @Override protected final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) { connectPorts(sourcePort, targetPort, 4); } @@ -197,7 +195,6 @@ public abstract class ConfigurationContext extends Configuration { * @param <T> * the type of elements to be sent */ - @Override protected final <T> void connectPorts(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) { if (sourcePort.getOwningStage().getInputPorts().length == 0 && !threadableStages.contains(sourcePort.getOwningStage())) { addThreadableStage(sourcePort.getOwningStage()); diff --git a/src/main/java/teetime/framework/Execution.java b/src/main/java/teetime/framework/Execution.java index 042584b4..7c14597c 100644 --- a/src/main/java/teetime/framework/Execution.java +++ b/src/main/java/teetime/framework/Execution.java @@ -35,7 +35,7 @@ import teetime.util.Pair; /** * Represents an Execution to which stages can be added and executed later. - * This needs a {@link ConfigurationContext}, + * This needs a {@link Configuration}, * in which the adding and configuring of stages takes place. * To start the analysis {@link #executeBlocking()} needs to be executed. * This class will automatically create threads and join them without any further commitment. @@ -43,9 +43,9 @@ import teetime.util.Pair; * @author Christian Wulf, Nelson Tavares de Sousa * * @param <T> - * the type of the {@link ConfigurationContext} + * the type of the {@link Configuration} */ -public final class Execution<T extends ConfigurationContext> implements UncaughtExceptionHandler { +public final class Execution<T extends Configuration> implements UncaughtExceptionHandler { private static final Logger LOGGER = LoggerFactory.getLogger(Execution.class); @@ -100,7 +100,7 @@ public final class Execution<T extends ConfigurationContext> implements Uncaught // BETTER validate concurrently private void validateStages() { - final Set<Stage> threadableStageJobs = this.configuration.getThreadableStages(); + final Set<Stage> threadableStageJobs = this.configuration.getContext().getThreadableStages(); for (Stage stage : threadableStageJobs) { // // portConnectionValidator.validate(stage); // } @@ -118,10 +118,10 @@ public final class Execution<T extends ConfigurationContext> implements Uncaught * */ private final void init() { - ExecutionInstantiation executionInstantiation = new ExecutionInstantiation(configuration); + ExecutionInstantiation executionInstantiation = new ExecutionInstantiation(configuration.getContext()); executionInstantiation.instantiatePipes(); - final Set<Stage> threadableStageJobs = this.configuration.getThreadableStages(); + final Set<Stage> threadableStageJobs = this.configuration.getContext().getThreadableStages(); if (threadableStageJobs.isEmpty()) { throw new IllegalStateException("No stage was added using the addThreadableStage(..) method. Add at least one stage."); } @@ -298,7 +298,7 @@ public final class Execution<T extends ConfigurationContext> implements Uncaught if (!executionInterrupted) { executionInterrupted = true; LOGGER.warn("Thread " + thread + " was interrupted. Terminating analysis now."); - for (Stage stage : configuration.getThreadableStages()) { + for (Stage stage : configuration.getContext().getThreadableStages()) { if (stage.getOwningThread() != thread) { if (stage.getTerminationStrategy() == TerminationStrategy.BY_SELF_DECISION) { stage.terminate(); diff --git a/src/main/java/teetime/framework/test/StageTester.java b/src/main/java/teetime/framework/test/StageTester.java index b9c55537..2c430e96 100644 --- a/src/main/java/teetime/framework/test/StageTester.java +++ b/src/main/java/teetime/framework/test/StageTester.java @@ -19,7 +19,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import teetime.framework.ConfigurationContext; +import teetime.framework.Configuration; import teetime.framework.Execution; import teetime.framework.ExecutionException; import teetime.framework.Stage; @@ -78,14 +78,14 @@ public final class StageTester { * */ public void start() { - final ConfigurationContext configuration = new Configuration(inputHolders, stage, outputHolders); - final Execution<ConfigurationContext> analysis = new Execution<ConfigurationContext>(configuration); + final Configuration configuration = new TestConfiguration(inputHolders, stage, outputHolders); + final Execution<Configuration> analysis = new Execution<Configuration>(configuration); analysis.executeBlocking(); } - private final class Configuration extends ConfigurationContext { + private final class TestConfiguration extends Configuration { - public Configuration(final List<InputHolder<?>> inputHolders, final Stage stage, final List<OutputHolder<?>> outputHolders) { + public TestConfiguration(final List<InputHolder<?>> inputHolders, final Stage stage, final List<OutputHolder<?>> outputHolders) { for (InputHolder<?> inputHolder : inputHolders) { final InitialElementProducer<Object> producer = new InitialElementProducer<Object>(inputHolder.getInput()); connectPorts(producer.getOutputPort(), inputHolder.getPort()); diff --git a/src/test/java/teetime/examples/cipher/CipherConfiguration.java b/src/test/java/teetime/examples/cipher/CipherConfiguration.java index 7f46f37b..ce4fe662 100644 --- a/src/test/java/teetime/examples/cipher/CipherConfiguration.java +++ b/src/test/java/teetime/examples/cipher/CipherConfiguration.java @@ -17,7 +17,7 @@ package teetime.examples.cipher; import java.io.File; -import teetime.framework.ConfigurationContext; +import teetime.framework.Configuration; import teetime.stage.CipherStage; import teetime.stage.CipherStage.CipherMode; import teetime.stage.InitialElementProducer; @@ -26,7 +26,7 @@ import teetime.stage.ZipByteArray.ZipMode; import teetime.stage.io.ByteArrayFileWriter; import teetime.stage.io.File2ByteArray; -public class CipherConfiguration extends ConfigurationContext { +public class CipherConfiguration extends Configuration { public CipherConfiguration(final String inputFile, final String outputFile, final String password) { final File input = new File(inputFile); diff --git a/src/test/java/teetime/examples/cipher/CipherTest.java b/src/test/java/teetime/examples/cipher/CipherTest.java index ca53aa4a..de2a62ad 100644 --- a/src/test/java/teetime/examples/cipher/CipherTest.java +++ b/src/test/java/teetime/examples/cipher/CipherTest.java @@ -22,7 +22,6 @@ import org.junit.Assert; import org.junit.Test; import teetime.framework.Execution; -import teetime.framework.ConfigurationContext; import com.google.common.io.Files; @@ -43,8 +42,8 @@ public class CipherTest { final String outputFile = "src/test/resources/data/output.txt"; final String password = "Password"; - final ConfigurationContext configuration = new CipherConfiguration(inputFile, outputFile, password); - final Execution execution = new Execution(configuration); + final CipherConfiguration configuration = new CipherConfiguration(inputFile, outputFile, password); + final Execution<CipherConfiguration> execution = new Execution<CipherConfiguration>(configuration); execution.executeBlocking(); Assert.assertTrue(Files.equal(new File(inputFile), new File(outputFile))); diff --git a/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java b/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java index abafb20b..8be24112 100644 --- a/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java +++ b/src/test/java/teetime/examples/tokenizer/TokenizerConfiguration.java @@ -17,7 +17,7 @@ package teetime.examples.tokenizer; import java.io.File; -import teetime.framework.ConfigurationContext; +import teetime.framework.Configuration; import teetime.stage.ByteArray2String; import teetime.stage.CipherStage; import teetime.stage.CipherStage.CipherMode; @@ -28,7 +28,7 @@ import teetime.stage.ZipByteArray.ZipMode; import teetime.stage.io.File2ByteArray; import teetime.stage.string.Tokenizer; -public class TokenizerConfiguration extends ConfigurationContext { +public class TokenizerConfiguration extends Configuration { private final Counter<String> counter; diff --git a/src/test/java/teetime/framework/ExecutionTest.java b/src/test/java/teetime/framework/ExecutionTest.java index 758dcfb1..028513cf 100644 --- a/src/test/java/teetime/framework/ExecutionTest.java +++ b/src/test/java/teetime/framework/ExecutionTest.java @@ -70,7 +70,7 @@ public class ExecutionTest { assertThat(watch.getDurationInMs() + ABSOLUTE_MAX_ERROR_IN_MS, is(greaterThanOrEqualTo(DELAY_IN_MS))); } - private static class TestConfig extends ConfigurationContext { + private static class TestConfig extends Configuration { public final DelayAndTerminate delay; public TestConfig() { @@ -111,7 +111,7 @@ public class ExecutionTest { assertThat(intraAnalysis.getConfiguration().init.getOwningThread(), is(intraAnalysis.getConfiguration().sink.getOwningThread())); } - private class AnalysisTestConfig extends ConfigurationContext { + private class AnalysisTestConfig extends Configuration { public InitialElementProducer<Object> init = new InitialElementProducer<Object>(); public Sink<Object> sink = new Sink<Object>(); @@ -134,7 +134,7 @@ public class ExecutionTest { new Execution<InvalidTestConfig>(configuration); } - private class InvalidTestConfig extends ConfigurationContext { + private class InvalidTestConfig extends Configuration { public InitialElementProducer<Object> init = new InitialElementProducer<Object>(); public InstanceOfFilter<Object, Object> iof = new InstanceOfFilter<Object, Object>(Object.class); public Sink<Object> sink = new Sink<Object>(); @@ -150,11 +150,11 @@ public class ExecutionTest { @Test public void automaticallyAddHeadStages() { AutomaticallyConfig context = new AutomaticallyConfig(); - new Execution<ConfigurationContext>(context).executeBlocking(); + new Execution<Configuration>(context).executeBlocking(); assertTrue(context.executed); } - private class AutomaticallyConfig extends ConfigurationContext { + private class AutomaticallyConfig extends Configuration { public boolean executed; diff --git a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java index 904cae28..965fe3e6 100644 --- a/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java +++ b/src/test/java/teetime/framework/RunnableConsumerStageTestConfiguration.java @@ -21,7 +21,7 @@ import java.util.List; import teetime.stage.CollectorSink; import teetime.stage.InitialElementProducer; -public class RunnableConsumerStageTestConfiguration extends ConfigurationContext { +public class RunnableConsumerStageTestConfiguration extends Configuration { private final List<Integer> collectedElements = new ArrayList<Integer>(); private final CollectorSink<Integer> collectorSink; diff --git a/src/test/java/teetime/framework/StageTest.java b/src/test/java/teetime/framework/StageTest.java index c8cf92c2..dccd7007 100644 --- a/src/test/java/teetime/framework/StageTest.java +++ b/src/test/java/teetime/framework/StageTest.java @@ -53,7 +53,7 @@ public class StageTest { assertEquals(tc.init.exceptionHandler, tc.delay.exceptionHandler); } - private static class TestConfig extends ConfigurationContext { + private static class TestConfig extends Configuration { public final DelayAndTerminate delay; public InitialElementProducer<String> init; diff --git a/src/test/java/teetime/framework/TraversorTest.java b/src/test/java/teetime/framework/TraversorTest.java index 782fc8cf..0b312dcd 100644 --- a/src/test/java/teetime/framework/TraversorTest.java +++ b/src/test/java/teetime/framework/TraversorTest.java @@ -53,7 +53,7 @@ public class TraversorTest { } // WordCounterConfiguration - private class TestConfiguration extends ConfigurationContext { + private class TestConfiguration extends Configuration { public final CountingMapMerger<String> result = new CountingMapMerger<String>(); public final InitialElementProducer<File> init; @@ -77,7 +77,7 @@ public class TraversorTest { // Middle part... multiple instances of WordCounter are created and connected to the merger and distrubuter stages for (int i = 0; i < threads; i++) { // final InputPortSizePrinter<String> inputPortSizePrinter = new InputPortSizePrinter<String>(); - final WordCounter wc = new WordCounter(this); + final WordCounter wc = new WordCounter(this.getContext()); // intraFact.create(inputPortSizePrinter.getOutputPort(), wc.getInputPort()); connectPorts(distributor.getNewOutputPort(), wc.getInputPort()); diff --git a/src/test/java/teetime/framework/WaitStrategyConfiguration.java b/src/test/java/teetime/framework/WaitStrategyConfiguration.java index 5ca67a38..4b63b95b 100644 --- a/src/test/java/teetime/framework/WaitStrategyConfiguration.java +++ b/src/test/java/teetime/framework/WaitStrategyConfiguration.java @@ -21,7 +21,7 @@ import teetime.stage.InitialElementProducer; import teetime.stage.Relay; import teetime.stage.basic.Delay; -class WaitStrategyConfiguration extends ConfigurationContext { +class WaitStrategyConfiguration extends Configuration { private Delay<Object> delay; private CollectorSink<Object> collectorSink; diff --git a/src/test/java/teetime/framework/YieldStrategyConfiguration.java b/src/test/java/teetime/framework/YieldStrategyConfiguration.java index b19df100..1adb09c9 100644 --- a/src/test/java/teetime/framework/YieldStrategyConfiguration.java +++ b/src/test/java/teetime/framework/YieldStrategyConfiguration.java @@ -19,7 +19,7 @@ import teetime.stage.CollectorSink; import teetime.stage.InitialElementProducer; import teetime.stage.Relay; -class YieldStrategyConfiguration extends ConfigurationContext { +class YieldStrategyConfiguration extends Configuration { private CollectorSink<Object> collectorSink; diff --git a/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java b/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java index f7d1e450..5586a3e1 100644 --- a/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java +++ b/src/test/java/teetime/framework/exceptionHandling/ExceptionTestConfiguration.java @@ -15,9 +15,9 @@ */ package teetime.framework.exceptionHandling; -import teetime.framework.ConfigurationContext; +import teetime.framework.Configuration; -public class ExceptionTestConfiguration extends ConfigurationContext { +public class ExceptionTestConfiguration extends Configuration { ExceptionTestProducerStage first; ExceptionTestConsumerStage second; diff --git a/src/test/java/teetime/stage/InstanceOfFilterTest.java b/src/test/java/teetime/stage/InstanceOfFilterTest.java index f4152c64..fbd2c561 100644 --- a/src/test/java/teetime/stage/InstanceOfFilterTest.java +++ b/src/test/java/teetime/stage/InstanceOfFilterTest.java @@ -29,7 +29,7 @@ import java.util.List; import org.junit.Before; import org.junit.Test; -import teetime.framework.ConfigurationContext; +import teetime.framework.Configuration; import teetime.framework.Execution; import teetime.framework.ExecutionException; import teetime.util.Pair; @@ -113,7 +113,7 @@ public class InstanceOfFilterTest { @Test public void filterShouldSendToBothOutputPorts() throws Exception { InstanceOfFilterTestConfig config = new InstanceOfFilterTestConfig(); - Execution execution = new Execution(config); + Execution<InstanceOfFilterTestConfig> execution = new Execution<InstanceOfFilterTestConfig>(config); try { execution.executeBlocking(); } catch (ExecutionException e) { @@ -122,7 +122,7 @@ public class InstanceOfFilterTest { } } - private static class InstanceOfFilterTestConfig extends ConfigurationContext { + private static class InstanceOfFilterTestConfig extends Configuration { public InstanceOfFilterTestConfig() { InitialElementProducer<Object> elementProducer = new InitialElementProducer<Object>(); -- GitLab