diff --git a/src/main/java/kieker/analysis/plugin/reader/amqp/RegistryRecordHandler.java b/src/main/java/kieker/analysis/plugin/reader/amqp/RegistryRecordHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..7195a294765a0a607db0cdb31f946765d04dccd0 --- /dev/null +++ b/src/main/java/kieker/analysis/plugin/reader/amqp/RegistryRecordHandler.java @@ -0,0 +1,91 @@ +/*************************************************************************** + * Copyright 2015 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ + +package kieker.analysis.plugin.reader.amqp; + +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +import kieker.common.logging.Log; +import kieker.common.logging.LogFactory; +import kieker.common.record.misc.RegistryRecord; +import kieker.common.util.registry.ILookup; + +/** + * Runnable to handle incoming registry records. + * + * @author holger + * + * @since 1.12 + */ +public class RegistryRecordHandler implements Runnable { + + /** The default queue size for the registry record queue */ + private static final int DEFAULT_QUEUE_SIZE = 1024; + + private static final Log LOG = LogFactory.getLog(RegistryRecordHandler.class); + + private final ILookup<String> stringRegistry; + private final BlockingQueue<ByteBuffer> queue = new ArrayBlockingQueue<ByteBuffer>(DEFAULT_QUEUE_SIZE); + + /** + * Creates a new registry record handler for the given registry. + * + * @param stringRegistry + * The string registry to operate on + */ + public RegistryRecordHandler(final ILookup<String> stringRegistry) { + this.stringRegistry = stringRegistry; + } + + @Override + public void run() { + while (true) { + try { + final ByteBuffer nextRecord = this.queue.take(); + + this.readRegistryRecord(nextRecord); + } catch (final InterruptedException e) { + LOG.error("Registry record handler was interrupted", e); + } + } + } + + /** + * Enqueues an unparsed registry record for processing. + * + * @param buffer + * The unparsed data in an appropriately positioned byte buffer + */ + public void enqueueRegistryRecord(final ByteBuffer buffer) { + try { + this.queue.put(buffer); + } catch (final InterruptedException e) { + LOG.error("Record queue was interrupted", e); + } + } + + private void readRegistryRecord(final ByteBuffer buffer) { + try { + RegistryRecord.registerRecordInRegistry(buffer, this.stringRegistry); + } catch (final BufferUnderflowException e) { + LOG.error("Buffer underflow while reading registry record", e); + } + } + +} diff --git a/src/main/java/kieker/analysis/plugin/reader/amqp/RegularRecordHandler.java b/src/main/java/kieker/analysis/plugin/reader/amqp/RegularRecordHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..683f9290aacf9dc8ba1c3ef22947987cd4a05f76 --- /dev/null +++ b/src/main/java/kieker/analysis/plugin/reader/amqp/RegularRecordHandler.java @@ -0,0 +1,107 @@ +/*************************************************************************** + * Copyright 2015 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ + +package kieker.analysis.plugin.reader.amqp; + +import java.nio.ByteBuffer; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +import kieker.common.exception.RecordInstantiationException; +import kieker.common.logging.Log; +import kieker.common.logging.LogFactory; +import kieker.common.record.IMonitoringRecord; +import kieker.common.record.factory.CachedRecordFactoryCatalog; +import kieker.common.record.factory.IRecordFactory; +import kieker.common.util.registry.ILookup; + +/** + * Runnable to handle incoming regular records. + * + * @author Holger Knoche + * + * @since 1.12 + */ +public class RegularRecordHandler implements Runnable { + + /** Default queue size for the regular record queue */ + private static final int DEFAULT_QUEUE_SIZE = 4096; + + private static final Log LOG = LogFactory.getLog(RegularRecordHandler.class); + + private final ILookup<String> stringRegistry; + private final CachedRecordFactoryCatalog cachedRecordFactoryCatalog = CachedRecordFactoryCatalog.getInstance(); + private final AMQPReader reader; + + private final BlockingQueue<ByteBuffer> queue = new ArrayBlockingQueue<ByteBuffer>(DEFAULT_QUEUE_SIZE); + + /** + * Creates a new regular record handler. + * + * @param reader + * The reader to send the instantiated records to + * @param stringRegistry + * The string registry to use + */ + public RegularRecordHandler(final AMQPReader reader, final ILookup<String> stringRegistry) { + this.reader = reader; + this.stringRegistry = stringRegistry; + } + + @Override + public void run() { + while (true) { + try { + final ByteBuffer nextRecord = this.queue.take(); + + this.readRegularRecord(nextRecord); + } catch (final InterruptedException e) { + LOG.error("Regular record handler was interrupted", e); + } + } + } + + /** + * Enqueues an unparsed regular record for processing. + * + * @param buffer + * The unparsed data in an appropriately positioned byte buffer + */ + public void enqueueRegularRecord(final ByteBuffer buffer) { + try { + this.queue.put(buffer); + } catch (final InterruptedException e) { + LOG.error("Record queue was interrupted", e); + } + } + + private void readRegularRecord(final ByteBuffer buffer) { + final int classId = buffer.getInt(); + final long loggingTimestamp = buffer.getLong(); + + try { + final String recordClassName = this.stringRegistry.get(classId); + final IRecordFactory<? extends IMonitoringRecord> recordFactory = this.cachedRecordFactoryCatalog.get(recordClassName); + final IMonitoringRecord record = recordFactory.create(buffer, this.stringRegistry); + record.setLoggingTimestamp(loggingTimestamp); + + this.reader.deliverRecord(record); + } catch (final RecordInstantiationException e) { + LOG.error("Error instantiating record", e); + } + } + +}