From 39dd66de02230015916ab8cd506f195c25809b39 Mon Sep 17 00:00:00 2001
From: Marc Adolf <mad@informatik.uni-kiel.de>
Date: Thu, 10 Mar 2016 19:02:21 +0100
Subject: [PATCH] final version for the thesis
---
conf/quality-config/pmd-ruleset.xml | 4 +
.../java/teetime/framework/AbstractStage.java | 77 ++++++++++++++----
.../java/teetime/framework/Configuration.java | 27 +++++++
.../framework/ThreadAssignmentService.java | 3 +
.../pipe/ChangesOngoingException.java | 30 -------
.../framework/pipe/DeActivatingPipe.java | 23 ++++++
.../AlternatingAssignment.java | 5 +-
.../AssignmentAdaptationThread.java | 23 ++++++
.../DefaultThreadAssignment.java | 2 +-
.../threadAssignment/FDPStageAssignment.java | 4 +
.../threadAssignment/FDPStageAssignment2.java | 5 ++
.../OneThreadPerStageAssignment.java | 47 -----------
.../threadAssignment/SimpleAssignment.java | 5 +-
.../StableBottomUpAssignment.java | 6 +-
.../StableTopDownAssignment.java | 6 +-
.../threadAssignment/TestAssignment.java | 80 -------------------
.../threadAssignment/TreeMapComparator.java | 3 +-
.../analysis/AbstractAnalysisAlgorithm.java | 16 ++++
.../analysis/AnalysisService.java | 63 +++++++++++++--
.../threadAssignment/analysis/History.java | 36 +++++++--
.../{HistroyEntry.java => HistoryEntry.java} | 8 +-
.../analysis/MeanAlgorithm.java | 4 +-
.../analysis/RegressionAlgorithm.java | 4 +-
.../analysis/WeightedAlgorithm.java | 4 +-
.../metrics/PullThroughputMetric.java | 2 +-
25 files changed, 286 insertions(+), 201 deletions(-)
delete mode 100644 src/main/java/teetime/framework/pipe/ChangesOngoingException.java
delete mode 100644 src/main/java/teetime/framework/threadAssignment/OneThreadPerStageAssignment.java
delete mode 100644 src/main/java/teetime/framework/threadAssignment/TestAssignment.java
rename src/main/java/teetime/framework/threadAssignment/analysis/{HistroyEntry.java => HistoryEntry.java} (84%)
diff --git a/conf/quality-config/pmd-ruleset.xml b/conf/quality-config/pmd-ruleset.xml
index 420343b1b..4259c8ac8 100644
--- a/conf/quality-config/pmd-ruleset.xml
+++ b/conf/quality-config/pmd-ruleset.xml
@@ -99,6 +99,10 @@
<exclude name="LocalVariableCouldBeFinal" />
<exclude name="SimplifyStartsWith" />
</rule>
+
+ <!--rule ref="rulesets/chw/basic.xml">
+ <exclude name="NonHeaderCommentSize"/>
+ <!--/rule>
<!-- <rule ref="rulesets/chw/basic.xml" />
<rule ref="rulesets/chw/basic.xml/NonHeaderCommentSize">
diff --git a/src/main/java/teetime/framework/AbstractStage.java b/src/main/java/teetime/framework/AbstractStage.java
index 0488913f6..304e8525d 100644
--- a/src/main/java/teetime/framework/AbstractStage.java
+++ b/src/main/java/teetime/framework/AbstractStage.java
@@ -129,7 +129,7 @@ public abstract class AbstractStage {
// this stages activates, we have to sync first
synchronized (this) {
try {
- // TODO something to reduce the overhead in the normal case, normal exception handling
+ // TODO something to reduce the overhead in the case of a normal execution, normal exception handling
// while activating this stage
// if we enter this synchronized the pipe-replacement should be finished and the element should be available again
this.execute();
@@ -196,12 +196,28 @@ public abstract class AbstractStage {
this.setActive(true);
}
+ /**
+ *
+ * Calls {@link #declareActiveRuntime(int)} with no restriction to cascading behavior.
+ *
+ * @return true, if the stage could be activated
+ */
public synchronized Boolean declareActiveRuntime() {
// default depths is not restricted (some one want to build a configuration with
// max integer value stages that would have to be activated at once?)
return declareActiveRuntime(Integer.MAX_VALUE);
}
+ /**
+ * Execute this method at during the execution, to set this stage as active.
+ * The incoming pipes are replaced with {@link ActivatingPipe ActivatingPipes}, which handle
+ * the second part of the activation.
+ *
+ * @param depth
+ * determines how many other stages are allow to be activated in order to activate this stage
+ * @return true, if the stage could be activated
+ */
+
@SuppressWarnings("unchecked")
public Boolean declareActiveRuntime(final int depth) {
if (this.isActive) {
@@ -282,6 +298,11 @@ public abstract class AbstractStage {
}
}
+ /**
+ * Computes the number of threads that are needed to activate this stage
+ *
+ * @return the number of threads need for activation
+ */
public int requiredThreadsToActivate() {
int neededThreads = 0;
if (!this.isActive) {
@@ -290,6 +311,9 @@ public abstract class AbstractStage {
return neededThreads;
}
+ /*
+ * Recursive test to compute all needed threads for activation.
+ */
private int requiredThreadsToActivateFollowing() {
int neededThreads = 0;
@@ -314,6 +338,9 @@ public abstract class AbstractStage {
return neededThreads;
}
+ /*
+ * Recursive activation of all following stages, that need to be activated
+ */
private Boolean activateNecessaryFollowing() {
Boolean success = true;
for (OutputPort<?> outPort : this.getOutputPorts()) {
@@ -333,12 +360,22 @@ public abstract class AbstractStage {
return success;
}
+ /**
+ * Updates the owning thread of a single partition
+ *
+ * @param oldThread
+ * the thread to be replaced
+ * @param newThread
+ * the threads whichs replaces the old one
+ */
public void updateThreads(final Thread oldThread, final Thread newThread) {
new ThreadToStageSetter().replaceThreads(this, oldThread, newThread);
}
+ /*
+ * Helper method to change a pipe to its activation variant.
+ */
private <T> void changeToActivatingPipe(final IPipe<T> oldPipe) {
- // TODO cache signals
ActivatingPipe<T> newPipe = new ActivatingPipe<T>(oldPipe, this.getInputPorts(), 1);
if (oldPipe.isClosed()) {
newPipe.close();
@@ -346,8 +383,11 @@ public abstract class AbstractStage {
}
/**
- * Set an active stage as passive and waits for its thread to finish
- * Always sets the predecessor thread as the new owning thread
+ * Sets this stage as passive during the runtime.
+ * Replaces the incoming pipes with {@link DeActivatingPipe DeActivatingPipes}
+ * which handle the next part of the deactivation.
+ *
+ * @return true, if the stage could be set as passive
*/
public Boolean declarePassiveRuntime() {
@@ -390,6 +430,12 @@ public abstract class AbstractStage {
return true;
}
+ /**
+ * Declares this stage as passive and set the new owning thread.
+ *
+ * @param newContainingThread
+ * the thread which executes this stage and the other ones owned by the previous thread in the future.
+ */
public void declarePassive(final Thread newContainingThread) {
// wait for termination
try {
@@ -407,6 +453,9 @@ public abstract class AbstractStage {
this.isActive = false;
}
+ /*
+ * Changes a pipe to the DeActivatingPipe
+ */
private <T> void changeToDeActivatingPipe(final IPipe<T> oldPipe) {
IPipe<T> newPipe = new DeActivatingPipe<T>(((AbstractSynchedPipe<T>) oldPipe), getInputPorts());
@@ -425,21 +474,14 @@ public abstract class AbstractStage {
private boolean calledOnStarting = false;
private volatile StageState currentState = StageState.CREATED;
+ /**
+ * If the start sequence should be skipped, e.g. if the stage is activated at runtime
+ */
private boolean skipStart = false;
// state of the activation, 0 represents the normal behavior, -1 signals the thread to be deactivated
// and 1 let the thread skip the starting signal to avoid signal races
- private volatile int activation = 0;
-
- public int getActivation() {
- return activation;
- }
-
- public void setActivation(final int activation) {
- this.activation = activation;
- }
-
public List<InputPort<?>> getInputPorts() {
return inputPorts.getOpenedPorts(); // TODO consider to publish a read-only version
}
@@ -750,7 +792,7 @@ public abstract class AbstractStage {
/**
* Checks if this stage can be deactivated. E.g. if it only has one predecessor thread or is no producer
*
- * @return True, if the stage can be deactivated
+ * @return true, if the stage can be deactivated
*/
public Boolean canBePassive() {
Boolean passive = true;
@@ -775,6 +817,11 @@ public abstract class AbstractStage {
}
+ /**
+ * Checkks if the stage is in the process of being changed from active to passive or vice versa.
+ *
+ * @return true, if the stage is still being changed
+ */
public Boolean isChanging() {
return activating || deactivating || changing;
}
diff --git a/src/main/java/teetime/framework/Configuration.java b/src/main/java/teetime/framework/Configuration.java
index 27b383e34..56c5502df 100644
--- a/src/main/java/teetime/framework/Configuration.java
+++ b/src/main/java/teetime/framework/Configuration.java
@@ -47,18 +47,45 @@ public abstract class Configuration extends AbstractCompositeStage {
this.context = new ConfigurationContext(this);
}
+ /**
+ * Defines which assignment algorithm, to automatically distribute the resources, should be used during the execution.
+ * There exist several different strategies to be chosen of.
+ *
+ * @param assignment
+ * The assignment algorithm to be used
+ */
public void setThreadAssignment(final AbstractThreadAssignment assignment) {
context.getAssignmentService().setThreadAssignment(assignment);
}
+ /**
+ * Sets the applied metric, which is used by dynamic assignment or resource distribution algorithms.
+ *
+ * @param metric
+ * The used metric
+ */
public void setMetric(final AbstractMetric metric) {
context.getAssignmentService().setMetric(metric);
}
+ /**
+ * Defines how many threads are allowed to be used if we activate a single stage.
+ * In some cases the activation of one stage may need further activated threads.
+ * If this limit is exceeded the stage can not be activated at runtime
+ *
+ * @param activationDepth
+ * How many threads are allowed to be activated if one stage is set as active
+ */
public void setActivationDepth(final int activationDepth) {
context.getAssignmentService().setAllowedActivationDepth(activationDepth);
}
+ /**
+ * Defines which algorithm is used to predict the behavior of the system
+ *
+ * @param algorithm
+ * One of the algorithms defined in {@link AnalysisAlgorithm}.
+ */
public void setAnalysisAlgorithm(final AnalysisAlgorithm algorithm) {
context.getAssignmentService().setAnalysisAlgorithm(algorithm);
}
diff --git a/src/main/java/teetime/framework/ThreadAssignmentService.java b/src/main/java/teetime/framework/ThreadAssignmentService.java
index 744ccae83..cb4e42def 100644
--- a/src/main/java/teetime/framework/ThreadAssignmentService.java
+++ b/src/main/java/teetime/framework/ThreadAssignmentService.java
@@ -26,6 +26,9 @@ import teetime.framework.threadAssignment.metrics.AbstractMetric;
import teetime.framework.threadAssignment.metrics.PullThroughputMetric;
/**
+ * This service manages all needed features for the runtime adaptation of the configuration.
+ * Several options can be chosen.
+ *
* @author Marc Adolf
*
*/
diff --git a/src/main/java/teetime/framework/pipe/ChangesOngoingException.java b/src/main/java/teetime/framework/pipe/ChangesOngoingException.java
deleted file mode 100644
index f1d130a63..000000000
--- a/src/main/java/teetime/framework/pipe/ChangesOngoingException.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime-framework.github.io)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package teetime.framework.pipe;
-
-/**
- * @author Marc Adolf
- *
- */
-public class ChangesOngoingException extends Exception {
-
- /**
- *
- */
- private static final long serialVersionUID = -7522751730339676371L;
-
-}
diff --git a/src/main/java/teetime/framework/pipe/DeActivatingPipe.java b/src/main/java/teetime/framework/pipe/DeActivatingPipe.java
index 2c6e419d5..65fb1a16f 100644
--- a/src/main/java/teetime/framework/pipe/DeActivatingPipe.java
+++ b/src/main/java/teetime/framework/pipe/DeActivatingPipe.java
@@ -17,10 +17,19 @@ package teetime.framework.pipe;
import java.util.List;
+import teetime.framework.AbstractPipe;
import teetime.framework.AbstractSynchedPipe;
import teetime.framework.InputPort;
/**
+ * This pipe is used for deactivating a stage at runtime.
+ * All pipes in the input ports are replaced with this pipe and as soon as the (only) predecessor thread wants to add an element
+ * to one of the pipes, as much of the remaining elements are processed as possible.
+ * If no elements remains all pipes are replaced by {@link teetime.framework.pipe.UnsynchedPipe UnsynchedPipes}.
+ * The owning thread of the target stage is only stopped if we made sure, that the new responsible thread is still alive.
+ * Then the target stage is finally passive the owning thread is replaced.
+ * This is necessary to ensure, that only one Thread is active in a partition at the same time and no element is lost.
+ *
* @author Marc Adolf
*
*/
@@ -30,6 +39,14 @@ public class DeActivatingPipe<T> extends AbstractWaitingPipe<T> {
private final List<InputPort<?>> pipesToBeReplaced;
IPipe<T> newPipe;
+ /**
+ * * Creates a {@link DeActivatingPipe} and replaces the pipes in the input and output port according to the behavior defined in the {@link AbstractPipe}.
+ *
+ * @param oldPipe
+ * The old pipe is used to keep informations and some of its computation behavior
+ * @param pipesWithContent
+ * A list of all input ports, that can contain remaining elements
+ */
public DeActivatingPipe(final AbstractSynchedPipe<T> oldPipe,
final List<InputPort<?>> pipesWithContent) {
super(oldPipe.getSourcePort(), oldPipe.getTargetPort(), 4);
@@ -80,6 +97,9 @@ public class DeActivatingPipe<T> extends AbstractWaitingPipe<T> {
}
}
+ /*
+ * Replaces all pipes in known input ports with a standard pipe.
+ */
private void replaceAll() {
for (InputPort<?> inPort : pipesToBeReplaced) {
IPipe<?> pipe = inPort.getPipe();
@@ -91,6 +111,9 @@ public class DeActivatingPipe<T> extends AbstractWaitingPipe<T> {
}
+ /*
+ * Computes the number of remaining elements of all known pipes.
+ */
private int getTotalNumberRemainingElements() {
int totalElements = 0;
totalElements += getSize();
diff --git a/src/main/java/teetime/framework/threadAssignment/AlternatingAssignment.java b/src/main/java/teetime/framework/threadAssignment/AlternatingAssignment.java
index 088c015ee..92ceeba74 100644
--- a/src/main/java/teetime/framework/threadAssignment/AlternatingAssignment.java
+++ b/src/main/java/teetime/framework/threadAssignment/AlternatingAssignment.java
@@ -25,7 +25,10 @@ import java.util.Set;
import teetime.framework.AbstractStage;
/**
- * @author marc
+ * This algorithm tries to toggle the passive/active state of every {@link AbstractStage Stage}.
+ * This was build for evaluation purposes
+ *
+ * @author Marc Adolf
*
*/
public class AlternatingAssignment extends AbstractThreadAssignment {
diff --git a/src/main/java/teetime/framework/threadAssignment/AssignmentAdaptationThread.java b/src/main/java/teetime/framework/threadAssignment/AssignmentAdaptationThread.java
index 52bec2dac..844bbbfa2 100644
--- a/src/main/java/teetime/framework/threadAssignment/AssignmentAdaptationThread.java
+++ b/src/main/java/teetime/framework/threadAssignment/AssignmentAdaptationThread.java
@@ -24,6 +24,12 @@ import org.slf4j.LoggerFactory;
import teetime.framework.AbstractStage;
/**
+ * This threads executes the control loop of the dynamic adaptation.
+ * It is only started if an dynamic assignment was chosen.
+ * Its execution is split into three phases.
+ * First it updates the monitored data.
+ * Than it uses the assignment algorithm to plan the next iteration.
+ * At the end it executes this changes.
*
* @author Marc Adolf
*
@@ -35,11 +41,25 @@ public final class AssignmentAdaptationThread extends Thread {
private boolean running = true;
final static Logger logger = LoggerFactory.getLogger(AssignmentAdaptationThread.class);
+ /**
+ * Creates the thread responsible for the execution of the dynamic adaptation of the configuration.
+ *
+ * @param assignment
+ * The used assignment algorithm.
+ */
public AssignmentAdaptationThread(final AbstractThreadAssignment assignment) {
this(assignment, 200);
}
+ /**
+ * Creates the thread responsible for the execution of the dynamic adaptation of the configuration.
+ *
+ * @param assignment
+ * The used assignment algorithm.
+ * @param timeToWait
+ * The time between every iteration of the adaptation loop.
+ */
public AssignmentAdaptationThread(final AbstractThreadAssignment assignment, final long timeToWait) {
this.assignment = assignment;
this.setDaemon(true);
@@ -69,6 +89,9 @@ public final class AssignmentAdaptationThread extends Thread {
}
}
+ /**
+ * Terminates the adaptation thread
+ */
public void finish() {
this.running = false;
}
diff --git a/src/main/java/teetime/framework/threadAssignment/DefaultThreadAssignment.java b/src/main/java/teetime/framework/threadAssignment/DefaultThreadAssignment.java
index 74840e66e..ab4e54740 100644
--- a/src/main/java/teetime/framework/threadAssignment/DefaultThreadAssignment.java
+++ b/src/main/java/teetime/framework/threadAssignment/DefaultThreadAssignment.java
@@ -20,7 +20,7 @@ import java.util.Map;
import teetime.framework.AbstractStage;
/**
- * This class represents the default behaviour of TeeTime and may be used to apply legacy configurations.
+ * This class represents the default behavior of TeeTime and may be used to apply legacy configurations.
*
* @author Marc Adolf
*
diff --git a/src/main/java/teetime/framework/threadAssignment/FDPStageAssignment.java b/src/main/java/teetime/framework/threadAssignment/FDPStageAssignment.java
index 030800291..9f0372938 100644
--- a/src/main/java/teetime/framework/threadAssignment/FDPStageAssignment.java
+++ b/src/main/java/teetime/framework/threadAssignment/FDPStageAssignment.java
@@ -31,6 +31,10 @@ import teetime.framework.OutputPort;
import teetime.framework.pipe.IPipe;
/**
+ * This algorithm represents the procedure presented by the Feedback-directed Pipeline Parallelism by Suleman et al.
+ * Here we don't clearly divide the optimization and the saving phases.
+ * Here we focus on slow stages, an alternative would be to balance the partitions directly.
+ *
* @author Marc Adolf
*
*/
diff --git a/src/main/java/teetime/framework/threadAssignment/FDPStageAssignment2.java b/src/main/java/teetime/framework/threadAssignment/FDPStageAssignment2.java
index 49639fbc2..47bf06386 100644
--- a/src/main/java/teetime/framework/threadAssignment/FDPStageAssignment2.java
+++ b/src/main/java/teetime/framework/threadAssignment/FDPStageAssignment2.java
@@ -31,6 +31,11 @@ import teetime.framework.OutputPort;
import teetime.framework.pipe.IPipe;
/**
+ * This algorithm represents the procedure presented by the Feedback-directed Pipeline Parallelism by Suleman et al.
+ * In this algorithm we distinguish more clearly between the optimization and the saving phases.
+ * Here we focus on slow stages, an alternative would be to balance the partitions directly.
+ *
+ *
* @author Marc Adolf
*
*/
diff --git a/src/main/java/teetime/framework/threadAssignment/OneThreadPerStageAssignment.java b/src/main/java/teetime/framework/threadAssignment/OneThreadPerStageAssignment.java
deleted file mode 100644
index e6b1219ec..000000000
--- a/src/main/java/teetime/framework/threadAssignment/OneThreadPerStageAssignment.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime-framework.github.io)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package teetime.framework.threadAssignment;
-
-import java.util.Map;
-
-import teetime.framework.AbstractStage;
-
-public class OneThreadPerStageAssignment extends AbstractThreadAssignment {
-
- public OneThreadPerStageAssignment() {
- super(false);
- }
-
- @Override
- public void setFirstAssignment() {
- int stageNumber = 1;
- for (AbstractStage stage : this.stages) {
- stage.declareActive();
- // System.out.println(stageNumber);
- stageNumber++;
-
- }
- // System.out.println("done");
- }
-
- @Override
- public Map<AbstractStage, Integer> changeAssingmentAtRuntime() {
- // do nothing since its static
- // if something could be done and its static, no thread will execute this method
- return null;
- }
-
-}
diff --git a/src/main/java/teetime/framework/threadAssignment/SimpleAssignment.java b/src/main/java/teetime/framework/threadAssignment/SimpleAssignment.java
index a55ca1e3f..d5c11eec0 100644
--- a/src/main/java/teetime/framework/threadAssignment/SimpleAssignment.java
+++ b/src/main/java/teetime/framework/threadAssignment/SimpleAssignment.java
@@ -27,7 +27,10 @@ import java.util.TreeMap;
import teetime.framework.AbstractStage;
/**
- * @author marc
+ * In this assignment algorithm in every iteration the slowest stage is activated.
+ * If not enough resources are available, the fastest stage is set as passive.
+ *
+ * @author Marc Adolf
*
*/
public class SimpleAssignment extends AbstractThreadAssignment {
diff --git a/src/main/java/teetime/framework/threadAssignment/StableBottomUpAssignment.java b/src/main/java/teetime/framework/threadAssignment/StableBottomUpAssignment.java
index e0ab29853..0b8e9a9a9 100644
--- a/src/main/java/teetime/framework/threadAssignment/StableBottomUpAssignment.java
+++ b/src/main/java/teetime/framework/threadAssignment/StableBottomUpAssignment.java
@@ -28,7 +28,11 @@ import teetime.framework.OutputPort;
import teetime.framework.pipe.IPipe;
/**
- * @author marc
+ * In this algorithm we don't activate stages at the beginning.
+ * Instead we activate the slowest stage or its successors.
+ * If the performance of the changed stage drops through this, the change is reverted.
+ *
+ * @author Marc Adolf
*
*/
public class StableBottomUpAssignment extends AbstractThreadAssignment {
diff --git a/src/main/java/teetime/framework/threadAssignment/StableTopDownAssignment.java b/src/main/java/teetime/framework/threadAssignment/StableTopDownAssignment.java
index 5855c14dc..80441f31d 100644
--- a/src/main/java/teetime/framework/threadAssignment/StableTopDownAssignment.java
+++ b/src/main/java/teetime/framework/threadAssignment/StableTopDownAssignment.java
@@ -27,7 +27,11 @@ import java.util.TreeMap;
import teetime.framework.AbstractStage;
/**
- * @author marc
+ * In this assignment we start with every stage set as active.
+ * In every iteration we set the fastest stage as passive.
+ * If the performance drops, we stop this behavior don't apply further changes.
+ *
+ * @author Marc Adolf
*
*/
public class StableTopDownAssignment extends AbstractThreadAssignment {
diff --git a/src/main/java/teetime/framework/threadAssignment/TestAssignment.java b/src/main/java/teetime/framework/threadAssignment/TestAssignment.java
deleted file mode 100644
index 0abb82560..000000000
--- a/src/main/java/teetime/framework/threadAssignment/TestAssignment.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime-framework.github.io)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- *
- */
-package teetime.framework.threadAssignment;
-
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import teetime.framework.AbstractStage;
-
-/**
- * @author Marc Adolf
- *
- */
-public class TestAssignment extends AbstractThreadAssignment {
- int count = 0;
-
- public TestAssignment() {
- super(true);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see teetime.framework.threadAssignment.AbstractThreadAssignment#onInitialize()
- */
- @Override
- public void setFirstAssignment() {
- for (AbstractStage stage : stages) {
- stage.declareActive();
- }
-
- }
-
- /*
- * (non-Javadoc)
- *
- * @see teetime.framework.threadAssignment.AbstractThreadAssignment#changeAssingmentAtRuntime()
- */
- @Override
- public Map<AbstractStage, Integer> changeAssingmentAtRuntime() {
- Set<AbstractStage> newActiveOnes = new HashSet<AbstractStage>();
- for (AbstractStage stage : stages) {
- if (!stage.isActive()) {
- newActiveOnes.add(stage);
- }
- }
- // newActiveOnes.addAll(currentThreadedStages);
- // if (!newActiveOnes.isEmpty()) {
- // System.out.println("Assignm:" + newActiveOnes);
- // System.out.println("Curr:" + currentThreadedStages);
- // }
- // if (count == 0) {
- // newActiveOnes = stages;
- // }
- // if (count == 2) {
- // newActiveOnes = stages;
- // }
- // count++;
- return transformAssingmentToMap(newActiveOnes);
-
- }
-
-}
diff --git a/src/main/java/teetime/framework/threadAssignment/TreeMapComparator.java b/src/main/java/teetime/framework/threadAssignment/TreeMapComparator.java
index 3d7b0656a..f954108a6 100644
--- a/src/main/java/teetime/framework/threadAssignment/TreeMapComparator.java
+++ b/src/main/java/teetime/framework/threadAssignment/TreeMapComparator.java
@@ -24,7 +24,8 @@ import java.util.Map;
import teetime.framework.AbstractStage;
/**
- * @author marc
+ *
+ * @author Marc Adolf
*
*/
class TreeMapComparator implements Comparator<AbstractStage> {
diff --git a/src/main/java/teetime/framework/threadAssignment/analysis/AbstractAnalysisAlgorithm.java b/src/main/java/teetime/framework/threadAssignment/analysis/AbstractAnalysisAlgorithm.java
index 02392a229..b321e66b2 100644
--- a/src/main/java/teetime/framework/threadAssignment/analysis/AbstractAnalysisAlgorithm.java
+++ b/src/main/java/teetime/framework/threadAssignment/analysis/AbstractAnalysisAlgorithm.java
@@ -23,16 +23,32 @@ import java.util.Map;
import teetime.framework.AbstractStage;
/**
+ * The abstract class used for every algorithm that is applied to analyze the saved data.
+ *
* @author Marc Adolf
*
*/
public abstract class AbstractAnalysisAlgorithm {
private final int window;
+ /**
+ *
+ * @param window
+ * the range of the saved data that should be analyzed
+ */
public AbstractAnalysisAlgorithm(final int window) {
this.window = window;
}
+ /**
+ * The data saved in the {@link History} is analyzed.
+ * Thereby, every entry of each stages in the given range is compressed to a single value
+ * to predict the future behavior.
+ *
+ * @param history
+ * The history class where every used data is saved.
+ * @return A map with the considered stages as keys and their predicted performance as values.
+ */
public abstract Map<AbstractStage, Double> doAnalysis(final History history);
public int getWindow() {
diff --git a/src/main/java/teetime/framework/threadAssignment/analysis/AnalysisService.java b/src/main/java/teetime/framework/threadAssignment/analysis/AnalysisService.java
index 38dfe714a..4b334a965 100644
--- a/src/main/java/teetime/framework/threadAssignment/analysis/AnalysisService.java
+++ b/src/main/java/teetime/framework/threadAssignment/analysis/AnalysisService.java
@@ -22,23 +22,49 @@ import java.util.Set;
import teetime.framework.AbstractStage;
import teetime.framework.threadAssignment.metrics.AbstractMetric;
+/**
+ * This service class coordinates the analysis package, used by the dynamic adaptation approach.
+ * The metric and the used analysis algorithm can be chosen.
+ *
+ * @author Marc Adolf
+ *
+ */
public final class AnalysisService {
private AbstractMetric metric;
private final History history;
private Set<AbstractStage> stages;
+ /**
+ * Here the three different analysis algorithms are registered and can be chosen through.
+ *
+ * @author Marc Adolf
+ *
+ */
public enum AnalysisAlgorithm {
WEIGHTED, MEAN, REGRESSION
}
- // TODO other algorithms
protected AbstractAnalysisAlgorithm analysisAlgorithm;
private final int numberOfHistoryItems;
+ /**
+ * Here the monitoring and assignment classes are initialized.
+ *
+ * @param metric
+ * The specific metric which defines the observed properties
+ */
public AnalysisService(final AbstractMetric metric) {
this(5, metric);
}
+ /**
+ * Here the monitoring and assignment classes are initialized.
+ *
+ * @param numberOfHistoryItems
+ * Defines how many measurements are saved per observed stage.
+ * @param metric
+ * The specific metric which defines the observed properties
+ */
public AnalysisService(final int numberOfHistoryItems, final AbstractMetric metric) {
this.metric = metric;
this.numberOfHistoryItems = numberOfHistoryItems;
@@ -67,16 +93,29 @@ public final class AnalysisService {
}
+ /**
+ * This method uses the specified {@link AnalysisAlgorithm} and analyzes the
+ * data saved in the {@link History}.
+ *
+ * @return A map with the considered stages as keys and their predicted performance as values.
+ */
public synchronized Map<AbstractStage, Double> getSystemState() {
return analysisAlgorithm.doAnalysis(history);
}
- // only positive numbers -> normalized by max
+ /**
+ * This method uses the specified {@link AnalysisAlgorithm} and analyzes the
+ * data saved in the {@link History}.
+ * Thereby, the result of each stage is normalized by the maximum performance.
+ * In the end the values range between 0 and 1.
+ *
+ * @return A map with the considered stages as keys and their predicted and normalized performance as values.
+ *
+ */
public synchronized Map<AbstractStage, Double> getNormalizedSystemState() {
- // TODO exception handling if a pipe has the monitorable interface not implemented
// get data from history
- // using algorithm like wiechman for weighting the measurements in time
+ // using algorithm like in the task farm for weighting the measurements in time
Map<AbstractStage, Double> weightedMeasurements = new HashMap<AbstractStage, Double>();
weightedMeasurements = analysisAlgorithm.doAnalysis(history);
Map<AbstractStage, Double> normalizesMeasurements = new HashMap<AbstractStage, Double>();
@@ -110,16 +149,28 @@ public final class AnalysisService {
return metric;
}
+ /**
+ * The {@link AbstractMetric Metric} is used to gather the currently monitored data and
+ * it is saved in the {@link History}.
+ * Only a certain amount of values is saved for each stage.
+ */
public synchronized void updateData() {
Map<AbstractStage, Double> measurements = metric.getAllStageData(stages);
- Map<AbstractStage, HistroyEntry> historyEntries = new HashMap<AbstractStage, HistroyEntry>();
+ Map<AbstractStage, HistoryEntry> historyEntries = new HashMap<AbstractStage, HistoryEntry>();
for (AbstractStage stage : measurements.keySet()) {
- historyEntries.put(stage, new HistroyEntry(stage, measurements.get(stage)));
+ historyEntries.put(stage, new HistoryEntry(stage, measurements.get(stage)));
}
history.addEntry(historyEntries);
}
+ /**
+ * Removes the given stage from the list of considered stages.
+ * Also removes all data of this stage in the {@link History}.
+ *
+ * @param stage
+ * The {@link AbstractStage Stage} to be removed.
+ */
public void removeStage(final AbstractStage stage) {
stages.remove(stage);
history.removeStage(stage);
diff --git a/src/main/java/teetime/framework/threadAssignment/analysis/History.java b/src/main/java/teetime/framework/threadAssignment/analysis/History.java
index e0979dfc9..f41a15124 100644
--- a/src/main/java/teetime/framework/threadAssignment/analysis/History.java
+++ b/src/main/java/teetime/framework/threadAssignment/analysis/History.java
@@ -21,6 +21,14 @@ import java.util.Map;
import teetime.framework.AbstractStage;
+/**
+ * This class saves all measured {@link HistoryEntry HistoryEntries} for each {@link AbstractStage Stage}.
+ * The number of entries per stage is limited according to {@link #maxEntries}.
+ * If this limit is exceeded, old values are deleted
+ *
+ * @author Marc Adolf
+ *
+ */
public class History {
// similar to ccw's ThroughputHistory
@@ -31,22 +39,36 @@ public class History {
}
// BETTER maybe a circular buffer would be faster
- private final Map<AbstractStage, LinkedList<HistroyEntry>> entryList;
+ private final Map<AbstractStage, LinkedList<HistoryEntry>> entryList;
+ /**
+ * Creates an empty History with a limit to the number of saved entries per stage.
+ *
+ * @param maxEntries
+ * The limiting size of the History.
+ * Old values will be discarded if exceeded
+ */
public History(final int maxEntries) {
this.maxEntries = maxEntries;
- entryList = new HashMap<AbstractStage, LinkedList<HistroyEntry>>();
+ entryList = new HashMap<AbstractStage, LinkedList<HistoryEntry>>();
}
- public void addEntry(final Map<AbstractStage, HistroyEntry> newEntries) {
+ /**
+ * Adds a new {@link HistoryEntry} for every stage contained in the input.
+ * Old values will be discarded if {@link #maxEntries} is exceeded
+ *
+ * @param newEntries
+ * A map with stages and their new values.
+ */
+ public void addEntry(final Map<AbstractStage, HistoryEntry> newEntries) {
for (AbstractStage stage : newEntries.keySet()) {
if (!entryList.containsKey(stage)) {
- LinkedList<HistroyEntry> newList = new LinkedList<HistroyEntry>();
+ LinkedList<HistoryEntry> newList = new LinkedList<HistoryEntry>();
entryList.put(stage, newList);
}
- LinkedList<HistroyEntry> stageList = entryList.get(stage);
+ LinkedList<HistoryEntry> stageList = entryList.get(stage);
stageList.add(newEntries.get(stage));
if (stageList.size() > maxEntries) {
stageList.removeFirst();
@@ -54,11 +76,11 @@ public class History {
}
}
- public Map<AbstractStage, LinkedList<HistroyEntry>> getEntryList() {
+ public Map<AbstractStage, LinkedList<HistoryEntry>> getEntryList() {
return entryList;
}
- // needed for duplication to replace stages.
+ // needed for replication of stages.
public void removeStage(final AbstractStage stage) {
entryList.remove(stage);
}
diff --git a/src/main/java/teetime/framework/threadAssignment/analysis/HistroyEntry.java b/src/main/java/teetime/framework/threadAssignment/analysis/HistoryEntry.java
similarity index 84%
rename from src/main/java/teetime/framework/threadAssignment/analysis/HistroyEntry.java
rename to src/main/java/teetime/framework/threadAssignment/analysis/HistoryEntry.java
index 5be68332b..0dea8ce54 100644
--- a/src/main/java/teetime/framework/threadAssignment/analysis/HistroyEntry.java
+++ b/src/main/java/teetime/framework/threadAssignment/analysis/HistoryEntry.java
@@ -21,19 +21,21 @@ package teetime.framework.threadAssignment.analysis;
import teetime.framework.AbstractStage;
/**
+ * Represents one measured value of a single stage at a given time.
+ *
* @author Marc Adolf
*
*/
-class HistroyEntry {
+class HistoryEntry {
private final AbstractStage owningStage;
private final long timeStamp;
private final double value;
- HistroyEntry(final AbstractStage stage, final double value) {
+ HistoryEntry(final AbstractStage stage, final double value) {
this(stage, System.currentTimeMillis(), value);
}
- HistroyEntry(final AbstractStage stage, final long timeStamp, final double value) {
+ HistoryEntry(final AbstractStage stage, final long timeStamp, final double value) {
this.owningStage = stage;
this.timeStamp = timeStamp;
this.value = value;
diff --git a/src/main/java/teetime/framework/threadAssignment/analysis/MeanAlgorithm.java b/src/main/java/teetime/framework/threadAssignment/analysis/MeanAlgorithm.java
index 2ddd20b79..e4c88a99c 100644
--- a/src/main/java/teetime/framework/threadAssignment/analysis/MeanAlgorithm.java
+++ b/src/main/java/teetime/framework/threadAssignment/analysis/MeanAlgorithm.java
@@ -51,9 +51,9 @@ public class MeanAlgorithm extends AbstractAnalysisAlgorithm {
window = history.getMaxEntries();
}
- Map<AbstractStage, LinkedList<HistroyEntry>> entryMap = history.getEntryList();
+ Map<AbstractStage, LinkedList<HistoryEntry>> entryMap = history.getEntryList();
Map<AbstractStage, Double> resultMap = new HashMap<AbstractStage, Double>();
- List<HistroyEntry> singleEntry;
+ List<HistoryEntry> singleEntry;
for (AbstractStage stage : entryMap.keySet()) {
diff --git a/src/main/java/teetime/framework/threadAssignment/analysis/RegressionAlgorithm.java b/src/main/java/teetime/framework/threadAssignment/analysis/RegressionAlgorithm.java
index 85b8d3d45..4559588ea 100644
--- a/src/main/java/teetime/framework/threadAssignment/analysis/RegressionAlgorithm.java
+++ b/src/main/java/teetime/framework/threadAssignment/analysis/RegressionAlgorithm.java
@@ -70,9 +70,9 @@ public class RegressionAlgorithm extends AbstractAnalysisAlgorithm {
window = history.getMaxEntries();
}
- Map<AbstractStage, LinkedList<HistroyEntry>> entryMap = history.getEntryList();
+ Map<AbstractStage, LinkedList<HistoryEntry>> entryMap = history.getEntryList();
Map<AbstractStage, Double> resultMap = new HashMap<AbstractStage, Double>();
- List<HistroyEntry> singleEntry;
+ List<HistoryEntry> singleEntry;
double prediction;
for (AbstractStage stage : entryMap.keySet()) {
diff --git a/src/main/java/teetime/framework/threadAssignment/analysis/WeightedAlgorithm.java b/src/main/java/teetime/framework/threadAssignment/analysis/WeightedAlgorithm.java
index 3a7fe42bf..cdd8a3506 100644
--- a/src/main/java/teetime/framework/threadAssignment/analysis/WeightedAlgorithm.java
+++ b/src/main/java/teetime/framework/threadAssignment/analysis/WeightedAlgorithm.java
@@ -63,9 +63,9 @@ public class WeightedAlgorithm extends AbstractAnalysisAlgorithm {
window = history.getMaxEntries();
}
- Map<AbstractStage, LinkedList<HistroyEntry>> entryMap = history.getEntryList();
+ Map<AbstractStage, LinkedList<HistoryEntry>> entryMap = history.getEntryList();
Map<AbstractStage, Double> resultList = new HashMap<AbstractStage, Double>();
- List<HistroyEntry> singleEntry;
+ List<HistoryEntry> singleEntry;
for (AbstractStage stage : entryMap.keySet()) {
diff --git a/src/main/java/teetime/framework/threadAssignment/metrics/PullThroughputMetric.java b/src/main/java/teetime/framework/threadAssignment/metrics/PullThroughputMetric.java
index 5ec26465c..06e3a7b06 100644
--- a/src/main/java/teetime/framework/threadAssignment/metrics/PullThroughputMetric.java
+++ b/src/main/java/teetime/framework/threadAssignment/metrics/PullThroughputMetric.java
@@ -23,7 +23,7 @@ import teetime.framework.InputPort;
import teetime.framework.pipe.IMonitorablePipe;
/**
- * Measures the pull throughput of the stages.
+ * Measures the pull throughput of incoming pipes of the stages.
*
* @author Marc Adolf
*
--
GitLab