amqp091-go icon indicating copy to clipboard operation
amqp091-go copied to clipboard

Add Contexts where appropriate

Open lukebakken opened this issue 2 years ago • 10 comments

https://www.digitalocean.com/community/tutorials/how-to-use-contexts-in-go

https://github.com/rabbitmq/amqp091-go/discussions/121#discussioncomment-3788391

See the following:

  • #96
  • #122
  • #103

cc @mgdotson

lukebakken avatar Oct 03 '22 15:10 lukebakken

An example from the Go codebase on how to use a context in a function that makes an external call:

https://github.com/golang/go/blob/6d8ec893039a39f495c8139012e47754e4518b70/src/database/sql/ctxutil.go#L46-L61

Zerpet avatar Oct 04 '22 10:10 Zerpet

would be great to have this!

mrkagelui avatar Nov 17 '22 03:11 mrkagelui

@mrkagelui this project is open-source. We would welcome a contribution that adds this feature. @mgdotson also suggested he would be willing to work on it.

lukebakken avatar Nov 17 '22 17:11 lukebakken

The Consume method doesn't take context for cancellation.

func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error) {
  ...
}

I know the Channel has a Cancel method, and a developer can implement any cancel process. However, Go's context is a standard way to cancel a process, so passing context to Consume method can cancel delivering without implementing its own cancel process is useful. What do you think?

func (ch *Channel) ConsumeWithContext(ctx context.Context, queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error) {
  ...
}

t2y avatar May 15 '23 03:05 t2y

I implemented #192 as a basis for discussion.

t2y avatar May 15 '23 09:05 t2y

I'm not sure I quite follow the implementation (and I haven't followed the calling code all the way through to see if it is a blocking operation waiting for server confirmation).

Wouldn't the ch.call(req, res) be the call that needs the context due to a possible long/blocking execution time? Once we have the message queue, we can then timeout those messages in a select statement and manually cancel the consume.

If ch.call(..) is waiting on the server due to network delay or blocked status (would have to test this but it's late), this command won't return in the context provided until the server is unblocked meaning we'd still have to wrap Channel.Consume in a context.

example wrapper:

	finished := make(chan struct{})
	go func() {
		msgs, err = Channel.Consume(
			queue,
			consumer,
			autoAck,
			exclusive,
			noLocal,
			noWait,
			args,
		)
		close(finished)
	}()

	select {
	case <-finished:
		return msgs, err
	case <-ctx.Done():
		// cleanup 
		go func() { 
			<-finished
			_ = msg.Cancel(consumer, true)
		}()		
		return nil, fmt.Errorf("consume timed out")
	}

In the case of ConsumeWithContext, the ch.call(...) would be wrapped in the go func and additional cleanup could be done in the ctx.Done() code - such as ch.cancel, etc. allowing the ConsumeWithContext to not "hang" on the calling function waiting for server confirmation.

If I've misunderstood Consume and it's not a potentially blocking call with noWait=false, please ignore.

mgdotson avatar Jun 06 '23 06:06 mgdotson

First of all, I also think ch.call(...) should take context and should be canceled in the future.

In your example, Consume() may block with the current implementation since ch.call(...) may block. For example, context won't cancel if Consume() was blocked. I think both #192 and your example are not much different.

t2y avatar Jun 06 '23 07:06 t2y

I'm not sure about cancelling ch.call(...) because it may mess up the state machine in Channel.recv. The state machine should recover, but I'm not sure if it would recover when ch.call(...) is interrupted and ch.dispatch(message) is already called and trying to send the response back to the RPC. See:

https://github.com/rabbitmq/amqp091-go/blob/1a875e1491157b1f77339794171c38406418538e/channel.go#L305

https://github.com/rabbitmq/amqp091-go/blob/1a875e1491157b1f77339794171c38406418538e/channel.go#L365-L373

Even if case *basicDeliver: does not block (because it discards a message when the consumer is not found i.e. closed), it is not correct to do so, because RabbitMQ will be waiting for an ack for a very long time (until consumer timeout), and the message won't be redelivered to other consumers.

If we want to cancel ch.call(...), I think the only reliable way would be to close the Channel with noWait=true. Alternatively, I would feel slightly better if we checked context cancellation before calling ch.call(...).

Zerpet avatar Jun 06 '23 11:06 Zerpet

@Zerpet Do you mean moving go func from #192 line 188 up to 182 or just a default context check such as:

select {
    case <-ctx.Done():
        return
    default:
}

ch.call(...)

A default check before the call is only slightly better but still leaves the caller in the state of waiting for the call to return and attempting to clean up state from outside if the context is valid prior to "call" but times out during the "call".

If moving context in #192 above "call", could there be a race with "ch.[Cc]ancel(consumer, false)" and a message pending that needs a nack?

Feels like the entire state engine needs to be reworked with contexts in mind which is a pretty big lift.

mgdotson avatar Jun 06 '23 17:06 mgdotson

I meant a default context check as in your snippet.

Feels like the entire state engine needs to be reworked with contexts in mind which is a pretty big lift.

The more I think about this, the more I'm inclined to keep the state engine as-is, and implement a timeout mechanism that closes the AMQP channel. Anything else simply leaves the library in an unknown state. For example, Consume() might be waiting on ch.call(...) and give up; but the basic.consume frame may or may not have already arrive to the server. What should the library do? If we send a basic.cancel as part of the clean up, we may cause a channel exception if the server did not register the consumer, closing the channel. If we don't send a basic.cancel and the server registered the consumer, it will have a "leaked" consumer that will never consume and it will leave messages in-flight, unacked, unconsumable by other consumers. The conclusion is that the system is left in an unknown state, and it's not trivial to determine what to do next. At the same time, we don't want to start infering the state (we may not be able to) by doing subsequent calls to the server.

All in all, I think that closing the AMQP channel is the only reliable solution. It is intrusive, but it leaves the system in a known state, and it is straightforward for the caller what happened and what's the state.

Zerpet avatar Jun 07 '23 09:06 Zerpet