Skip to content
Snippets Groups Projects
Commit 1207c2da authored by Christian Wulf's avatar Christian Wulf
Browse files

added DummyPipe;

removed code for leading zeros
parent 1a52ab27
No related branches found
No related tags found
No related merge requests found
Showing with 83 additions and 28 deletions
...@@ -26,4 +26,4 @@ logger "teetime.variant.methodcallWithPorts.stage", INFO ...@@ -26,4 +26,4 @@ logger "teetime.variant.methodcallWithPorts.stage", INFO
logger "teetime.variant.methodcallWithPorts.framework.core.pipe", INFO logger "teetime.variant.methodcallWithPorts.framework.core.pipe", INFO
logger "teetime.variant.methodcallWithPorts.examples.kiekerdays.TimingsReader", TRACE, ["FILE"] logger "util.TimingsReader", TRACE, ["FILE"]
\ No newline at end of file \ No newline at end of file
...@@ -8,6 +8,7 @@ import java.util.UUID; ...@@ -8,6 +8,7 @@ import java.util.UUID;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import teetime.variant.methodcallWithPorts.framework.core.pipe.DummyPipe;
import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe; import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
public abstract class AbstractStage implements StageWithPort { public abstract class AbstractStage implements StageWithPort {
...@@ -59,6 +60,18 @@ public abstract class AbstractStage implements StageWithPort { ...@@ -59,6 +60,18 @@ public abstract class AbstractStage implements StageWithPort {
public void onStart() { public void onStart() {
this.cachedInputPorts = this.inputPortList.toArray(new InputPort<?>[0]); this.cachedInputPorts = this.inputPortList.toArray(new InputPort<?>[0]);
this.cachedOutputPorts = this.outputPortList.toArray(new OutputPort<?>[0]); this.cachedOutputPorts = this.outputPortList.toArray(new OutputPort<?>[0]);
this.connectUnconnectedOutputPorts();
}
@SuppressWarnings("unchecked")
private void connectUnconnectedOutputPorts() {
for (OutputPort<?> outputPort : this.cachedOutputPorts) {
if (null == outputPort.getPipe()) { // if port is unconnected
this.logger.warn("Unconnected output port: " + outputPort + ". Connecting with a dummy output port.");
outputPort.setPipe(new DummyPipe());
}
}
} }
protected void onFinished() { protected void onFinished() {
......
...@@ -4,7 +4,7 @@ public class OutputPort<T> extends AbstractPort<T> { ...@@ -4,7 +4,7 @@ public class OutputPort<T> extends AbstractPort<T> {
/** /**
* Performance cache: Avoids the following method chain * Performance cache: Avoids the following method chain
* *
* <pre> * <pre>
* this.getPipe().getTargetPort().getOwningStage() * this.getPipe().getTargetPort().getOwningStage()
* </pre> * </pre>
...@@ -16,7 +16,7 @@ public class OutputPort<T> extends AbstractPort<T> { ...@@ -16,7 +16,7 @@ public class OutputPort<T> extends AbstractPort<T> {
} }
/** /**
* *
* @param element * @param element
* @return <code>true</code> iff the given <code>element</code> could be sent, <code>false</code> otherwise (then use a re-try strategy) * @return <code>true</code> iff the given <code>element</code> could be sent, <code>false</code> otherwise (then use a re-try strategy)
*/ */
...@@ -33,9 +33,7 @@ public class OutputPort<T> extends AbstractPort<T> { ...@@ -33,9 +33,7 @@ public class OutputPort<T> extends AbstractPort<T> {
} }
public void sendSignal(final Signal signal) { public void sendSignal(final Signal signal) {
if (this.pipe != null) { // if the output port is connected with a pipe this.pipe.setSignal(signal);
this.pipe.setSignal(signal);
}
} }
} }
package teetime.variant.methodcallWithPorts.framework.core.pipe;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import teetime.variant.methodcallWithPorts.framework.core.Signal;
/**
* A pipe implementation used to connect unconnected output ports.
*
* @author Christian Wulf
*
*/
@SuppressWarnings("rawtypes")
public final class DummyPipe implements IPipe {
@Override
public boolean add(final Object element) {
return false;
}
@Override
public Object removeLast() {
return null;
}
@Override
public boolean isEmpty() {
return true;
}
@Override
public int size() {
return 0;
}
@Override
public Object readLast() {
return null;
}
@Override
public InputPort<Object> getTargetPort() {
return null;
}
@Override
public void setTargetPort(final InputPort targetPort) {}
@Override
public void setSignal(final Signal signal) {}
@Override
public void connectPorts(final OutputPort sourcePort, final InputPort targetPort) {}
}
...@@ -170,13 +170,12 @@ public class TCPReader extends ProducerStage<IMonitoringRecord> { ...@@ -170,13 +170,12 @@ public class TCPReader extends ProducerStage<IMonitoringRecord> {
} }
} }
private void createAndSendRecord(final ByteBuffer buffer) { private final void createAndSendRecord(final ByteBuffer buffer) {
final int clazzid = buffer.getInt(); final int clazzid = buffer.getInt();
final long loggingTimestamp = buffer.getLong(); final long loggingTimestamp = buffer.getLong();
final IMonitoringRecord record; try {
try { // NOCS (Nested try-catch)
// record = this.recordFactory.create(clazzid, buffer, this.stringRegistry); // record = this.recordFactory.create(clazzid, buffer, this.stringRegistry);
record = AbstractMonitoringRecord.createFromByteBuffer(clazzid, buffer, this.stringRegistry); final IMonitoringRecord record = AbstractMonitoringRecord.createFromByteBuffer(clazzid, buffer, this.stringRegistry);
record.setLoggingTimestamp(loggingTimestamp); record.setLoggingTimestamp(loggingTimestamp);
this.send(this.outputPort, record); this.send(this.outputPort, record);
} catch (final MonitoringRecordException ex) { } catch (final MonitoringRecordException ex) {
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
***************************************************************************/ ***************************************************************************/
package teetime.variant.methodcallWithPorts.examples.kiekerdays; package teetime.variant.methodcallWithPorts.stage.kieker;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
......
package teetime.variant.methodcallWithPorts.examples.kiekerdays; package util;
import java.io.BufferedOutputStream; import java.io.BufferedOutputStream;
import java.io.File; import java.io.File;
......
package teetime.variant.methodcallWithPorts.examples.kiekerdays; package util;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
...@@ -9,8 +9,6 @@ import java.util.Map; ...@@ -9,8 +9,6 @@ import java.util.Map;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import util.StatisticsUtil;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.io.CharSource; import com.google.common.io.CharSource;
...@@ -40,9 +38,6 @@ public class TimingsReader { ...@@ -40,9 +38,6 @@ public class TimingsReader {
durationsInNs.add(timing); durationsInNs.add(timing);
} }
LOGGER.trace("Removing leading zeros...");
StatisticsUtil.removeLeadingZeroThroughputs(durationsInNs);
LOGGER.trace("Calculating quantiles..."); LOGGER.trace("Calculating quantiles...");
Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(durationsInNs); Map<Double, Long> quintiles = StatisticsUtil.calculateQuintiles(durationsInNs);
LOGGER.info(StatisticsUtil.getQuantilesString(quintiles)); LOGGER.info(StatisticsUtil.getQuantilesString(quintiles));
......
...@@ -3,6 +3,7 @@ package teetime.variant.methodcallWithPorts.examples.kiekerdays; ...@@ -3,6 +3,7 @@ package teetime.variant.methodcallWithPorts.examples.kiekerdays;
import teetime.variant.explicitScheduling.framework.core.Analysis; import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage; import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.StageWithPort; import teetime.variant.methodcallWithPorts.framework.core.StageWithPort;
import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
public class TcpTraceLogging extends Analysis { public class TcpTraceLogging extends Analysis {
...@@ -29,15 +30,9 @@ public class TcpTraceLogging extends Analysis { ...@@ -29,15 +30,9 @@ public class TcpTraceLogging extends Analysis {
} }
private StageWithPort buildTcpPipeline() { private StageWithPort buildTcpPipeline() {
TCPReaderSink tcpReader = new TCPReaderSink(); // TCPReaderSink tcpReader = new TCPReaderSink();
// EndStage<IMonitoringRecord> endStage = new EndStage<IMonitoringRecord>(); TCPReader tcpReader = new TCPReader();
//
// SingleElementPipe.connect(tcpReader.getOutputPort(), endStage.getInputPort());
//
// // create and configure pipeline
// Pipeline<Void, IMonitoringRecord> pipeline = new Pipeline<Void, IMonitoringRecord>();
// pipeline.setFirstStage(tcpReader);
// pipeline.setLastStage(endStage);
return tcpReader; return tcpReader;
} }
......
Subproject commit 88e1e25f9519b250258c7e5ada30935975ab2d10 Subproject commit 75998aa20b7ec897ec321c1f94192de888f2dc6e
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment