diff --git a/src/main/java/teetime/framework/Pipeline.java b/src/main/java/teetime/framework/Pipeline.java index 8df419ce2458bcd4af1f44111f3331c660f662d4..27d11375efa5b38c472561c2af4e4f21c826e16f 100644 --- a/src/main/java/teetime/framework/Pipeline.java +++ b/src/main/java/teetime/framework/Pipeline.java @@ -12,7 +12,7 @@ import teetime.framework.validation.InvalidPortConnection; * @param <L> * the type of the last stage in this pipeline */ -// Consider to move it in the framework +// TODO Consider to move it in the framework public final class Pipeline<L extends Stage> extends Stage { private final Stage firstStage; @@ -39,26 +39,11 @@ public final class Pipeline<L extends Stage> extends Stage { return firstStage.shouldBeTerminated(); } - @Override - public String getId() { - return firstStage.getId(); - } - @Override public void executeWithPorts() { firstStage.executeWithPorts(); } - @Override - public Stage getParentStage() { - return firstStage.getParentStage(); - } - - @Override - public void setParentStage(final Stage parentStage, final int index) { - firstStage.setParentStage(parentStage, index); - } - @Override public void onSignal(final ISignal signal, final InputPort<?> inputPort) { firstStage.onSignal(signal, inputPort); diff --git a/src/main/java/teetime/stage/stringBuffer/handler/IMonitoringRecordHandler.java b/src/main/java/teetime/stage/string/buffer/handler/MonitoringRecordHandler.java similarity index 92% rename from src/main/java/teetime/stage/stringBuffer/handler/IMonitoringRecordHandler.java rename to src/main/java/teetime/stage/string/buffer/handler/MonitoringRecordHandler.java index ee345747f9ab70a1ec5e9f8ce26524fec8dd2e4d..bc0ddd8c51c892cbcbd6ec5d787a51c31d2db61c 100644 --- a/src/main/java/teetime/stage/stringBuffer/handler/IMonitoringRecordHandler.java +++ b/src/main/java/teetime/stage/string/buffer/handler/MonitoringRecordHandler.java @@ -13,19 +13,18 @@ * See the License for the specific language governing permissions and * limitations under the License. ***************************************************************************/ -package teetime.stage.stringBuffer.handler; +package teetime.stage.string.buffer.handler; import kieker.common.exception.MonitoringRecordException; import kieker.common.record.AbstractMonitoringRecord; import kieker.common.record.IMonitoringRecord; - /** * @author Christian Wulf * - * @since 1.10 + * @since 1.0 */ -public class IMonitoringRecordHandler extends AbstractDataTypeHandler<IMonitoringRecord> { +public final class MonitoringRecordHandler extends AbstractDataTypeHandler<IMonitoringRecord> { @Override public boolean canHandle(final Object object) { diff --git a/src/main/java/teetime/stage/stringBuffer/StringBufferFilter.java b/src/main/java/teetime/stage/stringBuffer/StringBufferFilter.java deleted file mode 100644 index 24d030f52644fabe78d4bcfb9a3b29b27da1797c..0000000000000000000000000000000000000000 --- a/src/main/java/teetime/stage/stringBuffer/StringBufferFilter.java +++ /dev/null @@ -1,86 +0,0 @@ -/*************************************************************************** - * Copyright 2014 Kieker Project (http://kieker-monitoring.net) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - ***************************************************************************/ -package teetime.stage.stringBuffer; - -import java.util.Collection; -import java.util.LinkedList; - -import teetime.framework.AbstractConsumerStage; -import teetime.framework.OutputPort; -import teetime.stage.stringBuffer.handler.AbstractDataTypeHandler; -import teetime.stage.stringBuffer.util.KiekerHashMap; - -/** - * @author Christian Wulf - * - * @since 1.10 - */ -public class StringBufferFilter<T> extends AbstractConsumerStage<T> { - - private final OutputPort<T> outputPort = this.createOutputPort(); - - // BETTER use a non shared data structure to avoid synchronization between threads - private KiekerHashMap kiekerHashMap = new KiekerHashMap(); - - private Collection<AbstractDataTypeHandler<?>> dataTypeHandlers = new LinkedList<AbstractDataTypeHandler<?>>(); - - @Override - protected void execute(final T element) { - final T returnedElement = this.handle(element); - outputPort.send(returnedElement); - } - - @Override - public void onStarting() throws Exception { - super.onStarting(); - for (final AbstractDataTypeHandler<?> handler : this.dataTypeHandlers) { - handler.setLogger(this.logger); - handler.setStringRepository(this.kiekerHashMap); - } - } - - private T handle(final T object) { - for (final AbstractDataTypeHandler<?> handler : this.dataTypeHandlers) { - if (handler.canHandle(object)) { - @SuppressWarnings("unchecked") - final T returnedObject = ((AbstractDataTypeHandler<T>) handler).handle(object); - return returnedObject; - } - } - return object; // else relay given object - } - - public KiekerHashMap getKiekerHashMap() { - return this.kiekerHashMap; - } - - public void setKiekerHashMap(final KiekerHashMap kiekerHashMap) { - this.kiekerHashMap = kiekerHashMap; - } - - public Collection<AbstractDataTypeHandler<?>> getDataTypeHandlers() { - return this.dataTypeHandlers; - } - - public void setDataTypeHandlers(final Collection<AbstractDataTypeHandler<?>> dataTypeHandlers) { - this.dataTypeHandlers = dataTypeHandlers; - } - - public OutputPort<T> getOutputPort() { - return outputPort; - } - -} diff --git a/src/performancetest/java/teetime/examples/traceReconstruction/ChwHomeTraceReconstructionAnalysisTest.java b/src/performancetest/java/teetime/examples/traceReconstruction/ChwHomeTraceReconstructionAnalysisTest.java index 704a8b26e1245e82177a3967e71520f433b7c0a6..6a9d9a738f5dcc15cd7b6ea596c3867ce73e24b1 100644 --- a/src/performancetest/java/teetime/examples/traceReconstruction/ChwHomeTraceReconstructionAnalysisTest.java +++ b/src/performancetest/java/teetime/examples/traceReconstruction/ChwHomeTraceReconstructionAnalysisTest.java @@ -59,9 +59,7 @@ public class ChwHomeTraceReconstructionAnalysisTest { @Test public void performAnalysisWithEprintsLogs() { - final TraceReconstructionAnalysis analysis = new TraceReconstructionAnalysis(); - analysis.setInputDir(new File(RESOURCE_DIR + "data/Eprints-logs")); - analysis.init(); + final TraceReconstructionAnalysis analysis = new TraceReconstructionAnalysis(new File(RESOURCE_DIR + "data/Eprints-logs")); this.stopWatch.start(); try { @@ -88,9 +86,7 @@ public class ChwHomeTraceReconstructionAnalysisTest { @Test public void performAnalysisWithKiekerLogs() { - final TraceReconstructionAnalysis analysis = new TraceReconstructionAnalysis(); - analysis.setInputDir(new File(RESOURCE_DIR + "data/kieker-logs")); - analysis.init(); + final TraceReconstructionAnalysis analysis = new TraceReconstructionAnalysis(new File(RESOURCE_DIR + "data/kieker-logs")); this.stopWatch.start(); try { @@ -117,9 +113,7 @@ public class ChwHomeTraceReconstructionAnalysisTest { @Test public void performAnalysisWithKieker2Logs() { - final TraceReconstructionAnalysis analysis = new TraceReconstructionAnalysis(); - analysis.setInputDir(new File(RESOURCE_DIR + "data/kieker2-logs")); - analysis.init(); + final TraceReconstructionAnalysis analysis = new TraceReconstructionAnalysis(new File(RESOURCE_DIR + "data/kieker2-logs")); this.stopWatch.start(); try { diff --git a/src/performancetest/java/teetime/examples/traceReconstruction/ChwWorkTraceReconstructionAnalysisTest.java b/src/performancetest/java/teetime/examples/traceReconstruction/ChwWorkTraceReconstructionAnalysisTest.java index c7438eca97eedc4c28fafb7f1a367556c18f2ff0..9fb77d98987d7135d68072ae9f4f8b36e2ab9c9f 100644 --- a/src/performancetest/java/teetime/examples/traceReconstruction/ChwWorkTraceReconstructionAnalysisTest.java +++ b/src/performancetest/java/teetime/examples/traceReconstruction/ChwWorkTraceReconstructionAnalysisTest.java @@ -57,9 +57,7 @@ public class ChwWorkTraceReconstructionAnalysisTest { @Test public void performAnalysisWithEprintsLogs() { - final TraceReconstructionAnalysis analysis = new TraceReconstructionAnalysis(); - analysis.setInputDir(new File("src/test/data/Eprints-logs")); - analysis.init(); + final TraceReconstructionAnalysis analysis = new TraceReconstructionAnalysis(new File("src/test/data/Eprints-logs")); this.stopWatch.start(); try { @@ -84,9 +82,7 @@ public class ChwWorkTraceReconstructionAnalysisTest { @Test public void performAnalysisWithKiekerLogs() { - final TraceReconstructionAnalysis analysis = new TraceReconstructionAnalysis(); - analysis.setInputDir(new File("src/test/data/kieker-logs")); - analysis.init(); + final TraceReconstructionAnalysis analysis = new TraceReconstructionAnalysis(new File("src/test/data/kieker-logs")); this.stopWatch.start(); try { @@ -113,9 +109,7 @@ public class ChwWorkTraceReconstructionAnalysisTest { @Test public void performAnalysisWithKieker2Logs() { - final TraceReconstructionAnalysis analysis = new TraceReconstructionAnalysis(); - analysis.setInputDir(new File("src/test/data/kieker2-logs")); - analysis.init(); + final TraceReconstructionAnalysis analysis = new TraceReconstructionAnalysis(new File("src/test/data/kieker2-logs")); this.stopWatch.start(); try { diff --git a/src/performancetest/java/teetime/examples/traceReconstruction/TraceReconstructionAnalysis.java b/src/performancetest/java/teetime/examples/traceReconstruction/TraceReconstructionAnalysis.java index 0b9882c8a85874ce586549d47e17a6450ea19af2..3341f172ed8403fc4a936080b531203e07744eb6 100644 --- a/src/performancetest/java/teetime/examples/traceReconstruction/TraceReconstructionAnalysis.java +++ b/src/performancetest/java/teetime/examples/traceReconstruction/TraceReconstructionAnalysis.java @@ -4,10 +4,12 @@ import java.io.File; import java.util.LinkedList; import java.util.List; -import teetime.framework.Stage; +import teetime.framework.AnalysisConfiguration; import teetime.framework.RunnableStage; -import teetime.framework.pipe.SingleElementPipe; -import teetime.framework.pipe.SpScPipe; +import teetime.framework.Stage; +import teetime.framework.pipe.IPipeFactory; +import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering; +import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication; import teetime.stage.Cache; import teetime.stage.Clock; import teetime.stage.CollectorSink; @@ -18,9 +20,9 @@ import teetime.stage.InstanceOfFilter; import teetime.stage.basic.merger.Merger; import teetime.stage.className.ClassNameRegistryRepository; import teetime.stage.io.filesystem.Dir2RecordsFilter; -import teetime.stage.stringBuffer.StringBufferFilter; -import teetime.stage.stringBuffer.handler.IMonitoringRecordHandler; -import teetime.stage.stringBuffer.handler.StringHandler; +import teetime.stage.string.buffer.StringBufferFilter; +import teetime.stage.string.buffer.handler.MonitoringRecordHandler; +import teetime.stage.string.buffer.handler.StringHandler; import teetime.stage.trace.traceReconstruction.TraceReconstructionFilter; import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault; import teetime.util.concurrent.hashmap.TraceBuffer; @@ -30,23 +32,32 @@ import kieker.common.record.IMonitoringRecord; import kieker.common.record.flow.IFlowRecord; // TODO extends AnalysisConfiguration -public class TraceReconstructionAnalysis { +public class TraceReconstructionAnalysis extends AnalysisConfiguration { private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>(); private Thread clockThread; private Thread workerThread; - private ClassNameRegistryRepository classNameRegistryRepository; - private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer()); + private final File inputDir; + private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace; + private final IPipeFactory intraThreadPipeFactory; + private final IPipeFactory interThreadPipeFactory; + private ClassNameRegistryRepository classNameRegistryRepository; private Counter<IMonitoringRecord> recordCounter; private Counter<TraceEventRecords> traceCounter; private ElementThroughputMeasuringStage<IFlowRecord> throughputFilter; - private File inputDir; + public TraceReconstructionAnalysis(final File inputDir) { + this.inputDir = inputDir; + traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer()); + intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false); + interThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false); + init(); + } - public void init() { + private void init() { Clock clockStage = this.buildClockPipeline(); this.clockThread = new Thread(new RunnableStage(clockStage)); @@ -80,24 +91,24 @@ public class TraceReconstructionAnalysis { final CollectorSink<TraceEventRecords> collector = new CollectorSink<TraceEventRecords>(this.elementCollection); // configure stages - stringBufferFilter.getDataTypeHandlers().add(new IMonitoringRecordHandler()); + stringBufferFilter.getDataTypeHandlers().add(new MonitoringRecordHandler()); stringBufferFilter.getDataTypeHandlers().add(new StringHandler()); // connect stages - SingleElementPipe.connect(initialElementProducer.getOutputPort(), dir2RecordsFilter.getInputPort()); - SingleElementPipe.connect(dir2RecordsFilter.getOutputPort(), this.recordCounter.getInputPort()); - SingleElementPipe.connect(this.recordCounter.getOutputPort(), cache.getInputPort()); - SingleElementPipe.connect(cache.getOutputPort(), stringBufferFilter.getInputPort()); - SingleElementPipe.connect(stringBufferFilter.getOutputPort(), instanceOfFilter.getInputPort()); - SingleElementPipe.connect(instanceOfFilter.getOutputPort(), this.throughputFilter.getInputPort()); - SingleElementPipe.connect(this.throughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); - // SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); - SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), merger.getNewInputPort()); - SingleElementPipe.connect(traceReconstructionFilter.getTraceInvalidOutputPort(), merger.getNewInputPort()); - SingleElementPipe.connect(merger.getOutputPort(), this.traceCounter.getInputPort()); - SingleElementPipe.connect(this.traceCounter.getOutputPort(), collector.getInputPort()); - - SpScPipe.connect(clockStage.getOutputPort(), this.throughputFilter.getTriggerInputPort(), 1); + intraThreadPipeFactory.create(initialElementProducer.getOutputPort(), dir2RecordsFilter.getInputPort()); + intraThreadPipeFactory.create(dir2RecordsFilter.getOutputPort(), this.recordCounter.getInputPort()); + intraThreadPipeFactory.create(this.recordCounter.getOutputPort(), cache.getInputPort()); + intraThreadPipeFactory.create(cache.getOutputPort(), stringBufferFilter.getInputPort()); + intraThreadPipeFactory.create(stringBufferFilter.getOutputPort(), instanceOfFilter.getInputPort()); + intraThreadPipeFactory.create(instanceOfFilter.getOutputPort(), this.throughputFilter.getInputPort()); + intraThreadPipeFactory.create(this.throughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); + // intraThreadPipeFactory.create(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); + intraThreadPipeFactory.create(traceReconstructionFilter.getTraceValidOutputPort(), merger.getNewInputPort()); + intraThreadPipeFactory.create(traceReconstructionFilter.getTraceInvalidOutputPort(), merger.getNewInputPort()); + intraThreadPipeFactory.create(merger.getOutputPort(), this.traceCounter.getInputPort()); + intraThreadPipeFactory.create(this.traceCounter.getOutputPort(), collector.getInputPort()); + + interThreadPipeFactory.create(clockStage.getOutputPort(), this.throughputFilter.getTriggerInputPort(), 1); return initialElementProducer; } @@ -141,7 +152,4 @@ public class TraceReconstructionAnalysis { return this.inputDir; } - public void setInputDir(final File inputDir) { - this.inputDir = inputDir; - } }