diff --git a/.classpath b/.classpath index fb5011632c0ab8d6649a148c6fb5845a1b34c747..e5536a4fdcf434c6206788e092e637f081337596 100644 --- a/.classpath +++ b/.classpath @@ -1,6 +1,8 @@ <?xml version="1.0" encoding="UTF-8"?> <classpath> - <classpathentry kind="src" path="src"/> + <classpathentry excluding="kieker/analysis/plugin/filter/flow/" kind="src" path="src"/> <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/> + <classpathentry kind="lib" path="lib/kieker-1.8-SNAPSHOT.jar"/> + <classpathentry kind="lib" path="lib/rabbitmq-client.jar"/> <classpathentry kind="output" path="bin"/> </classpath> diff --git a/lib/LICENSE-RabbitMQ/LICENSE b/lib/LICENSE-RabbitMQ/LICENSE new file mode 100644 index 0000000000000000000000000000000000000000..8edaa29c0cecc064c42a2350b74818b7204cdd30 --- /dev/null +++ b/lib/LICENSE-RabbitMQ/LICENSE @@ -0,0 +1,11 @@ +This package, the RabbitMQ Java client library is dual-licensed under +the MPL and the GPL v2. For the MPL, please see +LICENSE-MPL-RabbitMQ. For the GPL v2, please see LICENSE-GPL2. + +The RabbitMQ Java client library includes software under the Apache +Licence v2. For this license, please see LICENSE-APACHE2. For +attribution of copyright and other details of provenance, please refer +to the source code. + +If you have any questions regarding licensing, please contact us at +info@rabbitmq.com. diff --git a/lib/LICENSE-RabbitMQ/LICENSE-APACHE2 b/lib/LICENSE-RabbitMQ/LICENSE-APACHE2 new file mode 100644 index 0000000000000000000000000000000000000000..d645695673349e3947e8e5ae42332d0ac3164cd7 --- /dev/null +++ b/lib/LICENSE-RabbitMQ/LICENSE-APACHE2 @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/lib/LICENSE-RabbitMQ/LICENSE-GPL2 b/lib/LICENSE-RabbitMQ/LICENSE-GPL2 new file mode 100644 index 0000000000000000000000000000000000000000..d511905c1647a1e311e8b20d5930a37a9c2531cd --- /dev/null +++ b/lib/LICENSE-RabbitMQ/LICENSE-GPL2 @@ -0,0 +1,339 @@ + GNU GENERAL PUBLIC LICENSE + Version 2, June 1991 + + Copyright (C) 1989, 1991 Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + Everyone is permitted to copy and distribute verbatim copies + of this license document, but changing it is not allowed. + + Preamble + + The licenses for most software are designed to take away your +freedom to share and change it. By contrast, the GNU General Public +License is intended to guarantee your freedom to share and change free +software--to make sure the software is free for all its users. This +General Public License applies to most of the Free Software +Foundation's software and to any other program whose authors commit to +using it. (Some other Free Software Foundation software is covered by +the GNU Lesser General Public License instead.) You can apply it to +your programs, too. + + When we speak of free software, we are referring to freedom, not +price. Our General Public Licenses are designed to make sure that you +have the freedom to distribute copies of free software (and charge for +this service if you wish), that you receive source code or can get it +if you want it, that you can change the software or use pieces of it +in new free programs; and that you know you can do these things. + + To protect your rights, we need to make restrictions that forbid +anyone to deny you these rights or to ask you to surrender the rights. +These restrictions translate to certain responsibilities for you if you +distribute copies of the software, or if you modify it. + + For example, if you distribute copies of such a program, whether +gratis or for a fee, you must give the recipients all the rights that +you have. You must make sure that they, too, receive or can get the +source code. And you must show them these terms so they know their +rights. + + We protect your rights with two steps: (1) copyright the software, and +(2) offer you this license which gives you legal permission to copy, +distribute and/or modify the software. + + Also, for each author's protection and ours, we want to make certain +that everyone understands that there is no warranty for this free +software. If the software is modified by someone else and passed on, we +want its recipients to know that what they have is not the original, so +that any problems introduced by others will not reflect on the original +authors' reputations. + + Finally, any free program is threatened constantly by software +patents. We wish to avoid the danger that redistributors of a free +program will individually obtain patent licenses, in effect making the +program proprietary. To prevent this, we have made it clear that any +patent must be licensed for everyone's free use or not licensed at all. + + The precise terms and conditions for copying, distribution and +modification follow. + + GNU GENERAL PUBLIC LICENSE + TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION + + 0. This License applies to any program or other work which contains +a notice placed by the copyright holder saying it may be distributed +under the terms of this General Public License. The "Program", below, +refers to any such program or work, and a "work based on the Program" +means either the Program or any derivative work under copyright law: +that is to say, a work containing the Program or a portion of it, +either verbatim or with modifications and/or translated into another +language. (Hereinafter, translation is included without limitation in +the term "modification".) Each licensee is addressed as "you". + +Activities other than copying, distribution and modification are not +covered by this License; they are outside its scope. The act of +running the Program is not restricted, and the output from the Program +is covered only if its contents constitute a work based on the +Program (independent of having been made by running the Program). +Whether that is true depends on what the Program does. + + 1. You may copy and distribute verbatim copies of the Program's +source code as you receive it, in any medium, provided that you +conspicuously and appropriately publish on each copy an appropriate +copyright notice and disclaimer of warranty; keep intact all the +notices that refer to this License and to the absence of any warranty; +and give any other recipients of the Program a copy of this License +along with the Program. + +You may charge a fee for the physical act of transferring a copy, and +you may at your option offer warranty protection in exchange for a fee. + + 2. You may modify your copy or copies of the Program or any portion +of it, thus forming a work based on the Program, and copy and +distribute such modifications or work under the terms of Section 1 +above, provided that you also meet all of these conditions: + + a) You must cause the modified files to carry prominent notices + stating that you changed the files and the date of any change. + + b) You must cause any work that you distribute or publish, that in + whole or in part contains or is derived from the Program or any + part thereof, to be licensed as a whole at no charge to all third + parties under the terms of this License. + + c) If the modified program normally reads commands interactively + when run, you must cause it, when started running for such + interactive use in the most ordinary way, to print or display an + announcement including an appropriate copyright notice and a + notice that there is no warranty (or else, saying that you provide + a warranty) and that users may redistribute the program under + these conditions, and telling the user how to view a copy of this + License. (Exception: if the Program itself is interactive but + does not normally print such an announcement, your work based on + the Program is not required to print an announcement.) + +These requirements apply to the modified work as a whole. If +identifiable sections of that work are not derived from the Program, +and can be reasonably considered independent and separate works in +themselves, then this License, and its terms, do not apply to those +sections when you distribute them as separate works. But when you +distribute the same sections as part of a whole which is a work based +on the Program, the distribution of the whole must be on the terms of +this License, whose permissions for other licensees extend to the +entire whole, and thus to each and every part regardless of who wrote it. + +Thus, it is not the intent of this section to claim rights or contest +your rights to work written entirely by you; rather, the intent is to +exercise the right to control the distribution of derivative or +collective works based on the Program. + +In addition, mere aggregation of another work not based on the Program +with the Program (or with a work based on the Program) on a volume of +a storage or distribution medium does not bring the other work under +the scope of this License. + + 3. You may copy and distribute the Program (or a work based on it, +under Section 2) in object code or executable form under the terms of +Sections 1 and 2 above provided that you also do one of the following: + + a) Accompany it with the complete corresponding machine-readable + source code, which must be distributed under the terms of Sections + 1 and 2 above on a medium customarily used for software interchange; or, + + b) Accompany it with a written offer, valid for at least three + years, to give any third party, for a charge no more than your + cost of physically performing source distribution, a complete + machine-readable copy of the corresponding source code, to be + distributed under the terms of Sections 1 and 2 above on a medium + customarily used for software interchange; or, + + c) Accompany it with the information you received as to the offer + to distribute corresponding source code. (This alternative is + allowed only for noncommercial distribution and only if you + received the program in object code or executable form with such + an offer, in accord with Subsection b above.) + +The source code for a work means the preferred form of the work for +making modifications to it. For an executable work, complete source +code means all the source code for all modules it contains, plus any +associated interface definition files, plus the scripts used to +control compilation and installation of the executable. However, as a +special exception, the source code distributed need not include +anything that is normally distributed (in either source or binary +form) with the major components (compiler, kernel, and so on) of the +operating system on which the executable runs, unless that component +itself accompanies the executable. + +If distribution of executable or object code is made by offering +access to copy from a designated place, then offering equivalent +access to copy the source code from the same place counts as +distribution of the source code, even though third parties are not +compelled to copy the source along with the object code. + + 4. You may not copy, modify, sublicense, or distribute the Program +except as expressly provided under this License. Any attempt +otherwise to copy, modify, sublicense or distribute the Program is +void, and will automatically terminate your rights under this License. +However, parties who have received copies, or rights, from you under +this License will not have their licenses terminated so long as such +parties remain in full compliance. + + 5. You are not required to accept this License, since you have not +signed it. However, nothing else grants you permission to modify or +distribute the Program or its derivative works. These actions are +prohibited by law if you do not accept this License. Therefore, by +modifying or distributing the Program (or any work based on the +Program), you indicate your acceptance of this License to do so, and +all its terms and conditions for copying, distributing or modifying +the Program or works based on it. + + 6. Each time you redistribute the Program (or any work based on the +Program), the recipient automatically receives a license from the +original licensor to copy, distribute or modify the Program subject to +these terms and conditions. You may not impose any further +restrictions on the recipients' exercise of the rights granted herein. +You are not responsible for enforcing compliance by third parties to +this License. + + 7. If, as a consequence of a court judgment or allegation of patent +infringement or for any other reason (not limited to patent issues), +conditions are imposed on you (whether by court order, agreement or +otherwise) that contradict the conditions of this License, they do not +excuse you from the conditions of this License. If you cannot +distribute so as to satisfy simultaneously your obligations under this +License and any other pertinent obligations, then as a consequence you +may not distribute the Program at all. For example, if a patent +license would not permit royalty-free redistribution of the Program by +all those who receive copies directly or indirectly through you, then +the only way you could satisfy both it and this License would be to +refrain entirely from distribution of the Program. + +If any portion of this section is held invalid or unenforceable under +any particular circumstance, the balance of the section is intended to +apply and the section as a whole is intended to apply in other +circumstances. + +It is not the purpose of this section to induce you to infringe any +patents or other property right claims or to contest validity of any +such claims; this section has the sole purpose of protecting the +integrity of the free software distribution system, which is +implemented by public license practices. Many people have made +generous contributions to the wide range of software distributed +through that system in reliance on consistent application of that +system; it is up to the author/donor to decide if he or she is willing +to distribute software through any other system and a licensee cannot +impose that choice. + +This section is intended to make thoroughly clear what is believed to +be a consequence of the rest of this License. + + 8. If the distribution and/or use of the Program is restricted in +certain countries either by patents or by copyrighted interfaces, the +original copyright holder who places the Program under this License +may add an explicit geographical distribution limitation excluding +those countries, so that distribution is permitted only in or among +countries not thus excluded. In such case, this License incorporates +the limitation as if written in the body of this License. + + 9. The Free Software Foundation may publish revised and/or new versions +of the General Public License from time to time. Such new versions will +be similar in spirit to the present version, but may differ in detail to +address new problems or concerns. + +Each version is given a distinguishing version number. If the Program +specifies a version number of this License which applies to it and "any +later version", you have the option of following the terms and conditions +either of that version or of any later version published by the Free +Software Foundation. If the Program does not specify a version number of +this License, you may choose any version ever published by the Free Software +Foundation. + + 10. If you wish to incorporate parts of the Program into other free +programs whose distribution conditions are different, write to the author +to ask for permission. For software which is copyrighted by the Free +Software Foundation, write to the Free Software Foundation; we sometimes +make exceptions for this. Our decision will be guided by the two goals +of preserving the free status of all derivatives of our free software and +of promoting the sharing and reuse of software generally. + + NO WARRANTY + + 11. BECAUSE THE PROGRAM IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY +FOR THE PROGRAM, TO THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT WHEN +OTHERWISE STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR OTHER PARTIES +PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED +OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE ENTIRE RISK AS +TO THE QUALITY AND PERFORMANCE OF THE PROGRAM IS WITH YOU. SHOULD THE +PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF ALL NECESSARY SERVICING, +REPAIR OR CORRECTION. + + 12. IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING +WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR +REDISTRIBUTE THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES, +INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING +OUT OF THE USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED +TO LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY +YOU OR THIRD PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER +PROGRAMS), EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE +POSSIBILITY OF SUCH DAMAGES. + + END OF TERMS AND CONDITIONS + + How to Apply These Terms to Your New Programs + + If you develop a new program, and you want it to be of the greatest +possible use to the public, the best way to achieve this is to make it +free software which everyone can redistribute and change under these terms. + + To do so, attach the following notices to the program. It is safest +to attach them to the start of each source file to most effectively +convey the exclusion of warranty; and each file should have at least +the "copyright" line and a pointer to where the full notice is found. + + <one line to give the program's name and a brief idea of what it does.> + Copyright (C) <year> <name of author> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +Also add information on how to contact you by electronic and paper mail. + +If the program is interactive, make it output a short notice like this +when it starts in an interactive mode: + + Gnomovision version 69, Copyright (C) year name of author + Gnomovision comes with ABSOLUTELY NO WARRANTY; for details type `show w'. + This is free software, and you are welcome to redistribute it + under certain conditions; type `show c' for details. + +The hypothetical commands `show w' and `show c' should show the appropriate +parts of the General Public License. Of course, the commands you use may +be called something other than `show w' and `show c'; they could even be +mouse-clicks or menu items--whatever suits your program. + +You should also get your employer (if you work as a programmer) or your +school, if any, to sign a "copyright disclaimer" for the program, if +necessary. Here is a sample; alter the names: + + Yoyodyne, Inc., hereby disclaims all copyright interest in the program + `Gnomovision' (which makes passes at compilers) written by James Hacker. + + <signature of Ty Coon>, 1 April 1989 + Ty Coon, President of Vice + +This General Public License does not permit incorporating your program into +proprietary programs. If your program is a subroutine library, you may +consider it more useful to permit linking proprietary applications with the +library. If this is what you want to do, use the GNU Lesser General +Public License instead of this License. diff --git a/lib/LICENSE-RabbitMQ/LICENSE-MPL-RabbitMQ b/lib/LICENSE-RabbitMQ/LICENSE-MPL-RabbitMQ new file mode 100644 index 0000000000000000000000000000000000000000..4cdf783b1dac19f368a061ca324163568659ed7b --- /dev/null +++ b/lib/LICENSE-RabbitMQ/LICENSE-MPL-RabbitMQ @@ -0,0 +1,455 @@ + MOZILLA PUBLIC LICENSE + Version 1.1 + + --------------- + +1. Definitions. + + 1.0.1. "Commercial Use" means distribution or otherwise making the + Covered Code available to a third party. + + 1.1. "Contributor" means each entity that creates or contributes to + the creation of Modifications. + + 1.2. "Contributor Version" means the combination of the Original + Code, prior Modifications used by a Contributor, and the Modifications + made by that particular Contributor. + + 1.3. "Covered Code" means the Original Code or Modifications or the + combination of the Original Code and Modifications, in each case + including portions thereof. + + 1.4. "Electronic Distribution Mechanism" means a mechanism generally + accepted in the software development community for the electronic + transfer of data. + + 1.5. "Executable" means Covered Code in any form other than Source + Code. + + 1.6. "Initial Developer" means the individual or entity identified + as the Initial Developer in the Source Code notice required by Exhibit + A. + + 1.7. "Larger Work" means a work which combines Covered Code or + portions thereof with code not governed by the terms of this License. + + 1.8. "License" means this document. + + 1.8.1. "Licensable" means having the right to grant, to the maximum + extent possible, whether at the time of the initial grant or + subsequently acquired, any and all of the rights conveyed herein. + + 1.9. "Modifications" means any addition to or deletion from the + substance or structure of either the Original Code or any previous + Modifications. When Covered Code is released as a series of files, a + Modification is: + A. Any addition to or deletion from the contents of a file + containing Original Code or previous Modifications. + + B. Any new file that contains any part of the Original Code or + previous Modifications. + + 1.10. "Original Code" means Source Code of computer software code + which is described in the Source Code notice required by Exhibit A as + Original Code, and which, at the time of its release under this + License is not already Covered Code governed by this License. + + 1.10.1. "Patent Claims" means any patent claim(s), now owned or + hereafter acquired, including without limitation, method, process, + and apparatus claims, in any patent Licensable by grantor. + + 1.11. "Source Code" means the preferred form of the Covered Code for + making modifications to it, including all modules it contains, plus + any associated interface definition files, scripts used to control + compilation and installation of an Executable, or source code + differential comparisons against either the Original Code or another + well known, available Covered Code of the Contributor's choice. The + Source Code can be in a compressed or archival form, provided the + appropriate decompression or de-archiving software is widely available + for no charge. + + 1.12. "You" (or "Your") means an individual or a legal entity + exercising rights under, and complying with all of the terms of, this + License or a future version of this License issued under Section 6.1. + For legal entities, "You" includes any entity which controls, is + controlled by, or is under common control with You. For purposes of + this definition, "control" means (a) the power, direct or indirect, + to cause the direction or management of such entity, whether by + contract or otherwise, or (b) ownership of more than fifty percent + (50%) of the outstanding shares or beneficial ownership of such + entity. + +2. Source Code License. + + 2.1. The Initial Developer Grant. + The Initial Developer hereby grants You a world-wide, royalty-free, + non-exclusive license, subject to third party intellectual property + claims: + (a) under intellectual property rights (other than patent or + trademark) Licensable by Initial Developer to use, reproduce, + modify, display, perform, sublicense and distribute the Original + Code (or portions thereof) with or without Modifications, and/or + as part of a Larger Work; and + + (b) under Patents Claims infringed by the making, using or + selling of Original Code, to make, have made, use, practice, + sell, and offer for sale, and/or otherwise dispose of the + Original Code (or portions thereof). + + (c) the licenses granted in this Section 2.1(a) and (b) are + effective on the date Initial Developer first distributes + Original Code under the terms of this License. + + (d) Notwithstanding Section 2.1(b) above, no patent license is + granted: 1) for code that You delete from the Original Code; 2) + separate from the Original Code; or 3) for infringements caused + by: i) the modification of the Original Code or ii) the + combination of the Original Code with other software or devices. + + 2.2. Contributor Grant. + Subject to third party intellectual property claims, each Contributor + hereby grants You a world-wide, royalty-free, non-exclusive license + + (a) under intellectual property rights (other than patent or + trademark) Licensable by Contributor, to use, reproduce, modify, + display, perform, sublicense and distribute the Modifications + created by such Contributor (or portions thereof) either on an + unmodified basis, with other Modifications, as Covered Code + and/or as part of a Larger Work; and + + (b) under Patent Claims infringed by the making, using, or + selling of Modifications made by that Contributor either alone + and/or in combination with its Contributor Version (or portions + of such combination), to make, use, sell, offer for sale, have + made, and/or otherwise dispose of: 1) Modifications made by that + Contributor (or portions thereof); and 2) the combination of + Modifications made by that Contributor with its Contributor + Version (or portions of such combination). + + (c) the licenses granted in Sections 2.2(a) and 2.2(b) are + effective on the date Contributor first makes Commercial Use of + the Covered Code. + + (d) Notwithstanding Section 2.2(b) above, no patent license is + granted: 1) for any code that Contributor has deleted from the + Contributor Version; 2) separate from the Contributor Version; + 3) for infringements caused by: i) third party modifications of + Contributor Version or ii) the combination of Modifications made + by that Contributor with other software (except as part of the + Contributor Version) or other devices; or 4) under Patent Claims + infringed by Covered Code in the absence of Modifications made by + that Contributor. + +3. Distribution Obligations. + + 3.1. Application of License. + The Modifications which You create or to which You contribute are + governed by the terms of this License, including without limitation + Section 2.2. The Source Code version of Covered Code may be + distributed only under the terms of this License or a future version + of this License released under Section 6.1, and You must include a + copy of this License with every copy of the Source Code You + distribute. You may not offer or impose any terms on any Source Code + version that alters or restricts the applicable version of this + License or the recipients' rights hereunder. However, You may include + an additional document offering the additional rights described in + Section 3.5. + + 3.2. Availability of Source Code. + Any Modification which You create or to which You contribute must be + made available in Source Code form under the terms of this License + either on the same media as an Executable version or via an accepted + Electronic Distribution Mechanism to anyone to whom you made an + Executable version available; and if made available via Electronic + Distribution Mechanism, must remain available for at least twelve (12) + months after the date it initially became available, or at least six + (6) months after a subsequent version of that particular Modification + has been made available to such recipients. You are responsible for + ensuring that the Source Code version remains available even if the + Electronic Distribution Mechanism is maintained by a third party. + + 3.3. Description of Modifications. + You must cause all Covered Code to which You contribute to contain a + file documenting the changes You made to create that Covered Code and + the date of any change. You must include a prominent statement that + the Modification is derived, directly or indirectly, from Original + Code provided by the Initial Developer and including the name of the + Initial Developer in (a) the Source Code, and (b) in any notice in an + Executable version or related documentation in which You describe the + origin or ownership of the Covered Code. + + 3.4. Intellectual Property Matters + (a) Third Party Claims. + If Contributor has knowledge that a license under a third party's + intellectual property rights is required to exercise the rights + granted by such Contributor under Sections 2.1 or 2.2, + Contributor must include a text file with the Source Code + distribution titled "LEGAL" which describes the claim and the + party making the claim in sufficient detail that a recipient will + know whom to contact. If Contributor obtains such knowledge after + the Modification is made available as described in Section 3.2, + Contributor shall promptly modify the LEGAL file in all copies + Contributor makes available thereafter and shall take other steps + (such as notifying appropriate mailing lists or newsgroups) + reasonably calculated to inform those who received the Covered + Code that new knowledge has been obtained. + + (b) Contributor APIs. + If Contributor's Modifications include an application programming + interface and Contributor has knowledge of patent licenses which + are reasonably necessary to implement that API, Contributor must + also include this information in the LEGAL file. + + (c) Representations. + Contributor represents that, except as disclosed pursuant to + Section 3.4(a) above, Contributor believes that Contributor's + Modifications are Contributor's original creation(s) and/or + Contributor has sufficient rights to grant the rights conveyed by + this License. + + 3.5. Required Notices. + You must duplicate the notice in Exhibit A in each file of the Source + Code. If it is not possible to put such notice in a particular Source + Code file due to its structure, then You must include such notice in a + location (such as a relevant directory) where a user would be likely + to look for such a notice. If You created one or more Modification(s) + You may add your name as a Contributor to the notice described in + Exhibit A. You must also duplicate this License in any documentation + for the Source Code where You describe recipients' rights or ownership + rights relating to Covered Code. You may choose to offer, and to + charge a fee for, warranty, support, indemnity or liability + obligations to one or more recipients of Covered Code. However, You + may do so only on Your own behalf, and not on behalf of the Initial + Developer or any Contributor. You must make it absolutely clear than + any such warranty, support, indemnity or liability obligation is + offered by You alone, and You hereby agree to indemnify the Initial + Developer and every Contributor for any liability incurred by the + Initial Developer or such Contributor as a result of warranty, + support, indemnity or liability terms You offer. + + 3.6. Distribution of Executable Versions. + You may distribute Covered Code in Executable form only if the + requirements of Section 3.1-3.5 have been met for that Covered Code, + and if You include a notice stating that the Source Code version of + the Covered Code is available under the terms of this License, + including a description of how and where You have fulfilled the + obligations of Section 3.2. The notice must be conspicuously included + in any notice in an Executable version, related documentation or + collateral in which You describe recipients' rights relating to the + Covered Code. You may distribute the Executable version of Covered + Code or ownership rights under a license of Your choice, which may + contain terms different from this License, provided that You are in + compliance with the terms of this License and that the license for the + Executable version does not attempt to limit or alter the recipient's + rights in the Source Code version from the rights set forth in this + License. If You distribute the Executable version under a different + license You must make it absolutely clear that any terms which differ + from this License are offered by You alone, not by the Initial + Developer or any Contributor. You hereby agree to indemnify the + Initial Developer and every Contributor for any liability incurred by + the Initial Developer or such Contributor as a result of any such + terms You offer. + + 3.7. Larger Works. + You may create a Larger Work by combining Covered Code with other code + not governed by the terms of this License and distribute the Larger + Work as a single product. In such a case, You must make sure the + requirements of this License are fulfilled for the Covered Code. + +4. Inability to Comply Due to Statute or Regulation. + + If it is impossible for You to comply with any of the terms of this + License with respect to some or all of the Covered Code due to + statute, judicial order, or regulation then You must: (a) comply with + the terms of this License to the maximum extent possible; and (b) + describe the limitations and the code they affect. Such description + must be included in the LEGAL file described in Section 3.4 and must + be included with all distributions of the Source Code. Except to the + extent prohibited by statute or regulation, such description must be + sufficiently detailed for a recipient of ordinary skill to be able to + understand it. + +5. Application of this License. + + This License applies to code to which the Initial Developer has + attached the notice in Exhibit A and to related Covered Code. + +6. Versions of the License. + + 6.1. New Versions. + Netscape Communications Corporation ("Netscape") may publish revised + and/or new versions of the License from time to time. Each version + will be given a distinguishing version number. + + 6.2. Effect of New Versions. + Once Covered Code has been published under a particular version of the + License, You may always continue to use it under the terms of that + version. You may also choose to use such Covered Code under the terms + of any subsequent version of the License published by Netscape. No one + other than Netscape has the right to modify the terms applicable to + Covered Code created under this License. + + 6.3. Derivative Works. + If You create or use a modified version of this License (which you may + only do in order to apply it to code which is not already Covered Code + governed by this License), You must (a) rename Your license so that + the phrases "Mozilla", "MOZILLAPL", "MOZPL", "Netscape", + "MPL", "NPL" or any confusingly similar phrase do not appear in your + license (except to note that your license differs from this License) + and (b) otherwise make it clear that Your version of the license + contains terms which differ from the Mozilla Public License and + Netscape Public License. (Filling in the name of the Initial + Developer, Original Code or Contributor in the notice described in + Exhibit A shall not of themselves be deemed to be modifications of + this License.) + +7. DISCLAIMER OF WARRANTY. + + COVERED CODE IS PROVIDED UNDER THIS LICENSE ON AN "AS IS" BASIS, + WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, + WITHOUT LIMITATION, WARRANTIES THAT THE COVERED CODE IS FREE OF + DEFECTS, MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE OR NON-INFRINGING. + THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE COVERED CODE + IS WITH YOU. SHOULD ANY COVERED CODE PROVE DEFECTIVE IN ANY RESPECT, + YOU (NOT THE INITIAL DEVELOPER OR ANY OTHER CONTRIBUTOR) ASSUME THE + COST OF ANY NECESSARY SERVICING, REPAIR OR CORRECTION. THIS DISCLAIMER + OF WARRANTY CONSTITUTES AN ESSENTIAL PART OF THIS LICENSE. NO USE OF + ANY COVERED CODE IS AUTHORIZED HEREUNDER EXCEPT UNDER THIS DISCLAIMER. + +8. TERMINATION. + + 8.1. This License and the rights granted hereunder will terminate + automatically if You fail to comply with terms herein and fail to cure + such breach within 30 days of becoming aware of the breach. All + sublicenses to the Covered Code which are properly granted shall + survive any termination of this License. Provisions which, by their + nature, must remain in effect beyond the termination of this License + shall survive. + + 8.2. If You initiate litigation by asserting a patent infringement + claim (excluding declatory judgment actions) against Initial Developer + or a Contributor (the Initial Developer or Contributor against whom + You file such action is referred to as "Participant") alleging that: + + (a) such Participant's Contributor Version directly or indirectly + infringes any patent, then any and all rights granted by such + Participant to You under Sections 2.1 and/or 2.2 of this License + shall, upon 60 days notice from Participant terminate prospectively, + unless if within 60 days after receipt of notice You either: (i) + agree in writing to pay Participant a mutually agreeable reasonable + royalty for Your past and future use of Modifications made by such + Participant, or (ii) withdraw Your litigation claim with respect to + the Contributor Version against such Participant. If within 60 days + of notice, a reasonable royalty and payment arrangement are not + mutually agreed upon in writing by the parties or the litigation claim + is not withdrawn, the rights granted by Participant to You under + Sections 2.1 and/or 2.2 automatically terminate at the expiration of + the 60 day notice period specified above. + + (b) any software, hardware, or device, other than such Participant's + Contributor Version, directly or indirectly infringes any patent, then + any rights granted to You by such Participant under Sections 2.1(b) + and 2.2(b) are revoked effective as of the date You first made, used, + sold, distributed, or had made, Modifications made by that + Participant. + + 8.3. If You assert a patent infringement claim against Participant + alleging that such Participant's Contributor Version directly or + indirectly infringes any patent where such claim is resolved (such as + by license or settlement) prior to the initiation of patent + infringement litigation, then the reasonable value of the licenses + granted by such Participant under Sections 2.1 or 2.2 shall be taken + into account in determining the amount or value of any payment or + license. + + 8.4. In the event of termination under Sections 8.1 or 8.2 above, + all end user license agreements (excluding distributors and resellers) + which have been validly granted by You or any distributor hereunder + prior to termination shall survive termination. + +9. LIMITATION OF LIABILITY. + + UNDER NO CIRCUMSTANCES AND UNDER NO LEGAL THEORY, WHETHER TORT + (INCLUDING NEGLIGENCE), CONTRACT, OR OTHERWISE, SHALL YOU, THE INITIAL + DEVELOPER, ANY OTHER CONTRIBUTOR, OR ANY DISTRIBUTOR OF COVERED CODE, + OR ANY SUPPLIER OF ANY OF SUCH PARTIES, BE LIABLE TO ANY PERSON FOR + ANY INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES OF ANY + CHARACTER INCLUDING, WITHOUT LIMITATION, DAMAGES FOR LOSS OF GOODWILL, + WORK STOPPAGE, COMPUTER FAILURE OR MALFUNCTION, OR ANY AND ALL OTHER + COMMERCIAL DAMAGES OR LOSSES, EVEN IF SUCH PARTY SHALL HAVE BEEN + INFORMED OF THE POSSIBILITY OF SUCH DAMAGES. THIS LIMITATION OF + LIABILITY SHALL NOT APPLY TO LIABILITY FOR DEATH OR PERSONAL INJURY + RESULTING FROM SUCH PARTY'S NEGLIGENCE TO THE EXTENT APPLICABLE LAW + PROHIBITS SUCH LIMITATION. SOME JURISDICTIONS DO NOT ALLOW THE + EXCLUSION OR LIMITATION OF INCIDENTAL OR CONSEQUENTIAL DAMAGES, SO + THIS EXCLUSION AND LIMITATION MAY NOT APPLY TO YOU. + +10. U.S. GOVERNMENT END USERS. + + The Covered Code is a "commercial item," as that term is defined in + 48 C.F.R. 2.101 (Oct. 1995), consisting of "commercial computer + software" and "commercial computer software documentation," as such + terms are used in 48 C.F.R. 12.212 (Sept. 1995). Consistent with 48 + C.F.R. 12.212 and 48 C.F.R. 227.7202-1 through 227.7202-4 (June 1995), + all U.S. Government End Users acquire Covered Code with only those + rights set forth herein. + +11. MISCELLANEOUS. + + This License represents the complete agreement concerning subject + matter hereof. If any provision of this License is held to be + unenforceable, such provision shall be reformed only to the extent + necessary to make it enforceable. This License shall be governed by + California law provisions (except to the extent applicable law, if + any, provides otherwise), excluding its conflict-of-law provisions. + With respect to disputes in which at least one party is a citizen of, + or an entity chartered or registered to do business in the United + States of America, any litigation relating to this License shall be + subject to the jurisdiction of the Federal Courts of the Northern + District of California, with venue lying in Santa Clara County, + California, with the losing party responsible for costs, including + without limitation, court costs and reasonable attorneys' fees and + expenses. The application of the United Nations Convention on + Contracts for the International Sale of Goods is expressly excluded. + Any law or regulation which provides that the language of a contract + shall be construed against the drafter shall not apply to this + License. + +12. RESPONSIBILITY FOR CLAIMS. + + As between Initial Developer and the Contributors, each party is + responsible for claims and damages arising, directly or indirectly, + out of its utilization of rights under this License and You agree to + work with Initial Developer and Contributors to distribute such + responsibility on an equitable basis. Nothing herein is intended or + shall be deemed to constitute any admission of liability. + +13. MULTIPLE-LICENSED CODE. + + Initial Developer may designate portions of the Covered Code as + "Multiple-Licensed". "Multiple-Licensed" means that the Initial + Developer permits you to utilize portions of the Covered Code under + Your choice of the NPL or the alternative licenses, if any, specified + by the Initial Developer in the file described in Exhibit A. + +EXHIBIT A -Mozilla Public License. + + ``The contents of this file are subject to the Mozilla Public License + Version 1.1 (the "License"); you may not use this file except in + compliance with the License. You may obtain a copy of the License at + http://www.mozilla.org/MPL/ + + Software distributed under the License is distributed on an "AS IS" + basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the + License for the specific language governing rights and limitations + under the License. + + The Original Code is RabbitMQ. + + The Initial Developer of the Original Code is VMware, Inc. + Copyright (c) 2007-2013 VMware, Inc. All rights reserved.'' + + [NOTE: The text of this Exhibit A may differ slightly from the text of + the notices in the Source Code files of the Original Code. You should + use the text of this Exhibit A rather than the text found in the + Original Code Source Code for Your Modifications.] diff --git a/lib/commons-cli-1.1.jar b/lib/commons-cli-1.1.jar new file mode 100644 index 0000000000000000000000000000000000000000..e633afbe6842aa92b1a8f0ff3f5b8c0e3283961b Binary files /dev/null and b/lib/commons-cli-1.1.jar differ diff --git a/lib/commons-io-1.2.jar b/lib/commons-io-1.2.jar new file mode 100644 index 0000000000000000000000000000000000000000..b2867cdde4284228f2adc51e8a0358972bccaaf1 Binary files /dev/null and b/lib/commons-io-1.2.jar differ diff --git a/lib/junit.jar b/lib/junit.jar new file mode 100644 index 0000000000000000000000000000000000000000..674d71e89ea154dbe2e3cd032821c22b39e8fd68 Binary files /dev/null and b/lib/junit.jar differ diff --git a/lib/rabbitmq-client.jar b/lib/rabbitmq-client.jar new file mode 100644 index 0000000000000000000000000000000000000000..548a994cca4d17c719c480a289139411816a8d98 Binary files /dev/null and b/lib/rabbitmq-client.jar differ diff --git a/src/kieker/analysis/plugin/connector/mq/RabbitMQConnector.java b/src/kieker/analysis/plugin/connector/mq/RabbitMQConnector.java new file mode 100644 index 0000000000000000000000000000000000000000..a70b5127a39538e5ae68d9dd5e493fdc7dced306 --- /dev/null +++ b/src/kieker/analysis/plugin/connector/mq/RabbitMQConnector.java @@ -0,0 +1,194 @@ +/*************************************************************************** + * Copyright 2013 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ + +package kieker.analysis.plugin.connector.mq; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +import kieker.analysis.IProjectContext; +import kieker.analysis.plugin.annotation.InputPort; +import kieker.analysis.plugin.annotation.OutputPort; +import kieker.analysis.plugin.annotation.Plugin; +import kieker.analysis.plugin.annotation.Property; +import kieker.analysis.plugin.filter.AbstractFilterPlugin; +import kieker.common.configuration.Configuration; +import kieker.common.logging.Log; +import kieker.common.logging.LogFactory; + +/** + * A plugin used for kieker in the cloud. + * All incoming events are put into a RabbitMQ, but are also passed to an output port that can be used for + * testing purposes. + * + * @author Santje Finke + * + * @since 1.8 + * + */ +@Plugin( + description = "A filter that writes all incoming events into a specified queue on a specified RabbitMQServer", + outputPorts = { + @OutputPort( + name = RabbitMQConnector.OUTPUT_PORT_NAME, eventTypes = { Object.class }, + description = "Provides each incoming object") }, + configuration = { + @Property(name = RabbitMQConnector.CONFIG_PROPERTY_NAME_PROVIDER, defaultValue = "localhost"), + @Property(name = RabbitMQConnector.CONFIG_PROPERTY_NAME_QUEUE, defaultValue = "master"), + @Property(name = RabbitMQConnector.CONFIG_PROPERTY_PASSWORD, defaultValue = "guest"), + @Property(name = RabbitMQConnector.CONFIG_PROPERTY_USER, defaultValue = "guest") + }) +public class RabbitMQConnector extends AbstractFilterPlugin { + + /** + * The name of the input port receiving the incoming events. + */ + public static final String INPUT_PORT_NAME_EVENTS = "inputEvents"; + /** + * The name of the output port passing the incoming events. + */ + public static final String OUTPUT_PORT_NAME = "relayedEvents"; + /** + * The name of the property determining the address of the used Server. + */ + public static final String CONFIG_PROPERTY_NAME_PROVIDER = "providerUrl"; + /** + * The name of the property determining the name of the Queue. + */ + public static final String CONFIG_PROPERTY_NAME_QUEUE = "queueName"; + + /** + * The username that is used to connect to a queue. + */ + public static final String CONFIG_PROPERTY_USER = "guest"; + /** + * The password that is used to connect to a queue. + */ + public static final String CONFIG_PROPERTY_PASSWORD = "guest"; + + private static final Log LOG = LogFactory.getLog(RabbitMQConnector.class); + + private final String provider; + private final String queue; + private final String password; + private final String username; + private Connection connection; + private Channel channel; + private final ConnectionFactory factory; + + /** + * Creates a new instance of this class using the given parameters. + * + * @param configuration + * The configuration for this plugin + * + * @deprecated To be removed in Kieker 1.8. + */ + @Deprecated + public RabbitMQConnector(final Configuration configuration) { + this(configuration, null); + } + + /** + * Creates a new instance of this class using the given parameters. + * + * @param configuration + * The configuration for this component. + * @param projectContext + * The project context for this component. + */ + public RabbitMQConnector(final Configuration configuration, final IProjectContext projectContext) { + super(configuration, projectContext); + this.provider = configuration.getStringProperty(CONFIG_PROPERTY_NAME_PROVIDER); + this.queue = configuration.getStringProperty(CONFIG_PROPERTY_NAME_QUEUE); + this.username = configuration.getStringProperty(CONFIG_PROPERTY_USER); + this.password = configuration.getStringProperty(CONFIG_PROPERTY_PASSWORD); + this.factory = new ConnectionFactory(); + this.factory.setHost(this.provider); + this.factory.setConnectionTimeout(0); + this.factory.setUsername(this.username); + this.factory.setPassword(this.password); + try { + this.connection = this.factory.newConnection(); + this.channel = this.connection.createChannel(); + this.channel.queueDeclare(this.queue, false, false, false, null); + } catch (final IOException e) { + LOG.info("Error establishing connection", e); + } + LOG.info("Sending to destination:" + this.queue + " at " + this.provider + " !\n***\n\n"); + + } + + @Override + public Configuration getCurrentConfiguration() { + final Configuration configuration = new Configuration(); + configuration.setProperty(CONFIG_PROPERTY_NAME_PROVIDER, this.provider); + configuration.setProperty(CONFIG_PROPERTY_NAME_QUEUE, this.queue); + configuration.setProperty(CONFIG_PROPERTY_PASSWORD, this.password); + configuration.setProperty(CONFIG_PROPERTY_USER, this.username); + return configuration; + } + + /** + * This method represents the input port of this filter. + * + * @param event + * The next event. + */ + @InputPort( + name = INPUT_PORT_NAME_EVENTS, + eventTypes = { Object.class }, + description = "Receives incoming objects to be forwarded to a queue") + public final void inputEvent(final Object event) { + super.deliver(OUTPUT_PORT_NAME, event); + final ByteArrayOutputStream boas = new ByteArrayOutputStream(); + ObjectOutputStream out; + try { + out = new ObjectOutputStream(boas); + out.writeObject(event); + out.close(); + final byte[] message = boas.toByteArray(); + + if (!this.connection.isOpen() || !this.channel.isOpen()) { + this.reconnect(); + } + + this.channel.basicPublish("", this.queue, null, message); + } catch (final IOException e) { + RabbitMQConnector.LOG.error("Error sending event", e); + } + + } + + /** + * Establishes a connection to a rabbitMQ channel with the current connection informationen. + */ + private void reconnect() { + try { + this.connection = this.factory.newConnection(); + this.channel = this.connection.createChannel(); + this.channel.queueDeclare(this.queue, false, false, false, null); + } catch (final IOException e) { + RabbitMQConnector.LOG.error("Error reestablishing connection", e); + } + } + +} diff --git a/src/kieker/analysis/plugin/filter/flow/WorkflowRecordFinalTraceReconstructionFilter.java b/src/kieker/analysis/plugin/filter/flow/WorkflowRecordFinalTraceReconstructionFilter.java new file mode 100644 index 0000000000000000000000000000000000000000..139dc36bdbf2599eb502a9afdbc8b0547b7af6e1 --- /dev/null +++ b/src/kieker/analysis/plugin/filter/flow/WorkflowRecordFinalTraceReconstructionFilter.java @@ -0,0 +1,452 @@ +/*************************************************************************** + * Copyright 2013 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ + +package kieker.analysis.plugin.filter.flow; + +import java.io.Serializable; +import java.util.Comparator; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; + +import kieker.analysis.IProjectContext; +import kieker.analysis.plugin.annotation.InputPort; +import kieker.analysis.plugin.annotation.OutputPort; +import kieker.analysis.plugin.annotation.Plugin; +import kieker.analysis.plugin.annotation.Property; +import kieker.analysis.plugin.filter.AbstractFilterPlugin; +import kieker.common.configuration.Configuration; +import kieker.common.logging.Log; +import kieker.common.logging.LogFactory; +import kieker.common.record.flow.IFlowRecord; +import kieker.common.record.flow.trace.AbstractTraceEvent; +import kieker.common.record.flow.trace.WorkflowRecord; +import kieker.common.record.flow.trace.WorkflowTrace; + +/** + * This filter merges partial traces with the same trace id into one complete trace. + * Incomplete traces will be delivered after a specified timeout. + * + * @author Florian Biss, Soeren Mahmens, Bjoern Weissenfels + * + * @since 1.8 + */ +@Plugin(name = "Final Trace Reconstruction Filter (Workflow)", + description = "This filter merges partial WorkflowTraces into complete traces.", + outputPorts = { + @OutputPort(name = WorkflowRecordFinalTraceReconstructionFilter.OUTPUT_PORT_NAME_VALID_TRACES, + description = "Forwards valid traces", + eventTypes = { WorkflowTrace.class }), + @OutputPort(name = WorkflowRecordFinalTraceReconstructionFilter.OUTPUT_PORT_NAME_INVALID_TRACES, + description = "Forwards invalid traces", + eventTypes = { WorkflowTrace.class }) + }, + configuration = { + @Property(name = WorkflowRecordFinalTraceReconstructionFilter.CONFIG_PROPERTY_NAME_TIMEUNIT, + defaultValue = WorkflowRecordFinalTraceReconstructionFilter.CONFIG_PROPERTY_VALUE_TIMEUNIT), + @Property(name = WorkflowRecordFinalTraceReconstructionFilter.CONFIG_PROPERTY_NAME_MAX_TRACE_TIMEOUT, + defaultValue = WorkflowRecordFinalTraceReconstructionFilter.CONFIG_PROPERTY_VALUE_MAX_TIME), + @Property(name = WorkflowRecordFinalTraceReconstructionFilter.CONFIG_PROPERTY_NAME_MAX_TRACE_DURATION, + defaultValue = WorkflowRecordFinalTraceReconstructionFilter.CONFIG_PROPERTY_VALUE_MAX_TIME) + }) +public class WorkflowRecordFinalTraceReconstructionFilter extends AbstractFilterPlugin { + /** + * The name of the output port delivering the valid traces. + */ + public static final String OUTPUT_PORT_NAME_VALID_TRACES = "validTraces"; + /** + * The name of the output port delivering the valid traces. + */ + public static final String OUTPUT_PORT_NAME_INVALID_TRACES = "invalidTraces"; + /** + * The name of the input port receiving the trace records. + */ + public static final String INPUT_PORT_NAME_PARTIAL_TRACES = "partialTraces"; + + /** + * The name of the time trigger input port. + */ + public static final String INPUT_PORT_NAME_TIME_EVENT = "timestamp"; + + /** + * The name of the property determining the time unit. + */ + public static final String CONFIG_PROPERTY_NAME_TIMEUNIT = "timeunit"; + /** + * The name of the property determining the maximal trace duration, the time + * this filter waits for new partial traces of a trace. + */ + public static final String CONFIG_PROPERTY_NAME_MAX_TRACE_DURATION = "maxTraceDuration"; + /** + * The name of the property determining the maximal trace timeout. + */ + public static final String CONFIG_PROPERTY_NAME_MAX_TRACE_TIMEOUT = "maxTraceTimeout"; + /** + * The default value of the properties for the maximal trace duration and timeout. + */ + public static final String CONFIG_PROPERTY_VALUE_MAX_TIME = "9223372036854775807"; // String.valueOf(Long.MAX_VALUE) + /** + * The default value of the time unit property (nanoseconds). + */ + public static final String CONFIG_PROPERTY_VALUE_TIMEUNIT = "NANOSECONDS"; // TimeUnit.NANOSECONDS.name() + + private static final Log LOG = LogFactory.getLog(WorkflowRecordFinalTraceReconstructionFilter.class); + + private final TimeUnit timeunit; + private final long maxTraceDuration; + private final long maxTraceTimeout; + + private final ConcurrentMap<Long, TraceBuffer> traceId2trace; + + /** + * Creates a new instance of this class using the given parameters. + * + * @param configuration + * The configuration for this component. + * @param projectContext + * The project context for this component. + */ + public WorkflowRecordFinalTraceReconstructionFilter(final Configuration configuration, final IProjectContext projectContext) { + super(configuration, projectContext); + + if (null != projectContext) { // TODO #819 remove non-null check and else case in Kieker 1.8 //NOCS + final String recordTimeunitProperty = projectContext.getProperty(IProjectContext.CONFIG_PROPERTY_NAME_RECORDS_TIME_UNIT); + TimeUnit recordTimeunit; + try { + recordTimeunit = TimeUnit.valueOf(recordTimeunitProperty); + } catch (final IllegalArgumentException ex) { // already caught in AnalysisController, should never happen + LOG.warn(recordTimeunitProperty + " is no valid TimeUnit! Using NANOSECONDS instead."); + recordTimeunit = TimeUnit.NANOSECONDS; + } + this.timeunit = recordTimeunit; + } else { + this.timeunit = TimeUnit.NANOSECONDS; + } + + final String configTimeunitProperty = configuration.getStringProperty(CONFIG_PROPERTY_NAME_TIMEUNIT); + TimeUnit configTimeunit; + try { + configTimeunit = TimeUnit.valueOf(configTimeunitProperty); + } catch (final IllegalArgumentException ex) { + LOG.warn(configTimeunitProperty + " is no valid TimeUnit! Using inherited value of " + this.timeunit.name() + " instead."); + configTimeunit = this.timeunit; + } + + this.maxTraceDuration = this.timeunit.convert(configuration.getLongProperty(CONFIG_PROPERTY_NAME_MAX_TRACE_DURATION), configTimeunit); + this.maxTraceTimeout = this.timeunit.convert(configuration.getLongProperty(CONFIG_PROPERTY_NAME_MAX_TRACE_TIMEOUT), configTimeunit); + this.traceId2trace = new ConcurrentHashMap<Long, TraceBuffer>(); + } + + /** + * This method is the input port for the timeout. + * + * @param timestamp + * The actual nanotime + */ + @InputPort( + name = INPUT_PORT_NAME_TIME_EVENT, + description = "Input port for periodic a time signal", + eventTypes = { Long.class }) + public void newEvent(final Long timestamp) { + synchronized (this) { + for (final TraceBuffer traceBuffer : this.traceId2trace.values()) { + final long timeSinceLastRecord = timestamp - traceBuffer.getMaxLoggingTimestamp(); + final long timeSinceFirstRecord = timestamp - traceBuffer.getMinLoggingTimestamp(); + if ((timeSinceLastRecord >= this.maxTraceTimeout) || (timeSinceFirstRecord >= this.maxTraceDuration)) { // max duration or timeout is gone + super.deliver(OUTPUT_PORT_NAME_INVALID_TRACES, traceBuffer.toWorkflowTrace()); + // Concurrent modification allowed by ConcurrentMap + this.traceId2trace.remove(traceBuffer.getTraceId()); + } + } + } + } + + /** + * This method is the input port for the new events for this filter. + * + * @param record + * The new record to handle. + */ + @InputPort( + name = INPUT_PORT_NAME_PARTIAL_TRACES, + description = "Input port for partial traces from WorkflowRecordPartialTraceReconstructionFilters", + eventTypes = { WorkflowTrace.class }) + public void newEvent(final IFlowRecord record) { + final Long traceId; + TraceBuffer traceBuffer; + final WorkflowTrace trace; + + if (record instanceof WorkflowTrace) { + trace = (WorkflowTrace) record; + + if (trace.isComplete()) { + super.deliver(OUTPUT_PORT_NAME_VALID_TRACES, trace); // Nothing to do here + return; + } else { + traceId = trace.getTraceId(); + synchronized (this) { + traceBuffer = this.getTraceBuffer(traceId); + traceBuffer.insertTrace(trace); + } + } + } else { + LOG.error("Invalid input type at " + INPUT_PORT_NAME_PARTIAL_TRACES + + " in WorkflowRecordFinalTraceReconstructionFilter"); + return; // invalid type + } + + synchronized (this) { + if (traceBuffer.isComplete()) { + this.traceId2trace.remove(traceId); + super.deliver(OUTPUT_PORT_NAME_VALID_TRACES, traceBuffer.toWorkflowTrace()); + } + } + } + + private TraceBuffer getTraceBuffer(final Long traceId) { + TraceBuffer traceBuffer; + traceBuffer = this.traceId2trace.get(traceId); + if (traceBuffer == null) { // first record for this id! + synchronized (this) { + traceBuffer = this.traceId2trace.get(traceId); + + if (traceBuffer == null) { // NOCS (DCL) + final TraceBuffer newTraceBuffer = new TraceBuffer(this.timeunit); + traceBuffer = this.traceId2trace.put(traceId, newTraceBuffer); + if (traceBuffer == null) { + traceBuffer = newTraceBuffer; + } + } + } + } + return traceBuffer; + } + + /** + * {@inheritDoc} + */ + @Override + public void terminate(final boolean error) { + synchronized (this) { + for (final TraceBuffer traceBuffer : this.traceId2trace.values()) { + super.deliver(OUTPUT_PORT_NAME_INVALID_TRACES, traceBuffer.toWorkflowTrace()); + } + this.traceId2trace.clear(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public Configuration getCurrentConfiguration() { + final Configuration configuration = new Configuration(); + configuration.setProperty(CONFIG_PROPERTY_NAME_TIMEUNIT, this.timeunit.name()); + configuration.setProperty(CONFIG_PROPERTY_NAME_MAX_TRACE_DURATION, String.valueOf(this.maxTraceDuration)); + configuration.setProperty(CONFIG_PROPERTY_NAME_MAX_TRACE_TIMEOUT, String.valueOf(this.maxTraceTimeout)); + return configuration; + } + + /** + * Buffer for events from partial traces that will be turned into a single trace. + * + * @author Florian Biss + */ + private static final class TraceBuffer { + private static final Log LOG = LogFactory.getLog(TraceBuffer.class); + private static final Comparator<AbstractTraceEvent> COMPARATOR = new TraceEventComperator(); + + private final SortedSet<AbstractTraceEvent> events = new TreeSet<AbstractTraceEvent>(COMPARATOR); + + private boolean damaged; + private int maxOrderIndex = -1; + + private final TimeUnit timeunit; + + private long minLoggingTimestamp = Long.MAX_VALUE; + private long maxLoggingTimestamp = -1; + + private long traceId = -1; + private boolean hasStart; + private boolean hasEnd; + + /** + * Creates a new buffer. + * + * @param timeunit + * TimetUnit used for logging timestamps. + */ + public TraceBuffer(final TimeUnit timeunit) { + this.timeunit = timeunit; + } + + /** + * Store all events of a partial trace in the buffer. + * + * @param paritalTrace + * A partial trace + */ + public void insertTrace(final WorkflowTrace paritalTrace) { + final long myTraceId = paritalTrace.getTraceId(); + // Time information in partial traces are old (partial traces timed out on a worker), + // use current system time instead. + final long loggingTimestamp = this.timeunit.convert(System.nanoTime(), TimeUnit.NANOSECONDS); + + synchronized (this) { + + if (this.traceId == -1) { + this.traceId = myTraceId; + this.minLoggingTimestamp = loggingTimestamp; + } else if (this.traceId != myTraceId) { + LOG.error("Invalid traceId! Expected: " + this.traceId + " but found: " + myTraceId + " in trace " + paritalTrace.toString()); + this.damaged = true; + } + + this.maxLoggingTimestamp = loggingTimestamp; + + final WorkflowRecord[] newEvents = paritalTrace.getTraceEvents(); + + for (final WorkflowRecord event : newEvents) { + this.insertEvent(event); + } + + if (paritalTrace.isDamaged()) { + this.damaged = true; + } + } + } + + private void insertEvent(final WorkflowRecord event) { + final int orderIndex = event.getOrderIndex(); + if (orderIndex > this.maxOrderIndex) { + this.maxOrderIndex = orderIndex; + } + + if (event.isStart()) { + if (this.hasStart) { + LOG.error("Duplicate start event! TraceId: " + this.traceId + " Event: " + event.toString()); + this.damaged = true; + } + this.hasStart = true; + } + + if (event.isEnd()) { + if (this.hasEnd) { + LOG.error("Duplicate end event! TraceId: " + this.traceId + " Event: " + event.toString()); + this.damaged = true; + } + this.hasEnd = true; + } + + if (!this.events.add(event)) { + LOG.error("Duplicate entry for orderIndex " + orderIndex + " with traceId " + this.traceId); + this.damaged = true; + } + } + + /** + * @return + * <code>true</code> if all records are present and the trace has a start and an end record. + */ + public boolean isComplete() { + synchronized (this) { + return ((this.maxOrderIndex + 1) == this.events.size()) && !this.events.isEmpty() + && this.hasEnd && this.hasStart && !this.damaged; + } + } + + /** + * @return <code>true</code> if the trace in this buffer is damaged. + */ + public boolean isDamaged() { + synchronized (this) { + return this.damaged; + } + } + + /** + * @return The trace id + */ + public long getTraceId() { + synchronized (this) { + return this.traceId; + } + } + + /** + * Process this buffer into a WorkflowTrace containing all buffered events. + * + * @return A new WorkflowTrace + */ + public WorkflowTrace toWorkflowTrace() { + synchronized (this) { + return new WorkflowTrace(this.events.toArray(new WorkflowRecord[this.events.size()]), + this.isComplete(), this.isDamaged()); + } + } + + /** + * {@inheritDoc} + */ + @Override + public String toString() { + return this.toWorkflowTrace().toString(); + } + + /** + * @return Insertion timestamp of the latest event in this buffer + */ + public long getMaxLoggingTimestamp() { + synchronized (this) { + return this.maxLoggingTimestamp; + } + } + + /** + * @return Insertion timestamp of the first event in this buffer + */ + public long getMinLoggingTimestamp() { + synchronized (this) { + return this.minLoggingTimestamp; + } + } + + /** + * Compares two trace events by their order index. + * + * @author Jan Waller + */ + private static final class TraceEventComperator implements Comparator<AbstractTraceEvent>, Serializable { + private static final long serialVersionUID = 89207356648232517L; + + /** + * Creates a new instance of this class. + */ + public TraceEventComperator() { + // default empty constructor + } + + /** + * {@inheritDoc} + */ + public int compare(final AbstractTraceEvent o1, final AbstractTraceEvent o2) { + return o1.getOrderIndex() - o2.getOrderIndex(); + } + } + } +} diff --git a/src/kieker/analysis/plugin/filter/flow/WorkflowRecordPartialTraceReconstructionFilter.java b/src/kieker/analysis/plugin/filter/flow/WorkflowRecordPartialTraceReconstructionFilter.java new file mode 100644 index 0000000000000000000000000000000000000000..ef15f0491c937ac2f25121d633762f0eaecfab1e --- /dev/null +++ b/src/kieker/analysis/plugin/filter/flow/WorkflowRecordPartialTraceReconstructionFilter.java @@ -0,0 +1,388 @@ +/*************************************************************************** + * Copyright 2013 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ + +package kieker.analysis.plugin.filter.flow; + +import java.io.Serializable; +import java.util.Comparator; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import kieker.analysis.IProjectContext; +import kieker.analysis.plugin.annotation.InputPort; +import kieker.analysis.plugin.annotation.OutputPort; +import kieker.analysis.plugin.annotation.Plugin; +import kieker.analysis.plugin.annotation.Property; +import kieker.analysis.plugin.filter.AbstractFilterPlugin; +import kieker.common.configuration.Configuration; +import kieker.common.logging.Log; +import kieker.common.logging.LogFactory; +import kieker.common.record.flow.IFlowRecord; +import kieker.common.record.flow.trace.AbstractTraceEvent; +import kieker.common.record.flow.trace.WorkflowRecord; +import kieker.common.record.flow.trace.WorkflowTrace; + +/** + * This filter collects WorkflowRecords and constructs ordered traces of them. + * + * + * @author Florian Biss, Soeren Mahmens, Bjoern Weissenfels + * + * @since 1.8 + */ +@Plugin(name = "Partial Trace Reconstruction Filter (Workflow)", + description = "This filter bundles WorkflowRecords into a trace", + outputPorts = { + @OutputPort(name = WorkflowRecordPartialTraceReconstructionFilter.OUTPUT_PORT_NAME_PARTIAL_TRACES, + description = "Forwards the constructed partial and complete traces", + eventTypes = { WorkflowTrace.class }) + }, + configuration = { + @Property(name = WorkflowRecordPartialTraceReconstructionFilter.CONFIG_PROPERTY_NAME_MAX_TRACE_TIMEOUT_NS, + defaultValue = WorkflowRecordPartialTraceReconstructionFilter.CONFIG_PROPERTY_VALUE_MAX_TRACE_TIMEOUT_NS), + @Property(name = WorkflowRecordPartialTraceReconstructionFilter.CONFIG_PROPERTY_NAME_MAX_TRACE_DURATION_NS, + defaultValue = WorkflowRecordPartialTraceReconstructionFilter.CONFIG_PROPERTY_VALUE_MAX_TRACE_DURATION_NS) + }) +public class WorkflowRecordPartialTraceReconstructionFilter extends AbstractFilterPlugin { + /** + * The name of the output port delivering the valid traces. + */ + public static final String OUTPUT_PORT_NAME_PARTIAL_TRACES = "partialTraces"; + /** + * The name of the input port receiving the trace records. + */ + public static final String INPUT_PORT_NAME_WORKFLOW_RECORDS = "workflowRecords"; + + /** + * The name of the input port receiving the time stamps. + */ + public static final String INPUT_PORT_NAME_TIME_EVENT = "timestamp"; + /** + * The name of the property determining the maximal trace timeout. + */ + public static final String CONFIG_PROPERTY_NAME_MAX_TRACE_TIMEOUT_NS = "maxTraceTimeout"; + + /** + * The default value of the property determining the maximal trace timeout. + */ + public static final String CONFIG_PROPERTY_VALUE_MAX_TRACE_TIMEOUT_NS = "1000000000"; // 1 seconds + + /** + * The name of the property determining the maximal trace duration. + */ + public static final String CONFIG_PROPERTY_NAME_MAX_TRACE_DURATION_NS = "maxTraceDuration"; + + /** + * The default value of the property determining the maximal trace duration. + */ + public static final String CONFIG_PROPERTY_VALUE_MAX_TRACE_DURATION_NS = "5000000000"; // 5 seconds + + private static final Log LOG = LogFactory.getLog(WorkflowRecordPartialTraceReconstructionFilter.class); + + private final ConcurrentMap<Long, TraceBuffer> traceId2trace; + private final long maxTraceTimeout; + private final long maxTraceDuration; + + /** + * Creates a new instance of this class using the given parameters. + * + * @param configuration + * The configuration for this component. + * @param projectContext + * The project context for this component. + */ + public WorkflowRecordPartialTraceReconstructionFilter(final Configuration configuration, final IProjectContext projectContext) { + super(configuration, projectContext); + + this.traceId2trace = new ConcurrentHashMap<Long, TraceBuffer>(); + + this.maxTraceTimeout = configuration.getLongProperty(CONFIG_PROPERTY_NAME_MAX_TRACE_TIMEOUT_NS); + this.maxTraceDuration = configuration.getLongProperty(CONFIG_PROPERTY_NAME_MAX_TRACE_DURATION_NS); + + } + + /** + * This method is the input port for the timeout. + * + * @param timestamp + * The actual nanotime + */ + @InputPort( + name = INPUT_PORT_NAME_TIME_EVENT, + description = "Input port for periodic a time signal", + eventTypes = { Long.class }) + public void newEvent(final Long timestamp) { + synchronized (this) { + for (final TraceBuffer traceBuffer : this.traceId2trace.values()) { + final long timeSinceLastRecord = timestamp - traceBuffer.getMaxLoggingTimestamp(); + final long timeSinceFirstRecord = timestamp - traceBuffer.getMinLoggingTimestamp(); + if ((timeSinceLastRecord >= this.maxTraceTimeout) || (timeSinceFirstRecord >= this.maxTraceDuration)) { // max timeout or duration is gone + this.traceId2trace.remove(traceBuffer.getTraceID()); + super.deliver(OUTPUT_PORT_NAME_PARTIAL_TRACES, traceBuffer.toWorkflowTrace()); + } + } + } + } + + /** + * This method is the input port for the new events for this filter. + * + * @param record + * The new record to handle. + */ + @InputPort( + name = INPUT_PORT_NAME_WORKFLOW_RECORDS, + description = "Input port for WorkflowRecords", + eventTypes = { WorkflowRecord.class }) + public void newEvent(final IFlowRecord record) { + + final Long traceId; + TraceBuffer traceBuffer; + + if (record instanceof WorkflowRecord) { + final WorkflowRecord traceRecord = (WorkflowRecord) record; + traceId = traceRecord.getTraceId(); + traceBuffer = this.getTraceBuffer(traceId); + + synchronized (this) { + traceBuffer.insertEvent(traceRecord); + + if (traceBuffer.hasEnd()) { + this.traceId2trace.remove(traceId); + super.deliver(OUTPUT_PORT_NAME_PARTIAL_TRACES, traceBuffer.toWorkflowTrace()); + } + } + + } else { + LOG.error("Invalid event type in WorkflowRecordPartialTraceReconstructionFilter"); + return; // invalid type which should not happen due to the specified eventTypes + } + + } + + private TraceBuffer getTraceBuffer(final Long traceId) { + TraceBuffer traceBuffer; + traceBuffer = this.traceId2trace.get(traceId); + if (traceBuffer == null) { // first record for this id! + synchronized (this) { + traceBuffer = this.traceId2trace.get(traceId); + + if (traceBuffer == null) { // NOCS (DCL) + final TraceBuffer newTraceBuffer = new TraceBuffer(); + traceBuffer = this.traceId2trace.put(traceId, newTraceBuffer); + if (traceBuffer == null) { + traceBuffer = newTraceBuffer; + } + } + } + } + return traceBuffer; + } + + /** + * {@inheritDoc} + */ + @Override + public void terminate(final boolean error) { + this.deliverAllBuffer(); + } + + /** + * {@inheritDoc} + */ + @Override + public Configuration getCurrentConfiguration() { + final Configuration configuration = new Configuration(); + configuration.setProperty(CONFIG_PROPERTY_NAME_MAX_TRACE_TIMEOUT_NS, String.valueOf(this.maxTraceTimeout)); + return configuration; + } + + /** + * This method delivers all traceBuffer and clears the buffer. + */ + private void deliverAllBuffer() { + synchronized (this) { + for (final TraceBuffer traceBuffer : this.traceId2trace.values()) { + super.deliver(OUTPUT_PORT_NAME_PARTIAL_TRACES, traceBuffer.toWorkflowTrace()); + } + this.traceId2trace.clear(); + } + } + + /** + * Buffer for records that will be bundled into a single trace. + * + * @author Florian Biss + */ + private static final class TraceBuffer { + private static final Log LOG = LogFactory.getLog(TraceBuffer.class); + private static final Comparator<AbstractTraceEvent> COMPARATOR = new TraceEventComperator(); + + private final SortedSet<WorkflowRecord> events = new TreeSet<WorkflowRecord>(COMPARATOR); + + private boolean damaged; + private int maxOrderIndex = -1; + + private long minLoggingTimestamp = Long.MAX_VALUE; + private long maxLoggingTimestamp = -1; + + private long traceId = -1; + private boolean hasStart; + private boolean ended; + + /** + * Creates a new buffer. + */ + public TraceBuffer() { + // default empty constructor + } + + /** + * @return The trace id + */ + public Object getTraceID() { + synchronized (this) { + return this.traceId; + } + } + + /** + * Insert a new event into buffer. + * + * @param event + * New event + */ + public void insertEvent(final WorkflowRecord event) { + final long myTraceId = event.getTraceId(); + synchronized (this) { + + final long currentTime = System.nanoTime(); + if (this.traceId == -1) { + this.traceId = myTraceId; + this.minLoggingTimestamp = currentTime; + } else if (this.traceId != myTraceId) { + LOG.error("Invalid traceId! Expected: " + this.traceId + " but found: " + myTraceId + " in event " + event.toString()); + this.damaged = true; + } + + this.maxLoggingTimestamp = currentTime; + + final int orderIndex = event.getOrderIndex(); + if (orderIndex > this.maxOrderIndex) { + this.maxOrderIndex = orderIndex; + } + + if (event.isStart()) { + if (this.hasStart) { + LOG.error("Duplicate start event! TraceId: " + this.traceId + " Event: " + event.toString()); + this.damaged = true; + } + this.hasStart = true; + } + + if (event.isEnd()) { + if (this.ended) { + LOG.error("Duplicate end event! TraceId: " + this.traceId + " Event: " + event.toString()); + this.damaged = true; + } + this.ended = true; + } + + if (!this.events.add(event)) { + LOG.error("Duplicate entry for orderIndex " + orderIndex + " with traceId " + myTraceId); + this.damaged = true; + } + } + } + + /** + * @return <code>true</code> if this buffer contains a trace end record + */ + public boolean hasEnd() { + synchronized (this) { + return this.ended; + } + } + + /** + * @return <code>true</code> if this buffer contains all records of a trace, including start and end record + */ + public boolean isComplete() { + synchronized (this) { + return ((this.maxOrderIndex + 1) == this.events.size()) + && !this.events.isEmpty() && this.ended && this.hasStart; + } + } + + /** + * @return <code>true</code> if the trace in this buffer is damaged + */ + public boolean isDamaged() { + synchronized (this) { + return this.damaged; + } + } + + /** + * Process buffer into a trace. + * + * @return + * A new trace containing the buffered events + */ + public WorkflowTrace toWorkflowTrace() { + return new WorkflowTrace(this.events.toArray(new WorkflowRecord[this.events.size()]), + this.isComplete(), this.isDamaged()); + } + + /** + * @return Youngest time stamp in trace + */ + public long getMaxLoggingTimestamp() { + synchronized (this) { + return this.maxLoggingTimestamp; + } + } + + /** + * @return Oldest time stamp in trace + */ + public long getMinLoggingTimestamp() { + synchronized (this) { + return this.minLoggingTimestamp; + } + } + + /** + * @author Jan Waller + */ + private static final class TraceEventComperator implements Comparator<AbstractTraceEvent>, Serializable { + private static final long serialVersionUID = 89207356648232517L; + + /** + * Creates a new instance of this class. + */ + public TraceEventComperator() { + // default empty constructor + } + + public int compare(final AbstractTraceEvent o1, final AbstractTraceEvent o2) { + return o1.getOrderIndex() - o2.getOrderIndex(); + } + } + } +} diff --git a/src/kieker/analysis/plugin/filter/flow/WorkflowRecordTraceAgglomerationFilter.java b/src/kieker/analysis/plugin/filter/flow/WorkflowRecordTraceAgglomerationFilter.java new file mode 100644 index 0000000000000000000000000000000000000000..da5e9c7e2411545aaf4333f53be60776235eabe0 --- /dev/null +++ b/src/kieker/analysis/plugin/filter/flow/WorkflowRecordTraceAgglomerationFilter.java @@ -0,0 +1,496 @@ +/*************************************************************************** + * Copyright 2013 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ + +package kieker.analysis.plugin.filter.flow; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import kieker.analysis.IProjectContext; +import kieker.analysis.plugin.annotation.InputPort; +import kieker.analysis.plugin.annotation.OutputPort; +import kieker.analysis.plugin.annotation.Plugin; +import kieker.analysis.plugin.annotation.Property; +import kieker.analysis.plugin.filter.AbstractFilterPlugin; +import kieker.common.configuration.Configuration; +import kieker.common.logging.Log; +import kieker.common.logging.LogFactory; +import kieker.common.record.flow.IFlowRecord; +import kieker.common.record.flow.trace.WorkflowRecord; +import kieker.common.record.flow.trace.WorkflowTrace; +import kieker.common.util.StatisticInformation; + +/** + * This filter collects incoming traces for a specified amount of time. + * Any traces representing the same series of events will be used to calculate + * statistic informations like the average runtime of this kind of trace. + * Only one specimen of these traces containing this information will be forwarded + * from this filter. + * + * Statistic outliers regarding the runtime of the trace will be treated special + * and therefore send out as they are and will not be mixed with others. + * + * + * @author Florian Biss + * + * @since 1.8 + */ + +@Plugin(description = "This filter tries to agglomerate similar WorkflowTraces into a single trace.", + outputPorts = { + @OutputPort(name = WorkflowRecordTraceAgglomerationFilter.OUTPUT_PORT_NAME_TRACES, + description = "Output port for the processed traces", + eventTypes = { WorkflowTrace.class }) + }, + configuration = { + @Property(name = WorkflowRecordTraceAgglomerationFilter.CONFIG_PROPERTY_NAME_TIMEUNIT, + defaultValue = WorkflowRecordTraceAgglomerationFilter.CONFIG_PROPERTY_VALUE_TIMEUNIT), + @Property(name = WorkflowRecordTraceAgglomerationFilter.CONFIG_PROPERTY_NAME_MAX_COLLECTION_DURATION, + defaultValue = WorkflowRecordTraceAgglomerationFilter.CONFIG_PROPERTY_VALUE_MAX_COLLECTION_DURATION), + @Property(name = WorkflowRecordTraceAgglomerationFilter.CONFIG_PROPERTY_NAME_MAX_DEVIATION, + defaultValue = WorkflowRecordTraceAgglomerationFilter.CONFIG_PROPERTY_VALUE_MAX_DEVIATION) + }) +public class WorkflowRecordTraceAgglomerationFilter extends AbstractFilterPlugin { + /** + * The name of the output port delivering the valid traces. + */ + public static final String OUTPUT_PORT_NAME_TRACES = "tracesOut"; + + /** + * The name of the input port receiving the trace records. + */ + public static final String INPUT_PORT_NAME_TRACES = "tracesIn"; + + /** + * The name of the property determining the time unit. + */ + public static final String CONFIG_PROPERTY_NAME_TIMEUNIT = "timeunit"; + + /** + * Clock input for timeout handling. + */ + public static final String INPUT_PORT_NAME_TIME_EVENT = "timestamp"; + + /** + * The default value of the time unit property (nanoseconds). + */ + public static final String CONFIG_PROPERTY_VALUE_TIMEUNIT = "NANOSECONDS"; // TimeUnit.NANOSECONDS.name() + + /** + * The name of the property determining the maximal trace timeout. + */ + public static final String CONFIG_PROPERTY_NAME_MAX_COLLECTION_DURATION = "maxCollectionDuration"; + + /** + * The default value of the property determining the maximal trace timeout. + */ + public static final String CONFIG_PROPERTY_VALUE_MAX_COLLECTION_DURATION = "5000000000"; + + /** + * The name of the property determining the maximal runtime deviation factor. + * + * Outliers are indicated by <code>|runtime - averageRuntime| > deviationFactor * standardDeviation</code>. + * Use negative number to agglomerate all traces. + */ + public static final String CONFIG_PROPERTY_NAME_MAX_DEVIATION = "maxDeviation"; + + /** + * The default value of the property determining the maximal runtime deviation factor. + * Default is two standard deviations. + */ + public static final String CONFIG_PROPERTY_VALUE_MAX_DEVIATION = "2"; + + private static final Log LOG = LogFactory.getLog(WorkflowRecordTraceAgglomerationFilter.class); + + private final TimeUnit timeunit; + private final long maxCollectionDuration; + private final long maxDeviation; + + private final Map<WorkflowTrace, TraceAgglomerationBuffer> trace2buffer; + + /** + * Creates a new instance of this class using the given parameters. + * + * @param configuration + * The configuration for this component. + * @param projectContext + * The project context for this component. + */ + public WorkflowRecordTraceAgglomerationFilter(final Configuration configuration, final IProjectContext projectContext) { + super(configuration, projectContext); + + if (null != projectContext) { // TODO #819 remove non-null check and else case in Kieker 1.8 //NOCS + final String recordTimeunitProperty = projectContext.getProperty(IProjectContext.CONFIG_PROPERTY_NAME_RECORDS_TIME_UNIT); + TimeUnit recordTimeunit; + try { + recordTimeunit = TimeUnit.valueOf(recordTimeunitProperty); + } catch (final IllegalArgumentException ex) { // already caught in AnalysisController, should never happen + LOG.warn(recordTimeunitProperty + " is no valid TimeUnit! Using NANOSECONDS instead."); + recordTimeunit = TimeUnit.NANOSECONDS; + } + this.timeunit = recordTimeunit; + } else { + this.timeunit = TimeUnit.NANOSECONDS; + } + this.maxDeviation = configuration.getLongProperty(CONFIG_PROPERTY_NAME_MAX_DEVIATION); + final String configTimeunitProperty = configuration.getStringProperty(CONFIG_PROPERTY_NAME_TIMEUNIT); + TimeUnit configTimeunit; + try { + configTimeunit = TimeUnit.valueOf(configTimeunitProperty); + } catch (final IllegalArgumentException ex) { + LOG.warn(configTimeunitProperty + " is no valid TimeUnit! Using inherited value of " + this.timeunit.name() + " instead."); + configTimeunit = this.timeunit; + } + this.maxCollectionDuration = this.timeunit.convert(configuration.getLongProperty(CONFIG_PROPERTY_NAME_MAX_COLLECTION_DURATION), configTimeunit); + this.trace2buffer = new TreeMap<WorkflowTrace, TraceAgglomerationBuffer>(new TraceComperator()); + } + + /** + * This method is the input port for incoming traces. + * + * @param record + * A WorkflowTrace + */ + @InputPort( + name = INPUT_PORT_NAME_TRACES, + description = "Collect identical traces and agglomerate them.", + eventTypes = { WorkflowTrace.class }) + public void newEvent(final IFlowRecord event) { + final WorkflowTrace trace; + synchronized (this) { + if (event instanceof WorkflowTrace) { + trace = (WorkflowTrace) event; + + if (!trace.isComplete() || trace.isDamaged()) { + super.deliver(OUTPUT_PORT_NAME_TRACES, trace); // Incomplete or damaged? Nothing to do here. + return; + } else { + this.insertIntoBuffer(trace); + } + } else { + LOG.error("Invalid input type at " + OUTPUT_PORT_NAME_TRACES + + " in WorkflowRecordTraceAgglomerationFilter"); + return; // invalid type + } + } + } + + /** + * Inserts a WorkflowTrace into the buffer. + * + * @param trace + * The WorkflowTrace that will be inserted + */ + private void insertIntoBuffer(final WorkflowTrace trace) { + + TraceAgglomerationBuffer traceBuffer; + final long timestamp; + timestamp = this.timeunit.convert(System.nanoTime(), TimeUnit.NANOSECONDS); + traceBuffer = this.trace2buffer.get(trace); + + if (traceBuffer == null) { // first record for this id! + synchronized (this) { + traceBuffer = this.trace2buffer.get(trace); + + if (traceBuffer == null) { // NOCS (DCL) + traceBuffer = new TraceAgglomerationBuffer(timestamp, this.maxDeviation); + this.trace2buffer.put(trace, traceBuffer); + } + + } + } + + traceBuffer.insertTrace(trace); + } + + /** + * This method is the input port for the timeout. + * + * @param timestamp + * The current nanotime + */ + @InputPort( + name = INPUT_PORT_NAME_TIME_EVENT, + description = "Time signal for timeouts", + eventTypes = { Long.class }) + public void newEvent(final Long timestamp) { + synchronized (this) { + this.processTimeoutQueue(timestamp); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void terminate(final boolean error) { + synchronized (this) { + // Avoid ConcurrentModificationException, deliverBuffer will remove the buffer from trace2buffer. + final List<TraceAgglomerationBuffer> buffers = new LinkedList<TraceAgglomerationBuffer>(this.trace2buffer.values()); + for (final TraceAgglomerationBuffer traceBuffer : buffers) { + this.deliverBuffer(traceBuffer); + } + this.trace2buffer.clear(); + } + } + + private void processTimeoutQueue(final long timestamp) { + final long bufferTimeout = timestamp - this.maxCollectionDuration; + // Avoid ConcurrentModificationException, deliverBuffer will remove the buffer from trace2buffer. + final List<TraceAgglomerationBuffer> buffers = new LinkedList<TraceAgglomerationBuffer>(this.trace2buffer.values()); + for (final TraceAgglomerationBuffer traceBuffer : buffers) { + if (traceBuffer.getBufferCreatedTimestamp() <= bufferTimeout) { + this.deliverBuffer(traceBuffer); + } + } + } + + /** + * Deliver and remove a buffer. + * + * @param traceBuffer + */ + private void deliverBuffer(final TraceAgglomerationBuffer traceBuffer) { + final List<WorkflowTrace> traces = traceBuffer.processBuffer(); + for (final WorkflowTrace trace : traces) { + super.deliver(OUTPUT_PORT_NAME_TRACES, trace); + } + // Kill the buffer + if (this.trace2buffer.remove(traces.get(0)) == null) { + LOG.warn("Removal of buffer failed."); + } + } + + /** + * {@inheritDoc} + */ + @Override + public Configuration getCurrentConfiguration() { + final Configuration configuration = new Configuration(); + configuration.setProperty(CONFIG_PROPERTY_NAME_TIMEUNIT, this.timeunit.name()); + configuration.setProperty(CONFIG_PROPERTY_NAME_MAX_COLLECTION_DURATION, String.valueOf(this.maxCollectionDuration)); + configuration.setProperty(CONFIG_PROPERTY_NAME_MAX_DEVIATION, String.valueOf(this.maxDeviation)); + return configuration; + } + + /** + * Buffer for similar traces that are to be agglomerated into a single trace. + * + * @author Florian Biß + */ + private static final class TraceAgglomerationBuffer { + private static final Log LOG = LogFactory.getLog(TraceAgglomerationBuffer.class); + + /** Contains all buffered traces for statistical purposes. */ + private final List<WorkflowTrace> traces = new ArrayList<WorkflowTrace>(); + + /** Contains each unique trace at most once. */ + private WorkflowTrace agglomeratedTraces; + + private final long bufferCreatedTimestamp; + + /** + * Maximal runtime deviation factor. Use negative number to agglomerate all traces. + * + * Outliers are indicated by <code>averageRuntime - deviationFactor * standardDeviation > runtime</code> or + * <code>runtime > averageRuntime + deviationFactor * standardDeviation</code> + * + */ + private final long deviationFactor; + + /** + * Creates a new instance of this class. + */ + public TraceAgglomerationBuffer(final long bufferCreatedTimestamp, final long maxDeviation) { + this.bufferCreatedTimestamp = bufferCreatedTimestamp; + this.deviationFactor = maxDeviation; + } + + /** + * Insert a trace into this buffer. + * + * @param Trace + * to insert + */ + public void insertTrace(final WorkflowTrace trace) { + if (LOG.isDebugEnabled()) { + LOG.debug("Inserting into AgglomerationBuffer: " + trace.toString()); + } + synchronized (this) { + this.traces.add(trace); + } + } + + /** + * Agglomerate all traces that can and should be agglomerated. + * + * @return List of agglomerated traces and statistic outliers. + */ + public List<WorkflowTrace> processBuffer() { + final List<WorkflowTrace> processed = new ArrayList<WorkflowTrace>(); + final StatisticInformation tmpRuntime; + + synchronized (this) { + tmpRuntime = this.getTempBufferStatistic(); + for (final WorkflowTrace trace : this.traces) { + + // If deviationFactor is negative do not care about outliers. + if ((this.deviationFactor > 0) && this.isOutlier(tmpRuntime, trace.getRuntime())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Statistical outlier: " + trace.toString()); + } + // Outlier. Do not agglomerate. + processed.add(trace); + } else { + // Add to agglomeratedTraces. + this.agglomerate(trace); + } + + } + + processed.add(this.agglomeratedTraces); + } + + return processed; + } + + /** + * Merge a trace with the agglomerated traces. + * + * @param trace + * Trace to agglomerate + */ + private void agglomerate(final WorkflowTrace trace) { + if (this.agglomeratedTraces == null) { + // Trace is new, add to map. + this.agglomeratedTraces = trace; + } else { + // Trace exists, merge statistics for every record. + final WorkflowRecord[] agglomeratedRecords = this.agglomeratedTraces.getTraceEvents(); + final WorkflowRecord[] records = trace.getTraceEvents(); + for (int i = 0; i < agglomeratedRecords.length; i++) { + agglomeratedRecords[i].getRuntime().merge(records[i].getRuntime()); + } + + this.agglomeratedTraces.getRuntime().merge(trace.getRuntime()); + } + } + + /** + * Detect outliers. + * + * @param averagedRuntimes + * Statistics to check against + * @param recordRuntime + * Data to check + * @return + * <code>true</code> if both are too different + */ + private boolean isOutlier(final StatisticInformation averagedRuntimes, final StatisticInformation recordRuntime) { + final long maxDeviation = averagedRuntimes.getStandardDeviation() * this.deviationFactor; + if (maxDeviation < 0) { + return true; + } + return Math.abs(recordRuntime.getAvg() - averagedRuntimes.getAvg()) > maxDeviation; + } + + /** + * @return Creation time of this buffer. + */ + public long getBufferCreatedTimestamp() { + synchronized (this) { + return this.bufferCreatedTimestamp; + } + } + + /** + * The the statistical runtime informations about a buffer so far. + * + * @return Runtime informations. + */ + private StatisticInformation getTempBufferStatistic() { + final StatisticInformation tmpStatistic = new StatisticInformation(); + + for (final WorkflowTrace trace : this.traces) { + tmpStatistic.merge(trace.getRuntime()); + + } + return tmpStatistic; + } + } + + /** + * Compares traces based on their content instead of traceIds. + * Order of comparisons: Process name -> Trace lenght -> Id of each node in trace. + * If it all matches both traces are similar and may be agglomerated, albeit TraceId and runtimes might differ. + * + * @author Florian Biß + * + */ + private static final class TraceComperator implements Comparator<WorkflowTrace>, Serializable { + private static final long serialVersionUID = 8920766818232517L; + private static final Log LOG = LogFactory.getLog(TraceComperator.class); + + /** + * Creates a new instance of this class. + */ + public TraceComperator() { + // default empty constructor + } + + /** + * {@inheritDoc} + */ + public int compare(final WorkflowTrace t1, final WorkflowTrace t2) { + final int compProcesses = t1.getProcessName().compareTo(t2.getProcessName()); + if (compProcesses != 0) { + return compProcesses; + } + + final WorkflowRecord[] recordsT1 = t1.getTraceEvents(); + final WorkflowRecord[] recordsT2 = t2.getTraceEvents(); + + if ((recordsT1.length - recordsT2.length) != 0) { + return recordsT1.length - recordsT2.length; + } + + // Records in traces are already sorted by orderIndex, only compare nodeIds. + for (int i = 0; i < recordsT1.length; i++) { + final WorkflowRecord r1 = recordsT1[i]; + final WorkflowRecord r2 = recordsT2[i]; + final long idDiff = r1.getNodeId() - r2.getNodeId(); + + if ((idDiff > Integer.MAX_VALUE) || (idDiff < Integer.MIN_VALUE)) { + LOG.warn("Overflow during thread comparison!"); + } + + if (idDiff != 0) { + return (int) (r1.getNodeId() - r2.getNodeId()); + } + } + + // All records match. + return 0; + } + } + +} diff --git a/src/kieker/analysis/plugin/reader/mq/Bits.java b/src/kieker/analysis/plugin/reader/mq/Bits.java new file mode 100644 index 0000000000000000000000000000000000000000..1f4de169dfcd293803d5d749f1069a2c0b00a175 --- /dev/null +++ b/src/kieker/analysis/plugin/reader/mq/Bits.java @@ -0,0 +1,92 @@ +package kieker.analysis.plugin.reader.mq; + +public class Bits { + + static boolean getBoolean(byte[] b, int off) { + return b[off] != 0; + } + + static char getChar(byte[] b, int off) { + return (char) ((b[off + 1] & 0xFF) + + (b[off] << 8)); + } + + static short getShort(byte[] b, int off) { + return (short) ((b[off + 1] & 0xFF) + + (b[off] << 8)); + } + + static int getInt(byte[] b, int off) { + return ((b[off + 3] & 0xFF) ) + + ((b[off + 2] & 0xFF) << 8) + + ((b[off + 1] & 0xFF) << 16) + + ((b[off ] ) << 24); + } + + static float getFloat(byte[] b, int off) { + return Float.intBitsToFloat(getInt(b, off)); + } + + static long getLong(byte[] b, int off) { + return ((b[off + 7] & 0xFFL) ) + + ((b[off + 6] & 0xFFL) << 8) + + ((b[off + 5] & 0xFFL) << 16) + + ((b[off + 4] & 0xFFL) << 24) + + ((b[off + 3] & 0xFFL) << 32) + + ((b[off + 2] & 0xFFL) << 40) + + ((b[off + 1] & 0xFFL) << 48) + + (((long) b[off]) << 56); + } + + static double getDouble(byte[] b, int off) { + return Double.longBitsToDouble(getLong(b, off)); + } + + public static byte getByte(byte[] b, int off) { + return b[off]; + } + + static void putBoolean(byte[] b, int off, boolean val) { + b[off] = (byte) (val ? 1 : 0); + } + + static void putChar(byte[] b, int off, char val) { + b[off + 1] = (byte) (val ); + b[off ] = (byte) (val >>> 8); + } + + static void putShort(byte[] b, int off, short val) { + b[off + 1] = (byte) (val ); + b[off ] = (byte) (val >>> 8); + } + + static void putInt(byte[] b, int off, int val) { + b[off + 3] = (byte) (val ); + b[off + 2] = (byte) (val >>> 8); + b[off + 1] = (byte) (val >>> 16); + b[off ] = (byte) (val >>> 24); + } + + static void putFloat(byte[] b, int off, float val) { + putInt(b, off, Float.floatToIntBits(val)); + } + + static void putLong(byte[] b, int off, long val) { + b[off + 7] = (byte) (val ); + b[off + 6] = (byte) (val >>> 8); + b[off + 5] = (byte) (val >>> 16); + b[off + 4] = (byte) (val >>> 24); + b[off + 3] = (byte) (val >>> 32); + b[off + 2] = (byte) (val >>> 40); + b[off + 1] = (byte) (val >>> 48); + b[off ] = (byte) (val >>> 56); + } + + static void putDouble(byte[] b, int off, double val) { + putLong(b, off, Double.doubleToLongBits(val)); + } + + public static void putByte(byte[] b, int off, byte val) { + b[off] = val; + } +} diff --git a/src/kieker/analysis/plugin/reader/mq/RabbitMQReader.java b/src/kieker/analysis/plugin/reader/mq/RabbitMQReader.java new file mode 100644 index 0000000000000000000000000000000000000000..0b7af3c98b444a597c5ee2a71909a2461081b084 --- /dev/null +++ b/src/kieker/analysis/plugin/reader/mq/RabbitMQReader.java @@ -0,0 +1,394 @@ +/*************************************************************************** + * Copyright 2013 Kieker Project (http://kieker-monitoring.net) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ***************************************************************************/ + +package kieker.analysis.plugin.reader.mq; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.ConsumerCancelledException; +import com.rabbitmq.client.QueueingConsumer; +import com.rabbitmq.client.ShutdownSignalException; + +import kieker.analysis.IProjectContext; +import kieker.analysis.plugin.annotation.OutputPort; +import kieker.analysis.plugin.annotation.Plugin; +import kieker.analysis.plugin.annotation.Property; +import kieker.analysis.plugin.reader.AbstractReaderPlugin; +import kieker.common.configuration.Configuration; +import kieker.common.logging.Log; +import kieker.common.logging.LogFactory; +import kieker.common.record.IMonitoringRecord; +import kieker.analysis.plugin.reader.mq.Bits; + +/** + * Reads monitoring records from the queue of an established RabbitMQ connection. + * + * @author Santje Finke + * + * @since 1.8 + */ +@Plugin(description = "A reader which reads records from a RabbitMQ queue", + dependencies = "This plugin needs the file 'rabbitmq.client-*.jar'.", + outputPorts = { + @OutputPort(name = RabbitMQReader.OUTPUT_PORT_NAME_RECORDS, eventTypes = { IMonitoringRecord.class }, description = "Output Port of the JMSReader") + }, + configuration = { + @Property(name = RabbitMQReader.CONFIG_PROPERTY_NAME_PROVIDERURL, defaultValue = "localhost"), + @Property(name = RabbitMQReader.CONFIG_PROPERTY_NAME_QUEUE, defaultValue = "analysis"), + @Property(name = RabbitMQReader.CONFIG_PROPERTY_PASSWORD, defaultValue = "guest"), + @Property(name = RabbitMQReader.CONFIG_PROPERTY_USER, defaultValue = "guest"), + @Property(name = RabbitMQReader.CONFIG_PROPERTY_PORT, defaultValue = "5672") + + }) +public final class RabbitMQReader extends AbstractReaderPlugin { + + /** The name of the output port delivering the received records. */ + public static final String OUTPUT_PORT_NAME_RECORDS = "monitoringRecords"; + /** The name of the configuration determining the RabbitMQ provider URL. */ + public static final String CONFIG_PROPERTY_NAME_PROVIDERURL = "mqProviderUrl"; + /** The name of the configuration determining the RabbitMQ Queue (e.g. queue1). */ + public static final String CONFIG_PROPERTY_NAME_QUEUE = "mqDestination"; + /** The username that is used to connect to a queue. */ + public static final String CONFIG_PROPERTY_USER = "guest"; + /** The password that is used to connect to a queue. */ + public static final String CONFIG_PROPERTY_PASSWORD = "guest"; + /** The port that is used to connect to a queue. */ + public static final String CONFIG_PROPERTY_PORT = "5672"; + + static final Log LOG = LogFactory.getLog(RabbitMQReader.class); // NOPMD package for inner class + + private final String providerUrl; + private final String queueName; + private final String password; + private final String username; + private final int port; + private Connection connection; + private Channel channel; + private ConnectionFactory factory; + private QueueingConsumer normalConsumer; + private QueueingConsumer registryConsumer; + private final CountDownLatch cdLatch = new CountDownLatch(1); + + private final Map<Integer, String> stringRegistry = new HashMap<Integer, String>(); + + /** + * Creates a new instance of this class using the given parameters. + * + * @param configuration + * The configuration used to initialize the whole reader. Keep in mind that the configuration should contain the following properties: + * <ul> + * <li>The property {@link #CONFIG_PROPERTY_NAME_PROVIDERURL}, e.g. {@code localhost} + * <li>The property {@link #CONFIG_PROPERTY_NAME_QUEUE}, e.g. {@code queue1} + * <li>The property {@link #CONFIG_PROPERTY_PASSWORD}, e.g. {@code password} + * <li>The property {@link #CONFIG_PROPERTY_USER}, e.g. {@code username} + * <li>The property {@link #CONFIG_PROPERTY_PORT}, e.g. {@code port} + * </ul> + * @param projectContext + * The project context for this component. + * + * @throws IllegalArgumentException + * If one of the properties is empty. + */ + public RabbitMQReader(final Configuration configuration, final IProjectContext projectContext) throws IllegalArgumentException { + super(configuration, projectContext); + + // Initialize the reader bases on the given configuration. + this.providerUrl = configuration.getStringProperty(CONFIG_PROPERTY_NAME_PROVIDERURL); + this.queueName = configuration.getStringProperty(CONFIG_PROPERTY_NAME_QUEUE); + this.username = configuration.getStringProperty(CONFIG_PROPERTY_USER); + this.password = configuration.getStringProperty(CONFIG_PROPERTY_PASSWORD); + this.port = configuration.getIntProperty(CONFIG_PROPERTY_PORT); + // simple sanity check + if ((this.providerUrl.length() == 0) || (this.queueName.length() == 0)) { + throw new IllegalArgumentException("RabbitMQReader has not sufficient parameters. providerUrl ('" + this.providerUrl + "') or destination ('" + + this.queueName + "'), is null"); + } + + RabbitMQRegistryConsumer registryConsumer = new RabbitMQRegistryConsumer(); + registryConsumer.start(); + } + + /** + * Creates a new instance of this class using the given parameters. + * + * @param configuration + * The configuration used to initialize the whole reader. Keep in mind that the configuration should contain the following properties: + * <ul> + * <li>The property {@link #CONFIG_PROPERTY_NAME_PROVIDERURL}, e.g. {@code localhost} + * <li>The property {@link #CONFIG_PROPERTY_NAME_QUEUE}, e.g. {@code queue1} + * <li>The property {@link #CONFIG_PROPERTY_PASSWORD}, e.g. {@code password} + * <li>The property {@link #CONFIG_PROPERTY_USER}, e.g. {@code username} + * </ul> + * + * @throws IllegalArgumentException + * If one of the properties is empty. + * + * @deprecated To be removed in Kieker 1.8. + */ + @Deprecated + public RabbitMQReader(final Configuration configuration) throws IllegalArgumentException { + this(configuration, null); + } + + /** + * A call to this method is a blocking call. + * + * @return true if the method succeeds, false otherwise. + */ + public boolean read() { + boolean retVal = true; + try { + this.factory = new ConnectionFactory(); + this.factory.setHost(this.providerUrl); + this.factory.setPort(this.port); + this.factory.setConnectionTimeout(0); + this.factory.setUsername(this.username); + this.factory.setPassword(this.password); + + this.connection = this.factory.newConnection(); + this.channel = this.connection.createChannel(); + + this.channel.queueDeclare(this.queueName, false, false, false, null); + this.channel.queueDeclare("registryRecords", false, false, false, null); + + LOG.info("Listening to destination:" + this.queueName + " at " + this.providerUrl + " !\n***\n\n"); + + this.normalConsumer = new QueueingConsumer(this.channel); + this.channel.basicConsume(this.queueName, true, this.normalConsumer); + + this.registryConsumer = new QueueingConsumer(this.channel); + this.channel.basicConsume("registryRecords", true, this.registryConsumer); + + while (!Thread.interrupted()) { + + if (!this.connection.isOpen() || !this.channel.isOpen()) { + this.reconnect(); + } + + final QueueingConsumer.Delivery delivery = this.normalConsumer.nextDelivery(); + + final ByteArrayInputStream bain = new ByteArrayInputStream(delivery.getBody()); + final ObjectInputStream in = new ObjectInputStream(bain); + final Object message = in.readObject(); + + if ((message instanceof IMonitoringRecord) && (!this.deliverIndirect(OUTPUT_PORT_NAME_RECORDS, message))) { + LOG.error("deliverRecord returned false"); + } + + } + + } catch (final IOException ex) { // NOPMD NOCS (IllegalCatchCheck) + LOG.error("Error in read()", ex); + retVal = false; + } catch (final ClassNotFoundException e) { + LOG.error("Error in read(): ClassNotFound Exception", e); + retVal = false; + } catch (final ShutdownSignalException e) { + LOG.error("Error in read(): ShutdownSignal Exception", e); + retVal = false; + } catch (final ConsumerCancelledException e) { + LOG.error("Error in read(): ConsumerCancelled Exception", e); + retVal = false; + } catch (final InterruptedException e) { + LOG.error("Error in read(): Interrupted Exception", e); + retVal = false; + } finally { + try { + if (this.connection != null) { + this.connection.close(); + } + } catch (final IOException e) { + LOG.error("Error in read()", e); + } + } + return retVal; + } + + private IMonitoringRecord fromByteArray(final byte[] b) { + final Class<?>[] recordTypes = monitoringRecord.getValueTypes(); + + int arraySize = 4 + 8; + + for (int i = 0; i < recordTypes.length; i++) { + if (recordTypes[i] == String.class) { + arraySize += 4; + } else if (recordTypes[i] == Integer.class + || recordTypes[i] == int.class) { + arraySize += 4; + } else if (recordTypes[i] == Long.class + || recordTypes[i] == long.class) { + arraySize += 8; + } else if (recordTypes[i] == Float.class + || recordTypes[i] == float.class) { + arraySize += 4; + } else if (recordTypes[i] == Double.class + || recordTypes[i] == double.class) { + arraySize += 8; + } else if (recordTypes[i] == Byte.class + || recordTypes[i] == byte.class) { + arraySize += 1; + } else if (recordTypes[i] == Short.class + || recordTypes[i] == short.class) { + arraySize += 2; + } else if (recordTypes[i] == Boolean.class + || recordTypes[i] == boolean.class) { + arraySize += 1; + } else { + arraySize += 1; + } + } + + byte[] result = new byte[arraySize]; + int offset = 0; + + Bits.putInt(result, offset, this.monitoringController + .getIdForString(monitoringRecord.getClass().getName())); + offset += 4; + + Bits.putLong(result, offset, monitoringRecord.getLoggingTimestamp()); + offset += 8; + final Object[] recordFields = monitoringRecord.toArray(); + + for (int i = 0; i < recordFields.length; i++) { + if (recordFields[i] == null) { + if (recordTypes[i] == String.class) { + Bits.putInt(result, offset, + this.monitoringController.getIdForString("")); + offset += 4; + } else if (recordTypes[i] == Integer.class + || recordTypes[i] == int.class) { + Bits.putInt(result, offset, 0); + offset += 4; + } else if (recordTypes[i] == Long.class + || recordTypes[i] == long.class) { + Bits.putLong(result, offset, 0L); + offset += 8; + } else if (recordTypes[i] == Float.class + || recordTypes[i] == float.class) { + Bits.putFloat(result, offset, 0); + offset += 4; + } else if (recordTypes[i] == Double.class + || recordTypes[i] == double.class) { + Bits.putDouble(result, offset, 0); + offset += 8; + } else if (recordTypes[i] == Byte.class + || recordTypes[i] == byte.class) { + Bits.putByte(result, offset, (byte) 0); + offset += 1; + } else if (recordTypes[i] == Short.class + || recordTypes[i] == short.class) { + Bits.putShort(result, offset, (short) 0); + offset += 2; + } else if (recordTypes[i] == Boolean.class + || recordTypes[i] == boolean.class) { + Bits.putBoolean(result, offset, false); + offset += 1; + } else { + LOG.warn("Record with unsupported recordField of type " + + recordFields[i].getClass()); + Bits.putByte(result, offset, (byte) 0); + offset += 1; + } + } else if (recordFields[i] instanceof String) { + Bits.putInt(result, offset, this.monitoringController + .getIdForString((String) recordFields[i])); + offset += 4; + } else if (recordFields[i] instanceof Integer) { + Bits.putInt(result, offset, (Integer) recordFields[i]); + offset += 4; + } else if (recordFields[i] instanceof Long) { + Bits.putLong(result, offset, (Long) recordFields[i]); + offset += 8; + } else if (recordFields[i] instanceof Float) { + Bits.putFloat(result, offset, (Float) recordFields[i]); + offset += 4; + } else if (recordFields[i] instanceof Double) { + Bits.putDouble(result, offset, (Double) recordFields[i]); + offset += 8; + } else if (recordFields[i] instanceof Byte) { + Bits.putByte(result, offset, (Byte) recordFields[i]); + offset += 1; + } else if (recordFields[i] instanceof Short) { + Bits.putShort(result, offset, (Short) recordFields[i]); + offset += 2; + } else if (recordFields[i] instanceof Boolean) { + Bits.putBoolean(result, offset, (Boolean) recordFields[i]); + offset += 1; + } else { + LOG.warn("Record with unsupported recordField of type " + + recordFields[i].getClass()); + Bits.putByte(result, offset, (byte) 0); + offset += 1; + } + } + + return result; + } + + /** + * Establishes a connection to a rabbitMQ channel with the current connection informationen. + */ + private void reconnect() { + try { + this.connection = this.factory.newConnection(); + this.channel = this.connection.createChannel(); + this.channel.queueDeclare(this.queueName, false, false, false, null); + this.normalConsumer = new QueueingConsumer(this.channel); + this.channel.basicConsume(this.queueName, true, this.normalConsumer); + } catch (final IOException e) { + RabbitMQReader.LOG.error("Error reestablishing connection", e); + } + } + + final void unblock() { // NOPMD (package visible for inner class) + this.cdLatch.countDown(); + } + + final boolean deliverIndirect(final String outputPortName, final Object data) { // NOPMD (package visible for inner class) + return super.deliver(outputPortName, data); + } + + /** + * {@inheritDoc} + */ + public void terminate(final boolean error) { + LOG.info("Shutdown of RabbitMQReader requested."); + this.unblock(); + } + + /** + * {@inheritDoc} + */ + @Override + public Configuration getCurrentConfiguration() { + final Configuration configuration = new Configuration(); + + configuration.setProperty(CONFIG_PROPERTY_NAME_PROVIDERURL, this.providerUrl); + configuration.setProperty(CONFIG_PROPERTY_NAME_QUEUE, this.queueName); + configuration.setProperty(CONFIG_PROPERTY_PASSWORD, this.password); + configuration.setProperty(CONFIG_PROPERTY_USER, this.username); + + return configuration; + } +} diff --git a/src/kieker/analysis/plugin/reader/mq/RabbitMQRegistryConsumer.java b/src/kieker/analysis/plugin/reader/mq/RabbitMQRegistryConsumer.java new file mode 100644 index 0000000000000000000000000000000000000000..e90c6fceb34cd4b368d8101aa44bd82ecaa60c31 --- /dev/null +++ b/src/kieker/analysis/plugin/reader/mq/RabbitMQRegistryConsumer.java @@ -0,0 +1,9 @@ +package kieker.analysis.plugin.reader.mq; + +public class RabbitMQRegistryConsumer extends Thread { + + @Override + public void run() { + + } +}