rabbitmq-c icon indicating copy to clipboard operation
rabbitmq-c copied to clipboard

Consuming hangs thread after network error

Open PversusNP opened this issue 3 years ago • 2 comments

With 0.10.0 version, this is my source code for consuming:

amqp_envelope_t envelope; amqp_rpc_reply_t ret = amqp_consume_message(Connection->State, &envelope, timeout, 0);

if (AMQP_RESPONSE_NORMAL != ret.reply_type) { if (AMQP_RESPONSE_LIBRARY_EXCEPTION == ret.reply_type && AMQP_STATUS_UNEXPECTED_STATE == ret.library_error) { timeval Timeout = {}; // = 0 non blocking Timeout.tv_sec = 0; Timeout.tv_usec = 0; amqp_frame_t frame; if (AMQP_STATUS_OK != amqp_simple_wait_frame_noblock(Connection->State, &frame, &Timeout)) { return false; }

      if (AMQP_FRAME_METHOD == frame.frame_type)
      {
	      switch (frame.payload.method.id)
	      {
	      case AMQP_BASIC_ACK_METHOD:
		      /* if we've turned publisher confirms on, and we've published a
		      * message here is a message being confirmed.
		      */
		      break;
	      case AMQP_BASIC_RETURN_METHOD:
		      /* if a published message couldn't be routed and the mandatory
		      * flag was set this is what would be returned. The message then
		      * needs to be read.
		      */
	      {
		      amqp_message_t message;
		      ret = amqp_read_message(Connection->State, frame.channel, &message, 0);
		      if (AMQP_RESPONSE_NORMAL != ret.reply_type) 
		      {
			      return false;
		      }
  
		      amqp_destroy_message(&message);
	      }
	      break;

...

I replaced amqp_simple_wait_frame() with amqp_simple_wait_frame_noblock() because my thread must be not blocking.

But sometimes, after some kind of network error, my application closes the connection object and tries to reopen it. When, after some tries, it is able to open a new connection, somewhere in the code above it hangs up.

I suspect that amqp_read_message isn't not blocking. Does anyone experience the same issue? Is there a not blocking version for the functions above? Thanks.

PversusNP avatar Jul 29 '21 15:07 PversusNP

As a consumer, i would like to know under what circumstances AMQP_BASIC_ACK_METHOD or AMQP_BASIC_RETURN_METHOD will be recved?

kongxa avatar Nov 15 '23 07:11 kongxa

amqp_read_message->amqp_simple_wait_frame_on_channel->wait_frame_inner(amqp_time_infinite()). In wait_frame_inner function, if amqp_connection_state_t_.heartbeat <= 0,then heartbeats are not enabled, and next_recv_heartbeat and next_send_heartbeat are set to infinite. In this case maybe block your program.

   deadline = amqp_time_first(timeout_deadline,
                               amqp_time_first(state->next_recv_heartbeat,
                                               state->next_send_heartbeat));
    /* TODO this needs to wait for a _frame_ and not anything written from the
     * socket */
    res = recv_with_timeout(state, deadline);

kongxa avatar Nov 15 '23 07:11 kongxa