amqp icon indicating copy to clipboard operation
amqp copied to clipboard

AMQP session timeout

Open lawrencegripper opened this issue 6 years ago • 14 comments

Hi,

I'm seeing the following when using the library with Azure ServiceBus. The program is making a session and sender and then holding it open, sometime for >10Mins, without sending anything (it's a web app which sends an message on receiving a request). After 10mins idle sending a message fails because the link is detached due to being idle.

I want to add a check which allows me to reconnect when a link is detached but I can't find a channel I can watch for this event on either the sender or session. @devigned I wondered if you saw something similar in your work?

Would this be a useful thing to add as I'd also like to reconnect any linkdetached events? I could take a look in making a PR which adds a channel to the session, sound like a good idea?

ps. Still relatively new to AMQP so hopefully got terminology right be apologies if I've misunderstood something and this can be worked around with the existing library.

Error message:

time="2018-07-02T18:19:09Z" level=error msg="link detached, reason: *Error{Condition: amqp:link:detach-forced, Description: The link 'G25:1219031:9qnlIgUj4ClqjdQfzAfLSgmOXRsGQLJyhVerotCDtCO5g-0pXxBHGg' is force detached by the broker due to errors occurred in publisher(link12380570). Detach origin: AmqpMessagePublisher.IdleTimerExpired: Idle timeout: 00:10:00. TrackingId:97c6d7740000002700bce99a5b3a55f0_G25_B17, SystemTracker:ionsb-ccbbjpvf:Topic:frontapi.new_link, Timestamp:7/2/2018 4:52:24 PM, Info: map[]}"

lawrencegripper avatar Jul 02 '18 18:07 lawrencegripper

Here is the list of behaviors enforced in the service side related to connection management for Azure Service Bus (afaik).

  • Connection is created, but there are no senders/receivers created on the connection:
    • connection will be closed by the service after 1 minute.
  • Connection is created and a sender or a receiver is created on the connection:
    • connection will be maintained as long as there is at least one sender or receiver.
    • If there are no senders/receivers on the connection, connection will be closed after 5 minutes.
  • Senders/receivers will be closed by the service if there are no activities for 30 minutes (i.e. no send or pending receive calls)

To handle this, in https://github.com/Azure/azure-service-bus-go we recover when sending and receiving if we run into an error we'd classify as recoverable.

There are some event types which cause linkdetatch in Service Bus and Event Hubs, which are not recoverable. I believe this will be specific to the broker implementation and probably why it has not been implemented here.

Just a thought, but if there were to be a "recovery policy" that could be provided to a connection or some other entity, it might be generic enough to be useful across broker implementations.

@amarzavery I know you've been digging into recovery details lately. Please correct me if the details I've provided above are incorrect.

devigned avatar Jul 02 '18 20:07 devigned

The details you provided are correct. It would be nice if the protocol library can provide a hook/policy that can help users do some custom things for recovery.

For example in Azure EventHubs if a receiver link goes down for some reason and if we decide to recover. We would like to re-establish the link by setting the offset of the last received message as a filter on the link. This enables us to not receive messages from the start or from whatever offset/time that was provided when the link was initially created.

Having a callback/hook to do something custom like above during recovery would be extremely beneficial.

amarzavery avatar Jul 02 '18 21:07 amarzavery

Providing a way to be notified on link detach/allow recovery seems like a good idea.

Does someone want to propose an API? I would prefer it to use a callback approach rather than channels.

vcabbage avatar Jul 03 '18 02:07 vcabbage

Sure I'll take a shot at an API, very rough thinking at the moment but as you have the LinkOption already in the API my preference would be to use this as it will prevent a breaking change and keep things simple.

Here are some very rough (not compiled) thoughts as a first draft. Do you think this is along the right lines? Not set on this approach just getting the ball rolling.

// LinkRecoveryFunc is invoked when a link error occurs and 
// allows you to create a new link using the newLink func or return an error 
// which will be propogated to the sender/receiver next time they are used
type LinkRecoveryFunc func(linkError error, newLink func() (*link, error)) (*link, error)

func LinkRecoveryOption(recoveryFunc LinkRecoveryFunc) LinkOption {
	return func(l *link) error {
		l.recoveryFunc = func(linkError error) (*link, error) {
			return recoveryFunc(linkError, func() (*link, error) { newLink(l.session, l.receiver, l.options) })
		}
		return nil
	}
}

