Skip to content
Snippets Groups Projects
Commit 3757e902 authored by Christian Wulf's avatar Christian Wulf
Browse files

added RecordReaderAnalysis and corresponding stages

parent e9cf920e
No related branches found
No related tags found
No related merge requests found
Showing
with 541 additions and 74 deletions
package teetime.variant.methodcallWithPorts.stage;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import com.google.common.io.Files;
public class FileExtensionSwitch extends ConsumerStage<File, File> {
private final Map<String, OutputPort<File>> fileExtensions = new HashMap<String, OutputPort<File>>();
@Override
protected void execute5(final File file) {
String fileExtension = Files.getFileExtension(file.getAbsolutePath());
OutputPort<File> outputPort = this.fileExtensions.get(fileExtension);
outputPort.send(file);
}
public OutputPort<File> addFileExtension(final String fileExtension) {
OutputPort<File> outputPort = new OutputPort<File>();
this.fileExtensions.put(fileExtension, outputPort);
return outputPort;
}
}
package teetime.variant.methodcallWithPorts.stage;
import java.util.ArrayList;
import java.util.List;
import teetime.util.concurrent.spsc.Pow2;
import teetime.util.list.CommittableQueue;
import teetime.variant.methodcallWithPorts.framework.core.AbstractStage;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
public class Merger<T> extends AbstractStage<T, T> {
// TODO do not inherit from AbstractStage since it provides the default input port that is unnecessary for the merger
private final List<InputPort<T>> inputPortList = new ArrayList<InputPort<T>>();
private int nextInputPortIndex;
private int size;
private InputPort<T>[] inputPorts;
@Override
protected void execute4(final CommittableQueue<T> elements) {
// TODO Auto-generated method stub
}
@Override
protected void execute5(final T element) {
this.send(element);
}
@Override
public void executeWithPorts() {
InputPort<T> inputPort = this.inputPorts[this.nextInputPortIndex % this.size];
T element = inputPort.receive();
// if (element == null) {
// return;
// }
this.nextInputPortIndex++;
InputPort<T> nextInputPort = this.inputPorts[this.nextInputPortIndex % this.size];
this.setReschedulable(nextInputPort.getPipe().size() > 0);
this.execute5(element);
}
@SuppressWarnings("unchecked")
@Override
public void onStart() {
this.size = this.inputPortList.size();
// this.mask = this.size - 1;
int sizeInPow2 = Pow2.findNextPositivePowerOfTwo(this.size); // is not necessary so far
this.inputPorts = this.inputPortList.toArray(new InputPort[sizeInPow2]);
// System.out.println("inputPorts: " + this.inputPorts);
}
@Override
public InputPort<T> getInputPort() {
return this.getNewInputPort();
}
private InputPort<T> getNewInputPort() {
InputPort<T> inputPort = new InputPort<T>();
this.inputPortList.add(inputPort);
return inputPort;
}
@Override
public void onIsPipelineHead() {
// TODO Auto-generated method stub
}
}
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.variant.methodcallWithPorts.stage.basic.merger;
/**
* @author Nils Christian Ehmke
*
* @since 1.10
*/
public interface IMergerStrategy<T> {
public T getNextInput(Merger<T> merger);
}
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.variant.methodcallWithPorts.stage.basic.merger;
import java.util.ArrayList;
import java.util.List;
import teetime.variant.explicitScheduling.framework.core.Description;
import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
/**
*
* @author Christian Wulf
*
* @since 1.10
*
* @param <T>
* the type of the input ports and the output port
*/
@Description("This stage merges data from the input ports, by taking elements according to the chosen merge strategy and by putting them to the output port.")
public class Merger<T> extends ConsumerStage<T, T> {
// TODO do not inherit from AbstractStage since it provides the default input port that is unnecessary for the merger
private final List<InputPort<T>> inputPortList = new ArrayList<InputPort<T>>();
private IMergerStrategy<T> strategy = new RoundRobinStrategy<T>();
public IMergerStrategy<T> getStrategy() {
return this.strategy;
}
public void setStrategy(final IMergerStrategy<T> strategy) {
this.strategy = strategy;
}
@Override
protected void execute5(final T element) {
final T token = this.strategy.getNextInput(this);
if (token == null) {
return;
}
this.send(token);
}
@Override
public InputPort<T> getInputPort() {
return this.getNewInputPort();
}
private InputPort<T> getNewInputPort() {
InputPort<T> inputPort = new InputPort<T>();
this.inputPortList.add(inputPort);
return inputPort;
}
public List<InputPort<T>> getInputPortList() {
return this.inputPortList;
}
}
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.variant.methodcallWithPorts.stage.basic.merger;
import java.util.List;
import teetime.variant.methodcallWithPorts.framework.core.InputPort;
/**
* @author Nils Christian Ehmke
*
* @since 1.10
*/
public final class RoundRobinStrategy<T> implements IMergerStrategy<T> {
private int index = 0;
@Override
public T getNextInput(final Merger<T> merger) {
List<InputPort<T>> inputPorts = merger.getInputPortList();
int size = inputPorts.size();
// check each port at most once to avoid a potentially infinite loop
while (size-- > 0) {
InputPort<T> inputPort = this.getNextPortInRoundRobinOrder(inputPorts);
final T token = inputPort.receive();
if (token != null) {
return token;
}
}
return null;
}
private InputPort<T> getNextPortInRoundRobinOrder(final List<InputPort<T>> inputPorts) {
InputPort<T> inputPort = inputPorts.get(this.index);
this.index = (this.index + 1) % inputPorts.size();
return inputPort;
}
}
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.variant.methodcallWithPorts.stage.kieker;
import java.io.File;
import teetime.variant.methodcallWithPorts.framework.core.OutputPort;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.stage.FileExtensionSwitch;
import teetime.variant.methodcallWithPorts.stage.basic.merger.Merger;
import teetime.variant.methodcallWithPorts.stage.io.Directory2FilesFilter;
import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryCreationFilter;
import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository;
import teetime.variant.methodcallWithPorts.stage.kieker.fileToRecord.BinaryFile2RecordFilter;
import teetime.variant.methodcallWithPorts.stage.kieker.fileToRecord.DatFile2RecordFilter;
import teetime.variant.methodcallWithPorts.stage.kieker.fileToRecord.ZipFile2RecordFilter;
import kieker.common.record.IMonitoringRecord;
import kieker.common.util.filesystem.BinaryCompressionMethod;
import kieker.common.util.filesystem.FSUtil;
/**
* @author Christian Wulf
*
* @since 1.10
*/
public class File2RecordFilter extends Pipeline<File, IMonitoringRecord> {
private ClassNameRegistryRepository classNameRegistryRepository;
/**
* @since 1.10
*/
public File2RecordFilter(final ClassNameRegistryRepository classNameRegistryRepository) {
this.classNameRegistryRepository = classNameRegistryRepository;
// FIXME does not yet work with more than one thread due to classNameRegistryRepository: classNameRegistryRepository is set after the ctor
// create stages
final ClassNameRegistryCreationFilter classNameRegistryCreationFilter = new ClassNameRegistryCreationFilter(this.classNameRegistryRepository);
final Directory2FilesFilter directory2FilesFilter = new Directory2FilesFilter();
final FileExtensionSwitch fileExtensionSwitch = new FileExtensionSwitch();
final DatFile2RecordFilter datFile2RecordFilter = new DatFile2RecordFilter(this.classNameRegistryRepository);
final BinaryFile2RecordFilter binaryFile2RecordFilter = new BinaryFile2RecordFilter(this.classNameRegistryRepository);
final ZipFile2RecordFilter zipFile2RecordFilter = new ZipFile2RecordFilter();
final Merger<IMonitoringRecord> recordMerger = new Merger<IMonitoringRecord>();
// store ports due to readability reasons
final OutputPort<File> normalFileOutputPort = fileExtensionSwitch.addFileExtension(FSUtil.NORMAL_FILE_EXTENSION);
final OutputPort<File> binFileOutputPort = fileExtensionSwitch.addFileExtension(BinaryCompressionMethod.NONE.getFileExtension());
final OutputPort<File> zipFileOutputPort = fileExtensionSwitch.addFileExtension(FSUtil.ZIP_FILE_EXTENSION);
// connect ports by pipes
SingleElementPipe.connect(classNameRegistryCreationFilter.getOutputPort(), directory2FilesFilter.getInputPort());
SingleElementPipe.connect(directory2FilesFilter.getOutputPort(), fileExtensionSwitch.getInputPort());
SingleElementPipe.connect(normalFileOutputPort, datFile2RecordFilter.getInputPort());
SingleElementPipe.connect(binFileOutputPort, binaryFile2RecordFilter.getInputPort());
SingleElementPipe.connect(zipFileOutputPort, zipFile2RecordFilter.getInputPort());
SingleElementPipe.connect(datFile2RecordFilter.getOutputPort(), recordMerger.getInputPort());
SingleElementPipe.connect(binaryFile2RecordFilter.getOutputPort(), recordMerger.getInputPort());
SingleElementPipe.connect(zipFile2RecordFilter.getOutputPort(), recordMerger.getInputPort());
// prepare pipeline
this.setFirstStage(classNameRegistryCreationFilter);
this.addIntermediateStage(directory2FilesFilter);
this.addIntermediateStage(fileExtensionSwitch);
this.addIntermediateStage(datFile2RecordFilter);
this.addIntermediateStage(binaryFile2RecordFilter);
this.addIntermediateStage(zipFile2RecordFilter);
this.setLastStage(recordMerger);
}
/**
* @since 1.10
*/
public File2RecordFilter() {
this(null);
}
public ClassNameRegistryRepository getClassNameRegistryRepository() {
return this.classNameRegistryRepository;
}
public void setClassNameRegistryRepository(final ClassNameRegistryRepository classNameRegistryRepository) {
this.classNameRegistryRepository = classNameRegistryRepository;
}
}
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.variant.methodcallWithPorts.stage.kieker;
import java.io.File;
import java.io.FileFilter;
import java.util.Comparator;
import kieker.common.util.filesystem.BinaryCompressionMethod;
import kieker.common.util.filesystem.FSUtil;
import teetime.variant.explicitScheduling.framework.core.Context;
import teetime.variant.explicitScheduling.framework.core.IInputPort;
import teetime.variant.explicitScheduling.stage.io.Directory2FilesFilter;
/**
* @author Christian Wulf
*
* @since 1.10
*/
public class MonitoringLogDirectory2Files extends Directory2FilesFilter {
public final IInputPort<Directory2FilesFilter, String> filePrefixInputPort = this.createInputPort();
/**
* @author Christian Wulf
*
* @since 1.10
*/
static class MonitoringLogFileFilter implements FileFilter {
private String filePrefix;
@Override
public boolean accept(final File pathname) {
final String name = pathname.getName();
return pathname.isFile()
&& name.startsWith(this.filePrefix)
&& (name.endsWith(FSUtil.NORMAL_FILE_EXTENSION) || BinaryCompressionMethod.hasValidFileExtension(name));
}
public String getFilePrefix() {
return this.filePrefix;
}
public void setFilePrefix(final String filePrefix) {
this.filePrefix = filePrefix;
}
}
private static final Comparator<File> FILE_COMPARATOR = new Comparator<File>() {
@Override
public final int compare(final File f1, final File f2) {
return f1.compareTo(f2); // simplified (we expect no dirs!)
}
};
/**
* @since 1.10
*/
public MonitoringLogDirectory2Files() {
super(new MonitoringLogFileFilter(), FILE_COMPARATOR);
}
@Override
protected boolean execute(final Context<Directory2FilesFilter> context) {
final String filePrefix = context.tryTake(this.filePrefixInputPort);
if (filePrefix == null) {
return false;
}
((MonitoringLogFileFilter) this.getFilter()).setFilePrefix(filePrefix);
return super.execute(context);
}
}
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.variant.methodcallWithPorts.examples.recordReader;
import java.io.File;
import java.util.LinkedList;
import java.util.List;
import teetime.util.ConstructorClosure;
import teetime.variant.explicitScheduling.examples.throughput.TimestampObject;
import teetime.variant.explicitScheduling.framework.core.Analysis;
import teetime.variant.methodcallWithPorts.framework.core.Pipeline;
import teetime.variant.methodcallWithPorts.framework.core.RunnableStage;
import teetime.variant.methodcallWithPorts.framework.core.pipe.SingleElementPipe;
import teetime.variant.methodcallWithPorts.stage.CollectorSink;
import teetime.variant.methodcallWithPorts.stage.kieker.File2RecordFilter;
import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository;
import kieker.common.record.IMonitoringRecord;
/**
* @author Christian Wulf
*
* @since 1.10
*/
public class RecordReaderAnalysis extends Analysis {
private int numInputObjects;
private ConstructorClosure<TimestampObject> inputObjectCreator;
private int numNoopFilters;
private final List<IMonitoringRecord> timestampObjectsList = new LinkedList<IMonitoringRecord>();
private Thread producerThread;
private ClassNameRegistryRepository classNameRegistryRepository;
@Override
public void init() {
super.init();
Pipeline<File, Object> producerPipeline = this.buildProducerPipeline(this.numInputObjects, this.inputObjectCreator);
this.producerThread = new Thread(new RunnableStage(producerPipeline));
}
private Pipeline<File, Object> buildProducerPipeline(final int numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) {
this.classNameRegistryRepository = new ClassNameRegistryRepository();
// create stages
File2RecordFilter file2RecordFilter = new File2RecordFilter(this.classNameRegistryRepository);
CollectorSink<IMonitoringRecord> collector = new CollectorSink<IMonitoringRecord>(this.timestampObjectsList);
final Pipeline<File, Object> pipeline = new Pipeline<File, Object>();
pipeline.setFirstStage(file2RecordFilter);
pipeline.setLastStage(collector);
SingleElementPipe.connect(file2RecordFilter.getOutputPort(), collector.getInputPort());
return pipeline;
}
@Override
public void start() {
super.start();
this.producerThread.start();
try {
this.producerThread.join();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
public void setInput(final int numInputObjects, final ConstructorClosure<TimestampObject> inputObjectCreator) {
this.numInputObjects = numInputObjects;
this.inputObjectCreator = inputObjectCreator;
}
public int getNumNoopFilters() {
return this.numNoopFilters;
}
public void setNumNoopFilters(final int numNoopFilters) {
this.numNoopFilters = numNoopFilters;
}
public List<IMonitoringRecord> getTimestampObjectsList() {
return this.timestampObjectsList;
}
}
/***************************************************************************
* Copyright 2014 Kieker Project (http://kieker-monitoring.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***************************************************************************/
package teetime.variant.methodcallWithPorts.examples.recordReader;
import org.junit.Test;
import teetime.util.ConstructorClosure;
import teetime.variant.explicitScheduling.examples.throughput.TimestampObject;
import test.PerformanceTest;
/**
* @author Christian Wulf
*
* @since 1.10
*/
public class RecordReaderAnalysisTest extends PerformanceTest {
@Test
public void performAnalysis(final int numThreads) {
System.out.println("Testing teetime (mc) with NUM_OBJECTS_TO_CREATE=" + NUM_OBJECTS_TO_CREATE + ", NUM_NOOP_FILTERS="
+ NUM_NOOP_FILTERS + "...");
final RecordReaderAnalysis analysis = new RecordReaderAnalysis();
analysis.setNumNoopFilters(NUM_NOOP_FILTERS);
analysis.setInput(NUM_OBJECTS_TO_CREATE, new ConstructorClosure<TimestampObject>() {
@Override
public TimestampObject create() {
return new TimestampObject();
}
});
analysis.init();
this.stopWatch.start();
try {
analysis.start();
} finally {
this.stopWatch.end();
analysis.onTerminate();
}
// this.timestampObjects = analysis.getTimestampObjectsList();
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment