Skip to content
Snippets Groups Projects
Commit 4fd559b7 authored by Sören Henning's avatar Sören Henning
Browse files

refactored GraphML writer stages pipe and filter architecture

parent 50a2ab04
No related branches found
No related tags found
1 merge request!17Get impletemented stages and Java 8
Pipeline #
Showing
with 253 additions and 92 deletions
......@@ -21,7 +21,7 @@ import kieker.analysis.trace.traversal.AggrTraceTraverserStage;
import kieker.analysis.trace.traversal.TraceTraverserStage;
import kieker.analysis.util.graph.Graph;
import kieker.analysis.util.graph.export.dot.DotFileWriterStage;
import kieker.analysis.util.graph.export.graphml.GraphMLFileWriterComposite;
import kieker.analysis.util.graph.export.graphml.GraphMLFileWriterStage;
import kieker.common.record.IMonitoringRecord;
import kieker.common.record.misc.KiekerMetadataRecord;
......@@ -75,7 +75,7 @@ public class TraceAnalysisConfiguration extends Configuration {
TraceTraverserStage traceTraverserStage = new TraceTraverserStage();
final Distributor<Graph> graphDistributor = new Distributor<>(new CopyByReferenceStrategy());// TODO use clone
GraphMLFileWriterComposite graphMLFileWriterComposite = new GraphMLFileWriterComposite(graphFilesOutputDir);
GraphMLFileWriterStage graphMLFileWriterComposite = new GraphMLFileWriterStage(graphFilesOutputDir);
// DotTraceStyleStage dotTraceStyleStage = new DotTraceStyleStage();
DotFileWriterStage dotFileWriterStage = new DotFileWriterStage(graphFilesOutputDir);
......@@ -89,7 +89,7 @@ public class TraceAnalysisConfiguration extends Configuration {
final Distributor<AggregatedTrace> aggregatedTraceDistributor = new Distributor<>(new CopyByReferenceStrategy());
AggrTraceTraverserStage aggrTraceTraverser = new AggrTraceTraverserStage();
final Distributor<Graph> graphDistributor2 = new Distributor<>(new CopyByReferenceStrategy()); // TODO use clone
GraphMLFileWriterComposite graphMLFileWriterComposite2 = new GraphMLFileWriterComposite(graphFilesOutputDir);
GraphMLFileWriterStage graphMLFileWriterComposite2 = new GraphMLFileWriterStage(graphFilesOutputDir);
// DotTraceStyleStage dotAggrTraceStyleStage = new DotTraceStyleStage();
DotFileWriterStage dotFileWriterStage2 = new DotFileWriterStage(graphFilesOutputDir);
......
......@@ -5,12 +5,12 @@ import java.io.IOException;
import com.tinkerpop.blueprints.Graph;
import kieker.analysis.util.blueprintsgraph.NamedGraph;
import kieker.analysis.util.graph.export.graphml.GraphMLFileWriterComposite;
import kieker.analysis.util.graph.export.graphml.GraphMLFileWriterStage;
import teetime.framework.AbstractConsumerStage;
/**
* @deprecated use {@link GraphMLFileWriterComposite} instead.
* @deprecated use {@link GraphMLFileWriterStage} instead.
*
* @author Sören Henning
*
......
package kieker.analysis.util;
import java.util.LinkedList;
import java.util.Queue;
import teetime.framework.AbstractStage;
import teetime.framework.InputPort;
public abstract class AbstractCombinerStage<I, J> extends AbstractStage {
protected final InputPort<I> inputPort1 = this.createInputPort();
protected final InputPort<J> inputPort2 = this.createInputPort();
private final Queue<I> elements1 = new LinkedList<>();
private final Queue<J> elements2 = new LinkedList<>();
public final InputPort<I> getInputPort1() {
return this.inputPort1;
}
public final InputPort<J> getInputPort2() {
return this.inputPort2;
}
@Override
protected void execute() {
final I element1 = this.getInputPort1().receive();
if (element1 != null) {
elements1.add(element1);
}
final J element2 = this.getInputPort2().receive();
if (element2 != null) {
elements2.add(element2);
}
if (elements1.size() > 0 && elements2.size() > 0) {
this.combine(elements1.poll(), elements2.poll());
}
}
protected abstract void combine(final I element1, final J element2);
}
package kieker.analysis.util;
import java.util.function.Function;
import teetime.stage.basic.AbstractTransformation;
public class FunctionStage<I, O> extends AbstractTransformation<I, O> {
private Function<I, O> function;
public FunctionStage(final Function<I, O> function) {
super();
this.function = function;
}
public Function<I, O> getFunction() {
return function;
}
public void setFunction(final Function<I, O> function) {
this.function = function;
}
@Override
protected void execute(final I element) {
final O transformedElement = function.apply(element);
this.getOutputPort().send(transformedElement);
}
}
......@@ -4,6 +4,7 @@ import java.io.OutputStream;
import javax.xml.bind.JAXBElement;
@Deprecated
public class JAXBMarshalElement<T> {
private JAXBElement<T> element;
......
package kieker.analysis.util;
import java.io.OutputStream;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBElement;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
import teetime.framework.AbstractConsumerStage;
/**
* This stage marshals the content tree rooted at an incoming element into an
* output stream.
* This stage marshals the content tree rooted at an incoming element at the
* first input port into an output stream at the second input port.
*
* A class object has to be passed at creation. Only elements of this type
* wrapped in a {@code JAXBElement} could be marshaled.
*
* Incoming elements must be {@code JAXBMarshalElement} which stores the
* {@code JAXBElement} and an output stream.
*
* @author Sören Henning
*
*/
public class JAXBMarshalStage<T> extends AbstractConsumerStage<JAXBMarshalElement<T>> {
public class JAXBMarshalStage<T> extends AbstractCombinerStage<JAXBElement<T>, OutputStream> {
private static final Boolean FORMATTED_OUTPUT_DEFAULT = Boolean.TRUE;
......@@ -40,9 +38,10 @@ public class JAXBMarshalStage<T> extends AbstractConsumerStage<JAXBMarshalElemen
}
@Override
protected void execute(final JAXBMarshalElement<T> element) {
protected void combine(final JAXBElement<T> jaxbElement, final OutputStream outputStream) {
try {
marshaller.marshal(element.getElement(), element.getOutputStream());
marshaller.marshal(jaxbElement, outputStream);
} catch (JAXBException e) {
// TODO Exception
throw new IllegalStateException("The received element could not be marshalled.", e);
......
package kieker.analysis.util;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
import teetime.framework.AbstractConsumerStage;
/**
* This stage marshals the content tree rooted at an incoming element into an
* output stream.
*
* A class object has to be passed at creation. Only elements of this type
* wrapped in a {@code JAXBElement} could be marshaled.
*
* Incoming elements must be {@code JAXBMarshalElement} which stores the
* {@code JAXBElement} and an output stream.
*
* @author Sören Henning
*
*/
@Deprecated
public class JAXBMarshalStageOld<T> extends AbstractConsumerStage<JAXBMarshalElement<T>> {
private static final Boolean FORMATTED_OUTPUT_DEFAULT = Boolean.TRUE;
private final Marshaller marshaller;
public JAXBMarshalStageOld(final Class<T> elementsClass) {
this(elementsClass, FORMATTED_OUTPUT_DEFAULT);
}
public JAXBMarshalStageOld(final Class<T> elementsClass, final Boolean formattedOutput) {
try {
this.marshaller = JAXBContext.newInstance(elementsClass).createMarshaller();
marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, formattedOutput);
} catch (JAXBException e) {
// TODO Exception
throw new IllegalStateException(e);
}
}
@Override
protected void execute(final JAXBMarshalElement<T> element) {
try {
marshaller.marshal(element.getElement(), element.getOutputStream());
} catch (JAXBException e) {
// TODO Exception
throw new IllegalStateException("The received element could not be marshalled.", e);
}
}
}
......@@ -8,9 +8,9 @@ import kieker.analysis.util.graph.Graph;
import kieker.analysis.util.graph.mapping.SimpleFileNameMapper;
import kieker.analysis.util.graph.util.FileExtension;
public class GraphMLFileWriterComposite extends GraphMLWriterComposite {
public class GraphMLFileWriterStage extends GraphMLWriterStage {
public GraphMLFileWriterComposite(final Function<Graph, String> fileNameMapper) {
public GraphMLFileWriterStage(final Function<Graph, String> fileNameMapper) {
super(fileNameMapper.andThen(fileName -> {
try {
return new FileOutputStream(fileName);
......@@ -20,7 +20,7 @@ public class GraphMLFileWriterComposite extends GraphMLWriterComposite {
}));
}
public GraphMLFileWriterComposite(final String outputDirectory) {
public GraphMLFileWriterStage(final String outputDirectory) {
this(new SimpleFileNameMapper(outputDirectory, FileExtension.GRAPHML));
}
......
package kieker.analysis.util.graph.export.graphml;
import java.io.OutputStream;
import java.util.function.Function;
import javax.xml.bind.JAXBElement;
import org.graphdrawing.graphml.GraphmlType;
import org.graphdrawing.graphml.ObjectFactory;
import kieker.analysis.util.JAXBMarshalElement;
import kieker.analysis.util.graph.Graph;
import teetime.stage.basic.AbstractTransformation;
public class GraphMLTransformationStage extends AbstractTransformation<Graph, JAXBMarshalElement<GraphmlType>> {
private final ObjectFactory objectFactory = new ObjectFactory();
private final Function<Graph, OutputStream> outputStreamMapper;
public GraphMLTransformationStage(final Function<Graph, OutputStream> outputStreamMapper) {
this.outputStreamMapper = outputStreamMapper;
}
@Override
protected void execute(final Graph graph) {
final GraphTypeTransformer graphTypeTransformer = new GraphTypeTransformer(graph);
final GraphmlType graphmlType = new GraphmlType();
graphmlType.getGraphOrData().add(graphTypeTransformer.transform());
final JAXBElement<GraphmlType> jaxbElement = objectFactory.createGraphml(graphmlType);
final OutputStream outputStream = outputStreamMapper.apply(graph);
final JAXBMarshalElement<GraphmlType> marshalElement = new JAXBMarshalElement<>(jaxbElement, outputStream);
this.getOutputPort().send(marshalElement);
}
}
package kieker.analysis.util.graph.export.graphml;
import org.graphdrawing.graphml.GraphType;
import org.graphdrawing.graphml.GraphmlType;
import teetime.stage.basic.AbstractTransformation;
public class GraphMLTypeTransformationStage extends AbstractTransformation<GraphType, GraphmlType> {
@Override
protected void execute(final GraphType graphType) {
final GraphmlType graphmlType = new GraphmlType();
graphmlType.getGraphOrData().add(graphType);
this.getOutputPort().send(graphmlType);
}
}
package kieker.analysis.util.graph.export.graphml;
import java.io.OutputStream;
import java.util.function.Function;
import org.graphdrawing.graphml.GraphmlType;
import kieker.analysis.util.JAXBMarshalStage;
import kieker.analysis.util.graph.Graph;
import teetime.framework.CompositeStage;
import teetime.framework.InputPort;
public class GraphMLWriterComposite extends CompositeStage {
private final InputPort<Graph> inputPort;
public GraphMLWriterComposite(final Function<Graph, OutputStream> outputStreamMapper) {
final GraphMLTransformationStage graphTypeTransformer;
final JAXBMarshalStage<GraphmlType> jaxbMarshaller;
graphTypeTransformer = new GraphMLTransformationStage(outputStreamMapper);
jaxbMarshaller = new JAXBMarshalStage<GraphmlType>(GraphmlType.class);
this.inputPort = graphTypeTransformer.getInputPort();
super.connectPorts(graphTypeTransformer.getOutputPort(), jaxbMarshaller.getInputPort());
}
public InputPort<Graph> getInputPort() {
return this.inputPort;
}
}
package kieker.analysis.util.graph.export.graphml;
import java.io.OutputStream;
import java.util.function.Function;
import org.graphdrawing.graphml.GraphmlType;
import kieker.analysis.util.FunctionStage;
import kieker.analysis.util.JAXBMarshalStage;
import kieker.analysis.util.graph.Graph;
import teetime.framework.CompositeStage;
import teetime.framework.InputPort;
import teetime.stage.basic.distributor.Distributor;
import teetime.stage.basic.distributor.strategy.CopyByReferenceStrategy;
public class GraphMLWriterStage extends CompositeStage {
private final InputPort<Graph> inputPort;
public GraphMLWriterStage(final Function<Graph, OutputStream> outputStreamMapper) {
final Distributor<Graph> graphDistributor = new Distributor<>(new CopyByReferenceStrategy());
final GraphTypeTransformationStage graphTypeTransformer = new GraphTypeTransformationStage();
final GraphMLTypeTransformationStage graphMLTypeTransformer = new GraphMLTypeTransformationStage();
final JAXBElementWrapperStage jaxbElementWrapper = new JAXBElementWrapperStage();
final JAXBMarshalStage<GraphmlType> jaxbMarshaller = new JAXBMarshalStage<>(GraphmlType.class);
final FunctionStage<Graph, OutputStream> outputStreamMapperStage = new FunctionStage<>(outputStreamMapper);
this.inputPort = graphDistributor.getInputPort();
super.connectPorts(graphDistributor.getNewOutputPort(), graphTypeTransformer.getInputPort());
super.connectPorts(graphTypeTransformer.getOutputPort(), graphMLTypeTransformer.getInputPort());
super.connectPorts(graphMLTypeTransformer.getOutputPort(), jaxbElementWrapper.getInputPort());
super.connectPorts(jaxbElementWrapper.getOutputPort(), jaxbMarshaller.getInputPort1());
super.connectPorts(graphDistributor.getNewOutputPort(), outputStreamMapperStage.getInputPort());
super.connectPorts(outputStreamMapperStage.getOutputPort(), jaxbMarshaller.getInputPort2());
}
public InputPort<Graph> getInputPort() {
return this.inputPort;
}
}
package kieker.analysis.util.graph.export.graphml;
import org.graphdrawing.graphml.GraphType;
import kieker.analysis.util.graph.Graph;
import teetime.stage.basic.AbstractTransformation;
public class GraphTypeTransformationStage extends AbstractTransformation<Graph, GraphType> {
@Override
protected void execute(final Graph graph) {
final GraphTypeTransformer graphTypeTransformer = new GraphTypeTransformer(graph);
final GraphType graphType = graphTypeTransformer.transform();
this.getOutputPort().send(graphType);
}
}
package kieker.analysis.util.graph.export.graphml;
import javax.xml.bind.JAXBElement;
import org.graphdrawing.graphml.GraphmlType;
import org.graphdrawing.graphml.ObjectFactory;
import teetime.stage.basic.AbstractTransformation;
public class JAXBElementWrapperStage extends AbstractTransformation<GraphmlType, JAXBElement<GraphmlType>> {
private final ObjectFactory objectFactory = new ObjectFactory();
@Override
protected void execute(final GraphmlType graphmlType) {
final JAXBElement<GraphmlType> jaxbElement = objectFactory.createGraphml(graphmlType);
this.getOutputPort().send(jaxbElement);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment