diff --git a/conf/logback.groovy b/conf/logback.groovy index c9173f020aabfc6acb9f6e3163c1c3f69b455f6f..58e714aeb66a661011dc99bc5a059dd9e3ad5d66 100644 --- a/conf/logback.groovy +++ b/conf/logback.groovy @@ -26,4 +26,4 @@ logger "teetime.variant.methodcallWithPorts.stage", INFO logger "teetime.variant.methodcallWithPorts.framework.core.pipe", INFO -logger "teetime.variant.methodcallWithPorts.examples.kiekerdays.TimingsReader", TRACE, ["FILE"] \ No newline at end of file +logger "util.TimingsReader", TRACE, ["FILE"] \ No newline at end of file diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java index 776fc669dc46826713861ceb80c2374f40015e48..2fe100e57402455723436b3bf04fa832b42809f3 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java @@ -8,6 +8,7 @@ import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import teetime.variant.methodcallWithPorts.framework.core.pipe.DummyPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe; public abstract class AbstractStage implements StageWithPort { @@ -59,6 +60,18 @@ public abstract class AbstractStage implements StageWithPort { public void onStart() { this.cachedInputPorts = this.inputPortList.toArray(new InputPort<?>[0]); this.cachedOutputPorts = this.outputPortList.toArray(new OutputPort<?>[0]); + + this.connectUnconnectedOutputPorts(); + } + + @SuppressWarnings("unchecked") + private void connectUnconnectedOutputPorts() { + for (OutputPort<?> outputPort : this.cachedOutputPorts) { + if (null == outputPort.getPipe()) { // if port is unconnected + this.logger.warn("Unconnected output port: " + outputPort + ". Connecting with a dummy output port."); + outputPort.setPipe(new DummyPipe()); + } + } } protected void onFinished() { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java index 0aeab19c89973fec066303a2637875929c7fa769..b29c12464e1d329de06a92826b2a338f74968ce4 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/OutputPort.java @@ -4,7 +4,7 @@ public class OutputPort<T> extends AbstractPort<T> { /** * Performance cache: Avoids the following method chain - * + * * <pre> * this.getPipe().getTargetPort().getOwningStage() * </pre> @@ -16,7 +16,7 @@ public class OutputPort<T> extends AbstractPort<T> { } /** - * + * * @param element * @return <code>true</code> iff the given <code>element</code> could be sent, <code>false</code> otherwise (then use a re-try strategy) */ @@ -33,9 +33,7 @@ public class OutputPort<T> extends AbstractPort<T> { } public void sendSignal(final Signal signal) { - if (this.pipe != null) { // if the output port is connected with a pipe - this.pipe.setSignal(signal); - } + this.pipe.setSignal(signal); } } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/DummyPipe.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/DummyPipe.java new file mode 100644 index 0000000000000000000000000000000000000000..ee90ec5d5b1b9eef370b4ca9eaab27362c9034d6 --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/pipe/DummyPipe.java @@ -0,0 +1,55 @@ +package teetime.variant.methodcallWithPorts.framework.core.pipe; + +import teetime.variant.methodcallWithPorts.framework.core.InputPort; +import teetime.variant.methodcallWithPorts.framework.core.OutputPort; +import teetime.variant.methodcallWithPorts.framework.core.Signal; + +/** + * A pipe implementation used to connect unconnected output ports. + * + * @author Christian Wulf + * + */ +@SuppressWarnings("rawtypes") +public final class DummyPipe implements IPipe { + + @Override + public boolean add(final Object element) { + return false; + } + + @Override + public Object removeLast() { + return null; + } + + @Override + public boolean isEmpty() { + return true; + } + + @Override + public int size() { + return 0; + } + + @Override + public Object readLast() { + return null; + } + + @Override + public InputPort<Object> getTargetPort() { + return null; + } + + @Override + public void setTargetPort(final InputPort targetPort) {} + + @Override + public void setSignal(final Signal signal) {} + + @Override + public void connectPorts(final OutputPort sourcePort, final InputPort targetPort) {} + +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java index f6489819a7d2609e9713b78aa823300a4b2dab4e..6dca74493295e6697ec3b6deeca1816548924ba7 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java @@ -170,13 +170,12 @@ public class TCPReader extends ProducerStage<IMonitoringRecord> { } } - private void createAndSendRecord(final ByteBuffer buffer) { + private final void createAndSendRecord(final ByteBuffer buffer) { final int clazzid = buffer.getInt(); final long loggingTimestamp = buffer.getLong(); - final IMonitoringRecord record; - try { // NOCS (Nested try-catch) + try { // record = this.recordFactory.create(clazzid, buffer, this.stringRegistry); - record = AbstractMonitoringRecord.createFromByteBuffer(clazzid, buffer, this.stringRegistry); + final IMonitoringRecord record = AbstractMonitoringRecord.createFromByteBuffer(clazzid, buffer, this.stringRegistry); record.setLoggingTimestamp(loggingTimestamp); this.send(this.outputPort, record); } catch (final MonitoringRecordException ex) { diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TCPReaderSink.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/TCPReaderSink.java similarity index 99% rename from src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TCPReaderSink.java rename to src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/TCPReaderSink.java index 7bfec10e16795ddc53492e555ea07cd9ecb2be0d..a09cdea37f89e0385b847a137f7b0a2c208d9fc0 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TCPReaderSink.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/TCPReaderSink.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. ***************************************************************************/ -package teetime.variant.methodcallWithPorts.examples.kiekerdays; +package teetime.variant.methodcallWithPorts.stage.kieker; import java.io.IOException; import java.net.InetSocketAddress; diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/KiekerLoadDriver.java b/src/main/java/util/KiekerLoadDriver.java similarity index 99% rename from src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/KiekerLoadDriver.java rename to src/main/java/util/KiekerLoadDriver.java index ba83610422be4d9a595d8febf25b10979c452a2b..e30aa0674a73d2d37d7f926ce37e3fca69c5a7ac 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/KiekerLoadDriver.java +++ b/src/main/java/util/KiekerLoadDriver.java @@ -1,4 +1,4 @@ -package teetime.variant.methodcallWithPorts.examples.kiekerdays; +package util; import java.io.BufferedOutputStream; import java.io.File; diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TimingsReader.java b/src/main/java/util/TimingsReader.java similarity index 87% rename from src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TimingsReader.java rename to src/main/java/util/TimingsReader.java index fda303d7e3f1fcbbe07960f69d2b955a71b8d3ac..5d907fb252cbfe9698386e3bc6a058dbcbf7b857 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TimingsReader.java +++ b/src/main/java/util/TimingsReader.java @@ -1,4 +1,4 @@ -package teetime.variant.methodcallWithPorts.examples.kiekerdays; +package util; import java.io.File; import java.io.IOException; @@ -9,8 +9,6 @@ import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import util.StatisticsUtil; - import com.google.common.base.Charsets; import com.google.common.collect.ImmutableList; import com.google.common.io.CharSource; @@ -40,9 +38,6 @@ public class TimingsReader { durationsInNs.add(timing); } - LOGGER.trace("Removing leading zeros..."); - StatisticsUtil.removeLeadingZeroThroughputs(durationsInNs); - LOGGER.trace("Calculating quantiles..."); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(durationsInNs); LOGGER.info(StatisticsUtil.getQuantilesString(quintiles)); diff --git a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLogging.java b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLogging.java index 61e89b210e64c2c605f41eabd730741baa7fd823..5d27666cf810daf6802680742aef0c62bbe86f9f 100644 --- a/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLogging.java +++ b/src/test/java/teetime/variant/methodcallWithPorts/examples/kiekerdays/TcpTraceLogging.java @@ -3,6 +3,7 @@ package teetime.variant.methodcallWithPorts.examples.kiekerdays; import teetime.variant.explicitScheduling.framework.core.Analysis; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; +import teetime.variant.methodcallWithPorts.stage.io.TCPReader; public class TcpTraceLogging extends Analysis { @@ -29,15 +30,9 @@ public class TcpTraceLogging extends Analysis { } private StageWithPort buildTcpPipeline() { - TCPReaderSink tcpReader = new TCPReaderSink(); - // EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>(); - // - // SingleElementPipe.connect(tcpReader.getOutputPort(), endStage.getInputPort()); - // - // // create and configure pipeline - // Pipeline<Void, IMonitoringRecord> pipeline = new Pipeline<Void, IMonitoringRecord>(); - // pipeline.setFirstStage(tcpReader); - // pipeline.setLastStage(endStage); + // TCPReaderSink tcpReader = new TCPReaderSink(); + TCPReader tcpReader = new TCPReader(); + return tcpReader; } diff --git a/submodules/JCTools b/submodules/JCTools index 88e1e25f9519b250258c7e5ada30935975ab2d10..75998aa20b7ec897ec321c1f94192de888f2dc6e 160000 --- a/submodules/JCTools +++ b/submodules/JCTools @@ -1 +1 @@ -Subproject commit 88e1e25f9519b250258c7e5ada30935975ab2d10 +Subproject commit 75998aa20b7ec897ec321c1f94192de888f2dc6e