Skip to content
Snippets Groups Projects
Commit 97a71c9f authored by Christian Wulf's avatar Christian Wulf
Browse files

removed StringBufferFilter;

migrated some configurations
parent 80a9810f
No related branches found
No related tags found
No related merge requests found
...@@ -12,7 +12,7 @@ import teetime.framework.validation.InvalidPortConnection; ...@@ -12,7 +12,7 @@ import teetime.framework.validation.InvalidPortConnection;
* @param <L> * @param <L>
* the type of the last stage in this pipeline * the type of the last stage in this pipeline
*/ */
// Consider to move it in the framework // TODO Consider to move it in the framework
public final class Pipeline<L extends Stage> extends Stage { public final class Pipeline<L extends Stage> extends Stage {
private final Stage firstStage; private final Stage firstStage;
...@@ -39,26 +39,11 @@ public final class Pipeline<L extends Stage> extends Stage { ...@@ -39,26 +39,11 @@ public final class Pipeline<L extends Stage> extends Stage {
return firstStage.shouldBeTerminated(); return firstStage.shouldBeTerminated();
} }
@Override
public String getId() {
return firstStage.getId();
}
@Override @Override
public void executeWithPorts() { public void executeWithPorts() {
firstStage.executeWithPorts(); firstStage.executeWithPorts();
} }
@Override
public Stage getParentStage() {
return firstStage.getParentStage();
}
@Override
public void setParentStage(final Stage parentStage, final int index) {
firstStage.setParentStage(parentStage, index);
}
@Override @Override
public void onSignal(final ISignal signal, final InputPort<?> inputPort) { public void onSignal(final ISignal signal, final InputPort<?> inputPort) {
firstStage.onSignal(signal, inputPort); firstStage.onSignal(signal, inputPort);
......
...@@ -13,19 +13,18 @@ ...@@ -13,19 +13,18 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
***************************************************************************/ ***************************************************************************/
package teetime.stage.stringBuffer.handler; package teetime.stage.string.buffer.handler;
import kieker.common.exception.MonitoringRecordException; import kieker.common.exception.MonitoringRecordException;
import kieker.common.record.AbstractMonitoringRecord; import kieker.common.record.AbstractMonitoringRecord;
import kieker.common.record.IMonitoringRecord; import kieker.common.record.IMonitoringRecord;
/** /**
* @author Christian Wulf * @author Christian Wulf
* *
* @since 1.10 * @since 1.0
*/ */
public class IMonitoringRecordHandler extends AbstractDataTypeHandler<IMonitoringRecord> { public final class MonitoringRecordHandler extends AbstractDataTypeHandler<IMonitoringRecord> {
@Override @Override
public boolean canHandle(final Object object) { public boolean canHandle(final Object object) {
......
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.stage.stringBuffer;
import java.util.Collection;
import java.util.LinkedList;
import teetime.framework.AbstractConsumerStage;
import teetime.framework.OutputPort;
import teetime.stage.stringBuffer.handler.AbstractDataTypeHandler;
import teetime.stage.stringBuffer.util.KiekerHashMap;
/**
* @author Christian Wulf
*
* @since 1.10
*/
public class StringBufferFilter<T> extends AbstractConsumerStage<T> {
private final OutputPort<T> outputPort = this.createOutputPort();
// BETTER use a non shared data structure to avoid synchronization between threads
private KiekerHashMap kiekerHashMap = new KiekerHashMap();
private Collection<AbstractDataTypeHandler<?>> dataTypeHandlers = new LinkedList<AbstractDataTypeHandler<?>>();
@Override
protected void execute(final T element) {
final T returnedElement = this.handle(element);
outputPort.send(returnedElement);
}
@Override
public void onStarting() throws Exception {
super.onStarting();
for (final AbstractDataTypeHandler<?> handler : this.dataTypeHandlers) {
handler.setLogger(this.logger);
handler.setStringRepository(this.kiekerHashMap);
}
}
private T handle(final T object) {
for (final AbstractDataTypeHandler<?> handler : this.dataTypeHandlers) {
if (handler.canHandle(object)) {
@SuppressWarnings("unchecked")
final T returnedObject = ((AbstractDataTypeHandler<T>) handler).handle(object);
return returnedObject;
}
}
return object; // else relay given object
}
public KiekerHashMap getKiekerHashMap() {
return this.kiekerHashMap;
}
public void setKiekerHashMap(final KiekerHashMap kiekerHashMap) {
this.kiekerHashMap = kiekerHashMap;
}
public Collection<AbstractDataTypeHandler<?>> getDataTypeHandlers() {
return this.dataTypeHandlers;
}
public void setDataTypeHandlers(final Collection<AbstractDataTypeHandler<?>> dataTypeHandlers) {
this.dataTypeHandlers = dataTypeHandlers;
}
public OutputPort<T> getOutputPort() {
return outputPort;
}
}
...@@ -59,9 +59,7 @@ public class ChwHomeTraceReconstructionAnalysisTest { ...@@ -59,9 +59,7 @@ public class ChwHomeTraceReconstructionAnalysisTest {
@Test @Test
public void performAnalysisWithEprintsLogs() { public void performAnalysisWithEprintsLogs() {
final TraceReconstructionAnalysis analysis = new TraceReconstructionAnalysis(); final TraceReconstructionAnalysis analysis = new TraceReconstructionAnalysis(new File(RESOURCE_DIR + "data/Eprints-logs"));
analysis.setInputDir(new File(RESOURCE_DIR + "data/Eprints-logs"));
analysis.init();
this.stopWatch.start(); this.stopWatch.start();
try { try {
...@@ -88,9 +86,7 @@ public class ChwHomeTraceReconstructionAnalysisTest { ...@@ -88,9 +86,7 @@ public class ChwHomeTraceReconstructionAnalysisTest {
@Test @Test
public void performAnalysisWithKiekerLogs() { public void performAnalysisWithKiekerLogs() {
final TraceReconstructionAnalysis analysis = new TraceReconstructionAnalysis(); final TraceReconstructionAnalysis analysis = new TraceReconstructionAnalysis(new File(RESOURCE_DIR + "data/kieker-logs"));
analysis.setInputDir(new File(RESOURCE_DIR + "data/kieker-logs"));
analysis.init();
this.stopWatch.start(); this.stopWatch.start();
try { try {
...@@ -117,9 +113,7 @@ public class ChwHomeTraceReconstructionAnalysisTest { ...@@ -117,9 +113,7 @@ public class ChwHomeTraceReconstructionAnalysisTest {
@Test @Test
public void performAnalysisWithKieker2Logs() { public void performAnalysisWithKieker2Logs() {
final TraceReconstructionAnalysis analysis = new TraceReconstructionAnalysis(); final TraceReconstructionAnalysis analysis = new TraceReconstructionAnalysis(new File(RESOURCE_DIR + "data/kieker2-logs"));
analysis.setInputDir(new File(RESOURCE_DIR + "data/kieker2-logs"));
analysis.init();
this.stopWatch.start(); this.stopWatch.start();
try { try {
......
...@@ -57,9 +57,7 @@ public class ChwWorkTraceReconstructionAnalysisTest { ...@@ -57,9 +57,7 @@ public class ChwWorkTraceReconstructionAnalysisTest {
@Test @Test
public void performAnalysisWithEprintsLogs() { public void performAnalysisWithEprintsLogs() {
final TraceReconstructionAnalysis analysis = new TraceReconstructionAnalysis(); final TraceReconstructionAnalysis analysis = new TraceReconstructionAnalysis(new File("src/test/data/Eprints-logs"));
analysis.setInputDir(new File("src/test/data/Eprints-logs"));
analysis.init();
this.stopWatch.start(); this.stopWatch.start();
try { try {
...@@ -84,9 +82,7 @@ public class ChwWorkTraceReconstructionAnalysisTest { ...@@ -84,9 +82,7 @@ public class ChwWorkTraceReconstructionAnalysisTest {
@Test @Test
public void performAnalysisWithKiekerLogs() { public void performAnalysisWithKiekerLogs() {
final TraceReconstructionAnalysis analysis = new TraceReconstructionAnalysis(); final TraceReconstructionAnalysis analysis = new TraceReconstructionAnalysis(new File("src/test/data/kieker-logs"));
analysis.setInputDir(new File("src/test/data/kieker-logs"));
analysis.init();
this.stopWatch.start(); this.stopWatch.start();
try { try {
...@@ -113,9 +109,7 @@ public class ChwWorkTraceReconstructionAnalysisTest { ...@@ -113,9 +109,7 @@ public class ChwWorkTraceReconstructionAnalysisTest {
@Test @Test
public void performAnalysisWithKieker2Logs() { public void performAnalysisWithKieker2Logs() {
final TraceReconstructionAnalysis analysis = new TraceReconstructionAnalysis(); final TraceReconstructionAnalysis analysis = new TraceReconstructionAnalysis(new File("src/test/data/kieker2-logs"));
analysis.setInputDir(new File("src/test/data/kieker2-logs"));
analysis.init();
this.stopWatch.start(); this.stopWatch.start();
try { try {
......
...@@ -4,10 +4,12 @@ import java.io.File; ...@@ -4,10 +4,12 @@ import java.io.File;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import teetime.framework.Stage; import teetime.framework.AnalysisConfiguration;
import teetime.framework.RunnableStage; import teetime.framework.RunnableStage;
import teetime.framework.pipe.SingleElementPipe; import teetime.framework.Stage;
import teetime.framework.pipe.SpScPipe; import teetime.framework.pipe.IPipeFactory;
import teetime.framework.pipe.PipeFactoryRegistry.PipeOrdering;
import teetime.framework.pipe.PipeFactoryRegistry.ThreadCommunication;
import teetime.stage.Cache; import teetime.stage.Cache;
import teetime.stage.Clock; import teetime.stage.Clock;
import teetime.stage.CollectorSink; import teetime.stage.CollectorSink;
...@@ -18,9 +20,9 @@ import teetime.stage.InstanceOfFilter; ...@@ -18,9 +20,9 @@ import teetime.stage.InstanceOfFilter;
import teetime.stage.basic.merger.Merger; import teetime.stage.basic.merger.Merger;
import teetime.stage.className.ClassNameRegistryRepository; import teetime.stage.className.ClassNameRegistryRepository;
import teetime.stage.io.filesystem.Dir2RecordsFilter; import teetime.stage.io.filesystem.Dir2RecordsFilter;
import teetime.stage.stringBuffer.StringBufferFilter; import teetime.stage.string.buffer.StringBufferFilter;
import teetime.stage.stringBuffer.handler.IMonitoringRecordHandler; import teetime.stage.string.buffer.handler.MonitoringRecordHandler;
import teetime.stage.stringBuffer.handler.StringHandler; import teetime.stage.string.buffer.handler.StringHandler;
import teetime.stage.trace.traceReconstruction.TraceReconstructionFilter; import teetime.stage.trace.traceReconstruction.TraceReconstructionFilter;
import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault; import teetime.util.concurrent.hashmap.ConcurrentHashMapWithDefault;
import teetime.util.concurrent.hashmap.TraceBuffer; import teetime.util.concurrent.hashmap.TraceBuffer;
...@@ -30,23 +32,32 @@ import kieker.common.record.IMonitoringRecord; ...@@ -30,23 +32,32 @@ import kieker.common.record.IMonitoringRecord;
import kieker.common.record.flow.IFlowRecord; import kieker.common.record.flow.IFlowRecord;
// TODO extends AnalysisConfiguration // TODO extends AnalysisConfiguration
public class TraceReconstructionAnalysis { public class TraceReconstructionAnalysis extends AnalysisConfiguration {
private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>(); private final List<TraceEventRecords> elementCollection = new LinkedList<TraceEventRecords>();
private Thread clockThread; private Thread clockThread;
private Thread workerThread; private Thread workerThread;
private ClassNameRegistryRepository classNameRegistryRepository; private final File inputDir;
private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer()); private final ConcurrentHashMapWithDefault<Long, TraceBuffer> traceId2trace;
private final IPipeFactory intraThreadPipeFactory;
private final IPipeFactory interThreadPipeFactory;
private ClassNameRegistryRepository classNameRegistryRepository;
private Counter<IMonitoringRecord> recordCounter; private Counter<IMonitoringRecord> recordCounter;
private Counter<TraceEventRecords> traceCounter; private Counter<TraceEventRecords> traceCounter;
private ElementThroughputMeasuringStage<IFlowRecord> throughputFilter; private ElementThroughputMeasuringStage<IFlowRecord> throughputFilter;
private File inputDir; public TraceReconstructionAnalysis(final File inputDir) {
this.inputDir = inputDir;
traceId2trace = new ConcurrentHashMapWithDefault<Long, TraceBuffer>(new TraceBuffer());
intraThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTRA, PipeOrdering.ARBITRARY, false);
interThreadPipeFactory = PIPE_FACTORY_REGISTRY.getPipeFactory(ThreadCommunication.INTER, PipeOrdering.QUEUE_BASED, false);
init();
}
public void init() { private void init() {
Clock clockStage = this.buildClockPipeline(); Clock clockStage = this.buildClockPipeline();
this.clockThread = new Thread(new RunnableStage(clockStage)); this.clockThread = new Thread(new RunnableStage(clockStage));
...@@ -80,24 +91,24 @@ public class TraceReconstructionAnalysis { ...@@ -80,24 +91,24 @@ public class TraceReconstructionAnalysis {
final CollectorSink<TraceEventRecords> collector = new CollectorSink<TraceEventRecords>(this.elementCollection); final CollectorSink<TraceEventRecords> collector = new CollectorSink<TraceEventRecords>(this.elementCollection);
// configure stages // configure stages
stringBufferFilter.getDataTypeHandlers().add(new IMonitoringRecordHandler()); stringBufferFilter.getDataTypeHandlers().add(new MonitoringRecordHandler());
stringBufferFilter.getDataTypeHandlers().add(new StringHandler()); stringBufferFilter.getDataTypeHandlers().add(new StringHandler());
// connect stages // connect stages
SingleElementPipe.connect(initialElementProducer.getOutputPort(), dir2RecordsFilter.getInputPort()); intraThreadPipeFactory.create(initialElementProducer.getOutputPort(), dir2RecordsFilter.getInputPort());
SingleElementPipe.connect(dir2RecordsFilter.getOutputPort(), this.recordCounter.getInputPort()); intraThreadPipeFactory.create(dir2RecordsFilter.getOutputPort(), this.recordCounter.getInputPort());
SingleElementPipe.connect(this.recordCounter.getOutputPort(), cache.getInputPort()); intraThreadPipeFactory.create(this.recordCounter.getOutputPort(), cache.getInputPort());
SingleElementPipe.connect(cache.getOutputPort(), stringBufferFilter.getInputPort()); intraThreadPipeFactory.create(cache.getOutputPort(), stringBufferFilter.getInputPort());
SingleElementPipe.connect(stringBufferFilter.getOutputPort(), instanceOfFilter.getInputPort()); intraThreadPipeFactory.create(stringBufferFilter.getOutputPort(), instanceOfFilter.getInputPort());
SingleElementPipe.connect(instanceOfFilter.getOutputPort(), this.throughputFilter.getInputPort()); intraThreadPipeFactory.create(instanceOfFilter.getOutputPort(), this.throughputFilter.getInputPort());
SingleElementPipe.connect(this.throughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); intraThreadPipeFactory.create(this.throughputFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
// SingleElementPipe.connect(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort()); // intraThreadPipeFactory.create(instanceOfFilter.getOutputPort(), traceReconstructionFilter.getInputPort());
SingleElementPipe.connect(traceReconstructionFilter.getTraceValidOutputPort(), merger.getNewInputPort()); intraThreadPipeFactory.create(traceReconstructionFilter.getTraceValidOutputPort(), merger.getNewInputPort());
SingleElementPipe.connect(traceReconstructionFilter.getTraceInvalidOutputPort(), merger.getNewInputPort()); intraThreadPipeFactory.create(traceReconstructionFilter.getTraceInvalidOutputPort(), merger.getNewInputPort());
SingleElementPipe.connect(merger.getOutputPort(), this.traceCounter.getInputPort()); intraThreadPipeFactory.create(merger.getOutputPort(), this.traceCounter.getInputPort());
SingleElementPipe.connect(this.traceCounter.getOutputPort(), collector.getInputPort()); intraThreadPipeFactory.create(this.traceCounter.getOutputPort(), collector.getInputPort());
SpScPipe.connect(clockStage.getOutputPort(), this.throughputFilter.getTriggerInputPort(), 1); interThreadPipeFactory.create(clockStage.getOutputPort(), this.throughputFilter.getTriggerInputPort(), 1);
return initialElementProducer; return initialElementProducer;
} }
...@@ -141,7 +152,4 @@ public class TraceReconstructionAnalysis { ...@@ -141,7 +152,4 @@ public class TraceReconstructionAnalysis {
return this.inputDir; return this.inputDir;
} }
public void setInputDir(final File inputDir) {
this.inputDir = inputDir;
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment