Skip to content
Snippets Groups Projects
Commit 468be6de authored by Christian Wulf's avatar Christian Wulf
Browse files

added type to ports to allow type validation at runtime

parent 8ffe138c
No related branches found
No related tags found
No related merge requests found
Showing
with 141 additions and 67 deletions
......@@ -45,11 +45,11 @@ cleanup.sort_members_all=false
cleanup.use_blocks=true
cleanup.use_blocks_only_for_return_and_throw=false
cleanup.use_parentheses_in_expressions=true
cleanup.use_this_for_non_static_field_access=true
cleanup.use_this_for_non_static_field_access=false
cleanup.use_this_for_non_static_field_access_only_if_necessary=false
cleanup.use_this_for_non_static_method_access=true
cleanup.use_this_for_non_static_method_access_only_if_necessary=false
cleanup_profile=_Kieker - Clean Up
cleanup_profile=_TeeTime
cleanup_settings_version=2
eclipse.preferences.version=1
editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
......
package teetime.variant.methodcallWithPorts.framework.core;
import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
public abstract class AbstractPort<T> {
protected IPipe<T> pipe;
/**
* The type of this port.
* <p>
* <i>Used to validate the connection between two ports at runtime.</i>
* </p>
*/
protected Class<T> type;
public IPipe<T> getPipe() {
return this.pipe;
}
public void setPipe(final IPipe<T> pipe) {
this.pipe = pipe;
}
public Class<T> getType() {
return this.type;
}
public void setType(final Class<T> type) {
this.type = type;
}
}
package teetime.variant.methodcallWithPorts.framework.core;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
import kieker.common.logging.Log;
import kieker.common.logging.LogFactory;
......@@ -118,16 +121,36 @@ public abstract class AbstractStage implements StageWithPort {
protected <T> InputPort<T> createInputPort() {
InputPort<T> inputPort = new InputPort<T>(this);
// inputPort.setType(type); // TODO set type for input port
this.inputPortList.add(inputPort);
return inputPort;
}
protected <T> OutputPort<T> createOutputPort() {
OutputPort<T> outputPort = new OutputPort<T>();
// outputPort.setType(type); // TODO set type for output port
this.outputPortList.add(outputPort);
return outputPort;
}
public List<InvalidPortConnection> validateOutputPorts() {
List<InvalidPortConnection> invalidOutputPortMessages = new LinkedList<InvalidPortConnection>();
for (OutputPort<?> outputPort : this.getOutputPorts()) {
IPipe<?> pipe = outputPort.getPipe();
if (null != pipe) { // if output port is connected with another one
Class<?> sourcePortType = outputPort.getType();
Class<?> targetPortType = pipe.getTargetPort().getType();
if (null == sourcePortType || !sourcePortType.equals(targetPortType)) {
InvalidPortConnection invalidPortConnection = new InvalidPortConnection(outputPort, pipe.getTargetPort());
invalidOutputPortMessages.add(invalidPortConnection);
}
}
}
return invalidOutputPortMessages;
}
@Override
public String toString() {
return this.getClass().getName() + ": " + this.id;
......
......@@ -2,10 +2,9 @@ package teetime.variant.methodcallWithPorts.framework.core;
import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
public class InputPort<T> {
public class InputPort<T> extends AbstractPort<T> {
private final StageWithPort owningStage;
private IPipe<T> pipe;
InputPort(final StageWithPort owningStage) {
super();
......@@ -22,15 +21,12 @@ public class InputPort<T> {
return element;
}
public IPipe<T> getPipe() {
return this.pipe;
}
/**
* Connects this input port with the given <code>pipe</code> bi-directionally
*
* @param pipe
*/
@Override
public void setPipe(final IPipe<T> pipe) {
this.pipe = pipe;
pipe.setTargetPort(this);
......
package teetime.variant.methodcallWithPorts.framework.core;
public class InvalidPortConnection {
private final OutputPort<?> sourcePort;
private final InputPort<?> inputPort;
public InvalidPortConnection(final OutputPort<?> sourcePort, final InputPort<?> inputPort) {
super();
this.sourcePort = sourcePort;
this.inputPort = inputPort;
}
public OutputPort<?> getSourcePort() {
return sourcePort;
}
public InputPort<?> getInputPort() {
return inputPort;
}
}
package teetime.variant.methodcallWithPorts.framework.core;
import teetime.variant.methodcallWithPorts.framework.core.pipe.IPipe;
public class OutputPort<T> extends AbstractPort<T> {
public class OutputPort<T> {
private IPipe<T> pipe;
/**
* Performance cache: Avoids the following method chain
*
......@@ -27,14 +24,6 @@ public class OutputPort<T> {
return this.pipe.add(element);
}
public IPipe<T> getPipe() {
return this.pipe;
}
public void setPipe(final IPipe<T> pipe) {
this.pipe = pipe;
}
public StageWithPort getCachedTargetStage() {
return this.cachedTargetStage;
}
......
......@@ -8,8 +8,15 @@ import java.util.UUID;
import kieker.common.logging.Log;
import kieker.common.logging.LogFactory;
/**
*
* @author Christian Wulf
*
* @param <FirstStage>
* @param <LastStage>
*/
// BETTER remove the pipeline since it does not add any new functionality
public class Pipeline<I, O> implements StageWithPort {
public class Pipeline<FirstStage extends StageWithPort, LastStage extends StageWithPort> implements StageWithPort {
private final String id;
/**
......@@ -17,11 +24,9 @@ public class Pipeline<I, O> implements StageWithPort {
*/
protected Log logger;
private StageWithPort firstStage;
private InputPort<I> firstStageInputPort;
private FirstStage firstStage;
private final List<StageWithPort> intermediateStages = new LinkedList<StageWithPort>();
private StageWithPort lastStage;
private OutputPort<O> lastStageOutputPort;
private LastStage lastStage;
// BETTER remove the stage array and use the output ports instead for passing a signal to all stages in the same thread; what about multiple same signals due to
// multiple input ports?
......@@ -47,9 +52,8 @@ public class Pipeline<I, O> implements StageWithPort {
return this.id;
}
public void setFirstStage(final StageWithPort stage, final InputPort<I> firstStageInputPort) {
public void setFirstStage(final FirstStage stage) {
this.firstStage = stage;
this.firstStageInputPort = firstStageInputPort;
}
public void addIntermediateStages(final StageWithPort... stages) {
......@@ -60,9 +64,8 @@ public class Pipeline<I, O> implements StageWithPort {
this.intermediateStages.add(stage);
}
public void setLastStage(final StageWithPort stage, final OutputPort<O> lastStageOutputPort) {
public void setLastStage(final LastStage stage) {
this.lastStage = stage;
this.lastStageOutputPort = lastStageOutputPort;
}
@Override
......@@ -151,17 +154,17 @@ public class Pipeline<I, O> implements StageWithPort {
// this.reschedulable = reschedulable;
// }
public InputPort<I> getInputPort() {
return this.firstStageInputPort;
@Override
public void onSignal(final Signal signal, final InputPort<?> inputPort) {
this.firstStage.onSignal(signal, inputPort);
}
public OutputPort<O> getOutputPort() {
return this.lastStageOutputPort;
public FirstStage getFirstStage() {
return this.firstStage;
}
@Override
public void onSignal(final Signal signal, final InputPort<?> inputPort) {
this.firstStage.onSignal(signal, inputPort);
public LastStage getLastStage() {
return this.lastStage;
}
}
......@@ -3,12 +3,12 @@ package teetime.variant.methodcallWithPorts.framework.core;
import kieker.common.logging.Log;
import kieker.common.logging.LogFactory;
public class RunnableStage<I> implements Runnable {
public class RunnableStage implements Runnable {
private final ConsumerStage<I> stage;
private final StageWithPort stage;
private final Log logger;
public RunnableStage(final ConsumerStage<I> stage) {
public RunnableStage(final StageWithPort stage) {
this.stage = stage;
this.logger = LogFactory.getLog(stage.getClass());
}
......@@ -24,7 +24,7 @@ public class RunnableStage<I> implements Runnable {
this.stage.executeWithPorts();
} while (this.stage.isReschedulable());
this.stage.onSignal(Signal.FINISHED, this.stage.getInputPort());
this.stage.onSignal(Signal.FINISHED, null);
} catch (RuntimeException e) {
this.logger.error("Terminating thread due to the following exception: ", e);
......
......@@ -17,6 +17,8 @@ package teetime.variant.methodcallWithPorts.stage.kieker.fileToRecord;
import java.io.File;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.stage.io.File2TextLinesFilter;
......@@ -30,16 +32,24 @@ import kieker.common.record.IMonitoringRecord;
*
* @since 1.10
*/
public class DatFile2RecordFilter extends Pipeline<File, IMonitoringRecord> {
public class DatFile2RecordFilter extends Pipeline<File2TextLinesFilter, TextLine2RecordFilter> {
public DatFile2RecordFilter(final ClassNameRegistryRepository classNameRegistryRepository) {
final File2TextLinesFilter file2TextLinesFilter = new File2TextLinesFilter();
final TextLine2RecordFilter textLine2RecordFilter = new TextLine2RecordFilter(classNameRegistryRepository);
File2TextLinesFilter file2TextLinesFilter = new File2TextLinesFilter();
TextLine2RecordFilter textLine2RecordFilter = new TextLine2RecordFilter(classNameRegistryRepository);
this.setFirstStage(file2TextLinesFilter, file2TextLinesFilter.getInputPort());
this.setLastStage(textLine2RecordFilter, textLine2RecordFilter.getOutputPort());
this.setFirstStage(file2TextLinesFilter);
this.setLastStage(textLine2RecordFilter);
// BETTER let the framework choose the optimal pipe implementation
SingleElementPipe.connect(file2TextLinesFilter.getOutputPort(), textLine2RecordFilter.getInputPort());
}
public InputPort<File> getInputPort() {
return this.getFirstStage().getInputPort();
}
public OutputPort<IMonitoringRecord> getOutputPort() {
return this.getLastStage().getOutputPort();
}
}
......@@ -20,10 +20,10 @@ import teetime.variant.methodcallWithPorts.stage.Clock;
import teetime.variant.methodcallWithPorts.stage.Counter;
import teetime.variant.methodcallWithPorts.stage.ElementDelayMeasuringStage;
import teetime.variant.methodcallWithPorts.stage.ElementThroughputMeasuringStage;
import teetime.variant.methodcallWithPorts.stage.EndStage;
import teetime.variant.methodcallWithPorts.stage.InstanceCounter;
import teetime.variant.methodcallWithPorts.stage.InstanceOfFilter;
import teetime.variant.methodcallWithPorts.stage.Relay;
import teetime.variant.methodcallWithPorts.stage.basic.Sink;
import teetime.variant.methodcallWithPorts.stage.basic.distributor.Distributor;
import teetime.variant.methodcallWithPorts.stage.io.TCPReader;
import teetime.variant.methodcallWithPorts.stage.kieker.traceReconstruction.TraceReconstructionFilter;
......@@ -55,38 +55,38 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis {
@Override
public void init() {
super.init();
StageWithPort<Void, IMonitoringRecord> tcpPipeline = this.buildTcpPipeline();
this.tcpThread = new Thread(new RunnableStage<Void>(tcpPipeline));
Pipeline<TCPReader, Distributor<IMonitoringRecord>> tcpPipeline = this.buildTcpPipeline();
this.tcpThread = new Thread(new RunnableStage(tcpPipeline));
StageWithPort<Void, Long> clockStage = this.buildClockPipeline(1000);
this.clockThread = new Thread(new RunnableStage<Void>(clockStage));
Pipeline<Clock, Distributor<Long>> clockStage = this.buildClockPipeline(1000);
this.clockThread = new Thread(new RunnableStage(clockStage));
StageWithPort<Void, Long> clock2Stage = this.buildClockPipeline(5000);
this.clock2Thread = new Thread(new RunnableStage<Void>(clock2Stage));
Pipeline<Clock, Distributor<Long>> clock2Stage = this.buildClockPipeline(5000);
this.clock2Thread = new Thread(new RunnableStage(clock2Stage));
this.numWorkerThreads = Math.min(NUM_VIRTUAL_CORES, this.numWorkerThreads);
this.workerThreads = new Thread[this.numWorkerThreads];
for (int i = 0; i < this.workerThreads.length; i++) {
StageWithPort<IMonitoringRecord, ?> pipeline = this.buildPipeline(tcpPipeline, clockStage, clock2Stage);
this.workerThreads[i] = new Thread(new RunnableStage<IMonitoringRecord>(pipeline));
StageWithPort pipeline = this.buildPipeline(tcpPipeline, clockStage, clock2Stage);
this.workerThreads[i] = new Thread(new RunnableStage(pipeline));
}
}
private StageWithPort<Void, IMonitoringRecord> buildTcpPipeline() {
private Pipeline<TCPReader, Distributor<IMonitoringRecord>> buildTcpPipeline() {
TCPReader tcpReader = new TCPReader();
Distributor<IMonitoringRecord> distributor = new Distributor<IMonitoringRecord>();
SingleElementPipe.connect(tcpReader.getOutputPort(), distributor.getInputPort());
// create and configure pipeline
Pipeline<Void, IMonitoringRecord> pipeline = new Pipeline<Void, IMonitoringRecord>();
Pipeline<TCPReader, Distributor<IMonitoringRecord>> pipeline = new Pipeline<TCPReader, Distributor<IMonitoringRecord>>();
pipeline.setFirstStage(tcpReader);
pipeline.setLastStage(distributor);
return pipeline;
}
private StageWithPort<Void, Long> buildClockPipeline(final long intervalDelayInMs) {
private Pipeline<Clock, Distributor<Long>> buildClockPipeline(final long intervalDelayInMs) {
Clock clock = new Clock();
clock.setInitialDelayInMs(intervalDelayInMs);
clock.setIntervalDelayInMs(intervalDelayInMs);
......@@ -95,13 +95,13 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis {
SingleElementPipe.connect(clock.getOutputPort(), distributor.getInputPort());
// create and configure pipeline
Pipeline<Void, Long> pipeline = new Pipeline<Void, Long>();
Pipeline<Clock, Distributor<Long>> pipeline = new Pipeline<Clock, Distributor<Long>>();
pipeline.setFirstStage(clock);
pipeline.setLastStage(distributor);
return pipeline;
}
private static class StageFactory<T extends StageWithPort<?, ?>> {
private static class StageFactory<T extends StageWithPort> {
private final Constructor<T> constructor;
private final List<T> stages = new ArrayList<T>();
......@@ -155,9 +155,9 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis {
}
}
private Pipeline<IMonitoringRecord, ?> buildPipeline(final StageWithPort<Void, IMonitoringRecord> tcpReaderPipeline,
final StageWithPort<Void, Long> clockStage,
final StageWithPort<Void, Long> clock2Stage) {
private Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> buildPipeline(final Pipeline<TCPReader, Distributor<IMonitoringRecord>> tcpReaderPipeline,
final Pipeline<Clock, Distributor<Long>> clockStage,
final Pipeline<Clock, Distributor<Long>> clock2Stage) {
// create stages
Relay<IMonitoringRecord> relay = new Relay<IMonitoringRecord>();
Counter<IMonitoringRecord> recordCounter = this.recordCounterFactory.create();
......@@ -169,10 +169,10 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis {
TraceReductionFilter traceReductionFilter = new TraceReductionFilter(this.trace2buffer);
Counter<TraceEventRecords> traceCounter = this.traceCounterFactory.create();
ElementThroughputMeasuringStage<TraceEventRecords> traceThroughputFilter = this.traceThroughputFilterFactory.create();
EndStage<TraceEventRecords> endStage = new EndStage<TraceEventRecords>();
Sink<TraceEventRecords> endStage = new Sink<TraceEventRecords>();
// connect stages
this.tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
this.tcpRelayPipe = SpScPipe.connect(tcpReaderPipeline.getLastStage().getNewOutputPort(), relay.getInputPort(), TCP_RELAY_MAX_SIZE);
SingleElementPipe.connect(relay.getOutputPort(), recordCounter.getInputPort());
SingleElementPipe.connect(recordCounter.getOutputPort(), traceMetadataCounter.getInputPort());
......@@ -186,11 +186,11 @@ public class TcpTraceReductionAnalysisWithThreads extends Analysis {
// SingleElementPipe.connect(traceReconstructionFilter.getOutputPort(), traceThroughputFilter.getInputPort());
// SingleElementPipe.connect(traceThroughputFilter.getOutputPort(), endStage.getInputPort());
SpScPipe.connect(clock2Stage.getOutputPort(), traceReductionFilter.getTriggerInputPort(), 10);
SpScPipe.connect(clockStage.getOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10);
SpScPipe.connect(clock2Stage.getLastStage().getNewOutputPort(), traceReductionFilter.getTriggerInputPort(), 10);
SpScPipe.connect(clockStage.getLastStage().getNewOutputPort(), traceThroughputFilter.getTriggerInputPort(), 10);
// create and configure pipeline
Pipeline<IMonitoringRecord, TraceEventRecords> pipeline = new Pipeline<IMonitoringRecord, TraceEventRecords>();
Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>> pipeline = new Pipeline<Relay<IMonitoringRecord>, Sink<TraceEventRecords>>();
pipeline.setFirstStage(relay);
pipeline.addIntermediateStage(recordCounter);
pipeline.addIntermediateStage(traceMetadataCounter);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment