Skip to content
Snippets Groups Projects
Commit 80d9e8be authored by Christian Wulf's avatar Christian Wulf
Browse files

added checkstyle config file;

refactored package structure;
added correct and improved stage state management
parent 6e17324d
No related branches found
No related tags found
No related merge requests found
Showing
with 1824 additions and 398 deletions
<?xml version="1.0" encoding="UTF-8"?>
<fileset-config file-format-version="1.2.0" simple-config="true" sync-formatter="false">
<local-check-config name="Kieker Checkstyle" location="lib/static-analysis/checkstyle-5.6/cs-conf.xml" type="project" description="">
<additional-data name="protect-config-file" value="true"/>
<local-check-config name="Kieker Checkstyle" location="conf/cs-conf.xml" type="project" description="">
<additional-data name="protect-config-file" value="false"/>
</local-check-config>
<fileset name="all" enabled="true" check-config-name="Kieker Checkstyle" local="true">
<fileset name="all" enabled="true" check-config-name="Sun Checks" local="false">
<file-match-pattern match-pattern="." include-pattern="true"/>
</fileset>
<filter name="FilesFromPackage" enabled="true">
......@@ -16,5 +16,4 @@
<filter-data value="examples/userguide/appendix-Sigar/src"/>
<filter-data value="src-gen/analysis"/>
</filter>
<filter name="NonSrcDirs" enabled="true"/>
</fileset-config>
......@@ -12,6 +12,7 @@
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" path="conf"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6">
<attributes>
<attribute name="maven.pomderived" value="true"/>
......
This diff is collapsed.
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
formatter_profile=_Kieker - Profile
formatter_settings_version=12
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=java;javax;junit;org;com;kieker;kieker.test;
org.eclipse.jdt.ui.ondemandthreshold=99
org.eclipse.jdt.ui.staticondemandthreshold=99
sp_cleanup.add_default_serial_version_id=true
sp_cleanup.add_generated_serial_version_id=false
sp_cleanup.add_missing_annotations=true
sp_cleanup.add_missing_deprecated_annotations=true
sp_cleanup.add_missing_methods=false
sp_cleanup.add_missing_nls_tags=false
sp_cleanup.add_missing_override_annotations=true
sp_cleanup.add_missing_override_annotations_interface_methods=true
sp_cleanup.add_serial_version_id=false
sp_cleanup.always_use_blocks=true
sp_cleanup.always_use_parentheses_in_expressions=false
sp_cleanup.always_use_this_for_non_static_field_access=true
sp_cleanup.always_use_this_for_non_static_method_access=true
sp_cleanup.convert_to_enhanced_for_loop=false
sp_cleanup.correct_indentation=false
sp_cleanup.format_source_code=false
sp_cleanup.format_source_code_changes_only=false
sp_cleanup.make_local_variable_final=false
sp_cleanup.make_parameters_final=true
sp_cleanup.make_private_fields_final=true
sp_cleanup.make_type_abstract_if_missing_method=false
sp_cleanup.make_variable_declarations_final=true
sp_cleanup.never_use_blocks=false
sp_cleanup.never_use_parentheses_in_expressions=true
sp_cleanup.on_save_use_additional_actions=true
sp_cleanup.organize_imports=true
sp_cleanup.qualify_static_field_accesses_with_declaring_class=false
sp_cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
sp_cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
sp_cleanup.qualify_static_member_accesses_with_declaring_class=false
sp_cleanup.qualify_static_method_accesses_with_declaring_class=false
sp_cleanup.remove_private_constructors=true
sp_cleanup.remove_trailing_whitespaces=true
sp_cleanup.remove_trailing_whitespaces_all=true
sp_cleanup.remove_trailing_whitespaces_ignore_empty=false
sp_cleanup.remove_unnecessary_casts=true
sp_cleanup.remove_unnecessary_nls_tags=false
sp_cleanup.remove_unused_imports=true
sp_cleanup.remove_unused_local_variables=false
sp_cleanup.remove_unused_private_fields=true
sp_cleanup.remove_unused_private_members=false
sp_cleanup.remove_unused_private_methods=true
sp_cleanup.remove_unused_private_types=true
sp_cleanup.sort_members=false
sp_cleanup.sort_members_all=false
sp_cleanup.use_blocks=false
sp_cleanup.use_blocks_only_for_return_and_throw=false
sp_cleanup.use_parentheses_in_expressions=false
sp_cleanup.use_this_for_non_static_field_access=true
sp_cleanup.use_this_for_non_static_field_access_only_if_necessary=false
sp_cleanup.use_this_for_non_static_method_access=true
sp_cleanup.use_this_for_non_static_method_access_only_if_necessary=false
This diff is collapsed.
No preview for this file type
......@@ -17,7 +17,6 @@
package teetime.examples.countWords;
import java.io.File;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
......@@ -28,11 +27,13 @@ import teetime.framework.concurrent.StageTerminationPolicy;
import teetime.framework.concurrent.WorkerThread;
import teetime.framework.core.Analysis;
import teetime.framework.core.IInputPort;
import teetime.framework.core.IInputPort.PortState;
import teetime.framework.core.IOutputPort;
import teetime.framework.core.IPipeline;
import teetime.framework.core.ISink;
import teetime.framework.core.ISource;
import teetime.framework.core.IStage;
import teetime.framework.core.Pipeline;
import teetime.framework.sequential.MethodCallPipe;
import teetime.framework.sequential.QueuePipe;
import teetime.stage.basic.RepeaterSource;
......@@ -51,7 +52,7 @@ public class ConcurrentCountWordsAnalysis extends Analysis {
private static final String START_DIRECTORY_NAME = ".";
private static final int SECONDS = 1000;
private static final int MAX_NUM_THREADS = 3;
private static final int MAX_NUM_THREADS = 2; // 1:2150, 2:1400, 3:~1400, 4:~1400
private WorkerThread[] ioThreads;
private WorkerThread[] nonIoThreads;
......@@ -82,6 +83,7 @@ public class ConcurrentCountWordsAnalysis extends Analysis {
int numThreads = Runtime.getRuntime().availableProcessors();
numThreads = Math.min(MAX_NUM_THREADS, numThreads); // only for testing purposes
System.out.println("Using " + numThreads + " Threads.");
this.nonIoThreads = new WorkerThread[numThreads];
for (int i = 0; i < this.nonIoThreads.length; i++) {
......@@ -142,6 +144,9 @@ public class ConcurrentCountWordsAnalysis extends Analysis {
distributor.setAccessesDeviceId(1);
// add each stage to a stage list
final List<IStage> startStages = new LinkedList<IStage>();
startStages.add(repeaterSource);
final List<IStage> stages = new LinkedList<IStage>();
stages.add(repeaterSource);
stages.add(directoryName2Files);
......@@ -152,34 +157,11 @@ public class ConcurrentCountWordsAnalysis extends Analysis {
QueuePipe.connect(directoryName2Files.fileOutputPort, distributor.genericInputPort);
repeaterSource.START.setAssociatedPipe(new MethodCallPipe<Boolean>(Boolean.TRUE));
repeaterSource.START.setState(PortState.CLOSED);
final IPipeline pipeline = new IPipeline() {
@Override
@SuppressWarnings("unchecked")
public List<? extends IStage> getStartStages() {
return Arrays.asList(repeaterSource);
}
@Override
public List<IStage> getStages() {
return stages;
}
@Override
public void fireStartNotification() throws Exception {
for (final IStage stage : this.getStartStages()) {
stage.notifyPipelineStarts();
}
}
@Override
public void fireStopNotification() {
for (final IStage stage : this.getStartStages()) {
stage.notifyPipelineStops();
}
}
};
final Pipeline pipeline = new Pipeline();
pipeline.setStartStages(startStages);
pipeline.setStages(stages);
return pipeline;
}
......@@ -191,6 +173,9 @@ public class ConcurrentCountWordsAnalysis extends Analysis {
final Merger<Pair<File, Integer>> merger = new Merger<Pair<File, Integer>>();
// add each stage to a stage list
final List<IStage> startStages = new LinkedList<IStage>();
startStages.add(distributor);
final List<IStage> stages = new LinkedList<IStage>();
stages.add(distributor);
stages.add(countWordsStage0);
......@@ -207,33 +192,9 @@ public class ConcurrentCountWordsAnalysis extends Analysis {
SingleProducerSingleConsumerPipe.connect(merger.outputPort, printingMerger.getNewInputPort());
final IPipeline pipeline = new IPipeline() {
@Override
@SuppressWarnings("unchecked")
public List<? extends IStage> getStartStages() {
return Arrays.asList(distributor);
}
@Override
public List<IStage> getStages() {
return stages;
}
@Override
public void fireStartNotification() throws Exception {
for (final IStage stage : this.getStartStages()) {
stage.notifyPipelineStarts();
}
}
@Override
public void fireStopNotification() {
for (final IStage stage : this.getStartStages()) {
stage.notifyPipelineStops();
}
}
};
final Pipeline pipeline = new Pipeline();
pipeline.setStartStages(startStages);
pipeline.setStages(stages);
return pipeline;
}
......@@ -245,6 +206,9 @@ public class ConcurrentCountWordsAnalysis extends Analysis {
outputWordsCountStage.setAccessesDeviceId(2);
// add each stage to a stage list
final List<IStage> startStages = new LinkedList<IStage>();
startStages.add(merger);
final List<IStage> stages = new LinkedList<IStage>();
stages.add(merger);
stages.add(outputWordsCountStage);
......@@ -252,33 +216,9 @@ public class ConcurrentCountWordsAnalysis extends Analysis {
// connect stages by pipes
QueuePipe.connect(merger.outputPort, outputWordsCountStage.fileWordcountTupleInputPort);
final IPipeline pipeline = new IPipeline() {
@Override
@SuppressWarnings("unchecked")
public List<? extends IStage> getStartStages() {
return Arrays.asList(merger);
}
@Override
public List<IStage> getStages() {
return stages;
}
@Override
public void fireStartNotification() throws Exception {
for (final IStage stage : this.getStartStages()) {
stage.notifyPipelineStarts();
}
}
@Override
public void fireStopNotification() {
for (final IStage stage : this.getStartStages()) {
stage.notifyPipelineStops();
}
}
};
final Pipeline pipeline = new Pipeline();
pipeline.setStartStages(startStages);
pipeline.setStages(stages);
return pipeline;
}
......@@ -327,8 +267,8 @@ public class ConcurrentCountWordsAnalysis extends Analysis {
System.out.println(stage); // NOPMD (Just for example purposes)
}
// final long durationInNs = thread.getDurationInNs();
// System.out.println(thread + " takes " + TimeUnit.NANOSECONDS.toMillis(durationInNs) + " ms");
// final long durationInNs = thread.getDurationInNs();
// System.out.println(thread + " takes " + TimeUnit.NANOSECONDS.toMillis(durationInNs) + " ms");
}
for (final WorkerThread thread : this.nonIoThreads) {
......@@ -337,16 +277,16 @@ public class ConcurrentCountWordsAnalysis extends Analysis {
System.out.println(stage); // NOPMD (Just for example purposes)
}
// final long durationInNs = thread.getDurationInNs();
// System.out.println(thread + " takes " + TimeUnit.NANOSECONDS.toMillis(durationInNs) + " ms");
// final long durationInNs = thread.getDurationInNs();
// System.out.println(thread + " takes " + TimeUnit.NANOSECONDS.toMillis(durationInNs) + " ms");
// if (durationInNs > maxDuration) {
// maxDuration = durationInNs;
// maxThread = thread;
// }
// if (durationInNs > maxDuration) {
// maxDuration = durationInNs;
// maxThread = thread;
// }
}
// System.out.println("maxThread: " + maxThread.toString() + " takes " + TimeUnit.NANOSECONDS.toMillis(maxDuration) + " ms"); // NOPMD (Just for example
// purposes)
// System.out.println("maxThread: " + maxThread.toString() + " takes " + TimeUnit.NANOSECONDS.toMillis(maxDuration) + " ms"); // NOPMD (Just for example
// purposes)
}
}
......@@ -17,14 +17,15 @@
package teetime.examples.countWords;
import java.io.File;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import teetime.framework.core.AbstractFilter;
import teetime.framework.core.Analysis;
import teetime.framework.core.IInputPort.PortState;
import teetime.framework.core.IPipeline;
import teetime.framework.core.IStage;
import teetime.framework.core.Pipeline;
import teetime.framework.sequential.MethodCallPipe;
import teetime.stage.basic.RepeaterSource;
import teetime.stage.basic.distributor.Distributor;
......@@ -79,6 +80,9 @@ public class CountWordsAnalysis extends Analysis {
final OutputWordsCountSink outputWordsCountStage = new OutputWordsCountSink();
// add each stage to a stage list
final List<IStage> startStages = new LinkedList<IStage>();
startStages.add(repeaterSource);
final List<IStage> stages = new LinkedList<IStage>();
stages.add(repeaterSource);
stages.add(findFilesStage);
......@@ -98,34 +102,11 @@ public class CountWordsAnalysis extends Analysis {
MethodCallPipe.connect(merger.outputPort, outputWordsCountStage.fileWordcountTupleInputPort);
repeaterSource.START.setAssociatedPipe(new MethodCallPipe<Boolean>(Boolean.TRUE));
repeaterSource.START.setState(PortState.CLOSED);
final IPipeline pipeline = new IPipeline() {
@Override
@SuppressWarnings("unchecked")
public List<? extends AbstractFilter<?>> getStartStages() {
return Arrays.asList(repeaterSource);
}
@Override
public List<IStage> getStages() {
return stages;
}
@Override
public void fireStartNotification() throws Exception {
for (final IStage stage : this.getStartStages()) {
stage.notifyPipelineStarts();
}
}
@Override
public void fireStopNotification() {
for (final IStage stage : this.getStartStages()) {
stage.notifyPipelineStops();
}
}
};
final Pipeline pipeline = new Pipeline();
pipeline.setStartStages(startStages);
pipeline.setStages(stages);
return pipeline;
}
......
......@@ -17,7 +17,6 @@
package teetime.examples.countWords;
import java.io.File;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
......@@ -25,8 +24,10 @@ import teetime.framework.concurrent.StageTerminationPolicy;
import teetime.framework.concurrent.WorkerThread;
import teetime.framework.core.AbstractFilter;
import teetime.framework.core.Analysis;
import teetime.framework.core.IInputPort.PortState;
import teetime.framework.core.IPipeline;
import teetime.framework.core.IStage;
import teetime.framework.core.Pipeline;
import teetime.framework.sequential.MethodCallPipe;
import teetime.framework.sequential.QueuePipe;
import teetime.stage.basic.RepeaterSource;
......@@ -81,6 +82,9 @@ public class QueuedCountWordsAnalysis extends Analysis {
final OutputWordsCountSink outputWordsCountStage = new OutputWordsCountSink();
// add each stage to a stage list
final List<IStage> startStages = new LinkedList<IStage>();
startStages.add(repeaterSource);
final List<IStage> stages = new LinkedList<IStage>();
stages.add(repeaterSource);
stages.add(findFilesStage);
......@@ -100,34 +104,11 @@ public class QueuedCountWordsAnalysis extends Analysis {
QueuePipe.connect(merger.outputPort, outputWordsCountStage.fileWordcountTupleInputPort);
repeaterSource.START.setAssociatedPipe(new MethodCallPipe<Boolean>(Boolean.TRUE));
repeaterSource.START.setState(PortState.CLOSED);
final IPipeline pipeline = new IPipeline() {
@Override
@SuppressWarnings("unchecked")
public List<? extends IStage> getStartStages() {
return Arrays.asList(repeaterSource);
}
@Override
public List<IStage> getStages() {
return stages;
}
@Override
public void fireStartNotification() throws Exception {
for (final IStage stage : this.getStartStages()) {
stage.notifyPipelineStarts();
}
}
@Override
public void fireStopNotification() {
for (final IStage stage : this.getStartStages()) {
stage.notifyPipelineStops();
}
}
};
final Pipeline pipeline = new Pipeline();
pipeline.setStartStages(startStages);
pipeline.setStages(stages);
return pipeline;
}
......
......@@ -15,21 +15,15 @@
***************************************************************************/
package teetime.framework.concurrent;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import teetime.framework.core.IOutputPort;
import teetime.framework.core.IPipe;
import teetime.framework.core.IPipeCommand;
import teetime.framework.core.IPipeline;
import teetime.framework.core.IStage;
/**
* @author Christian Wulf
*
*
* @since 1.10
*/
public class StageWorkArrayList implements IStageWorkList {
......@@ -41,7 +35,6 @@ public class StageWorkArrayList implements IStageWorkList {
public int numToBeExecuted;
}
private final IPipeline pipeline;
private final int accessesDeviceId;
/** sorted array where the last stage has highest priority */
......@@ -53,75 +46,25 @@ public class StageWorkArrayList implements IStageWorkList {
* @since 1.10
*/
public StageWorkArrayList(final IPipeline pipeline, final int accessesDeviceId) {
this.pipeline = pipeline;
this.accessesDeviceId = accessesDeviceId;
final List<IStage> localStages = this.init();
this.stages = new SchedulableStage[localStages.size()];
for (int i = 0; i < localStages.size(); i++) {
this.stages = new SchedulableStage[pipeline.getStages().size()];
for (IStage stage : pipeline.getStages()) {
final SchedulableStage schedulableStage = new SchedulableStage();
schedulableStage.stage = localStages.get(i);
schedulableStage.stage = stage;
schedulableStage.numToBeExecuted = 0;
this.stages[i] = schedulableStage;
}
}
private List<IStage> init() {
this.setDepthForEachStage();
final List<IStage> stageList = new ArrayList<IStage>(this.pipeline.getStages());
final Comparator<? super IStage> depthComparator = new Comparator<IStage>() {
public int compare(final IStage o1, final IStage o2) {
if (o1.getDepth() == o2.getDepth()) {
return 0;
} else if (o1.getDepth() < o2.getDepth()) {
return -1;
} else {
return 1;
}
}
};
Collections.sort(stageList, depthComparator);
for (int i = 0; i < stageList.size(); i++) {
stageList.get(i).setSchedulingIndex(i);
}
return stageList;
}
private void setDepthForEachStage() {
final IPipeCommand setDepthCommand = new IPipeCommand() {
public void execute(final IPipe<?> pipe) throws Exception {
final IStage sourceStage = pipe.getSourcePort().getOwningStage();
final IStage owningStage = pipe.getTargetPort().getOwningStage();
if (owningStage.getDepth() == IStage.DEPTH_NOT_SET) {
owningStage.setDepth(sourceStage.getDepth() + 1);
owningStage.notifyOutputPipes(this);
}
}
};
for (final IStage startStage : this.pipeline.getStartStages()) {
startStage.setDepth(0);
}
for (final IStage startStage : this.pipeline.getStartStages()) {
try {
startStage.notifyOutputPipes(setDepthCommand);
} catch (final Exception e) {
throw new IllegalStateException("may not happen", e);
}
this.stages[stage.getSchedulingIndex()] = schedulableStage;
}
}
@Override
public void pushAll(final Collection<? extends IStage> stages) {
for (final IStage stage : stages) {
this.push(stage);
}
}
@Override
public void pushAll(final IOutputPort<?, ?>[] outputPorts) {
for (final IOutputPort<?, ?> outputPort : outputPorts) {
if (outputPort != null) {
......@@ -148,12 +91,12 @@ public class StageWorkArrayList implements IStageWorkList {
return isValid;
}
@Override
public IStage pop() {
final SchedulableStage schedulableStage = this.stages[this.lastIndex];
// schedulableStage.numToBeExecuted--;
schedulableStage.numToBeExecuted = 0;
cond:
if (schedulableStage.numToBeExecuted == 0)
cond: if (schedulableStage.numToBeExecuted == 0)
{
for (int i = this.lastIndex - 1; i >= this.firstIndex; i--) {
if (this.stages[i].numToBeExecuted > 0) {
......@@ -167,11 +110,13 @@ public class StageWorkArrayList implements IStageWorkList {
return schedulableStage.stage;
}
@Override
public IStage read() {
final SchedulableStage schedulableStage = this.stages[this.lastIndex];
return schedulableStage.stage;
}
@Override
public boolean isEmpty() {
return this.lastIndex == -1;
}
......
......@@ -16,11 +16,20 @@
package teetime.framework.concurrent;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import teetime.framework.core.IInputPort;
import teetime.framework.core.IInputPort.PortState;
import teetime.framework.core.IPipe;
import teetime.framework.core.IPipeCommand;
import teetime.framework.core.IPipeline;
import teetime.framework.core.IStage;
import teetime.framework.scheduling.NextStageScheduler;
import teetime.framework.scheduling.StageStateManager;
import teetime.util.StopWatch;
/**
......@@ -30,8 +39,11 @@ import teetime.util.StopWatch;
*/
public class WorkerThread extends Thread {
private static final int NUM_ITERATIONS_TO_MEASURE = 10000;
private final IPipeline pipeline;
private IStageScheduler stageScheduler;
private StageStateManager stageStateManager;
private volatile StageTerminationPolicy terminationPolicy;
private volatile boolean shouldTerminate = false;
......@@ -40,15 +52,72 @@ public class WorkerThread extends Thread {
// statistics
private final StopWatch stopWatch = new StopWatch();
private final List<Long> durationPer10000IterationsInNs = new LinkedList<Long>();
private final List<Long> durationPerXIterationsInNs = new LinkedList<Long>();
private int iterations;
public WorkerThread(final IPipeline pipeline, final int accessesDeviceId) {
this.pipeline = pipeline;
this.accessesDeviceId = accessesDeviceId;
}
private void initStages(final IPipeline pipeline) {
for (final IStage stage : pipeline.getStages()) {
stage.setOwningThread(this);
}
this.accessesDeviceId = accessesDeviceId;
this.setDepthForEachStage(pipeline);
this.setSchedulingIndexForEachhStage(pipeline);
}
private void setDepthForEachStage(final IPipeline pipeline) {
final IPipeCommand setDepthCommand = new IPipeCommand() {
@Override
public void execute(final IPipe<?> pipe) throws Exception {
final IStage sourceStage = pipe.getSourcePort().getOwningStage();
final IStage owningStage = pipe.getTargetPort().getOwningStage();
if (owningStage.getDepth() == IStage.DEPTH_NOT_SET) {
owningStage.setDepth(sourceStage.getDepth() + 1);
owningStage.notifyOutputPipes(this);
}
}
};
for (final IStage startStage : pipeline.getStartStages()) {
startStage.setDepth(0);
}
for (final IStage startStage : pipeline.getStartStages()) {
try {
startStage.notifyOutputPipes(setDepthCommand);
} catch (final Exception e) {
throw new IllegalStateException("may not happen", e);
}
}
}
private List<IStage> setSchedulingIndexForEachhStage(final IPipeline pipeline) {
final List<IStage> stageList = new ArrayList<IStage>(pipeline.getStages());
final Comparator<? super IStage> depthComparator = new Comparator<IStage>() {
@Override
public int compare(final IStage o1, final IStage o2) {
if (o1.getDepth() == o2.getDepth()) {
return 0;
} else if (o1.getDepth() < o2.getDepth()) {
return -1;
} else {
return 1;
}
}
};
Collections.sort(stageList, depthComparator);
for (int i = 0; i < stageList.size(); i++) {
stageList.get(i).setSchedulingIndex(i);
}
return stageList;
}
@Override
......@@ -94,38 +163,37 @@ public class WorkerThread extends Thread {
// stageExecutionStopWatch.getDurationInNs(); //4952
// 6268 -> 5350 (w/o after) -> 4450 (w/o before) -> 3800 (w/o stage)
// final long schedulingOverhead = this.iterationStopWatch.getDurationInNs();
// final long schedulingOverhead = this.iterationStopWatch.getDurationInNs();
// final long schedulingOverhead = beforeStageExecutionStopWatch.getDurationInNs(); //327
// final long schedulingOverhead = stageExecutionStopWatch.getDurationInNs(); //1416
// final long schedulingOverhead = afterStageExecutionStopWatch.getDurationInNs(); //2450
// rest: ~2000 (measurement overhead?)
if ((iterations % 10000) == 0) {
if ((this.iterations % NUM_ITERATIONS_TO_MEASURE) == 0) {
this.stopWatch.end();
this.durationPer10000IterationsInNs.add(stopWatch.getDurationInNs());
this.durationPerXIterationsInNs.add(this.stopWatch.getDurationInNs());
this.stopWatch.start();
}
}
this.stopWatch.end();
this.durationPer10000IterationsInNs.add(stopWatch.getDurationInNs());
this.durationPerXIterationsInNs.add(this.stopWatch.getDurationInNs());
this.cleanUpDatastructures();
}
private void executeTerminationPolicy(final IStage executedStage, final boolean executedSuccessfully) {
// System.out.println("WorkerThread.executeTerminationPolicy(): " + this.terminationPolicy +
// ", executedSuccessfully=" + executedSuccessfully
// + ", mayBeDisabled=" + executedStage.mayBeDisabled());
// System.out.println("executeTerminationPolicy executedStage=" + executedStage + ", executedSuccessfully=" + executedSuccessfully);
// System.out.println("executeTerminationPolicy areAllInputPortsClosed(executedStage)=" + this.stageStateManager.areAllInputPortsClosed(executedStage));
switch (this.terminationPolicy) {
case TERMINATE_STAGE_AFTER_NEXT_EXECUTION:
if (executedStage.mayBeDisabled()) {
if (this.stageStateManager.areAllInputPortsClosed(executedStage)) {
this.stageScheduler.disable(executedStage);
}
break;
case TERMINATE_STAGE_AFTER_UNSUCCESSFUL_EXECUTION:
if (!executedSuccessfully) {
if (executedStage.mayBeDisabled()) {
if (this.stageStateManager.areAllInputPortsClosed(executedStage)) {
this.stageScheduler.disable(executedStage);
}
}
......@@ -141,8 +209,20 @@ public class WorkerThread extends Thread {
}
private void initDatastructures() throws Exception {
// stages need to be initialized here, because in a concurrent context some stages (e.g., a merger) is executed after its pipeline has been created.
this.initStages(this.pipeline);
this.stageStateManager = new StageStateManager(this.pipeline);
this.stageScheduler = new NextStageScheduler(this.pipeline, this.accessesDeviceId, this.stageStateManager);
for (final IStage startStage : this.pipeline.getStartStages()) {
for (IInputPort<IStage, ?> inputPort : startStage.getInputPorts()) {
if (inputPort.getState() == PortState.CLOSED) {
inputPort.close();
}
}
}
this.pipeline.fireStartNotification();
this.stageScheduler = new NextStageScheduler(this.pipeline, this.accessesDeviceId);
}
private void startStageExecution(final IStage stage) {
......@@ -150,18 +230,13 @@ public class WorkerThread extends Thread {
}
private void finishStageExecution(final IStage stage, final boolean executedSuccessfully) {
// System.out.println("Executed stage " + stage + " successfully: " + executedSuccessfully);
if (!executedSuccessfully) { // statistics
this.executedUnsuccessfullyCount++;
}
}
private void cleanUpDatastructures() {
// System.out.println("Cleaning up datastructures...");
// System.out.println("Firing stop notification...");
this.pipeline.fireStopNotification();
// System.out.println("Thread terminated:" + this);
// System.out.println(this.getName() + ": executedUnsuccessfullyCount=" + this.executedUnsuccessfullyCount);
}
public IPipeline getPipeline() {
......@@ -171,9 +246,11 @@ public class WorkerThread extends Thread {
// BETTER remove this method since it is not intuitive; add a check to onStartPipeline so that a stage automatically
// disables itself if it has no input ports
public void terminate(final StageTerminationPolicy terminationPolicyToUse) {
for (final IStage startStage : this.pipeline.getStartStages()) {
startStage.fireSignalClosingToAllInputPorts();
}
// for (final IStage startStage : this.pipeline.getStartStages()) {
// if (this.stageStateManager.areAllInputPortsClosed(startStage)) {
// startStage.fireSignalClosingToAllInputPorts();
// }
// }
this.setTerminationPolicy(terminationPolicyToUse);
}
......@@ -193,14 +270,14 @@ public class WorkerThread extends Thread {
}
public List<Long> getDurationPer10000IterationsInNs() {
return durationPer10000IterationsInNs;
return this.durationPerXIterationsInNs;
}
/**
* @since 1.10
*/
public int getIterations() {
return iterations;
return this.iterations;
}
}
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.framework.concurrent.steal;
import java.util.Collection;
import teetime.framework.concurrent.ConcurrentWorkStealingPipe;
import teetime.framework.core.IInputPort;
import teetime.framework.core.IStage;
/**
* @author Christian Wulf
*
* @since 1.10
*/
public class StealIfMayBeDisabledStrategy<T> implements IStealStrategy<T> {
public <S extends IStage> T steal(final IInputPort<S, T> inputPort, final Collection<ConcurrentWorkStealingPipe<T>> pipesToStealFrom) {
if (inputPort.getOwningStage().mayBeDisabled()) {
for (final ConcurrentWorkStealingPipe<T> pipe : pipesToStealFrom) {
final T stolenElement = pipe.steal();
if (stolenElement != null) {
return stolenElement;
}
}
}
// BETTER improve stealing efficiency by stealing multiple elements at once
return null; // do not expose internal impl details (here: CircularWorkStealingDeque); instead return null
}
}
......@@ -34,9 +34,7 @@ import teetime.util.concurrent.workstealing.exception.DequePopException;
* the extending stage
*
*/
public abstract class AbstractFilter<S extends IStage> extends AbstractStage implements ISink<S>, ISource, IPortListener<S> {
protected volatile boolean mayBeDisabled; // BETTER write only non-concurrent code in a stage
public abstract class AbstractFilter<S extends IStage> extends AbstractStage implements ISink<S>, ISource {
/**
* @author Christian Wulf
......@@ -78,7 +76,6 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp
}
};
private int enabledInputPorts = 0;
/**
* 0=in-memory, x>0=disk0, disk1, display0, display1, socket0, socket1 etc.
*/
......@@ -172,48 +169,6 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp
// default empty implementation
}
/**
* @since 1.10
*/
@Override
public void onPortIsClosed(final IInputPort<S, ?> inputPort) {
// inputPort.setState(IInputPort.State.CLOSING);
this.enabledInputPorts--; // FIXME not thread-safe
// this.logger.info("Closed " + "(" + this.enabledInputPorts + " remaining) " + inputPort + " of " + this);
if (this.enabledInputPorts < 0) {
this.logger.error("Closed port more than once: portIndex=" + inputPort.getIndex() + " for stage " + this);
}
this.checkWhetherThisStageMayBeDisabled();
}
/**
* @since 1.10
*/
private void checkWhetherThisStageMayBeDisabled() {
if (this.enabledInputPorts == 0) {
this.mayBeDisabled = true;
// this.logger.info(this.toString() + " can now be disabled by the pipeline scheduler.");
}
}
/**
* @since 1.10
*/
@Override
public void fireSignalClosingToAllInputPorts() {
// this.logger.info("Fire closing signal to all input ports of: " + this);
if (!this.inputPorts.isEmpty()) {
for (final IInputPort<S, ?> port : this.inputPorts) {
port.close();
}
} else {
this.checkWhetherThisStageMayBeDisabled();
}
}
/**
* @since 1.10
*/
......@@ -222,18 +177,10 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp
try {
this.notifyOutputPipes(this.closeCommand);
} catch (final Exception e) {
throw new IllegalStateException("may not happen");
throw new IllegalStateException("may not happen", e);
}
}
/**
* @since 1.10
*/
@Override
public boolean mayBeDisabled() {
return this.mayBeDisabled;
}
@Override
public String toString() {
final String s = super.toString();
......@@ -250,8 +197,6 @@ public abstract class AbstractFilter<S extends IStage> extends AbstractStage imp
final InputPortImpl<S, T> inputPort = new InputPortImpl<S, T>((S) this);
inputPort.setIndex(this.inputPorts.size());
this.inputPorts.add(inputPort);
inputPort.setPortListener(this);
this.enabledInputPorts++;
return inputPort;
}
......
......@@ -18,9 +18,9 @@ package teetime.framework.core;
/**
* @author Christian Wulf
*
*
* @since 1.10
*
*
* @param <T>
* The type of the pipe
* @param <P>
......@@ -30,7 +30,7 @@ public abstract class AbstractPipe<T> implements IPipe<T> {
/**
* @author Christian Wulf
*
*
* @since 1.10
*/
public enum PipeState {
......@@ -42,19 +42,23 @@ public abstract class AbstractPipe<T> implements IPipe<T> {
private IOutputPort<?, ? extends T> sourcePort;
private IInputPort<?, T> targetPort;
@Override
public IOutputPort<?, ? extends T> getSourcePort() {
return this.sourcePort;
}
@Override
public IInputPort<?, T> getTargetPort() {
return this.targetPort;
}
@Override
public <S extends ISource, A extends T> void setSourcePort(final IOutputPort<S, A> sourcePort) {
sourcePort.setAssociatedPipe(this);
this.sourcePort = sourcePort;
}
@Override
public <S extends ISink<S>, A extends T> void setTargetPort(final IInputPort<S, T> targetPort) {
targetPort.setAssociatedPipe(this);
this.targetPort = targetPort;
......@@ -63,6 +67,7 @@ public abstract class AbstractPipe<T> implements IPipe<T> {
// BETTER remove if it does not add any new functionality
protected abstract void putInternal(T token);
@Override
public void put(final T token) {
this.putInternal(token);
}
......@@ -70,10 +75,12 @@ public abstract class AbstractPipe<T> implements IPipe<T> {
// BETTER remove if it does not add any new functionality
protected abstract T tryTakeInternal();
@Override
public final T tryTake() {
return this.tryTakeInternal();
}
@Override
public final void notifyPipelineStarts() throws Exception {
if (this.state == PipeState.UNINITIALIZED) {
this.state = PipeState.PIPELINE_STARTED;
......@@ -84,13 +91,14 @@ public abstract class AbstractPipe<T> implements IPipe<T> {
/**
* This method is called exactly once iff the pipeline is started.
*
*
* @since 1.10
*/
public void onPipelineStarts() {
// empty default implementation
}
@Override
public final void notifyPipelineStops() {
if (this.state != PipeState.PIPELINE_STOPPED) {
this.state = PipeState.PIPELINE_STOPPED;
......@@ -101,13 +109,14 @@ public abstract class AbstractPipe<T> implements IPipe<T> {
/**
* This method is called exactly once iff the pipeline is stopped.
*
*
* @since 1.10
*/
public void onPipelineStops() {
// empty default implementation
}
@Override
public void close() {
this.targetPort.close();
}
......
package teetime.framework.core;
/**
*
*
* @author Christian Wulf
*
*
* @param <S>
* the stage, this port belongs to<br>
* <i>(used for ensuring type safety)</i>
......@@ -14,24 +14,24 @@ public interface IInputPort<S extends IStage, T> extends IPort<S, T> {
/**
* @since 1.10
*/
enum State {
OPEN, CLOSING
public enum PortState {
OPENED, CLOSED
}
/**
* @since 1.10
*/
public abstract State getState();
public abstract PortState getState();
/**
* @since 1.10
*/
public abstract void setState(final State state);
public abstract void setState(final PortState state);
/**
* @since 1.10
*/
public abstract void setPortListener(final IPortListener<S> stageListener);
public abstract void setPortListener(final IPortListener portListener);
/**
* @since 1.10
......
......@@ -17,10 +17,10 @@ package teetime.framework.core;
/**
* @author Christian Wulf
*
*
* @since 1.10
*/
public interface IPortListener<S extends IStage> {
public interface IPortListener {
void onPortIsClosed(IInputPort<S, ?> inputPort);
void onPortIsClosed(IInputPort<?, ?> inputPort);
}
......@@ -69,24 +69,11 @@ public interface IStage extends IBaseStage {
// an overloaded version with a task bundle of one single input port is useless.
// void execute(TaskBundle taskBundle);
/**
*
* @return <code>true</code> if the stage may be disabled by the pipeline scheduler, <code>false</code> otherwise.
*
* @since 1.10
*/
boolean mayBeDisabled();
/**
* @since 1.10
*/
void fireSignalClosingToAllOutputPorts();
/**
* @since 1.10
*/
void fireSignalClosingToAllInputPorts();
/**
* @since 1.10
*/
......
......@@ -18,28 +18,35 @@ package teetime.framework.core;
class InputPortImpl<S extends IStage, T> extends AbstractPort<S, T> implements IInputPort<S, T> {
private volatile State state = State.OPEN;
private volatile PortState state = PortState.OPENED;
private IPortListener<S> stageListener;
private IPortListener portListener;
public InputPortImpl(final S owningStage) {
this.setOwningStage(owningStage);
}
public void setState(final State state) {
@Override
public void setState(final PortState state) {
this.state = state;
}
public State getState() {
@Override
public PortState getState() {
return this.state;
}
public void setPortListener(final IPortListener<S> stageListener) {
this.stageListener = stageListener;
@Override
public void setPortListener(final IPortListener portListener) {
this.portListener = portListener;
}
@Override
public void close() {
this.stageListener.onPortIsClosed(this);
if (this.portListener == null) {
throw new NullPointerException("stage: "+this.getOwningStage().getClass().getName()+", port="+this.getIndex());
}
this.portListener.onPortIsClosed(this);
}
}
......@@ -14,13 +14,14 @@
* limitations under the License.
***************************************************************************/
package teetime.framework.concurrent;
package teetime.framework.scheduling;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.LinkedHashSet;
import java.util.Set;
import teetime.framework.concurrent.IStageScheduler;
import teetime.framework.concurrent.IStageWorkList;
import teetime.framework.concurrent.StageWorkArrayList;
import teetime.framework.core.IOutputPort;
import teetime.framework.core.IPipeline;
import teetime.framework.core.IStage;
......@@ -30,13 +31,14 @@ import teetime.framework.core.IStage;
*
* @since 1.10
*/
public class NextStageScheduler implements IStageScheduler {
public final class NextStageScheduler implements IStageScheduler {
protected final Map<IStage, Boolean> statesOfStages = new HashMap<IStage, Boolean>();
private final Collection<IStage> highestPrioritizedEnabledStages = new ArrayList<IStage>();
private final Set<IStage> highestPrioritizedEnabledStages = new LinkedHashSet<IStage>();
private final IStageWorkList workList;
private final StageStateManager stageStateManager;
public NextStageScheduler(final IPipeline pipeline, final int accessesDeviceId) throws Exception {
public NextStageScheduler(final IPipeline pipeline, final int accessesDeviceId, final StageStateManager stageStateManager) {
this.stageStateManager = stageStateManager;
// this.workList = new StageWorkList(accessesDeviceId, pipeline.getStages().size());
this.workList = new StageWorkArrayList(pipeline, accessesDeviceId); // faster implementation
......@@ -45,10 +47,6 @@ public class NextStageScheduler implements IStageScheduler {
this.workList.pushAll(this.highestPrioritizedEnabledStages);
// System.out.println("Initial work list: " + this.workList);
// this.workList.addAll(pipeline.getStages());
for (final IStage stage : pipeline.getStages()) {
this.enable(stage);
}
}
@Override
......@@ -58,28 +56,23 @@ public class NextStageScheduler implements IStageScheduler {
@Override
public boolean isAnyStageActive() {
// System.out.println("workList: " + this.workList);
return !this.workList.isEmpty();
}
protected void enable(final IStage stage) {
// // / TODO consider to move state (enabled/disabled) of stage to stage for performance reasons
this.statesOfStages.put(stage, Boolean.TRUE);
}
@Override
public void disable(final IStage stage) {
this.statesOfStages.put(stage, Boolean.FALSE);
this.stageStateManager.disable(stage);
if (this.highestPrioritizedEnabledStages.contains(stage)) {
this.highestPrioritizedEnabledStages.remove(stage);
for (final IStage outputStage : stage.getAllOutputStages()) {
if (this.statesOfStages.get(outputStage) == Boolean.TRUE) {
if (this.stageStateManager.isStageEnabled(outputStage)) {
this.highestPrioritizedEnabledStages.add(outputStage);
}
}
}
// System.out.println("highestPrioritizedEnabledStages: "+this.highestPrioritizedEnabledStages);
stage.fireSignalClosingToAllOutputPorts();
}
......
package teetime.framework.scheduling;
import teetime.framework.core.IStage;
public abstract class StageStateContainer {
public static enum StageState {
ENABLED, ALL_INPUT_PORTS_CLOSED, DISABLED
}
protected final IStage stage;
protected volatile StageState stageState;
public StageStateContainer(final IStage stage) {
this.stage = stage;
this.stageState = stage.getInputPorts().size() > 0 ? StageState.ENABLED : StageState.ALL_INPUT_PORTS_CLOSED;
// System.out.println("stage=" + stage + ", stageState=" + this.stageState);
}
/**
*
* @return the new value
*/
public abstract int decNumOpenedPorts();
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment