From 1207c2daa890343718b073e6eca76f6dc98d5074 Mon Sep 17 00:00:00 2001 From: Christian Wulf <chw@informatik.uni-kiel.de> Date: Tue, 19 Aug 2014 09:04:11 +0200 Subject: [PATCH] added DummyPipe; removed code for leading zeros --- conf/logback.groovy | 2 +- .../framework/core/AbstractStage.java | 13 +++++ .../framework/core/OutputPort.java | 8 +-- .../framework/core/pipe/DummyPipe.java | 55 +++++++++++++++++++ .../stage/io/TCPReader.java | 7 +-- .../stage/kieker}/TCPReaderSink.java | 2 +- .../java/util}/KiekerLoadDriver.java | 2 +- .../java/util}/TimingsReader.java | 7 +-- .../examples/kiekerdays/TcpTraceLogging.java | 13 ++--- submodules/JCTools | 2 +- 10 files changed, 83 insertions(+), 28 deletions(-) create mode 100644 src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/DummyPipe.java rename src/{test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays => main/java/teetime/variant/methodcallWithPorts/stage/kieker}/TCPReaderSink.java (99%) rename src/{test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays => main/java/util}/KiekerLoadDriver.java (99%) rename src/{test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays => main/java/util}/TimingsReader.java (87%) diff --git a/conf/logback.groovy b/conf/logback.groovy index c9173f0..58e714a 100644 --- a/conf/logback.groovy +++ b/conf/logback.groovy @@ -26,4 +26,4 @@ logger "teetime.variant.methodcallWithPorts.stage", INFO logger "teetime.variant.methodcallWithPorts.framework.core.pipe", INFO -logger "teetime.variant.methodcallWithPorts.examples.kiekerdays.TimingsReader", TRACE, ["FILE"] \ No newline at end of file +logger "util.TimingsReader", TRACE, ["FILE"] \ No newline at end of file diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java index 776fc66..2fe100e 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java @@ -8,6 +8,7 @@ import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import teetime.variant.methodcallWithPorts.framework.core.pipe.DummyPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe; public abstract class AbstractStage implements StageWithPort { @@ -59,6 +60,18 @@ public abstract class AbstractStage implements StageWithPort { public void onStart() { this.cachedInputPorts = this.inputPortList.toArray(new InputPort<?>[0]); this.cachedOutputPorts = this.outputPortList.toArray(new OutputPort<?>[0]); + + this.connectUnconnectedOutputPorts(); + } + + @SuppressWarnings("unchecked") + private void connectUnconnectedOutputPorts() { + for (OutputPort<?> outputPort : this.cachedOutputPorts) { + if (null == outputPort.getPipe()) { // if port is unconnected + this.logger.warn("Unconnected output port: " + outputPort + ". Connecting with a dummy output port."); + outputPort.setPipe(new DummyPipe()); + } + } } protected void onFinished() { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java index 0aeab19..b29c124 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java @@ -4,7 +4,7 @@ public class OutputPort<T> extends AbstractPort<T> { /** * Performance cache: Avoids the following method chain - * + * * <pre> * this.getPipe().getTargetPort().getOwningStage() * </pre> @@ -16,7 +16,7 @@ public class OutputPort<T> extends AbstractPort<T> { } /** - * + * * @param element * @return <code>true</code> iff the given <code>element</code> could be sent, <code>false</code> otherwise (then use a re-try strategy) */ @@ -33,9 +33,7 @@ public class OutputPort<T> extends AbstractPort<T> { } public void sendSignal(final Signal signal) { - if (this.pipe != null) { // if the output port is connected with a pipe - this.pipe.setSignal(signal); - } + this.pipe.setSignal(signal); } } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/DummyPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/DummyPipe.java new file mode 100644 index 0000000..ee90ec5 --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/DummyPipe.java @@ -0,0 +1,55 @@ +package teetime.variant.methodcallWithPorts.framework.core.pipe; + +import teetime.variant.methodcallWithPorts.framework.core.InputPort; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; +import teetime.variant.methodcallWithPorts.framework.core.Signal; + +/** + * A pipe implementation used to connect unconnected output ports. + * + * @author Christian Wulf + * + */ +@SuppressWarnings("rawtypes") +public final class DummyPipe implements IPipe { + + @Override + public boolean add(final Object element) { + return false; + } + + @Override + public Object removeLast() { + return null; + } + + @Override + public boolean isEmpty() { + return true; + } + + @Override + public int size() { + return 0; + } + + @Override + public Object readLast() { + return null; + } + + @Override + public InputPort<Object> getTargetPort() { + return null; + } + + @Override + public void setTargetPort(final InputPort targetPort) {} + + @Override + public void setSignal(final Signal signal) {} + + @Override + public void connectPorts(final OutputPort sourcePort, final InputPort targetPort) {} + +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java index f648981..6dca744 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java @@ -170,13 +170,12 @@ public class TCPReader extends ProducerStage<IMonitoringRecord> { } } - private void createAndSendRecord(final ByteBuffer buffer) { + private final void createAndSendRecord(final ByteBuffer buffer) { final int clazzid = buffer.getInt(); final long loggingTimestamp = buffer.getLong(); - final IMonitoringRecord record; - try { // NOCS (Nested try-catch) + try { // record = this.recordFactory.create(clazzid, buffer, this.stringRegistry); - record = AbstractMonitoringRecord.createFromByteBuffer(clazzid, buffer, this.stringRegistry); + final IMonitoringRecord record = AbstractMonitoringRecord.createFromByteBuffer(clazzid, buffer, this.stringRegistry); record.setLoggingTimestamp(loggingTimestamp); this.send(this.outputPort, record); } catch (final MonitoringRecordException ex) { diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TCPReaderSink.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/TCPReaderSink.java similarity index 99% rename from src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TCPReaderSink.java rename to src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/TCPReaderSink.java index 7bfec10..a09cdea 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TCPReaderSink.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/TCPReaderSink.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. ***************************************************************************/ -package teetime.variant.methodcallWithPorts.examples.kiekerdays; +package teetime.variant.methodcallWithPorts.stage.kieker; import java.io.IOException; import java.net.InetSocketAddress; diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/KiekerLoadDriver.java b/src/main/java/util/KiekerLoadDriver.java similarity index 99% rename from src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/KiekerLoadDriver.java rename to src/main/java/util/KiekerLoadDriver.java index ba83610..e30aa06 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/KiekerLoadDriver.java +++ b/src/main/java/util/KiekerLoadDriver.java @@ -1,4 +1,4 @@ -package teetime.variant.methodcallWithPorts.examples.kiekerdays; +package util; import java.io.BufferedOutputStream; import java.io.File; diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TimingsReader.java b/src/main/java/util/TimingsReader.java similarity index 87% rename from src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TimingsReader.java rename to src/main/java/util/TimingsReader.java index fda303d..5d907fb 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TimingsReader.java +++ b/src/main/java/util/TimingsReader.java @@ -1,4 +1,4 @@ -package teetime.variant.methodcallWithPorts.examples.kiekerdays; +package util; import java.io.File; import java.io.IOException; @@ -9,8 +9,6 @@ import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import util.StatisticsUtil; - import com.google.common.base.Charsets; import com.google.common.collect.ImmutableList; import com.google.common.io.CharSource; @@ -40,9 +38,6 @@ public class TimingsReader { durationsInNs.add(timing); } - LOGGER.trace("Removing leading zeros..."); - StatisticsUtil.removeLeadingZeroThroughputs(durationsInNs); - LOGGER.trace("Calculating quantiles..."); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(durationsInNs); LOGGER.info(StatisticsUtil.getQuantilesString(quintiles)); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLogging.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLogging.java index 61e89b2..5d27666 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLogging.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLogging.java @@ -3,6 +3,7 @@ package teetime.variant.methodcallWithPorts.examples.kiekerdays; import teetime.variant.explicitScheduling.framework.core.Analysis; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; +import teetime.variant.methodcallWithPorts.stage.io.TCPReader; public class TcpTraceLogging extends Analysis { @@ -29,15 +30,9 @@ public class TcpTraceLogging extends Analysis { } private StageWithPort buildTcpPipeline() { - TCPReaderSink tcpReader = new TCPReaderSink(); - // EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>(); - // - // SingleElementPipe.connect(tcpReader.getOutputPort(), endStage.getInputPort()); - // - // // create and configure pipeline - // Pipeline<Void, IMonitoringRecord> pipeline = new Pipeline<Void, IMonitoringRecord>(); - // pipeline.setFirstStage(tcpReader); - // pipeline.setLastStage(endStage); + // TCPReaderSink tcpReader = new TCPReaderSink(); + TCPReader tcpReader = new TCPReader(); + return tcpReader; } diff --git a/submodules/JCTools b/submodules/JCTools index 88e1e25..75998aa 160000 --- a/submodules/JCTools +++ b/submodules/JCTools @@ -1 +1 @@ -Subproject commit 88e1e25f9519b250258c7e5ada30935975ab2d10 +Subproject commit 75998aa20b7ec897ec321c1f94192de888f2dc6e -- GitLab