When the link experiences an error then it can invoke l.recoveryFunc with that error and either start using the new link returned or propagate the error.

This could then be used like this when creating a sender:


// CreateAmqpSender makes a sender which reconnects when a link detaches
func (l *AmqpConnection) CreateAmqpSender(topic string) (*amqp.Sender, error) {
	if l.Session == nil {
		log.WithField("currentListener", l).Panic("Cannot create amqp listener without a session already configured")
	}

	return l.Session.NewSender(
		amqp.LinkTargetAddress("/" + topic),
		amqp.LinkRecoveryOption(func(linkError error, newLink func() (*link, error){
			if isDetachError, _ := err.(amqp.DetachError); isDetachError {
				return newLink(), nil
			}

			return nil, linkError
		}),
	)

lawrencegripper avatar Jul 03 '18 08:07 lawrencegripper

The electron API (an alternative Go API for AMQP) provides a channel: https://github.com/alanconway/qpid-proton/blob/cpp-null/go/src/qpid.apache.org/electron/endpoint.go#L58 and an Error() method to get error details - this is consistent for all endpoints (Connection, Link, Session)

Having written Go APIs that do both, I'd recommend a channel over a callback. There's some detailed thoughts about why here: https://github.com/alanconway/qpid-proton/blob/cpp-null/go/examples/README.md#a-tale-of-two-brokers

The short story: callbacks are a return to inversion-of-control, a style of programming that Go finally allows us to escape from. A single function that loops and selects over multiple channels is easier to read and maintain than a scattering of itty-bitty callback functions, each handling a fraction of the problem, each lacking the overall context of the loop, and ultimately each needing to communicate results and problems to some other code via a mechanism such as ... a channel.

On Tue, Jul 3, 2018 at 4:12 AM, Lawrence Gripper [email protected] wrote:

Sure I'll take a shot at an API, very rough thinking at the moment but as you have the LinkOption already in the API my preference would be to use this as it will prevent a breaking change and keep things simple.

Here are some very rough (not compiled) thoughts as a first draft. Do you think this is along the right lines? Not set on this approach just getting the ball rolling.

// LinkRecoveryFunc is invoked when a link error occurs and // allows you to create a new link using the newLink func or return an error // which will be propogated to the sender/receiver next time they are usedtype LinkRecoveryFunc func(linkError error, newLink func() (*link, error)) (*link, error)

func LinkRecoveryOption(recoveryFunc LinkRecoveryFunc) LinkOption { return func(l *link) error { l.recoveryFunc = func(linkError error) (*link, error) { return recoveryFunc(linkError, func() (*link, error) { newLink(l.session, l.receiver, l.options) }) } return nil } }

When the link experiences an error then it can invoke l.recoveryFunc with that error and either start using the new link returned or propagate the error.

This could then be used like this when creating a sender:

// CreateAmqpSender makes a sender which reconnects when a link detachesfunc (l *AmqpConnection) CreateAmqpSender(topic string) (*amqp.Sender, error) { if l.Session == nil { log.WithField("currentListener", l).Panic("Cannot create amqp listener without a session already configured") }

return l.Session.NewSender( amqp.LinkTargetAddress("/" + topic), amqp.LinkRecoveryOption(func(linkError error, newLink func() (*link, error){ if isDetachError, _ := err.(amqp.DetachError); isDetachError { return newLink(), nil }

  	return nil, linkError
  }),

)

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/vcabbage/amqp/issues/109#issuecomment-402051843, or mute the thread https://github.com/notifications/unsubscribe-auth/AHa6XgjACqlNRjCrloEe-jQDuNTlQL7Sks5uCyeKgaJpZM4U_xMz .

alanconway avatar Jul 03 '18 14:07 alanconway

Gahh, sorry - sent links to my devel branch! Here are some more stable links:

https://github.com/apache/qpid-proton/blob/master/go/examples/README.md#a-tale-of-two-brokers https://godoc.org/qpid.apache.org/electron#Endpoint

On Tue, Jul 3, 2018 at 10:44 AM, Alan Conway [email protected] wrote:

The electron API (an alternative Go API for AMQP) provides a channel: https://github.com/alanconway/qpid-proton/blob/cpp-null/go/ src/qpid.apache.org/electron/endpoint.go#L58 and an Error() method to get error details - this is consistent for all endpoints (Connection, Link, Session)

Having written Go APIs that do both, I'd recommend a channel over a callback. There's some detailed thoughts about why here: https://github.com/alanconway/qpid-proton/blob/cpp-null/go/ examples/README.md#a-tale-of-two-brokers

The short story: callbacks are a return to inversion-of-control, a style of programming that Go finally allows us to escape from. A single function that loops and selects over multiple channels is easier to read and maintain than a scattering of itty-bitty callback functions, each handling a fraction of the problem, each lacking the overall context of the loop, and ultimately each needing to communicate results and problems to some other code via a mechanism such as ... a channel.

On Tue, Jul 3, 2018 at 4:12 AM, Lawrence Gripper <[email protected]

wrote:

Sure I'll take a shot at an API, very rough thinking at the moment but as you have the LinkOption already in the API my preference would be to use this as it will prevent a breaking change and keep things simple.

Here are some very rough (not compiled) thoughts as a first draft. Do you think this is along the right lines? Not set on this approach just getting the ball rolling.

// LinkRecoveryFunc is invoked when a link error occurs and // allows you to create a new link using the newLink func or return an error // which will be propogated to the sender/receiver next time they are usedtype LinkRecoveryFunc func(linkError error, newLink func() (*link, error)) (*link, error)

func LinkRecoveryOption(recoveryFunc LinkRecoveryFunc) LinkOption { return func(l *link) error { l.recoveryFunc = func(linkError error) (*link, error) { return recoveryFunc(linkError, func() (*link, error) { newLink(l.session, l.receiver, l.options) }) } return nil } }

When the link experiences an error then it can invoke l.recoveryFunc with that error and either start using the new link returned or propagate the error.

This could then be used like this when creating a sender:

// CreateAmqpSender makes a sender which reconnects when a link detachesfunc (l *AmqpConnection) CreateAmqpSender(topic string) (*amqp.Sender, error) { if l.Session == nil { log.WithField("currentListener", l).Panic("Cannot create amqp listener without a session already configured") }

return l.Session.NewSender( amqp.LinkTargetAddress("/" + topic), amqp.LinkRecoveryOption(func(linkError error, newLink func() (*link, error){ if isDetachError, _ := err.(amqp.DetachError); isDetachError { return newLink(), nil }

 	return nil, linkError
 }),

)

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/vcabbage/amqp/issues/109#issuecomment-402051843, or mute the thread https://github.com/notifications/unsubscribe-auth/AHa6XgjACqlNRjCrloEe-jQDuNTlQL7Sks5uCyeKgaJpZM4U_xMz .

alanconway avatar Jul 03 '18 14:07 alanconway

What @alanconway suggested is eerily similar to what we've implemented in the Azure Service Bus and Event Hubs libraries for message receivers.

// ListenerHandle provides the ability to close or listen to the close of a Receiver
type ListenerHandle struct {
	r   *receiver
	ctx context.Context
}

// Close will close the listener
func (lc *ListenerHandle) Close(ctx context.Context) error {
	return lc.r.Close(ctx)
}

// Done will close the channel when the listener has stopped
func (lc *ListenerHandle) Done() <-chan struct{} {
	return lc.ctx.Done()
}

// Err will return the last error encountered
func (lc *ListenerHandle) Err() error {
	if lc.r.lastError != nil {
		return lc.r.lastError
	}
	return lc.ctx.Err()
}

https://github.com/Azure/azure-service-bus-go/blob/607999369044f648929f37d3c925c086b21a5a06/receiver.go#L339-L355

This pattern is helpful for consumers to understand when something has failed, but I don't know that this pattern deals with recovery. This would still leave recovery in the hands of the consumer of the API.

If recovery is to be handled effectively, I'd imagine the consumer of the API would need to provide at least two things.

  1. a filter criteria to identify the errors that are recoverable
  2. a backoff policy for recovery (linear, exponential...)

devigned avatar Jul 03 '18 15:07 devigned

On Tue, Jul 3, 2018 at 11:46 AM, David Justice [email protected] wrote:

What @alanconway https://github.com/alanconway suggested is eerily similar to what we've implemented in the Azure Service Bus and Event Hubs libraries for message receivers.

I've seen this pattern in the standard libraries but I'm not long enough at go to say confidently that it's a "well known pattern". It works nicely to tie related "waiting" together, e.g. suppose you're reading a channel and sending messages using the data from the channel. You want to know immediately if the link closes or fails, even if you're not actively sending when it fails, and you have a timeout to respect on top of it all:

// NB: This is not vcabbage code! https://godoc.org/qpid.apache.org
select {
case data, ok := <-channelWithData:
    if ok {
        err := sender.Send(MessageMadeOf(data));
        if err != nil { /* handle err */ }
    } else {
        // no more data
    }
case <- sender.Done():
    if sender.Error() == nil {
        // orderly close
    } else {
        // link failed, handle sender.Error()
    }
    case <- time.After(n * time.Millisecond) {
        // timeout expired
    }
}

Very neat!

// ListenerHandle provides the ability to close or listen to the close of a Receivertype ListenerHandle struct {

r *receiver ctx context.Context } // Close will close the listenerfunc (lc *ListenerHandle) Close(ctx context.Context) error { return lc.r.Close(ctx) } // Done will close the channel when the listener has stoppedfunc (lc *ListenerHandle) Done() <-chan struct{} { return lc.ctx.Done() } // Err will return the last error encounteredfunc (lc *ListenerHandle) Err() error { if lc.r.lastError != nil { return lc.r.lastError } return lc.ctx.Err() }

https://github.com/Azure/azure-service-bus-go/blob/ 607999369044f648929f37d3c925c086b21a5a06/receiver.go#L339-L355

This pattern is helpful for consumers to understand when something has failed, but I don't know that this pattern deals with recovery. This would still leave recovery in the hands of the consumer of the API.

If recovery is to be handled effectively, I'd imagine the consumer of the API would need to provide at least two things.

  1. a filter criteria to identify the errors that are recoverable
  2. a backoff policy for recovery (linear, exponential...)

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/vcabbage/amqp/issues/109#issuecomment-402203557, or mute the thread https://github.com/notifications/unsubscribe-auth/AHa6XibUdlf5HFkH97QO8l2fL20RQ_95ks5uC5HtgaJpZM4U_xMz .

alanconway avatar Jul 03 '18 16:07 alanconway

Exposing channels in a public API can get rather complex. Not so much for a simple "done" channel, since it can be closed and that's it. For other uses it's not so straightforward. (There's a post detailing some examples and guidelines.)

Referring to @alanconway's example, what is expected of the consumer when both channelWithData and sender.Done() can be received from? When multiple select cases can proceed one is chosen at random (uniformly pseudo-random, to be precise). If channelWithData is buffered, the client may still want to process any pending messages, making correct usage more involved. In fact, the internal implementation of Receiver.Receive has exactly this problem, which needs to be dealt with.

In summary, I prefer to keep these details internal to the package, exposing a synchronous API where possible. In this case, a synchronous API doesn't make much sense, a callback is the next best option in my opinion. Neither of these approaches prevent the user from creating and using channels themselves, if that's more convenient for them. I'm not saying that this lib can never expose channels, but I'd want to see some reasons as to why a channel is significantly better than the alternatives.

vcabbage avatar Jul 04 '18 16:07 vcabbage

@lawrencegripper I think creating a new LinkOption makes sense. To @devigned's point, we probably want it to allow more customization.

Perhaps modifying the newLink function to accept LinkOptions would be sufficient.

type LinkRecoveryFunc func(linkError error, newLink func(opts ...LinkOption) (*link, error)) (*link, error)

When newLink is called, it would use the original options by default. Most, if not all, of the existing LinkOptions will overwrite previous values if applied multiple times.

vcabbage avatar Jul 04 '18 16:07 vcabbage

On Wed, Jul 4, 2018 at 12:20 PM, Kale Blankenship [email protected] wrote:

Exposing channels in a public API can get rather complex. Not so much for a simple "done" channel, since it can be closed and that's it. For other uses it's not so straightforward. (There's a post https://inconshreveable.com/07-08-2014/principles-of-designing-go-apis-with-channels/ detailing some examples and guidelines.)

Referring to @alanconway https://github.com/alanconway's example, what is expected of the consumer when both channelWithData and sender.Done() can be received from? When multiple select cases can proceed one is chosen at random (uniformly pseudo-random, to be precise). If channelWithData is buffered, the client may still want to process any pending messages, making correct usage more involved. In fact, the internal implementation of Receiver.Receive has exactly this problem, which needs to be dealt with.

There is sufficient state to deal with that in the Sender. Regardless of why you wake up you'll get an error from Sender.Send() if it has been closed. Indeed multiple wake-ups isn't the only way this can happen - it's perfectly possible for another goroutine to close the Sender after you wake from select and before you try to Send(). Whether you use select or not you always need to handle errors in the final Send() call.

Agreed with risks of channels for general use but I think the Done case is not a bad idea. Of course you can use a callback to close a channel so a callback approach doesn't preclude waiting concurrently but it makes it more complex.

In summary, I prefer to keep these details internal to the package, exposing a synchronous API where possible. In this case, a synchronous API doesn't make much sense, a callback is the next best option in my opinion. Neither of these approaches prevent the user from creating and using channels themselves, if that's more convenient for them. I'm not saying that this lib can never expose channels, but I'd want to see some reasons as to why a channel is significantly better than the alternatives.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/vcabbage/amqp/issues/109#issuecomment-402520662, or mute the thread https://github.com/notifications/unsubscribe-auth/AHa6XszRMY2QnZWsTcjSvODKS0-HR4gzks5uDOtSgaJpZM4U_xMz .

alanconway avatar Jul 04 '18 16:07 alanconway

@vcabbage Ok makes sense to me, I'll take a stab at building a PR out of the proposal, hopefully get some time tomorrow if things go well.

lawrencegripper avatar Jul 04 '18 17:07 lawrencegripper

So it's taken me longer than I'd have liked to wrap my head around the changes needed. Before I dive any further into this I wanted to come up for air and sanity check some stuff @vcabbage.

Here are the cases where I think the library should handle a failure gracefully using this method:

  1. TCP Connection lost: https://github.com/vcabbage/amqp/blob/master/conn.go#L518 (other thinks to consider MaxSizeError line 497, multiple parsing errors but I'm unsure if these should be handled in this change)
  2. Link Error returned when invoking Send on a Sender: https://github.com/vcabbage/amqp/blob/master/client.go#L350
  3. Link Error returned when calling Receive on a Receiver: https://github.com/vcabbage/amqp/blob/master/client.go#L1569

The first feels like it should be handled with a separate ConnectionRecoveryOptions added to the connection details.

For the rest I think the existing LinkRecoveryOptions makes sense still.

In terms of where the recoveryFunc gets called I think that inside the body of send is the best fit as this is already locking so I can ensure it's safe. I'm thinking something like this:


// send is separated from Send so that the mutex unlock can be deferred without
// locking the transfer confirmation that happens in Send.
func (s *Sender) send(ctx context.Context, msg *Message) (chan deliveryState, error) {
	s.mu.Lock()
	defer s.mu.Unlock()

	if s.link.err != nil {
		err := s.link.Close(ctx)
		if err != nil {
			return nil, errorErrorf("link in error state: %+v, attempted to close and received error: %+v", s.link.err, err)
		}

		newLink, err := s.link.recoveryFunc(s.link.err)
		if err != nil {
			return nil, errorErrorf("link in error state: %+v, attempted to recovery but received error: %+v", s.link.err, err)
		}
		s.link = newLink
	}

      ... existing code here

I'm a bit unsure how to handle the link change within the Receive function as it doesn't use locking, I need to write a bit of a test harness to prove this all out as currently without a reasonable way of testing all the scenarios.

Sorry if I'm way off track with this, initially looked at adding the recovery into the mux functions but it looks like sitting them on the higher level session object will be cleaner - any advice thoughts very welcome

lawrencegripper avatar Jul 30 '18 17:07 lawrencegripper

@lawrencegripper There are at least a few places this lib has grown to be a bit unwieldy, sorry about that. I need to do another round of cleanup at some point.

I'm not sure I'm willing to introduce a connection recovery mechanism. That seems much more involved than recovering individual links. Though I'm always willing to be convinced that it's the right thing to do if there's a real need.


That looks roughly correct for the Sender. A couple notes:

  • Only the mux can access link.err before link.done is closed. You'll need to do a non-blocking receive to check if link.done has been closed.
  • By that point the link should already be closed, so there's no need to call s.link.Close(ctx).
  • There will need to be a way to determine if the error originated at the link level since recovery can't be performed if it came from the session or conn.
  • Some thought will need to be put into errors that happen in the middle of transferring a message or while waiting for a disposition. Off the top of my head, I'm not certain of the correct way to handle them.

For the Receiver, I think the recovery should happen if r.link.done is closed. Since Receiver.Receive can be called concurrently, a new mutex will be needed. The first to acquire the mutex will do the recovery and there will need to be a check to determine if recovery has already been attempted.

vcabbage avatar Aug 01 '18 01:08 vcabbage