diff --git a/src/main/java/teetime/variant/explicitScheduling/stage/kieker/className/ClassNameRegistryCreationFilter.java b/src/main/java/teetime/variant/explicitScheduling/stage/kieker/className/ClassNameRegistryCreationFilter.java index e789679ff108713a85678873bd261103955ba2d0..6f4fdf16a5e1ce26bcea21ad6132b47e6bc6699a 100644 --- a/src/main/java/teetime/variant/explicitScheduling/stage/kieker/className/ClassNameRegistryCreationFilter.java +++ b/src/main/java/teetime/variant/explicitScheduling/stage/kieker/className/ClassNameRegistryCreationFilter.java @@ -19,8 +19,6 @@ import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; -import kieker.analysis.stage.MappingFileParser; - import teetime.variant.explicitScheduling.framework.core.AbstractFilter; import teetime.variant.explicitScheduling.framework.core.Context; import teetime.variant.explicitScheduling.framework.core.IInputPort; diff --git a/src/main/java/kieker/analysis/stage/MappingFileParser.java b/src/main/java/teetime/variant/explicitScheduling/stage/kieker/className/MappingFileParser.java similarity index 98% rename from src/main/java/kieker/analysis/stage/MappingFileParser.java rename to src/main/java/teetime/variant/explicitScheduling/stage/kieker/className/MappingFileParser.java index 882ffbdd7dc01374fe143a181a790922d02bcb7d..9578f48a1785b86f324e4ab4a8faa70da2a7680c 100644 --- a/src/main/java/kieker/analysis/stage/MappingFileParser.java +++ b/src/main/java/teetime/variant/explicitScheduling/stage/kieker/className/MappingFileParser.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. ***************************************************************************/ -package kieker.analysis.stage; +package teetime.variant.explicitScheduling.stage.kieker.className; import java.io.BufferedReader; import java.io.File; diff --git a/src/main/java/teetime/variant/explicitScheduling/stage/kieker/fileToRecord/BinaryFile2RecordFilter.java b/src/main/java/teetime/variant/explicitScheduling/stage/kieker/fileToRecord/BinaryFile2RecordFilter.java index ea1a5bf18c9326066efafbe9d79acbd8b5699de1..34cc0e7fe6665a4d67288995f360a5071b1e518d 100644 --- a/src/main/java/teetime/variant/explicitScheduling/stage/kieker/fileToRecord/BinaryFile2RecordFilter.java +++ b/src/main/java/teetime/variant/explicitScheduling/stage/kieker/fileToRecord/BinaryFile2RecordFilter.java @@ -19,20 +19,19 @@ import java.io.DataInputStream; import java.io.File; import java.io.IOException; -import kieker.analysis.stage.RecordFromBinaryFileCreator; -import kieker.common.exception.MonitoringRecordException; -import kieker.common.record.IMonitoringRecord; -import kieker.common.util.filesystem.BinaryCompressionMethod; - import teetime.variant.explicitScheduling.framework.core.AbstractFilter; import teetime.variant.explicitScheduling.framework.core.Context; import teetime.variant.explicitScheduling.framework.core.IInputPort; import teetime.variant.explicitScheduling.framework.core.IOutputPort; import teetime.variant.explicitScheduling.stage.kieker.className.ClassNameRegistryRepository; +import kieker.common.exception.MonitoringRecordException; +import kieker.common.record.IMonitoringRecord; +import kieker.common.util.filesystem.BinaryCompressionMethod; + /** * @author Christian Wulf - * + * * @since 1.10 */ public class BinaryFile2RecordFilter extends AbstractFilter<BinaryFile2RecordFilter> { diff --git a/src/main/java/kieker/analysis/stage/RecordFromBinaryFileCreator.java b/src/main/java/teetime/variant/explicitScheduling/stage/kieker/fileToRecord/RecordFromBinaryFileCreator.java similarity index 98% rename from src/main/java/kieker/analysis/stage/RecordFromBinaryFileCreator.java rename to src/main/java/teetime/variant/explicitScheduling/stage/kieker/fileToRecord/RecordFromBinaryFileCreator.java index 31598c301740b3bd9362431c6693f90b30f5859c..c7dd5d198da67408d348afe647339826ec7af4d3 100644 --- a/src/main/java/kieker/analysis/stage/RecordFromBinaryFileCreator.java +++ b/src/main/java/teetime/variant/explicitScheduling/stage/kieker/fileToRecord/RecordFromBinaryFileCreator.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. ***************************************************************************/ -package kieker.analysis.stage; +package teetime.variant.explicitScheduling.stage.kieker.fileToRecord; import java.io.DataInputStream; import java.io.EOFException; diff --git a/src/main/java/kieker/analysis/stage/RecordFromTextLineCreator.java b/src/main/java/teetime/variant/explicitScheduling/stage/kieker/fileToRecord/RecordFromTextLineCreator.java similarity index 98% rename from src/main/java/kieker/analysis/stage/RecordFromTextLineCreator.java rename to src/main/java/teetime/variant/explicitScheduling/stage/kieker/fileToRecord/RecordFromTextLineCreator.java index e428980cfd32b661b397a112ee5a5d054d43409d..cffcb48efffcd28678e487c1b428c7cd07230ebd 100644 --- a/src/main/java/kieker/analysis/stage/RecordFromTextLineCreator.java +++ b/src/main/java/teetime/variant/explicitScheduling/stage/kieker/fileToRecord/RecordFromTextLineCreator.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. ***************************************************************************/ -package kieker.analysis.stage; +package teetime.variant.explicitScheduling.stage.kieker.fileToRecord; import java.io.File; diff --git a/src/main/java/teetime/variant/explicitScheduling/stage/kieker/fileToRecord/ZipFile2RecordFilter.java b/src/main/java/teetime/variant/explicitScheduling/stage/kieker/fileToRecord/ZipFile2RecordFilter.java index 4f086a903048c9d6a340774b672d8551a9b08f41..ef5bcd718ee25d9efd92dad4d36c565ea37b5d53 100644 --- a/src/main/java/teetime/variant/explicitScheduling/stage/kieker/fileToRecord/ZipFile2RecordFilter.java +++ b/src/main/java/teetime/variant/explicitScheduling/stage/kieker/fileToRecord/ZipFile2RecordFilter.java @@ -28,19 +28,19 @@ import java.io.UnsupportedEncodingException; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; -import kieker.analysis.stage.MappingFileParser; -import kieker.common.record.IMonitoringRecord; -import kieker.common.util.filesystem.FSUtil; - import teetime.variant.explicitScheduling.framework.core.AbstractFilter; import teetime.variant.explicitScheduling.framework.core.Context; import teetime.variant.explicitScheduling.framework.core.IInputPort; import teetime.variant.explicitScheduling.framework.core.IOutputPort; import teetime.variant.explicitScheduling.stage.kieker.className.ClassNameRegistry; +import teetime.variant.explicitScheduling.stage.kieker.className.MappingFileParser; + +import kieker.common.record.IMonitoringRecord; +import kieker.common.util.filesystem.FSUtil; /** * @author Christian Wulf - * + * * @since 1.10 */ public class ZipFile2RecordFilter extends AbstractFilter<ZipFile2RecordFilter> { diff --git a/src/main/java/teetime/variant/explicitScheduling/stage/kieker/fileToRecord/textLine/TextLine2RecordFilter.java b/src/main/java/teetime/variant/explicitScheduling/stage/kieker/fileToRecord/textLine/TextLine2RecordFilter.java index e066fe2319b8acfc21ebf2a720663c0f79b4dc7f..d48b77683ad228b99899c3ffa73af2b702e2f7c0 100644 --- a/src/main/java/teetime/variant/explicitScheduling/stage/kieker/fileToRecord/textLine/TextLine2RecordFilter.java +++ b/src/main/java/teetime/variant/explicitScheduling/stage/kieker/fileToRecord/textLine/TextLine2RecordFilter.java @@ -19,23 +19,23 @@ package teetime.variant.explicitScheduling.stage.kieker.fileToRecord.textLine; import java.util.HashSet; import java.util.Set; -import kieker.analysis.stage.RecordFromTextLineCreator; -import kieker.common.exception.IllegalRecordFormatException; -import kieker.common.exception.MonitoringRecordException; -import kieker.common.exception.UnknownRecordTypeException; -import kieker.common.record.IMonitoringRecord; - import teetime.variant.explicitScheduling.framework.core.AbstractFilter; import teetime.variant.explicitScheduling.framework.core.Context; import teetime.variant.explicitScheduling.framework.core.IInputPort; import teetime.variant.explicitScheduling.framework.core.IOutputPort; import teetime.variant.explicitScheduling.stage.MappingException; import teetime.variant.explicitScheduling.stage.kieker.className.ClassNameRegistryRepository; +import teetime.variant.explicitScheduling.stage.kieker.fileToRecord.RecordFromTextLineCreator; import teetime.variant.explicitScheduling.stage.util.TextLine; +import kieker.common.exception.IllegalRecordFormatException; +import kieker.common.exception.MonitoringRecordException; +import kieker.common.exception.UnknownRecordTypeException; +import kieker.common.record.IMonitoringRecord; + /** * @author Christian Wulf - * + * * @since 1.10 */ public class TextLine2RecordFilter extends AbstractFilter<TextLine2RecordFilter> { diff --git a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java index ab6fc8943c7cec02751f08ee5ce66752cff30ea2..478273af132cdebe240833096ce3bc790eb4af5f 100644 --- a/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java +++ b/src/main/java/teetime/variant/methodcallWithPorts/framework/core/AbstractStage.java @@ -1,9 +1,20 @@ package teetime.variant.methodcallWithPorts.framework.core; +import java.util.UUID; + import teetime.util.list.CommittableQueue; +import kieker.common.logging.Log; +import kieker.common.logging.LogFactory; + public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { + private final String id; + /** + * A unique logger instance per stage instance + */ + protected Log logger; + private final InputPort<I> inputPort = new InputPort<I>(); private final OutputPort<O> outputPort = new OutputPort<O>(); @@ -18,6 +29,11 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { private boolean reschedulable; + public AbstractStage() { + this.id = UUID.randomUUID().toString(); // the id should only be represented by a UUID, not additionally by the class name + this.logger = LogFactory.getLog(this.id); + } + @Override public InputPort<I> getInputPort() { return this.inputPort; @@ -64,7 +80,11 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { // } while (this.next().isReschedulable()); // } - protected abstract void execute4(CommittableQueue<I> elements); + // protected abstract void execute4(CommittableQueue<I> elements); + + protected void execute4(final CommittableQueue<I> elements) { + throw new IllegalStateException(); // default implementation + } protected abstract void execute5(I element); @@ -138,4 +158,8 @@ public abstract class AbstractStage<I, O> implements StageWithPort<I, O> { this.reschedulable = reschedulable; } + public String getId() { + return this.id; + } + } diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/DbReader.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/DbReader.java new file mode 100644 index 0000000000000000000000000000000000000000..5dad520e5583d15f3c60e85d179f3765c4e65b44 --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/DbReader.java @@ -0,0 +1,187 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ + +package teetime.variant.methodcallWithPorts.stage.io; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +import kieker.common.exception.MonitoringRecordException; +import kieker.common.record.AbstractMonitoringRecord; +import kieker.common.record.IMonitoringRecord; + +import teetime.variant.explicitScheduling.framework.core.AbstractFilter; +import teetime.variant.explicitScheduling.framework.core.Context; +import teetime.variant.explicitScheduling.framework.core.Description; +import teetime.variant.explicitScheduling.framework.core.IOutputPort; + +/** + * A very simple database reader that probably only works for small data sets. + * + * @author Jan Waller, Nils Christian Ehmke + * + * @since 1.10 + */ +@Description("A reader which reads records from a database") +public class DbReader extends AbstractFilter<DbReader> { + + private final IOutputPort<DbReader, IMonitoringRecord> outputPort = super.createOutputPort(); + + @Description("The classname of the driver used for the connection.") + private String driverClassname = "org.apache.derby.jdbc.EmbeddedDrive"; + @Description("The connection string used to establish the connection.") + private String connectionString = "jdbc:derby:tmp/KIEKER;user=DBUSER;password=DBPASS"; + @Description("The prefix of the used table within the database.") + private String tablePrefix = "kieker"; + + private volatile boolean running = true; + + @Override + public void onPipelineStarts() throws Exception { + super.onPipelineStarts(); + try { + Class.forName(this.driverClassname).newInstance(); + } catch (final Exception ex) { // NOPMD NOCS (IllegalCatchCheck) + throw new Exception("DB driver registration failed. Perhaps the driver jar is missing?", ex); + } + } + + @Override + public void onPipelineStops() { + super.logger.info("Shutdown of DBReader requested."); + this.running = false; + super.onPipelineStops(); + } + + @Override + protected boolean execute(final Context<DbReader> context) { + Connection connection = null; + try { + connection = DriverManager.getConnection(this.connectionString); + Statement getIndexTable = null; + try { + getIndexTable = connection.createStatement(); + ResultSet indexTable = null; + try { // NOCS (nested try) + indexTable = getIndexTable.executeQuery("SELECT * from " + this.tablePrefix); + while (this.running && indexTable.next()) { + final String tablename = indexTable.getString(1); + final String classname = indexTable.getString(2); + try { // NOCS (nested try) + this.table2record(context, connection, tablename, AbstractMonitoringRecord.classForName(classname)); + } catch (final MonitoringRecordException ex) { + // log error but continue with next table + super.logger.error("Failed to load records of type " + classname + " from table " + tablename, ex); + continue; + } + } + } finally { + if (indexTable != null) { + indexTable.close(); + } + } + } finally { + if (getIndexTable != null) { + getIndexTable.close(); + } + } + } catch (final SQLException ex) { + super.logger.error("SQLException with SQLState: '" + ex.getSQLState() + "' and VendorError: '" + ex.getErrorCode() + "'", ex); + return false; + } finally { + if (connection != null) { + try { + connection.close(); + } catch (final SQLException ex) { + super.logger.error("SQLException with SQLState: '" + ex.getSQLState() + "' and VendorError: '" + ex.getErrorCode() + "'", ex); + } + } + } + return true; + } + + public final String getDriverClassname() { + return this.driverClassname; + } + + public final void setDriverClassname(final String driverClassname) { + this.driverClassname = driverClassname; + } + + public final String getConnectionString() { + return this.connectionString; + } + + public final void setConnectionString(final String connectionString) { + this.connectionString = connectionString; + } + + public final String getTablePrefix() { + return this.tablePrefix; + } + + public final void setTablePrefix(final String tablePrefix) { + this.tablePrefix = tablePrefix; + } + + /** + * This method uses the given table to read records and sends them to the output port. + * + * @param connection + * The connection to the database which will be used. + * @param tablename + * The name of the table containing records. + * @param clazz + * The class of the monitoring records. This will be used to convert the array into the record. + * @throws SQLException + * If something went wrong during the database access. + * @throws MonitoringRecordException + * If the data within the table could not be converted into a valid record. + */ + private void table2record(final Context<DbReader> context, final Connection connection, final String tablename, final Class<? extends IMonitoringRecord> clazz) + throws SQLException, MonitoringRecordException { + Statement selectRecord = null; + try { + selectRecord = connection.createStatement(); + ResultSet records = null; + try { + records = selectRecord.executeQuery("SELECT * from " + tablename); + final int size = records.getMetaData().getColumnCount() - 2; // remove index column + while (this.running && records.next()) { + final Object[] recordValues = new Object[size]; + for (int i = 0; i < size; i++) { + recordValues[i] = records.getObject(i + 3); + } + final IMonitoringRecord record = AbstractMonitoringRecord.createFromArray(clazz, recordValues); + record.setLoggingTimestamp(records.getLong(2)); + context.put(this.outputPort, record); + } + } finally { + if (records != null) { + records.close(); + } + } + } finally { + if (selectRecord != null) { + selectRecord.close(); + } + } + } + +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/Directory2FilesFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/Directory2FilesFilter.java new file mode 100644 index 0000000000000000000000000000000000000000..0de805e15fde11cb1815c02f717ffcb377f1a876 --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/Directory2FilesFilter.java @@ -0,0 +1,116 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ + +package teetime.variant.methodcallWithPorts.stage.io; + +import java.io.File; +import java.io.FileFilter; +import java.util.Arrays; +import java.util.Comparator; + +import teetime.variant.explicitScheduling.framework.core.AbstractFilter; +import teetime.variant.explicitScheduling.framework.core.Context; +import teetime.variant.explicitScheduling.framework.core.IInputPort; +import teetime.variant.explicitScheduling.framework.core.IOutputPort; + +/** + * @author Christian Wulf + * + * @since 1.10 + */ +public class Directory2FilesFilter extends AbstractFilter<Directory2FilesFilter> { + + public final IInputPort<Directory2FilesFilter, File> directoryInputPort = this.createInputPort(); + + public final IOutputPort<Directory2FilesFilter, File> fileOutputPort = this.createOutputPort(); + + private FileFilter filter; + private Comparator<File> fileComparator; + + /** + * @since 1.10 + */ + public Directory2FilesFilter(final FileFilter fileFilter) { + this.setFilter(fileFilter); + } + + /** + * @since 1.10 + */ + public Directory2FilesFilter(final Comparator<File> fileComparator) { + this.setFileComparator(fileComparator); + } + + /** + * @since 1.10 + */ + public Directory2FilesFilter(final FileFilter fileFilter, final Comparator<File> fileComparator) { + this.setFilter(fileFilter); + this.setFileComparator(fileComparator); + } + + /** + * @since 1.10 + */ + public Directory2FilesFilter() { + super(); + } + + /** + * @since 1.10 + */ + @Override + protected boolean execute(final Context<Directory2FilesFilter> context) { + final File inputDir = context.tryTake(this.directoryInputPort); + if (inputDir == null) { + return false; + } + + final File[] inputFiles = inputDir.listFiles(this.filter); + + if (inputFiles == null) { + this.logger.error("Directory '" + inputDir + "' does not exist or an I/O error occured."); + return true; + } + + if (this.fileComparator != null) { + Arrays.sort(inputFiles, this.fileComparator); + } + + for (final File file : inputFiles) { + context.put(this.fileOutputPort, file); + } + + return true; + } + + public FileFilter getFilter() { + return this.filter; + } + + public void setFilter(final FileFilter filter) { + this.filter = filter; + } + + public Comparator<File> getFileComparator() { + return this.fileComparator; + } + + public void setFileComparator(final Comparator<File> fileComparator) { + this.fileComparator = fileComparator; + } + +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/File2TextLinesFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/File2TextLinesFilter.java new file mode 100644 index 0000000000000000000000000000000000000000..47be6efe228f1cfc8a588e62861a4c7776d109f8 --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/File2TextLinesFilter.java @@ -0,0 +1,73 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ + +package teetime.variant.methodcallWithPorts.stage.io; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; + +import teetime.variant.explicitScheduling.stage.util.TextLine; +import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; + +/** + * @author Christian Wulf + * + * @since 1.10 + */ +public class File2TextLinesFilter extends ConsumerStage<File, TextLine> { + + private String charset = "UTF-8"; + + @Override + protected void execute5(final File textFile) { + BufferedReader reader = null; + try { + reader = new BufferedReader(new InputStreamReader(new FileInputStream(textFile), this.charset)); + String line; + while ((line = reader.readLine()) != null) { + line = line.trim(); + if (line.length() != 0) { + this.send(new TextLine(textFile, line)); + } // else: ignore empty line + } + } catch (final FileNotFoundException e) { + this.logger.error("", e); + } catch (final IOException e) { + this.logger.error("", e); + } finally { + try { + if (reader != null) { + reader.close(); + } + } catch (final IOException e) { + this.logger.warn("", e); + } + } + } + + public String getCharset() { + return this.charset; + } + + public void setCharset(final String charset) { + this.charset = charset; + } + +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/Printer.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/Printer.java new file mode 100644 index 0000000000000000000000000000000000000000..bb38e50e12fb6cedce26b7ccd72ce48bad530403 --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/Printer.java @@ -0,0 +1,144 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ +package teetime.variant.methodcallWithPorts.stage.io; + +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.PrintStream; +import java.io.UnsupportedEncodingException; + +import teetime.variant.explicitScheduling.framework.core.AbstractFilter; +import teetime.variant.explicitScheduling.framework.core.Context; +import teetime.variant.explicitScheduling.framework.core.Description; +import teetime.variant.explicitScheduling.framework.core.IInputPort; + +/** + * @author Matthias Rohr, Jan Waller, Nils Christian Ehmke + * + * @since 1.10 + */ +@Description("A filter to print objects to a configured stream") +public class Printer<T> extends AbstractFilter<Printer<T>> { + + public static final String STREAM_STDOUT = "STDOUT"; + public static final String STREAM_STDERR = "STDERR"; + public static final String STREAM_STDLOG = "STDlog"; + public static final String STREAM_NULL = "NULL"; + + public static final String ENCODING_UTF8 = "UTF-8"; + + public final IInputPort<Printer<T>, T> input = this.createInputPort(); + + private PrintStream printStream; + private String streamName = STREAM_STDOUT; + private String encoding = ENCODING_UTF8; + private boolean active = true; + private boolean append = true; + + public String getStreamName() { + return this.streamName; + } + + public void setStreamName(final String streamName) { + this.streamName = streamName; + } + + public String getEncoding() { + return this.encoding; + } + + public void setEncoding(final String encoding) { + this.encoding = encoding; + } + + public boolean isAppend() { + return this.append; + } + + public void setAppend(final boolean append) { + this.append = append; + } + + @Override + public void onPipelineStarts() throws Exception { + super.onPipelineStarts(); + this.initializeStream(); + } + + @Override + public void onPipelineStops() { + this.closeStream(); + super.onPipelineStops(); + } + + private void initializeStream() { + if (STREAM_STDOUT.equals(this.streamName)) { + this.printStream = System.out; + this.active = true; + } else if (STREAM_STDERR.equals(this.streamName)) { + this.printStream = System.err; + this.active = true; + } else if (STREAM_STDLOG.equals(this.streamName)) { + this.printStream = null; + this.active = true; + } else if (STREAM_NULL.equals(this.streamName)) { + this.printStream = null; + this.active = false; + } else { + try { + this.printStream = new PrintStream(new FileOutputStream(this.streamName, this.append), false, this.encoding); + this.active = true; + } catch (final FileNotFoundException ex) { + this.active = false; + super.logger.warn("Stream could not be created", ex); + } catch (final UnsupportedEncodingException ex) { + this.active = false; + super.logger.warn("Encoding not supported", ex); + } + } + } + + private void closeStream() { + if ((this.printStream != null) && (this.printStream != System.out) && (this.printStream != System.err)) { + this.printStream.close(); + } + } + + @Override + protected boolean execute(final Context<Printer<T>> context) { + final T object = context.tryTake(this.input); + if (null == object) { + return false; + } + + if (this.active) { + final StringBuilder sb = new StringBuilder(128); + + sb.append(super.getId()); + sb.append('(').append(object.getClass().getSimpleName()).append(") ").append(object.toString()); + + final String msg = sb.toString(); + if (this.printStream != null) { + this.printStream.println(msg); + } else { + super.logger.info(msg); + } + } + + return true; + } + +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java new file mode 100644 index 0000000000000000000000000000000000000000..33f1583db8acfc272b4ececea426e839f737bd38 --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/io/TCPReader.java @@ -0,0 +1,207 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ +package teetime.variant.methodcallWithPorts.stage.io; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; + +import kieker.common.exception.MonitoringRecordException; +import kieker.common.logging.Log; +import kieker.common.logging.LogFactory; +import kieker.common.record.AbstractMonitoringRecord; +import kieker.common.record.IMonitoringRecord; +import kieker.common.record.misc.RegistryRecord; +import kieker.common.util.registry.ILookup; +import kieker.common.util.registry.Lookup; + +import teetime.variant.explicitScheduling.framework.core.AbstractFilter; +import teetime.variant.explicitScheduling.framework.core.Context; +import teetime.variant.explicitScheduling.framework.core.IOutputPort; + +/** + * This is a reader which reads the records from a TCP port. + * + * @author Jan Waller, Nils Christian Ehmke + * + * @since 1.10 + */ +public class TCPReader extends AbstractFilter<TCPReader> { + + private static final int MESSAGE_BUFFER_SIZE = 65535; + + private final IOutputPort<TCPReader, IMonitoringRecord> outputPort = super.createOutputPort(); + + // BETTER use a non thread-safe implementation to increase performance. A thread-safe version is not necessary. + private final ILookup<String> stringRegistry = new Lookup<String>(); + private int port1 = 10133; + private int port2 = 10134; + + @Override + public void onPipelineStarts() throws Exception { + super.onPipelineStarts(); + + // FIXME use the implementation from the thread or from execute(), but not both + final TCPStringReader tcpStringReader = new TCPStringReader(this.port2, this.stringRegistry); + tcpStringReader.start(); + } + + @Override + public void onPipelineStops() { + super.logger.info("Shutdown of TCPReader requested."); + // TODO actually implement terminate! + super.onPipelineStops(); + } + + public final int getPort1() { + return this.port1; + } + + public final void setPort1(final int port1) { + this.port1 = port1; + } + + public final int getPort2() { + return this.port2; + } + + public final void setPort2(final int port2) { + this.port2 = port2; + } + + @Override + protected boolean execute(final Context<TCPReader> context) { + ServerSocketChannel serversocket = null; + try { + serversocket = ServerSocketChannel.open(); + serversocket.socket().bind(new InetSocketAddress(this.port1)); + if (super.logger.isDebugEnabled()) { + super.logger.debug("Listening on port " + this.port1); + } + // BEGIN also loop this one? + final SocketChannel socketChannel = serversocket.accept(); + final ByteBuffer buffer = ByteBuffer.allocateDirect(MESSAGE_BUFFER_SIZE); + while (socketChannel.read(buffer) != -1) { + buffer.flip(); + // System.out.println("Reading, remaining:" + buffer.remaining()); + try { + while (buffer.hasRemaining()) { + buffer.mark(); + final int clazzid = buffer.getInt(); + final long loggingTimestamp = buffer.getLong(); + final IMonitoringRecord record; + try { // NOCS (Nested try-catch) + record = AbstractMonitoringRecord.createFromByteBuffer(clazzid, buffer, this.stringRegistry); + record.setLoggingTimestamp(loggingTimestamp); + context.put(this.outputPort, record); + } catch (final MonitoringRecordException ex) { + super.logger.error("Failed to create record.", ex); + } + } + buffer.clear(); + } catch (final BufferUnderflowException ex) { + buffer.reset(); + // System.out.println("Underflow, remaining:" + buffer.remaining()); + buffer.compact(); + } + } + // System.out.println("Channel closing..."); + socketChannel.close(); + // END also loop this one? + } catch (final IOException ex) { + super.logger.error("Error while reading", ex); + return false; + } finally { + if (null != serversocket) { + try { + serversocket.close(); + } catch (final IOException e) { + if (super.logger.isDebugEnabled()) { + super.logger.debug("Failed to close TCP connection!", e); + } + } + } + } + return true; + } + +} + +/** + * + * @author Jan Waller + * + * @since 1.8 + */ +class TCPStringReader extends Thread { + + private static final int MESSAGE_BUFFER_SIZE = 65535; + + private static final Log LOG = LogFactory.getLog(TCPStringReader.class); + + private final int port; + private final ILookup<String> stringRegistry; + + public TCPStringReader(final int port, final ILookup<String> stringRegistry) { + this.port = port; + this.stringRegistry = stringRegistry; + } + + @Override + public void run() { + ServerSocketChannel serversocket = null; + try { + serversocket = ServerSocketChannel.open(); + serversocket.socket().bind(new InetSocketAddress(this.port)); + if (LOG.isDebugEnabled()) { + LOG.debug("Listening on port " + this.port); + } + // BEGIN also loop this one? + final SocketChannel socketChannel = serversocket.accept(); + final ByteBuffer buffer = ByteBuffer.allocateDirect(MESSAGE_BUFFER_SIZE); + while (socketChannel.read(buffer) != -1) { + buffer.flip(); + try { + while (buffer.hasRemaining()) { + buffer.mark(); + RegistryRecord.registerRecordInRegistry(buffer, this.stringRegistry); + } + buffer.clear(); + } catch (final BufferUnderflowException ex) { + buffer.reset(); + buffer.compact(); + } + } + socketChannel.close(); + // END also loop this one? + } catch (final IOException ex) { + LOG.error("Error while reading", ex); + } finally { + if (null != serversocket) { + try { + serversocket.close(); + } catch (final IOException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Failed to close TCP connection!", e); + } + } + } + } + } +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/className/ClassNameRegistry.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/className/ClassNameRegistry.java new file mode 100644 index 0000000000000000000000000000000000000000..3afc8cb84009b59d475ae9bfdeffe8b4e3c57701 --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/className/ClassNameRegistry.java @@ -0,0 +1,29 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ +package teetime.variant.methodcallWithPorts.stage.kieker.className; + +import java.util.HashMap; + +/** + * @author Christian Wulf + * + * @since 1.10 + */ +public class ClassNameRegistry extends HashMap<Integer, String> { + + private static final long serialVersionUID = -7254550212115937463L; + +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/className/ClassNameRegistryCreationFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/className/ClassNameRegistryCreationFilter.java new file mode 100644 index 0000000000000000000000000000000000000000..3dc945c17bc492c2ba707a50db8476f7bdf218bc --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/className/ClassNameRegistryCreationFilter.java @@ -0,0 +1,84 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ +package teetime.variant.methodcallWithPorts.stage.kieker.className; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; + +import teetime.util.list.CommittableQueue; +import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; + +/** + * @author Christian Wulf + * + * @since 1.10 + */ +public class ClassNameRegistryCreationFilter extends ConsumerStage<File, File> { + + private ClassNameRegistryRepository classNameRegistryRepository; + + private final MappingFileParser mappingFileParser; + + /** + * @since 1.10 + */ + public ClassNameRegistryCreationFilter(final ClassNameRegistryRepository classNameRegistryRepository) { + this(); + this.classNameRegistryRepository = classNameRegistryRepository; + } + + /** + * @since 1.10 + */ + public ClassNameRegistryCreationFilter() { + super(); + this.mappingFileParser = new MappingFileParser(this.logger); + } + + @Override + protected void execute5(final File inputDir) { + final File mappingFile = this.mappingFileParser.findMappingFile(inputDir); + if (mappingFile == null) { + return; + } + + try { + final ClassNameRegistry classNameRegistry = this.mappingFileParser.parseFromStream(new FileInputStream(mappingFile)); + this.classNameRegistryRepository.put(inputDir, classNameRegistry); + this.send(inputDir); + + // final String filePrefix = this.mappingFileParser.getFilePrefixFromMappingFile(mappingFile); + // context.put(this.filePrefixOutputPort, filePrefix); // TODO pass prefix + } catch (final FileNotFoundException e) { + this.logger.error("Mapping file not found.", e); // and skip this directory + } + } + + public ClassNameRegistryRepository getClassNameRegistryRepository() { + return this.classNameRegistryRepository; + } + + public void setClassNameRegistryRepository(final ClassNameRegistryRepository classNameRegistryRepository) { + this.classNameRegistryRepository = classNameRegistryRepository; + } + + @Override + protected void execute4(final CommittableQueue<File> elements) { + throw new IllegalStateException(); + } + +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/className/ClassNameRegistryRepository.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/className/ClassNameRegistryRepository.java new file mode 100644 index 0000000000000000000000000000000000000000..e916721a3c16e68cd09fb59f458bbcc1daa859e2 --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/className/ClassNameRegistryRepository.java @@ -0,0 +1,61 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ +package teetime.variant.methodcallWithPorts.stage.kieker.className; + +import java.io.File; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This class represents a wrapper for a Map<String, ClassNameRegistry> ensuring that keys are + * <ul> + * <li>of the type <code>java.io.File</code> and + * <li>passed as absolute file paths. + * </ul> + * + * @author Christian Wulf + * + * @since 1.10 + */ +public class ClassNameRegistryRepository { + + private final ConcurrentHashMap<String, ClassNameRegistry> classNameRegistryRepository = new ConcurrentHashMap<String, ClassNameRegistry>(); + + /** + * @since 1.10 + */ + public ClassNameRegistry get(final File directory) { + return this.classNameRegistryRepository.get(directory.getAbsolutePath()); + } + + /** + * @since 1.10 + */ + public void put(final File directory, final ClassNameRegistry classNameRegistry) { + this.classNameRegistryRepository.put(directory.getAbsolutePath(), classNameRegistry); + } + + /** + * @since 1.10 + */ + public int size() { + return this.classNameRegistryRepository.size(); + } + + @Override + public String toString() { + return this.classNameRegistryRepository.toString(); + } +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/className/MappingFileParser.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/className/MappingFileParser.java new file mode 100644 index 0000000000000000000000000000000000000000..030ac62e2005d642a24b40ced2f562be96975803 --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/className/MappingFileParser.java @@ -0,0 +1,127 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ +package teetime.variant.methodcallWithPorts.stage.kieker.className; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.HashMap; +import java.util.Map; + +import kieker.common.logging.Log; +import kieker.common.util.filesystem.FSUtil; + +/** + * @author Christian Wulf + * + * @since 1.10 + */ +public class MappingFileParser { + + protected Log logger; + + private static final Map<String, String> filePrefixRegistry = new HashMap<String, String>(); + + static { + filePrefixRegistry.put(FSUtil.MAP_FILENAME, FSUtil.FILE_PREFIX); + filePrefixRegistry.put(FSUtil.LEGACY_MAP_FILENAME, FSUtil.LEGACY_FILE_PREFIX); + } + + public MappingFileParser(final Log logger) { + this.logger = logger; + } + + public ClassNameRegistry parseFromStream(final InputStream inputStream) { + final ClassNameRegistry classNameRegistry = new ClassNameRegistry(); + + BufferedReader in = null; + try { + in = new BufferedReader(new InputStreamReader(inputStream, FSUtil.ENCODING)); + String line; + while ((line = in.readLine()) != null) { // NOPMD (assign) + this.parseTextLine(line, classNameRegistry); + } + } catch (final IOException ex) { + this.logger.error("Error reading mapping file", ex); + } finally { + if (in != null) { + try { + in.close(); + } catch (final IOException ex) { + this.logger.error("Exception while closing input stream for mapping file", ex); + } + } + } + + return classNameRegistry; + } + + private void parseTextLine(final String line, final Map<Integer, String> stringRegistry) { + if (line.length() == 0) { + return; // ignore empty lines + } + final int split = line.indexOf('='); + if (split == -1) { + this.logger.error("Failed to find character '=' in line: {" + line + "}. It must consist of a ID=VALUE pair."); + return; // continue on errors + } + final String key = line.substring(0, split); + final String value = FSUtil.decodeNewline(line.substring(split + 1)); + // the leading $ is optional + final Integer id; + try { + id = Integer.valueOf((key.charAt(0) == '$') ? key.substring(1) : key); // NOCS + } catch (final NumberFormatException ex) { + this.logger.error("Error reading mapping file, id must be integer", ex); + return; // continue on errors + } + final String prevVal = stringRegistry.put(id, value); + if (prevVal != null) { + this.logger.error("Found addional entry for id='" + id + "', old value was '" + prevVal + "' new value is '" + value + "'"); + } + } + + /** + * @since 1.10 + */ + public File findMappingFile(final File dirPath) { + File mappingFile = new File(dirPath, FSUtil.MAP_FILENAME); + if (!mappingFile.exists()) { + // No mapping file found. Check whether we find a legacy tpmon.map file! + mappingFile = new File(dirPath, FSUtil.LEGACY_MAP_FILENAME); + if (mappingFile.exists()) { + this.logger.info("Directory '" + dirPath + "' contains no file '" + FSUtil.MAP_FILENAME + "'. Found '" + FSUtil.LEGACY_MAP_FILENAME + + "' ... switching to legacy mode"); + } else { + // no {kieker|tpmon}.map exists. This is valid for very old monitoring logs. Hence, only dump a log.warn + this.logger.warn("No mapping file in directory '" + dirPath.getAbsolutePath() + "'"); + return null; + } + } + + return mappingFile; + } + + /** + * @return <code>null</code> if a file prefix for the given <code>mappingFile</code> is not registered. + * @since 1.10 + */ + public String getFilePrefixFromMappingFile(final File mappingFile) { + return MappingFileParser.filePrefixRegistry.get(mappingFile.getName()); + } +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/BinaryFile2RecordFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/BinaryFile2RecordFilter.java new file mode 100644 index 0000000000000000000000000000000000000000..000de983c26fca83f5b1a494b1ed5f5a0359f676 --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/BinaryFile2RecordFilter.java @@ -0,0 +1,100 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ +package teetime.variant.methodcallWithPorts.stage.kieker.fileToRecord; + +import java.io.DataInputStream; +import java.io.File; +import java.io.IOException; + +import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; +import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository; + +import kieker.common.exception.MonitoringRecordException; +import kieker.common.record.IMonitoringRecord; +import kieker.common.util.filesystem.BinaryCompressionMethod; + +/** + * @author Christian Wulf + * + * @since 1.10 + */ +public class BinaryFile2RecordFilter extends ConsumerStage<File, IMonitoringRecord> { + + private static final int MB = 1024 * 1024; + + private RecordFromBinaryFileCreator recordFromBinaryFileCreator; + + private ClassNameRegistryRepository classNameRegistryRepository; + + /** + * @since 1.10 + */ + public BinaryFile2RecordFilter(final ClassNameRegistryRepository classNameRegistryRepository) { + this(); + this.classNameRegistryRepository = classNameRegistryRepository; + } + + /** + * @since 1.10 + */ + public BinaryFile2RecordFilter() { + super(); + } + + @Override + public void onStart() { + this.recordFromBinaryFileCreator = new RecordFromBinaryFileCreator(this.logger, this.classNameRegistryRepository); + super.onStart(); + } + + public ClassNameRegistryRepository getClassNameRegistryRepository() { + return this.classNameRegistryRepository; + } + + public void setClassNameRegistryRepository(final ClassNameRegistryRepository classNameRegistryRepository) { + this.classNameRegistryRepository = classNameRegistryRepository; + } + + @Override + protected void execute5(final File binaryFile) { + try { + final BinaryCompressionMethod method = BinaryCompressionMethod.getByFileExtension(binaryFile.getName()); + final DataInputStream in = method.getDataInputStream(binaryFile, 1 * MB); + try { + IMonitoringRecord record = this.recordFromBinaryFileCreator.createRecordFromBinaryFile(binaryFile, in); + while (record != null) { + this.send(record); + record = this.recordFromBinaryFileCreator.createRecordFromBinaryFile(binaryFile, in); + } + } catch (final MonitoringRecordException e) { + this.logger.error("Error reading file: " + binaryFile, e); + } finally { + if (in != null) { + try { + in.close(); + } catch (final IOException ex) { + this.logger.error("Exception while closing input stream for processing input file", ex); + } + } + } + } catch (final IOException e) { + this.logger.error("Error reading file: " + binaryFile, e); + } catch (final IllegalArgumentException e) { + this.logger.warn("Unknown file extension for file: " + binaryFile); + } + } + +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/DatFile2RecordFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/DatFile2RecordFilter.java new file mode 100644 index 0000000000000000000000000000000000000000..2660c06c303ebab2a79f0fa29d6a523bbb354e36 --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/DatFile2RecordFilter.java @@ -0,0 +1,45 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ +package teetime.variant.methodcallWithPorts.stage.kieker.fileToRecord; + +import java.io.File; + +import teetime.variant.methodcallWithPorts.framework.core.Pipeline; +import teetime.variant.methodcallWithPorts.framework.core.pipe.SpScPipe; +import teetime.variant.methodcallWithPorts.stage.io.File2TextLinesFilter; +import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository; +import teetime.variant.methodcallWithPorts.stage.kieker.fileToRecord.textLine.TextLine2RecordFilter; + +import kieker.common.record.IMonitoringRecord; + +/** + * @author Christian Wulf + * + * @since 1.10 + */ +public class DatFile2RecordFilter extends Pipeline<File, IMonitoringRecord> { + + public DatFile2RecordFilter(final ClassNameRegistryRepository classNameRegistryRepository) { + final File2TextLinesFilter file2TextLinesFilter = new File2TextLinesFilter(); + final TextLine2RecordFilter textLine2RecordFilter = new TextLine2RecordFilter(classNameRegistryRepository); + + this.setFirstStage(file2TextLinesFilter); + this.setLastStage(textLine2RecordFilter); + + // BETTER let the framework choose the optimal pipe implementation + SpScPipe.connect(file2TextLinesFilter.getOutputPort(), textLine2RecordFilter.getInputPort(), 1); + } +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/RecordFromBinaryFileCreator.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/RecordFromBinaryFileCreator.java new file mode 100644 index 0000000000000000000000000000000000000000..fd1258eb1a874b12cfd43b59b2f7e5f2aa7eff0a --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/RecordFromBinaryFileCreator.java @@ -0,0 +1,117 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ +package teetime.variant.methodcallWithPorts.stage.kieker.fileToRecord; + +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.File; +import java.io.IOException; + +import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistry; +import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository; + +import kieker.common.exception.MonitoringRecordException; +import kieker.common.logging.Log; +import kieker.common.record.AbstractMonitoringRecord; +import kieker.common.record.IMonitoringRecord; + +/** + * @author Christian Wulf + * + * @since 1.10 + */ +public class RecordFromBinaryFileCreator { + + private final Log logger; + private final ClassNameRegistryRepository classNameRegistryRepository; + + public RecordFromBinaryFileCreator(final Log logger, final ClassNameRegistryRepository classNameRegistryRepository) { + this.logger = logger; + this.classNameRegistryRepository = classNameRegistryRepository; + } + + public IMonitoringRecord createRecordFromBinaryFile(final File binaryFile, final DataInputStream inputStream) throws IOException, MonitoringRecordException { + final ClassNameRegistry classNameRegistry = this.classNameRegistryRepository.get(binaryFile.getParentFile()); + + final Integer id; + try { + id = inputStream.readInt(); + } catch (final EOFException eof) { + return null; // we are finished + } + final String classname = classNameRegistry.get(id); + if (classname == null) { + this.logger.error("Missing classname mapping for record type id " + "'" + id + "'"); + return null; // we can't easily recover on errors + } + + final Class<? extends IMonitoringRecord> clazz = AbstractMonitoringRecord.classForName(classname); + final Class<?>[] typeArray = AbstractMonitoringRecord.typesForClass(clazz); + + // read record + final long loggingTimestamp = inputStream.readLong(); // NOPMD (must be read here!) + final Object[] objectArray = new Object[typeArray.length]; + int idx = -1; + for (final Class<?> type : typeArray) { + idx++; + boolean successful = this.writeToObjectArray(inputStream, classNameRegistry, clazz, objectArray, idx, type); + if (!successful) { + return null; + } + } + final IMonitoringRecord record = AbstractMonitoringRecord.createFromArray(clazz, objectArray); + record.setLoggingTimestamp(loggingTimestamp); + + return record; + } + + private boolean writeToObjectArray(final DataInputStream inputStream, final ClassNameRegistry classNameRegistry, final Class<? extends IMonitoringRecord> clazz, + final Object[] objectArray, final int idx, final Class<?> type) throws IOException { + if (type == String.class) { + final Integer strId = inputStream.readInt(); + final String str = classNameRegistry.get(strId); + if (str == null) { + this.logger.error("No String mapping found for id " + strId.toString()); + objectArray[idx] = ""; + } else { + objectArray[idx] = str; + } + } else if ((type == int.class) || (type == Integer.class)) { + objectArray[idx] = inputStream.readInt(); + } else if ((type == long.class) || (type == Long.class)) { + objectArray[idx] = inputStream.readLong(); + } else if ((type == float.class) || (type == Float.class)) { + objectArray[idx] = inputStream.readFloat(); + } else if ((type == double.class) || (type == Double.class)) { + objectArray[idx] = inputStream.readDouble(); + } else if ((type == byte.class) || (type == Byte.class)) { + objectArray[idx] = inputStream.readByte(); + } else if ((type == short.class) || (type == Short.class)) { // NOPMD (short) + objectArray[idx] = inputStream.readShort(); + } else if ((type == boolean.class) || (type == Boolean.class)) { + objectArray[idx] = inputStream.readBoolean(); + } else { + if (inputStream.readByte() != 0) { + this.logger.error("Unexpected value for unsupported type: " + clazz.getName()); + return false; // breaking error (break would not terminate the correct loop) + } + this.logger.warn("Unsupported type: " + clazz.getName()); + objectArray[idx] = null; + } + + return true; + } +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/RecordFromTextLineCreator.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/RecordFromTextLineCreator.java new file mode 100644 index 0000000000000000000000000000000000000000..476a753751d51945033bb6e0ef3ae255fd44c53f --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/RecordFromTextLineCreator.java @@ -0,0 +1,116 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ +package teetime.variant.methodcallWithPorts.stage.kieker.fileToRecord; + +import java.io.File; + +import teetime.variant.explicitScheduling.stage.MappingException; +import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistry; +import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository; + +import kieker.common.exception.IllegalRecordFormatException; +import kieker.common.exception.MonitoringRecordException; +import kieker.common.exception.UnknownRecordTypeException; +import kieker.common.record.AbstractMonitoringRecord; +import kieker.common.record.IMonitoringRecord; +import kieker.common.record.controlflow.OperationExecutionRecord; + +/** + * @author Christian Wulf + * + * @since 1.10 + */ +public class RecordFromTextLineCreator { + + private static final String CSV_SEPARATOR_CHARACTER = ";"; + + private static final IllegalRecordFormatException ILLEGAL_RECORD_FORMAT_EXCEPTION = new IllegalRecordFormatException(); + + private final ClassNameRegistryRepository classNameRegistryRepository; + + public RecordFromTextLineCreator(final ClassNameRegistryRepository classNameRegistryRepository) { + this.classNameRegistryRepository = classNameRegistryRepository; + } + + /** + * @since 1.10 + */ + public IMonitoringRecord createRecordFromLine(final File textFile, final String line) throws MonitoringRecordException, IllegalRecordFormatException, + MappingException, + UnknownRecordTypeException { + final String[] recordFields = line.split(CSV_SEPARATOR_CHARACTER); + + if (recordFields.length < 2) { + throw ILLEGAL_RECORD_FORMAT_EXCEPTION; + } + + final boolean isModernRecord = recordFields[0].charAt(0) == '$'; + if (isModernRecord) { + return this.createModernRecordFromRecordFields(textFile, recordFields); + } else { + return this.createLegacyRecordFromRecordFiels(recordFields); + } + } + + private IMonitoringRecord createModernRecordFromRecordFields(final File textFile, final String[] recordFields) throws MonitoringRecordException, + MappingException, + UnknownRecordTypeException { + final ClassNameRegistry classNameRegistry = this.classNameRegistryRepository.get(textFile.getParentFile()); + final Integer id = Integer.valueOf(recordFields[0].substring(1)); + final String classname = classNameRegistry.get(id); + if (classname == null) { + throw new MappingException("Missing classname mapping for record type id " + "'" + id + "'"); + } + final Class<? extends IMonitoringRecord> clazz = this.getClassByName(classname); + final long loggingTimestamp = Long.parseLong(recordFields[1]); + final int skipValues; + // check for Kieker < 1.6 OperationExecutionRecords + if ((recordFields.length == 11) && clazz.equals(OperationExecutionRecord.class)) { + skipValues = 3; + } else { + skipValues = 2; + } + // Java 1.5 compatibility + final String[] recordFieldsReduced = new String[recordFields.length - skipValues]; + System.arraycopy(recordFields, skipValues, recordFieldsReduced, 0, recordFields.length - skipValues); + // in Java 1.6 this could be simplified to + // recordFieldsReduced = Arrays.copyOfRange(recordFields, skipValues, recordFields.length); + + final IMonitoringRecord record = AbstractMonitoringRecord.createFromStringArray(clazz, recordFieldsReduced); + record.setLoggingTimestamp(loggingTimestamp); + return record; + } + + /** + * @since 1.10 + */ + private Class<? extends IMonitoringRecord> getClassByName(final String classname) throws MonitoringRecordException, UnknownRecordTypeException { + try { + return AbstractMonitoringRecord.classForName(classname); + } catch (final MonitoringRecordException ex) { + throw new UnknownRecordTypeException("Failed to load record type " + classname, classname, ex); + } + } + + /** + * @since 1.10 + */ + private IMonitoringRecord createLegacyRecordFromRecordFiels(final String[] recordFields) throws MonitoringRecordException { + final String[] recordFieldsReduced = new String[recordFields.length - 1]; + System.arraycopy(recordFields, 1, recordFieldsReduced, 0, recordFields.length - 1); + return AbstractMonitoringRecord.createFromStringArray(OperationExecutionRecord.class, recordFieldsReduced); + } +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/ZipFile2RecordFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/ZipFile2RecordFilter.java new file mode 100644 index 0000000000000000000000000000000000000000..938f87bb639cbcbe395ec2a4aec2d5f2b0c2430c --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/ZipFile2RecordFilter.java @@ -0,0 +1,122 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ +package teetime.variant.methodcallWithPorts.stage.kieker.fileToRecord; + +import java.io.BufferedInputStream; +import java.io.BufferedReader; +import java.io.DataInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.UnsupportedEncodingException; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; +import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistry; +import teetime.variant.methodcallWithPorts.stage.kieker.className.MappingFileParser; + +import kieker.common.record.IMonitoringRecord; +import kieker.common.util.filesystem.FSUtil; + +/** + * @author Christian Wulf + * + * @since 1.10 + */ +public class ZipFile2RecordFilter extends ConsumerStage<File, IMonitoringRecord> { + + private final MappingFileParser mappingFileParser; + + /** + * @since 1.10 + */ + public ZipFile2RecordFilter() { + this.mappingFileParser = new MappingFileParser(this.logger); + } + + @Override + protected void execute5(final File zipFile) { + final InputStream mappingFileInputStream = this.findMappingFileInputStream(zipFile); + if (mappingFileInputStream == null) { + return; + } + final ClassNameRegistry classNameRegistry = this.mappingFileParser.parseFromStream(mappingFileInputStream); + + try { + this.createAndSendRecordsFromZipFile(zipFile, classNameRegistry); + } catch (final FileNotFoundException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + private void createAndSendRecordsFromZipFile(final File zipFile, final ClassNameRegistry classNameRegistry) + throws FileNotFoundException { + final ZipInputStream zipInputStream = new ZipInputStream(new FileInputStream(zipFile)); + final BufferedReader reader; + try { + reader = new BufferedReader(new InputStreamReader(zipInputStream, FSUtil.ENCODING)); + } catch (final UnsupportedEncodingException e) { + this.logger.error("This exception should never occur.", e); + return; + } + final DataInputStream input = new DataInputStream(new BufferedInputStream(zipInputStream, 1024 * 1024)); + + ZipEntry zipEntry; + try { + while (null != (zipEntry = zipInputStream.getNextEntry())) { // NOCS NOPMD + final String filename = zipEntry.getName(); + // TODO + } + } catch (final IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + private InputStream findMappingFileInputStream(final File zipFile) { + ZipInputStream zipInputStream = null; + try { + zipInputStream = new ZipInputStream(new FileInputStream(zipFile)); + ZipEntry zipEntry; + while ((null != (zipEntry = zipInputStream.getNextEntry())) && !zipEntry.getName().equals(FSUtil.MAP_FILENAME)) { // NOCS NOPMD + // do nothing, just skip to the map file if present + } + if (null == zipEntry) { + this.logger.error("The zip file does not contain a Kieker log: " + zipFile.toString()); + return null; + } + return zipInputStream; + } catch (final IOException ex) { + this.logger.error("Error accessing ZipInputStream", ex); + } finally { + if (null != zipInputStream) { + try { + zipInputStream.close(); + } catch (final IOException ex) { + this.logger.error("Failed to close ZipInputStream", ex); + } + } + } + + return null; + } + +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/textLine/TextLine2MappingRegistryFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/textLine/TextLine2MappingRegistryFilter.java new file mode 100644 index 0000000000000000000000000000000000000000..a3ddf9229c8214d6764bcf927d10de901fa57c4e --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/textLine/TextLine2MappingRegistryFilter.java @@ -0,0 +1,69 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ + +package teetime.variant.methodcallWithPorts.stage.kieker.fileToRecord.textLine; + +import java.util.Map; + +import teetime.util.list.CommittableQueue; +import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; + +import kieker.common.util.filesystem.FSUtil; + +/** + * @author Christian Wulf + * + * @since 1.10 + */ +public class TextLine2MappingRegistryFilter extends ConsumerStage<String, Void> { + + private final Map<Integer, String> stringRegistry; + + public TextLine2MappingRegistryFilter(final Map<Integer, String> stringRegistry) { + this.stringRegistry = stringRegistry; + } + + @Override + protected void execute4(final CommittableQueue<String> elements) { + throw new IllegalStateException(); + } + + @Override + protected void execute5(final String textLine) { + final int split = textLine.indexOf('='); + if (split == -1) { + this.logger.error("Failed to find character '=' in line: {" + textLine + "}. It must consist of a ID=VALUE pair."); + return; + } + final String key = textLine.substring(0, split); + // BETTER execute split instead of checking it before with multiple string operations + final String value = FSUtil.decodeNewline(textLine.substring(split + 1)); + // the leading $ is optional + final Integer id; + try { + id = Integer.valueOf((key.charAt(0) == '$') ? key.substring(1) : key); // NOCS + } catch (final NumberFormatException ex) { + this.logger.error("Error reading mapping file, id must be integer", ex); + return; // continue on errors + } + final String prevVal = this.stringRegistry.put(id, value); + if (prevVal != null) { + this.logger.error("Found additional entry for id='" + id + "', old value was '" + prevVal + "' new value is '" + value + "'"); + return; + } + } + +} diff --git a/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/textLine/TextLine2RecordFilter.java b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/textLine/TextLine2RecordFilter.java new file mode 100644 index 0000000000000000000000000000000000000000..286ce50261a554b981b6cf40a7da70f5e8a24d54 --- /dev/null +++ b/src/main/java/teetime/variant/methodcallWithPorts/stage/kieker/fileToRecord/textLine/TextLine2RecordFilter.java @@ -0,0 +1,107 @@ +/*************************************************************************** + * Copyright 2014 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ + +package teetime.variant.methodcallWithPorts.stage.kieker.fileToRecord.textLine; + +import java.util.HashSet; +import java.util.Set; + +import teetime.util.list.CommittableQueue; +import teetime.variant.explicitScheduling.stage.MappingException; +import teetime.variant.explicitScheduling.stage.util.TextLine; +import teetime.variant.methodcallWithPorts.framework.core.ConsumerStage; +import teetime.variant.methodcallWithPorts.stage.kieker.className.ClassNameRegistryRepository; +import teetime.variant.methodcallWithPorts.stage.kieker.fileToRecord.RecordFromTextLineCreator; + +import kieker.common.exception.IllegalRecordFormatException; +import kieker.common.exception.MonitoringRecordException; +import kieker.common.exception.UnknownRecordTypeException; +import kieker.common.record.IMonitoringRecord; + +/** + * @author Christian Wulf + * + * @since 1.10 + */ +public class TextLine2RecordFilter extends ConsumerStage<TextLine, IMonitoringRecord> { + + private final Set<String> unknownTypesObserved = new HashSet<String>(); + + private boolean ignoreUnknownRecordTypes; + + private boolean abortDueToUnknownRecordType; + + private RecordFromTextLineCreator recordFromTextLineCreator; + + /** + * @since 1.10 + */ + public TextLine2RecordFilter(final ClassNameRegistryRepository classNameRegistryRepository) { + this.recordFromTextLineCreator = new RecordFromTextLineCreator(classNameRegistryRepository); + } + + /** + * @since 1.10 + */ + public TextLine2RecordFilter() { + super(); + } + + public boolean isIgnoreUnknownRecordTypes() { + return this.ignoreUnknownRecordTypes; + } + + public void setIgnoreUnknownRecordTypes(final boolean ignoreUnknownRecordTypes) { + this.ignoreUnknownRecordTypes = ignoreUnknownRecordTypes; + } + + public RecordFromTextLineCreator getRecordFromTextLineCreator() { + return this.recordFromTextLineCreator; + } + + public void setRecordFromTextLineCreator(final RecordFromTextLineCreator recordFromTextLineCreator) { + this.recordFromTextLineCreator = recordFromTextLineCreator; + } + + @Override + protected void execute4(final CommittableQueue<TextLine> elements) { + throw new IllegalStateException(); + } + + @Override + protected void execute5(final TextLine textLine) { + try { + final IMonitoringRecord record = this.recordFromTextLineCreator.createRecordFromLine(textLine.getTextFile(), textLine.getTextLine()); + this.send(record); + } catch (final MonitoringRecordException e) { + this.logger.error("Could not create record from text line: '" + textLine + "'", e); + } catch (final IllegalRecordFormatException e) { + this.logger.error("Illegal record format: " + textLine, e); + } catch (final MappingException e) { + this.logger.error("", e); + } catch (final UnknownRecordTypeException e) { + final String classname = e.getClassName(); + if (!this.ignoreUnknownRecordTypes) { + this.abortDueToUnknownRecordType = true; + this.logger.error("Failed to load record type " + classname, e); + } else if (!this.unknownTypesObserved.contains(classname)) { + this.unknownTypesObserved.add(classname); + this.logger.error("Failed to load record type " + classname, e); // log once for this type + } + } + } + +}