azure-sdk-for-java icon indicating copy to clipboard operation
azure-sdk-for-java copied to clipboard

[BUG] Timeout on consuming message from azure service bus through Application Gateway

Open fanemama opened this issue 1 year ago • 12 comments

Context: We are using Azure Service Bus via an application gateway (custom endpoint) with the transport type: AmqpTransportType.AMQP_WEB_SOCKETS. Our consumer encounters regularly a connection timeout issue and the application stops consuming messages. We are constantly forced to restart the application to consume again messages.

Do you have a solution for our issue ? and an explanation of this behaviours

Stack trace:

{"az.sdk.message":"Error occurred while refreshing token that is not retriable. Not scheduling refresh task. Use ActiveClientTokenManager.authorize() to schedule task again.","exception":"Could not emit tick 256 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)","scopes":[https:// t](https://*******,"audience":"amqp://"}

{"az.sdk.message":"Timeout waiting for RemoteClose. Manually terminating EndpointStates and completing close.","connectionId":"MF_a2709f_1714114657307","entityPath":"","linkName":""}

{"az.sdk.message":"onLinkRemoteClose","connectionId":"MF_6ffaaf_1714548329200","errorCondition":"amqp:link:detach-forced","errorDescription":"The link 'G14:5461966: ' is force detached. Code: publisher(link162650). Details: AmqpMessagePublisher.IdleTimerExpired: Idle timeout: 00:10:00.","linkName":"","entityPath":"***********"}

fanemama avatar May 03 '24 08:05 fanemama

@anuchandy @conniey @lmolkova

github-actions[bot] avatar May 03 '24 08:05 github-actions[bot]

Thank you for your feedback. Tagging and routing to the team member best able to assist.

github-actions[bot] avatar May 03 '24 08:05 github-actions[bot]

Hello @fanemama, the first error message indicate that the background scheduled task that is responsible for renewing the auth token at regular interval is failing to do so at certain point. If this is happening often then it possibly indicating an unstable network between application running consumer and AD endpoint or broker. I wonder if there is a restricted network or proxy or some kind of firewall rules that result in dropping the connection often.

anuchandy avatar May 03 '24 22:05 anuchandy

Hi  @anuchandy , To give you more contexts, All our application developped in .Net or Python doesn't have this issue. We only have this error with application develloped with Java and runing on docker container.  In addition to that, before creating this issue, we had already contacted  the support azure and we  had many meetings  to check our network; everything seemed fine they didn't detect anything unsual. The information of the azure support ticket bellow.  - Timeout when using the application gateway ... - TrackingID#2402210050003918 

Out of the different meetings we had with the support, the conclusion was that it is not a net work issue.

fanemama avatar May 04 '24 05:05 fanemama

@fanemama, let me prepare a setup (SB resource behind app-gateway and SDK consumer running in a Docker instance) to see if this can be reproduced.

anuchandy avatar May 06 '24 20:05 anuchandy

FYI @anuchandy I will know take the lead on this, helping @fanemama .

Thank you for preparing this setup.

lazhar avatar May 07 '24 07:05 lazhar

Hi @anuchandy, any news on this ? Thanks.

lazhar avatar May 13 '24 09:05 lazhar

Hi @lazhar, I’ve been looking into this. One thing I noticed is, if the gateway front end sent FIN+ACK and TCP RST, then the underlying proton-j library does not signal connection termination to the application. I’ve created an issue in that project’s JIRA [PROTON-2823] Proton-J does not raise transport closed when TCP FIN+ACK arrives followed by TCP RST - ASF JIRA (apache.org) . I’m not sure if your environment is impacted by this.

Here are a few observations I had that I thought helpful to share -

  1. Application gateway seems to have a request time-out (under Backend settings for port 443) config, setting it a higher value (30-60 seconds) will reduce the disconnection chances.
  2. There is an AMQP system property setting that enables clients to send heartbeat signals to the broker (via gateway), that can help with keeping the connection from idling. The property can be passed as a VM Option E.g., -Dqpid.heartbeat=10, I can see this is enabling the underlying proton-j library to send empty frames to the broker (doc). I read in the references that one should not set this at a very low value, I was trying with 10-20 seconds.

Also, may I know Service Bus SDK version, the mode of authentication (e.g., connection string) and how the receive code looks like.

anuchandy avatar May 14 '24 22:05 anuchandy

Hello @anuchandy,

Thank you very much for your help and feedback. I'm also working with @lazhar and @fanemama on this issue.

On the infrastructure, network side, we have noticed the same findings about FIN+ACK and TCP RST and you confirm my assumption that the application does not get the signal and thus doesn't try to restart the connection. Do you have an idea which service initiate these FIN+ACK + TCP RST and why ? Do you think we should we focus also on this ?

We have already setup the timeout on Application Gateway at 120s (following that note: https://learn.microsoft.com/en-us/azure/application-gateway/application-gateway-websocket#backendaddresspool-backendhttpsetting-and-routing-rule-configuration). 120s is double than the default timeout of ASB message locks. To be on safe side, even if it is not optimized. And indeed, we noticed little improvement on some applications.

I let @lazhar answer about the code details.

Thank you again for your help.

glmva avatar May 15 '24 06:05 glmva

Hi @anuchandy, thank you for your help and for opening the ticket about Proton-J.

We will try to set the heartbeat option in our java applications and see if it helps.

We are also trying to see if this heartbeat option can be set directly in the code somewhere.

lazhar avatar May 15 '24 06:05 lazhar

@anuchandy I saw this option on .NET SDK. Is it the heartbeat we are talking about ?

Do you know the equivalent in the java SDK ?

lazhar avatar May 15 '24 07:05 lazhar

@anuchandy and to answer your questions:

  • service Bus SDK version:
    • for the first project: spring-cloud-azure-starter --> 5.5.0 and azure-messaging-servicebus --> 7.14.3
    • for the second project: spring-cloud-azure-starter --> 5.11.0 and azure-messaging-servicebus --> 7.15.2
  • mode of authentication: managed identity
  • how the receive code looks like (second project):
// In @Configuration file
    @Bean
      public ServiceBusClientBuilder serviceBusClient() {
      return new ServiceBusClientBuilder().fullyQualifiedNamespace(fullyQualifiedName).customEndpointAddress(customEndpoint).transportType(AmqpTransportType.AMQP_WEB_SOCKETS).credential(ClientSecretCredentialBuilder().clientId(clientId).tenantId(tenantId).clientSecret(secretId).build());
   }

   
  
  // In a @Service file
   
      @PostConstruct
   public void receiveMessages() {
      ServiceBusProcessorClient mainService = AzureBusUtility.createServiceBusProcessorClient(serviceBusClientBuilder, topicName, subscriptionName, this::processMessage, this::processError).buildProcessorClient();

      try {
         mainService.start();
         log.info("Starting the processor");
         Schedulers.boundedElastic().schedulePeriodically(() -> {
            log.info("Is Main service still running: " + mainService.isRunning());
            AzureBusUtility.isServiceRunning(mainService);
         }, 30L, 30L, TimeUnit.SECONDS);
      }catch (ServiceBusException serviceBusException){
         AzureBusUtility.isServiceRunning(mainService);
      }

   }
   
    public  ServiceBusProcessorClient createServiceBusProcessorClient(ServiceBusClientBuilder serviceBusClientBuilder, String topicName, String subscriptionName,
         Consumer<ServiceBusReceivedMessageContext> processMessage, Consumer<ServiceBusErrorContext> processError) {
      return serviceBusClientBuilder.processor().topicName(topicName).subscriptionName(subscriptionName).maxConcurrentCalls(1).maxAutoLockRenewDuration(Duration.ofMinutes(2L)).receiveMode(ServiceBusReceiveMode.PEEK_LOCK).disableAutoComplete()
            .processMessage(processMessage).processError(processError).disableAutoComplete();
   }
   
    protected static void isServiceRunning(ServiceBusProcessorClient service){
      if (!service.isRunning()) {
         service.stop();
         log.info("Stopping the service");
         service.close();
         log.info("Closing the service");
         service.start();
         log.info("Restart the service");
      }
   }
   
   void processMessage(ServiceBusReceivedMessageContext context) {
      final ServiceBusReceivedMessage message = context.getMessage();

      try {
         log.info("Processing message. Id: {}, Sequence #: {}. Contents: {} %n", message.getMessageId(), message.getSequenceNumber(), message.getBody());
         context.complete();
       
      catch (Exception processingException) {
         log.error(message.getMessageId() + ERROR_MESSAGE + " : Processing exception", processingException.getMessage());
         context.deadLetter(AzureBusUtility.prepareDeadLetterOptions(processingException, message, message.getMessageId() + ":Processing exception"));
         log.info(message.getMessageId() + DLQ_MESSAGE);
      }
   }


   void processError(ServiceBusErrorContext context) {
      if (!(context.getException() instanceof ServiceBusException exception)) {
         log.error("Non-ServiceBusException occurred: %s", context.getException());
         return;
      }
      log.error("ServiceBusException source: {}. Reason: {}. Is transient? {}.", context.getErrorSource(), exception.getReason(), exception.isTransient(), context.getException());
   }
  • I also share with you how the sending code looks like (first project):
@Configuration
@Slf4j
public class AzureServiceBusConfiguration {

   @Value("${spring.cloud.azure.profile.tenant-id}")
   private String tenantId;
   @Value("${spring.cloud.azure.credential.client-id}")
   private String clientId;
   @Value("${spring.cloud.azure.credential.client-secret}")
   private String secretId;
   @Value("${spring.cloud.azure.servicebus.entity-name}")
   private String topicName;
   @Value("${spring.cloud.azure.servicebus.processor.subscription-name}")
   private String subscriptionName;
   @Value("${fully.qualified.name}")
   private String fullyQualifiedName;
   @Value("${custom.endpoint}")
   private String customEndpoint;


   @Primary
   @Bean(name = "SendBusMessage")
   ServiceBusSenderClient sendBusMessage() {
      ServiceBusClientBuilder builder = new ServiceBusClientBuilder().fullyQualifiedNamespace(fullyQualifiedName).customEndpointAddress(customEndpoint).transportType(AmqpTransportType.AMQP_WEB_SOCKETS).credential(getAuth().build());

      return builder.sender().topicName(topicName).buildClient();
   }

   private ClientSecretCredentialBuilder getAuth() {
      return new ClientSecretCredentialBuilder().clientId(clientId).tenantId(tenantId).clientSecret(secretId);
   }

}
@Slf4j
@Service("AzureBusService")
public class AzureBusServiceImpl implements AzureBusService {

   private static final String          BUS_PROPERTIES_CLAIM_KEY = "claimNumber";
   private final ServiceBusSenderClient senderClient;

   public AzureBusServiceImpl(@Qualifier("SendBusMessage") ServiceBusSenderClient senderClient) {
      this.senderClient = senderClient;
   }

   @Override
   public void sendMessage(String subject, String message, String messageSessionId, String claimNumber) {
      log.info("Writing message \"{}\" in the Azure Bus topic \"{}\"", message, senderClient.getEntityPath());

      ServiceBusMessage busMessage = new ServiceBusMessage(message).setSessionId(messageSessionId).setSubject(subject);
      busMessage.getApplicationProperties().put(BUS_PROPERTIES_CLAIM_KEY, claimNumber);

      senderClient.sendMessage(busMessage);
   }

   @PreDestroy
   public void preDestroy() {
      senderClient.close();
   }

}

lazhar avatar May 15 '24 09:05 lazhar

Hello @anuchandy,

Thank you very much for your help and feedback. I'm also working with @lazhar and @fanemama on this issue.

On the infrastructure, network side, we have noticed the same findings about FIN+ACK and TCP RST and you confirm my assumption that the application does not get the signal and thus doesn't try to restart the connection. Do you have an idea which service initiate these FIN+ACK + TCP RST and why ? Do you think we should we focus also on this ?

Hello @anuchandy, we are attempting to replicate the FIN-ACK-RST in a controlled environment to ensure that the recent updates to the application's code are resilient under all conditions, including network failures.

I suspect that the websocket is closing on the backend side of the Application Gateway, between the appgw and the service bus, as we do not observe an AMQP end session in the logs. This suggests that the FIN-ACK-RST may be initiated by the underlying protocol, likely the Websocket. We plan to decrease the backend timeout to a very low value to test this hypothesis. I will keep you informed about the outcomes of this test.

Regards,

glmva avatar May 22 '24 08:05 glmva

Hello @glmva, yes, my observation is also that Application Gateway is initiating the FIN-ACK-RST, based on (1). this never happens when the "client connects directly to Service Bus endpoint via Web Socket" (or AMQP), (2). the App Gateway timeout setting affects FIN-ACK-RST.

I my case, in the logs I can see entries indicating TCP Connection between Service Bus and App Gateway is indeed dropped, so App Gateway is notifying the Service Bus it seems.

I was able to reproduce this with a very low timeout value (less than 20 sec) even with heartbeat enabled. With 20 sec, the disconnect happened before even client was able to send the first heartbeat or receive the heartbeat from the service. But with a higher App Gateway timeout value (35-40 sec) + heartbeat seems to avoid FIN-ACK-RST. I think this may vary depending on where Service Bus, App Gateway and client are located (network distance), for me all were in the same azure region.

anuchandy avatar May 22 '24 15:05 anuchandy

Hello @anuchandy, thank you for your feedback. We have completed another troubleshooting session in a controlled environment.

  • By setting the timeout to 5 seconds (to be very aggressive), we managed to replicate the FIN-ACK-RST pattern. The applications tested withstood the timeout by reestablishing the connection, confirming that the development team's changes are effective. It appears they are now utilizing the "processor," which performed well in this test.

  • Regrettably, the same issue resurfaced in production today, after 13 days without incidents, causing a simultaneous loss of connection for both internal and external applications that consume or produce service bus messages. This occurred twice today at 11:15 am and 1:20 pm Switzerland time (Service Bus in Switzerland North), accompanied by a significant reduction in active connections in the Service Bus (down by 95%) each time. It is important to note that this is not related to the SDK or its implementation but is still noteworthy. We continue to investigate whether this is linked to our environment or is exogenous.

glmva avatar May 22 '24 16:05 glmva

Hello @glmva, this discussion might be relevant to you https://github.com/Azure/azure-sdk-for-java/issues/40496 The key point there is that TLSv1.0 in App Gateway can cause this problem. If you’re using v1.0, it’s recommended to update the version, so you can rule out one variable in the investigation.

anuchandy avatar Jun 11 '24 18:06 anuchandy

This ticket was created when the customer dev-team initially encountered random timeouts. With continuing analysis, the dev-team identified that this was happening specifically on one environment out of 6, based on that and additional context over time, customer created ticket 40496 with more specific details. I’m closing this ticket, now that we know why this happens based on the discussion in #40496.

anuchandy avatar Jun 11 '24 20:06 anuchandy