Skip to content
Snippets Groups Projects
Commit 37ceeadb authored by Nelson Tavares de Sousa's avatar Nelson Tavares de Sousa
Browse files

pmg thingies and removed UseConcurrentHashMap rule

parent b56e33f9
No related branches found
No related tags found
No related merge requests found
Showing
with 66 additions and 73 deletions
......@@ -54,6 +54,7 @@
<exclude name="AvoidUsingVolatile" />
<exclude name="CallSuperInConstructor" />
<exclude name="DefaultPackage" />
<exclude name="UseConcurrentHashMap" />
</rule>
<!-- UR means "undefined reference" which is already detected by the compiler.
......
......@@ -35,7 +35,7 @@ public final class Cache<T> extends AbstractConsumerStage<T> {
}
@Override
public void onTerminating() throws Exception {
public void onTerminating() throws Exception { // NOPMD
this.logger.debug("Emitting " + this.cachedObjects.size() + " cached elements...");
StopWatch stopWatch = new StopWatch();
stopWatch.start();
......
......@@ -40,7 +40,7 @@ import teetime.framework.TerminationStrategy;
*/
public final class Clock extends AbstractProducerStage<Long> {
private boolean initialDelayExceeded = false;
private boolean initialDelayExceeded;// = false;
/**
* Waiting time span until first sent element.
......@@ -58,11 +58,11 @@ public final class Clock extends AbstractProducerStage<Long> {
@Override
protected void execute() {
if (!this.initialDelayExceeded) {
if (this.initialDelayExceeded) {
this.sleep(this.intervalDelayInMs);
} else {
this.initialDelayExceeded = true;
this.sleep(this.initialDelayInMs);
} else {
this.sleep(this.intervalDelayInMs);
}
// this.logger.debug("Emitting timestamp");
......
......@@ -44,14 +44,14 @@ public final class ElementDelayMeasuringStage<T> extends AbstractConsumerStage<T
}
@Override
public void onStarting() throws Exception {
public void onStarting() throws Exception { // NOPMD
super.onStarting();
this.resetTimestamp(System.nanoTime());
}
private void computeElementDelay(final Long timestampInNs) {
long diffInNs = timestampInNs - this.lastTimestampInNs;
if (this.numPassedElements > 0) {
long diffInNs = timestampInNs - this.lastTimestampInNs;
long delayInNsPerElement = diffInNs / this.numPassedElements;
this.delays.add(delayInNsPerElement);
this.logger.info("Delay: " + delayInNsPerElement + " time units/element");
......
......@@ -45,7 +45,7 @@ public final class ElementThroughputMeasuringStage<T> extends AbstractConsumerSt
}
@Override
public void onStarting() throws Exception {
public void onStarting() throws Exception { // NOPMD
super.onStarting();
this.resetTimestamp(System.nanoTime());
}
......
......@@ -41,7 +41,7 @@ public final class MappingCounter<T> extends AbstractConsumerStage<T> {
}
@Override
public void onTerminating() throws Exception {
public void onTerminating() throws Exception { // NOPMD forced by super method
port.send(counter);
super.onTerminating();
}
......
......@@ -40,7 +40,7 @@ public final class MultipleInstanceOfFilter<I> extends AbstractConsumerStage<I>
@Override
@SuppressWarnings("unchecked")
public void onStarting() throws Exception {
public void onStarting() throws Exception { // NOPMD exception forced by super method
super.onStarting();
// We cache the map to avoid the creating of iterators during runtime
cachedOutputPortsMap = (Entry<Class<? extends I>, OutputPort<? super I>>[]) outputPortsMap.entrySet().toArray(new Entry<?, ?>[outputPortsMap.size()]);
......
......@@ -45,13 +45,11 @@ public final class ZipByteArray extends AbstractConsumerStage<byte[]> {
@Override
protected void execute(final byte[] element) {
byte[] streamBytes;
try {
streamBytes = (mode == ZipMode.COMP) ? compress(element) : decompress(element);
outputPort.send((mode == ZipMode.COMP) ? compress(element) : decompress(element));
} catch (IOException e) {
throw new IllegalStateException(e);
}
outputPort.send(streamBytes);
}
private byte[] compress(final byte[] data) throws IOException {
......@@ -61,7 +59,7 @@ public final class ZipByteArray extends AbstractConsumerStage<byte[]> {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream(data.length);
deflater.finish();
byte[] compressedBytes = new byte[1024];
byte[] compressedBytes = new byte[1024]; // NOPMD
while (!deflater.finished()) {
int count = deflater.deflate(compressedBytes); // returns the generated code... index
outputStream.write(compressedBytes, 0, count);
......@@ -79,7 +77,7 @@ public final class ZipByteArray extends AbstractConsumerStage<byte[]> {
inflater.setInput(data);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream(data.length);
byte[] uncompressedBytes = new byte[1024];
byte[] uncompressedBytes = new byte[1024]; // NOPMD
while (!inflater.finished()) {
int count;
try {
......
......@@ -53,15 +53,15 @@ public final class Delay<T> extends AbstractStage {
}
@Override
public void onTerminating() throws Exception {
while (null == timestampTriggerInputPort.receive()) {
public void onTerminating() throws Exception { // NOPMD
while (null == timestampTriggerInputPort.receive()) { // NOPMD flushes input
// wait for the next trigger
}
sendAllBufferedEllements();
T element;
while (null != (element = inputPort.receive())) {
while (null != (element = inputPort.receive())) { // NOPMD
outputPort.send(element);
}
......
......@@ -17,7 +17,7 @@ package teetime.stage.basic;
import teetime.framework.AbstractConsumerStage;
public final class Sink<T> extends AbstractConsumerStage<T> {
public final class Sink<T> extends AbstractConsumerStage<T> { // NOPMD Sink suits perfectly as a name for this stage
// PERFORMANCE let the sink remove all available input at once by using a new method receiveAll() that clears the pipe's buffer
......
......@@ -19,20 +19,18 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import teetime.framework.AbstractConsumerStage;
import com.google.common.io.Files;
import teetime.framework.AbstractConsumerStage;
public final class ByteArrayFileWriter extends AbstractConsumerStage<byte[]> {
private final File file;
private FileOutputStream fo;
private FileOutputStream fileOutput;
public ByteArrayFileWriter(final File file) {
this.file = file;
try {
Files.touch(file);
fo = new FileOutputStream(this.file);
fileOutput = new FileOutputStream(file);
} catch (IOException e) {
throw new IllegalStateException(e);
}
......@@ -41,8 +39,8 @@ public final class ByteArrayFileWriter extends AbstractConsumerStage<byte[]> {
@Override
protected void execute(final byte[] element) {
try {
fo.write(element);
} catch (Exception e) {
fileOutput.write(element);
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
......@@ -50,7 +48,7 @@ public final class ByteArrayFileWriter extends AbstractConsumerStage<byte[]> {
@Override
public void onTerminating() {
try {
fo.close();
fileOutput.close();
} catch (IOException e) {
throw new IllegalStateException(e);
}
......
......@@ -56,13 +56,14 @@ public final class File2Lines extends AbstractConsumerStage<File> {
this.charset = charset;
}
@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
@Override
protected void execute(final File textFile) {
BufferedReader reader = null;
try {
reader = new BufferedReader(new InputStreamReader(new FileInputStream(textFile), this.charset));
String line;
while ((line = reader.readLine()) != null) {
while ((line = reader.readLine()) != null) { // NOPMD
line = line.trim();
if (line.length() != 0) {
outputPort.send(line);
......
......@@ -59,17 +59,15 @@ public final class File2SeqOfWords extends AbstractConsumerStage<File> {
@Override
protected void execute(final File textFile) {
BufferedReader reader = null;
BufferedReader reader = null; // NOPMD
try {
reader = new BufferedReader(new InputStreamReader(new FileInputStream(textFile), this.charset));
CharBuffer charBuffer = CharBuffer.allocate(bufferCapacity);
while (reader.read(charBuffer) != -1) {
final int position = getPreviousWhitespacePosition(charBuffer);
if (-1 == position) {
if (logger.isErrorEnabled()) {
logger.error("A word in the following text file is bigger than the buffer's capacity: " + textFile.getAbsolutePath());
return;
}
if (-1 == position && logger.isErrorEnabled()) {
logger.error("A word in the following text file is bigger than the buffer's capacity: " + textFile.getAbsolutePath());
return;
}
final int limit = charBuffer.limit();
......@@ -97,16 +95,16 @@ public final class File2SeqOfWords extends AbstractConsumerStage<File> {
}
private int getPreviousWhitespacePosition(final CharBuffer charBuffer) {
char[] characters = charBuffer.array();
char[] characters = charBuffer.array(); // NOPMD Array issue
int index = charBuffer.arrayOffset() + charBuffer.position() - 1;
while (index >= 0) {
switch (characters[index]) {
switch (characters[index]) { // NOPMD break not needed
case ' ':
case '\n':
case '\r':
case '\t':
return index - charBuffer.arrayOffset();
return index - charBuffer.arrayOffset(); // NOPMD
default:
index--;
}
......
......@@ -24,7 +24,7 @@ import java.io.InputStreamReader;
import teetime.framework.AbstractConsumerStage;
import teetime.framework.OutputPort;
import teetime.stage.util.TextLine;
import teetime.stage.util.TextLineContainer;
/**
* @author Christian Wulf
......@@ -34,7 +34,7 @@ import teetime.stage.util.TextLine;
*/
public final class File2TextLinesFilter extends AbstractConsumerStage<File> {
private final OutputPort<TextLine> outputPort = this.createOutputPort();
private final OutputPort<TextLineContainer> outputPort = this.createOutputPort();
private final String charset;
......@@ -70,7 +70,7 @@ public final class File2TextLinesFilter extends AbstractConsumerStage<File> {
while ((line = reader.readLine()) != null) {
line = line.trim();
if (line.length() != 0) {
outputPort.send(new TextLine(textFile, line));
outputPort.send(new TextLineContainer(textFile, line));
} // else: ignore empty line
}
} catch (final FileNotFoundException e) {
......@@ -92,7 +92,7 @@ public final class File2TextLinesFilter extends AbstractConsumerStage<File> {
return this.charset;
}
public OutputPort<TextLine> getOutputPort() {
public OutputPort<TextLineContainer> getOutputPort() {
return outputPort;
}
......
......@@ -37,7 +37,7 @@ public class TaskFarmConfiguration<I, O, T extends ITaskFarmDuplicable<I, O>> {
public static final int INIT_SAMPLES_UNTIL_REMOVE = -1;
/** should the monitoring services be activated (does not affect the adaptation thread!)? **/
private volatile boolean monitoringEnabled = false;
private volatile boolean monitoringEnabled;// = false;
/** the waiting time between each iteration of the adaptation thread **/
private volatile int adaptationWaitingTimeMillis = 50;
......@@ -57,7 +57,7 @@ public class TaskFarmConfiguration<I, O, T extends ITaskFarmDuplicable<I, O>> {
**/
private volatile int maxSamplesUntilRemove = 5;
/** throughput boundary of this task farm **/
private volatile double throughputScoreBoundary = 0.2d;
private volatile double throughputScoreBoundary = 0.2d; // NOPMD error in PMD
/** pipe capacity of all pipes inside the task farm **/
private volatile int pipeCapacity = 100;
......@@ -65,7 +65,9 @@ public class TaskFarmConfiguration<I, O, T extends ITaskFarmDuplicable<I, O>> {
/** the maximum number of worker stages the task farm may have **/
private volatile int maxNumberOfCores = Runtime.getRuntime().availableProcessors() - 2;
TaskFarmConfiguration() {}
TaskFarmConfiguration() {
// non-instantiable from outside
}
/**
*
......
......@@ -94,13 +94,13 @@ public final class TaskFarmStage<I, O, T extends ITaskFarmDuplicable<I, O>> exte
if (merger == null) {
this.merger = new DynamicMerger<O>() {
@Override
public void onStarting() throws Exception {
public void onStarting() throws Exception { // NOPMD
adaptationThread.start();
super.onStarting();
}
@Override
public void onTerminating() throws Exception {
public void onTerminating() throws Exception { // NOPMD
adaptationThread.stopAdaptationThread();
super.onTerminating();
}
......
......@@ -18,14 +18,14 @@ package teetime.stage.taskfarm.adaptation.analysis;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import com.google.common.base.Throwables;
import teetime.stage.taskfarm.ITaskFarmDuplicable;
import teetime.stage.taskfarm.TaskFarmConfiguration;
import teetime.stage.taskfarm.adaptation.history.TaskFarmHistoryService;
import teetime.stage.taskfarm.adaptation.history.ThroughputHistory;
import teetime.stage.taskfarm.exception.TaskFarmAnalysisException;
import com.google.common.base.Throwables;
/**
* Represents an interface to call a throughput algorithm
* by using the throughput algorithm class name. Also provides
......@@ -69,9 +69,7 @@ public class TaskFarmAnalysisService<I, O, T extends ITaskFarmDuplicable<I, O>>
* specified throughput history
*/
public void analyze(final ThroughputHistory history) {
AbstractThroughputAlgorithm algorithm = null;
algorithm = createAlgorithm(this.configuration.getThroughputAlgorithm());
AbstractThroughputAlgorithm algorithm = createAlgorithm(this.configuration.getThroughputAlgorithm());
this.throughputScore = algorithm.getTroughputAnalysis(history);
}
......@@ -86,7 +84,7 @@ public class TaskFarmAnalysisService<I, O, T extends ITaskFarmDuplicable<I, O>>
private AbstractThroughputAlgorithm createAlgorithm(final String algorithmClassName) {
String fullyQualifiedPath = THROUGHPUT_ALGORITHM_PATH + "." + algorithmClassName;
AbstractThroughputAlgorithm algorithm = null;
AbstractThroughputAlgorithm algorithm;
try {
// get throughput algorithm class by using reflection
......@@ -97,37 +95,37 @@ public class TaskFarmAnalysisService<I, O, T extends ITaskFarmDuplicable<I, O>>
Constructor<?> algorithmConstructor = algorithmClass.getConstructor(constructorParameterClasses);
algorithm = (AbstractThroughputAlgorithm) algorithmConstructor.newInstance(constructorParameterObjects);
algorithm = (AbstractThroughputAlgorithm) algorithmConstructor.newInstance(constructorParameterObjects); // NOPMD: returns in outer block
} catch (ClassNotFoundException e) {
throw new TaskFarmAnalysisException("The ThroughputAlgorithm \""
+ fullyQualifiedPath
+ "\" could not be found.");
+ "\" could not be found.", e);
} catch (InstantiationException e) {
throw new TaskFarmAnalysisException("The ThroughputAlgorithm \""
+ fullyQualifiedPath
+ "\" is declared as abstract and cannot be instantiated");
+ "\" is declared as abstract and cannot be instantiated", e);
} catch (IllegalAccessException e) {
throw new TaskFarmAnalysisException("The constructor of \""
+ fullyQualifiedPath
+ "\" could not be accessed.");
+ "\" could not be accessed.", e);
} catch (IllegalArgumentException e) {
// should not happen at all
throw new TaskFarmAnalysisException("The constructor of \""
+ fullyQualifiedPath
+ "\" has not been called with the correct amount of arguments.");
+ "\" has not been called with the correct amount of arguments.", e);
} catch (InvocationTargetException e) {
throw new TaskFarmAnalysisException("The constructor of \""
+ fullyQualifiedPath
+ "\" has thrown an exception:\n"
+ Throwables.getStackTraceAsString(e));
+ Throwables.getStackTraceAsString(e), e);
} catch (NoSuchMethodException e) {
throw new TaskFarmAnalysisException("The ThroughputAlgorithm \""
+ fullyQualifiedPath
+ "\" does not have any constructor with exactly one TaskFarmConfiguration as its parameter.");
+ "\" does not have any constructor with exactly one TaskFarmConfiguration as its parameter.", e);
} catch (SecurityException e) {
throw new TaskFarmAnalysisException("A Security Manager is present and \""
+ fullyQualifiedPath
+ "\"does not have the correct class loader.");
+ "\"does not have the correct class loader.", e);
}
return algorithm;
......
......@@ -75,7 +75,7 @@ public class TaskFarmHistoryService<I, O, T extends ITaskFarmDuplicable<I, O>> {
private double getSumOfPipePushThroughputs() {
this.lastPullThroughputs = new HashMap<IMonitorablePipe, Long>();
this.lastPushThroughputs = new HashMap<IMonitorablePipe, Long>();
double sum = 0;
double sum = 0; // NOPMD
try {
for (ITaskFarmDuplicable<I, O> enclosedStage : this.taskFarmStage.getEnclosedStageInstances()) {
......@@ -107,7 +107,7 @@ public class TaskFarmHistoryService<I, O, T extends ITaskFarmDuplicable<I, O>> {
* (zero if no throughput value for the pipe has been recorded at the last measurement)
*/
public long getLastPullThroughputOfPipe(final IMonitorablePipe pipe) {
long result = 0;
long result = 0; // NOPMD
if (this.lastPullThroughputs.containsKey(pipe)) {
result = this.lastPullThroughputs.get(pipe);
}
......@@ -121,7 +121,7 @@ public class TaskFarmHistoryService<I, O, T extends ITaskFarmDuplicable<I, O>> {
* (zero if no throughput value for the pipe has been recorded at the last measurement)
*/
public long getLastPushThroughputOfPipe(final IMonitorablePipe pipe) {
long result = 0;
long result = 0; // NOPMD
if (this.lastPushThroughputs.containsKey(pipe)) {
result = this.lastPushThroughputs.get(pipe);
}
......
......@@ -32,7 +32,7 @@ public class ThroughputHistory {
private final int maxEntries;
/** throughput sums **/
private final LinkedList<ThroughputEntry> entries = new LinkedList<ThroughputEntry>();
private final LinkedList<ThroughputEntry> entries = new LinkedList<ThroughputEntry>(); // NOPMD LinkedList is needed as type in the code
/**
* Creates a new throughput history with the analysis window specified in the configuration.
......
......@@ -62,22 +62,19 @@ class TaskFarmReconfigurationCommandService<I, O, T extends ITaskFarmDuplicable<
* @return {@link TaskFarmReconfigurationCommand} showing if we want to add or remove a stage
*/
public TaskFarmReconfigurationCommand decideExecutionPlan(final double throughputScore) {
TaskFarmReconfigurationCommand command = TaskFarmReconfigurationCommand.NONE;
TaskFarmReconfigurationCommand command = TaskFarmReconfigurationCommand.NONE; // NOPMD
switch (this.currentMode) {
case ADDING:
if (this.currentMode == ReconfigurationMode.ADDING) {
command = decideForAddingMode(throughputScore);
break;
case REMOVING:
} else {
command = decideForRemovingMode(throughputScore);
break;
}
return command;
}
private TaskFarmReconfigurationCommand decideForAddingMode(final double throughputScore) {
TaskFarmReconfigurationCommand command = TaskFarmReconfigurationCommand.NONE;
TaskFarmReconfigurationCommand command = TaskFarmReconfigurationCommand.NONE; // NOPMD
if (this.taskFarmStage.getEnclosedStageInstances().size() >= this.taskFarmStage.getConfiguration().getMaxNumberOfCores()) {
// we do not want to parallelize more than we have (virtual) processors
......@@ -115,7 +112,7 @@ class TaskFarmReconfigurationCommandService<I, O, T extends ITaskFarmDuplicable<
}
private TaskFarmReconfigurationCommand decideForRemovingMode(final double throughputScore) {
TaskFarmReconfigurationCommand command = TaskFarmReconfigurationCommand.NONE;
TaskFarmReconfigurationCommand command = TaskFarmReconfigurationCommand.NONE; // NOPMD
// we never want to remove the basic stage since it would destroy the pipeline
for (int i = 1; i < this.taskFarmStage.getEnclosedStageInstances().size() - 1; i++) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment