improved `Queue` shutdown functionality
Hey, I've recently proposed something on Discord, and since feedback has been rather positive, I'm making a ticket to track the idea.
I've been working with Queues recently, and I've been having some issues around shutdown that I would like to address.
Specifically, I find it a common pattern that I send some kind of request object through a queue because I want another fiber to perform some action on my behalf. Along with the request, I send a Promise to have that fiber communicate the outcome of that action to me. By and large this works fine. The issue arises when the fiber that I'm sending requests to fails. In that case, I would like it to communicate the cause of the failure back to the other fibers. This is easy enough for the requests that I've already pulled out of the queue: I simply fail those promises.
But I also need to deal with other cases: fibers currently blocked in an offer call, future attempts to offer to the queue, and I also need to deal with requests that have been submitted to the queue but not yet retrieved.
So my idea is as follows:
- add an
Etype parameter toQueue - add a
shutdownCausemethod that takes a type parameter of typeCause[E] shutdownCausewould also return the items currently buffered in the queue in order to dispose of them- after
shutdownCausehas been called, any attempt to interact with the queue will fail with the cause - methods like
take, offeretc. should indicate errors of typeE - streams created with
ZStream.fromQueuewould also fail with this cause shutdownCauseshould be atomic: when multiple fibers call it at the same time, one of them wins and the others fail with the cause supplied by the winner
Afaik, adding a new method is a binary compatible change, as is adding a new type parameter. Hence I think this is a source incompatible but binary compatible change. @ghostdogpr therefore suggested it could be added in a ZIO 2.2 release.
I think that while this would indeed make sense in some usecases, it would break source-compatibility quite extensively. I'm also worried that it'll end up making code quite more verbose as 99.99% of usages would simply be doing this:
def foo(queue: Queue[Nothing, A]) = ???
Just thinking out of loud, but what if we introduced shutdownCause(cause: Cause[Nothing])? I know that only Cause.die and Cause.interruption could be used in this case, but since shutting down a queue with an error does technically makes it "die" (and everything that depends on it) I think it's a decent middle ground for it.
This also somewhat aligns with ZStream as well, if we exposed this as an error it could mislead users thinking that they can recover from this error, but that wouldn't be possible since the queue is now shut-down
/bounty $100 (maintainers only)
💎 $100 bounty • ZIO
Steps to solve:
- Start working: Comment
/attempt #9844with your implementation plan - Submit work: Create a pull request including
/claim #9844in the PR body to claim the bounty - Receive payment: 100% of the bounty is received 2-5 days post-reward. Make sure you are eligible for payouts
❗ Important guidelines:
- To claim a bounty, you need to provide a short demo video of your changes in your pull request
- If anything is unclear, ask for clarification before starting as this will help avoid potential rework
- Low quality AI PRs will not receive review and will be closed
- Do not ask to be assigned unless you've contributed before
Thank you for contributing to zio/zio!
| Attempt | Started (UTC) | Solution | Actions |
|---|---|---|---|
| 🟢 @hearnadam | Jun 03, 2025, 12:13:31 AM | #9928 | Reward |
| 🟢 @Abhinanda-Shetty | Jul 12, 2025, 11:58:05 AM | #10020 | Reward |
| 🟢 @ahana27 | Jul 13, 2025, 06:36:49 AM | #10022 | Reward |
/attempt #9844
I have to say that I find the idea of merely typing it as Cause[Nothing] somewhat unfortunate. Typed errors are a staple of ZIO and merely extending the set of possible errors to die feels incomplete.
I get that this has quite severe compatibility implications, but maybe we can find a solution? We could implement this in a new class (ZQueue or something) that does the general error thing, and then reimplement Queue as a ZQueue wrapper for compatibility (ideally we could even just define Queue as a type alias for ZQueue, but I don't think that's possible in a binary compatible way).
/attempt #9844
/attempt #9844