RabbitMQ Stream provider
Hello, hello. So, I've been working on a RabbitMQ stream provider implementation and today I think that I have something ready to demonstrate and ask for feedback.
The existing RabbitMQ implementations
I know that we have these 2 RabbitMQ implementations for Orleans:
- https://github.com/OrleansContrib/Orleans.Providers.RabbitMQ
- https://github.com/zitmen/Orleans.Streams.RabbitMqStreamProvider
These are quite old and are not maintained anymore. I did base myself on these libraries of how to write the IQueueAdapter, IQueueAdapterReceiver, and others on this PR. But the main difference between this implementation and these other 2 libraries is that I'm not using the classic RabbitMQ queues. Instead, it will use the "new" RabbitMQ Streams implementation, so allowing us to use this provider as a Rewindable stream.
There are still a few things to do here, and again, this is a WIP but I really wanted some feedback here either to back off or to change something.
Questions I have:
-
A custom Queue Cache Cursor. I spent some time trying to use
SimpleQueueCachehere like it is being used bySQSandAzureQueuebut no success. I was either loosing messages or getting duplicated messages. I saw this issue regarding theMessagesDeliveredAsyncso I assumed there was no priority to fixSimpleQueueCacheand I was very afraid to play with it for now since it is used by other providers. Let me know what you guys think. I might need help to fix it tho, not sure. One thing is for sure though, this RabbitMqStreamProvider needs theMessagesDeliveredAsyncto be called only when all messages are delivered to all consumers successfully as per documentation. -
When refreshing a consumer cursor (here), there is a scenario that the current cursor is already running (it is
Active) but it is on the last message. Since the consumer is running on a different thread, the following race condition could happen:
Thread A -----------Refresh Cursor-----Check if it is Inactive----Cursor was still Active here;
Thread B ----MoveNext from last message at the time-------------------Set cursor to inactive;
- Not sure if this is clear enough. I prob failed to explain, but basically a race condition is happening and letting cursors that have messages to process inactive. I fixed it with a
lockon theconsumerDatafor now, but I think I could use theInterlocked.Exchangeas well, will try very soon. ~I tried to find issues with similar problems but couldn't find, so this is prob an issue with just my cursor~ @benjaminpetit is this related to this issue: https://github.com/dotnet/orleans/issues/7686#issuecomment-1102799629 ?
Things left to do:
-
[ ] Tests. I never wrote any tests with Orleans (and tbh, this is my first time ever playing with Orleans) so I will be taking a look at how you guys did the tests for the EventHub provider. This is my priority for now, since I had such a bad time writing the
RabbitMqQueueCacheCursorbecause it was hard to fix something, like making sure messages don't get duplicated without breaking something else -
[ ] Comments, a lot of comments. I tried to focus a bit on commenting on some stuff but at some point, I was just interested in solving issues regarding the implementation. There is still a lot to comment and a lot to refactor as well.
-
[ ] Make
RabbitMqQueueCacheCursorconsumer independent, so that every consumer can process messages at it is own pace. -
[ ] Consume previous failed messages that are too old. If I have a message that failed and no more messages are published to that same stream in a while, that message won't be retried (except for the times the pulling agent already did it) at all. For now, the pulling agent will only retry to process this failed message once a new message for that same stream arrives, making the consumer active again. I don't really know how, or if, other stream providers handle this but I'm gonna take a look.
-
[x] rabbitmq-stream-dotnet-client has almost no
.ConfigureAwait(false). I was facing a lot of deadlocks because of that, since they also do.Resultor.GetAwaiter().GetResult()internally. I'll also open PR's there to fix this. I don't think we could ship this provider without these changes tho. See: https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/discussions/222
Microsoft Reviewers: Open in CodeFlow
@iuribrindeiro This is awesome! Thank you for putting this together!
@iuribrindeiro This is awesome! Thank you for putting this together!
I'm just so glad to be helping! I'm still fixing some issues with concurrency or some messages that are getting lost or duplicated. Should have some decent version very soon.
I tried to use the SimpleQueueCache here but I saw some issues regarding calling the Receiver.MessagesDeliveredAsync prematurely and I did replicate this my self locally while doing some wild message consuming (millions of messages). I just pushed a few changes to the new RabbitMqQueueCache and I think that I have solved most, or all, of the issues I was seeing before. Compared to SimpleQueueCache, at least I'm not loosing messages anymore or duplicating those :)
Thanks for sharing this with the community, @iuribrindeiro. Curious if you've socialized this on the Orleans Discord channel to get some feedback from folks. Exciting stuff, considering how many .NET users are already RabbitMQ users.
Thanks for sharing this with the community, @iuribrindeiro. Curious if you've socialized this on the Orleans Discord channel to get some feedback from folks. Exciting stuff, considering how many .NET users are already RabbitMQ users.
I hadn't tried that yet! But great idea, just joined there and seems like the PR is already known over there xD
hi does this stream provider need to connect to RabbitMQ instances in cluster (each silo having one or more instance of RabbitMQ) . can it connect to one instance of RabbitMQ in Orleans cluster? if it can connect to multiple instance , how load can be managed ? Thank you
hi does this stream provider need to connect to RabbitMQ instances in cluster (each silo having one or more instance of RabbitMQ) . can it connect to one instance of RabbitMQ in Orleans cluster? if it can connect to multiple instance , how load can be managed ? Thank you
This is a really good question. I'm using the RabbitMQ Stream client, so yes, it can. You would just have to follow their instructions to connect to a Multi Host environment.
Here is how you would configure the stream provider with a custom RabbitMQ Stream configuration: https://github.com/dotnet/orleans/pull/8273/files#diff-871fbbd5e1d7a137910d48854bf3970bc069efa7f1cd9fceb79cf4e58b2c7dc3R71
@iuribrindeiro please read the following Contributor License Agreement(CLA). If you agree with the CLA, please reply with the following information.
@dotnet-policy-service agree [company="{your company}"]
Options:
- (default - no company specified) I have sole ownership of intellectual property rights to my Submissions and I am not making Submissions in the course of work for my employer.
@dotnet-policy-service agree
- (when company given) I am making Submissions in the course of work for my employer (or my employer has intellectual property rights in my Submissions by contract or applicable law). I have permission from my employer to make Submissions and enter into this Agreement on behalf of my employer. By signing below, the defined term “You” includes me and my employer.
@dotnet-policy-service agree company="Microsoft"
Contributor License Agreement
Contribution License Agreement
This Contribution License Agreement ( “Agreement” ) is agreed to by the party signing below ( “You” ), and conveys certain license rights to the .NET Foundation ( “.NET Foundation” ) for Your contributions to .NET Foundation open source projects. This Agreement is effective as of the latest signature date below.
1. Definitions.
“Code” means the computer software code, whether in human-readable or machine-executable form, that is delivered by You to .NET Foundation under this Agreement.
“Project” means any of the projects owned or managed by .NET Foundation and offered under a license approved by the Open Source Initiative (www.opensource.org).
“Submit” is the act of uploading, submitting, transmitting, or distributing code or other content to any Project, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Project for the purpose of discussing and improving that Project, but excluding communication that is conspicuously marked or otherwise designated in writing by You as “Not a Submission.”
“Submission” means the Code and any other copyrightable material Submitted by You, including any associated comments and documentation.
2. Your Submission. You must agree to the terms of this Agreement before making a Submission to any Project. This Agreement covers any and all Submissions that You, now or in the future (except as described in Section 4 below), Submit to any Project.
3. Originality of Work. You represent that each of Your Submissions is entirely Your original work.
Should You wish to Submit materials that are not Your original work, You may Submit them separately to the Project if You (a) retain all copyright and license information that was in the materials as You received them, (b) in the description accompanying Your Submission, include the phrase “Submission containing materials of a third party:” followed by the names of the third party and any licenses or other restrictions of which You are aware, and (c) follow any other instructions in the Project’s written
4. Your Employer. References to “employer” in this Agreement include Your employer or anyone else for whom You are acting in making Your Submission, e.g. as a contractor, vendor, or agent. If Your Submission is made in the course of Your work for an employer or Your employer has intellectual property rights in Your Submission by contract or applicable law, You must secure permission from Your employer to make the Submission before signing this Agreement. In that case, the term “You” in this Agreement will refer to You and the employer collectively. If You change employers in the future and desire to Submit additional Submissions for the new employer, then You agree to sign a new Agreement and secure permission from the new employer before Submitting those Submissions.
5. Licenses.
a. Copyright License. You grant .NET Foundation, and those who receive the Submission directly or indirectly from .NET Foundation, a perpetual, worldwide, non-exclusive, royalty-free, irrevocable
license in the Submission to reproduce, prepare derivative works of, publicly display, publicly perform, and distribute the Submission and such derivative works, and to sublicense any or all of the foregoing rights to third parties.
b. Patent License. You grant .NET Foundation, and those who receive the Submission directly or indirectly from .NET Foundation, a perpetual, worldwide, non-exclusive, royalty-free, irrevocable license under Your patent claims that are necessarily infringed by the Submission or the combination of the Submission with the Project to which it was Submitted to make, have made, use, offer to sell, sell and import or otherwise dispose of the Submission alone or with the Project.
c. Other Rights Reserved. Each party reserves all rights not expressly granted in this Agreement. No additional licenses or rights whatsoever (including, without limitation, any implied licenses) are granted by implication, exhaustion, estoppel or otherwise.
6. Representations and Warranties. You represent that You are legally entitled to grant the above licenses. You represent that each of Your Submissions is entirely Your original work (except as You may have disclosed under Section 3 ). You represent that You have secured permission from Your employer to make the Submission in cases where Your Submission is made in the course of Your work for Your employer or Your employer has intellectual property rights in Your Submission by contract or applicable law. If You are signing this Agreement on behalf of Your employer, You represent and warrant that You have the necessary authority to bind the listed employer to the obligations contained in this Agreement. You are not expected to provide support for Your Submission, unless You choose to do so. UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING, AND EXCEPT FOR THE WARRANTIES EXPRESSLY STATED IN SECTIONS 3, 4, AND 6 , THE SUBMISSION PROVIDED UNDER THIS AGREEMENT IS PROVIDED WITHOUT WARRANTY OF ANY KIND, INCLUDING, BUT NOT LIMITED TO, ANY WARRANTY OF NONINFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
7. Notice to .NET Foundation. You agree to notify .NET Foundation in writing of any facts or circumstances of which You later become aware that would make Your representations in this Agreement inaccurate in any respect.
8. Information about Submissions. You agree that contributions to Projects and information about contributions may be maintained indefinitely and disclosed publicly, including Your name and other information that You submit with Your Submission.
9. Governing Law/Jurisdiction. This Agreement is governed by the laws of the State of Washington, and the parties consent to exclusive jurisdiction and venue in the federal courts sitting in King County, Washington, unless no federal subject matter jurisdiction exists, in which case the parties consent to exclusive jurisdiction and venue in the Superior Court of King County, Washington. The parties waive all defenses of lack of personal jurisdiction and forum non-conveniens.
10. Entire Agreement/Assignment. This Agreement is the entire agreement between the parties, and supersedes any and all prior agreements, understandings or communications, written or oral, between the parties relating to the subject matter hereof. This Agreement may be assigned by .NET Foundation.
.NET Foundation dedicates this Contribution License Agreement to the public domain according to the Creative Commons CC0 1.
Hi, I just wanted to ask if there is any plan to realize this approach ? I would really love to use a Rabbit MQ Broker because it would simplify hosting, local testing etc IMHO.
I really appreciate the efforts making the Pull Request so this is not intended to sound rude, but it would give me clarity if I need to make one on my own or not.
Hi, I just wanted to ask if there is any plan to realize this approach ? I would really love to use a Rabbit MQ Broker because it would simplify hosting, local testing etc IMHO.
I really appreciate the efforts making the Pull Request so this is not intended to sound rude, but it would give me clarity if I need to make one on my own or not.
Unfortunately I ran out of time for this side project of mine (which is really sad for me because I was having a lot of fun). So, I don't see my self working on this anymore in near future :(