amqp091-go
amqp091-go copied to clipboard
Add Contexts where appropriate
https://www.digitalocean.com/community/tutorials/how-to-use-contexts-in-go
https://github.com/rabbitmq/amqp091-go/discussions/121#discussioncomment-3788391
See the following:
- #96
- #122
- #103
cc @mgdotson
An example from the Go codebase on how to use a context in a function that makes an external call:
https://github.com/golang/go/blob/6d8ec893039a39f495c8139012e47754e4518b70/src/database/sql/ctxutil.go#L46-L61
would be great to have this!
@mrkagelui this project is open-source. We would welcome a contribution that adds this feature. @mgdotson also suggested he would be willing to work on it.
The Consume method doesn't take context for cancellation.
func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error) {
...
}
I know the Channel has a Cancel method, and a developer can implement any cancel process. However, Go's context is a standard way to cancel a process, so passing context to Consume method can cancel delivering without implementing its own cancel process is useful. What do you think?
func (ch *Channel) ConsumeWithContext(ctx context.Context, queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error) {
...
}
I implemented #192 as a basis for discussion.
I'm not sure I quite follow the implementation (and I haven't followed the calling code all the way through to see if it is a blocking operation waiting for server confirmation).
Wouldn't the ch.call(req, res)
be the call that needs the context due to a possible long/blocking execution time? Once we have the message queue, we can then timeout those messages in a select statement and manually cancel the consume.
If ch.call(..)
is waiting on the server due to network delay or blocked status (would have to test this but it's late), this command won't return in the context provided until the server is unblocked meaning we'd still have to wrap Channel.Consume
in a context.
example wrapper:
finished := make(chan struct{})
go func() {
msgs, err = Channel.Consume(
queue,
consumer,
autoAck,
exclusive,
noLocal,
noWait,
args,
)
close(finished)
}()
select {
case <-finished:
return msgs, err
case <-ctx.Done():
// cleanup
go func() {
<-finished
_ = msg.Cancel(consumer, true)
}()
return nil, fmt.Errorf("consume timed out")
}
In the case of ConsumeWithContext
, the ch.call(...)
would be wrapped in the go func and additional cleanup could be done in the ctx.Done()
code - such as ch.cancel, etc. allowing the ConsumeWithContext
to not "hang" on the calling function waiting for server confirmation.
If I've misunderstood Consume and it's not a potentially blocking call with noWait=false
, please ignore.
First of all, I also think ch.call(...)
should take context and should be canceled in the future.
In your example, Consume()
may block with the current implementation since ch.call(...)
may block. For example, context won't cancel if Consume()
was blocked. I think both #192 and your example are not much different.
I'm not sure about cancelling ch.call(...)
because it may mess up the state machine in Channel.recv
. The state machine should recover, but I'm not sure if it would recover when ch.call(...)
is interrupted and ch.dispatch(message)
is already called and trying to send the response back to the RPC. See:
https://github.com/rabbitmq/amqp091-go/blob/1a875e1491157b1f77339794171c38406418538e/channel.go#L305
https://github.com/rabbitmq/amqp091-go/blob/1a875e1491157b1f77339794171c38406418538e/channel.go#L365-L373
Even if case *basicDeliver:
does not block (because it discards a message when the consumer is not found i.e. closed), it is not correct to do so, because RabbitMQ will be waiting for an ack for a very long time (until consumer timeout), and the message won't be redelivered to other consumers.
If we want to cancel ch.call(...)
, I think the only reliable way would be to close the Channel
with noWait=true
. Alternatively, I would feel slightly better if we checked context cancellation before calling ch.call(...)
.
@Zerpet Do you mean moving go func
from #192 line 188 up to 182 or just a default context check such as:
select {
case <-ctx.Done():
return
default:
}
ch.call(...)
A default check before the call is only slightly better but still leaves the caller in the state of waiting for the call to return and attempting to clean up state from outside if the context is valid prior to "call" but times out during the "call".
If moving context in #192 above "call", could there be a race with "ch.[Cc]ancel(consumer, false)" and a message pending that needs a nack?
Feels like the entire state engine needs to be reworked with contexts in mind which is a pretty big lift.
I meant a default context check as in your snippet.
Feels like the entire state engine needs to be reworked with contexts in mind which is a pretty big lift.
The more I think about this, the more I'm inclined to keep the state engine as-is, and implement a timeout mechanism that closes the AMQP channel. Anything else simply leaves the library in an unknown state. For example, Consume()
might be waiting on ch.call(...)
and give up; but the basic.consume
frame may or may not have already arrive to the server. What should the library do? If we send a basic.cancel
as part of the clean up, we may cause a channel exception if the server did not register the consumer, closing the channel. If we don't send a basic.cancel
and the server registered the consumer, it will have a "leaked" consumer that will never consume and it will leave messages in-flight, unacked, unconsumable by other consumers. The conclusion is that the system is left in an unknown state, and it's not trivial to determine what to do next. At the same time, we don't want to start infering the state (we may not be able to) by doing subsequent calls to the server.
All in all, I think that closing the AMQP channel is the only reliable solution. It is intrusive, but it leaves the system in a known state, and it is straightforward for the caller what happened and what's the state.