ruby-kafka icon indicating copy to clipboard operation
ruby-kafka copied to clipboard

MSK IAM: add support for AssumeRole auto refresh

Open hnaoto opened this issue 3 years ago • 11 comments

Hi, MSK IAM auth support was added recently https://github.com/zendesk/ruby-kafka/pull/937, but it doesn't support temp credentials/ assume role refreshes. This feature was initially mentioned by @garrett528 in a thread https://github.com/zendesk/ruby-kafka/pull/907#issuecomment-978146627

I am interested in adding support for auto refreshing AssumeRole.

Ideally, the client will retrieve temporary credentials from STS and refresh those credentials in the background.

      sts = Aws::STS::Client.new
      role_credentials = Aws::AssumeRoleCredentials.new(
        client: sts,
        role_arn: "arn:aws:iam::......",
        role_session_name: "example_session"
      )
    
  # can access access_key_id by using  role_credentials.access_key_id

The sasl_authenticator is initiated when the kafka client got created https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/sasl_authenticator.rb#L42. I am wondering what is the appropriate way of refreshing the credentials used by in sasl_authenticator the background. Theoretically, I can recreate the sasl_authenticator after the credentials get updated, but that feels a little bit hacky to me..

Any suggestions are much appreciated.

hnaoto avatar Jun 06 '22 16:06 hnaoto

@hnaoto the way that i've seen this done, specifically in librdkafka which is the standard C client for Kafka, is that the credentials are stored in an object. that object is refreshed on a certain cadence (about 80% of the way through the credential lifetime set by duration_sec). however, this requires that the refresh mechanism is 1. scheduled and 2. running on a background thread. the credential object requires that it be locked and unlocked upon update since multiple threads may be trying to access it concurrently.

here's what i got to in late 2021. https://github.com/garrett528/ruby-kafka/pull/2/files. it's not complete and i don't think i ever got the thread to be scheduled properly (i'm no rubyist so i may be heading in the wrong direction trying to port C to Ruby).

here's the librdkafka C implementation that i wrote to do this if it helps. https://github.com/UrbanCompass/librdkafka/blob/master/src/rdkafka_sasl_aws_msk_iam.c

garrett528 avatar Jun 07 '22 02:06 garrett528

@garrett528 Thank you so much for sharing all the details. The solution that you tried sounds promising. Let me see whether I can figure out the scheduling part.

hnaoto avatar Jun 08 '22 03:06 hnaoto

Hi @garrett528, I went through the branch that you shared. May I ask some questions? You mentioned "thread was not scheduled properly".. What kind of errors did you get? (For example, did the credentials get updated after expiration?)

hnaoto avatar Jun 08 '22 23:06 hnaoto

trying to remember where i stopped... right now, that code doesn't actually call the sts endpoint so that definitely needs to be added. the refresh thread works but it is fixed to sleep(60) instead of using the refresh duration. additionally, it's using an infinite loop + sleep instead of an actual scheduling mechanism. that may not be the best way to handle this and i don't think i ever tested whether the creds pick up new values after the refresh is executed.

garrett528 avatar Jun 09 '22 21:06 garrett528

Thanks for sharing all the information @garrett528 😃

I did some digging into Ruby AWS SDK and looks like the the temporary credentials(assumeRoleCredentials) will be refreshed in the background automatically (source code: https://github.com/aws/aws-sdk-ruby/blob/version-3/gems/aws-sdk-core/lib/aws-sdk-core/refreshing_credentials.rb)

If I try to pass the role credentials object to the kafka client, the role credentials object used by Ruby Kafka client will get updated in the background as well. (From some people's perspectives, Ruby is "pass by object reference"....)

Do you think it is still required to refresh the credentials in Kakfa Ruby client if the credentials object got updated in the background?

As far as I know, once a connection is established, the client can keep talking to Kafka. Kafka won't kick away the client with expired credentials proactively. The code change in this MR https://github.com/zendesk/ruby-kafka/pull/951 is working but I haven't figured out a way to verify the behavior of the client when it needs to re-establish a connection.

hnaoto avatar Jun 10 '22 19:06 hnaoto

oh that's a good find! i'm not sure what the mechanism is for how the ruby-kafka client itself manages once the connection requires reset. the way i tested this on the C client was to set up an IAM-enabled test MSK cluster and write a script to instantiate a producer and consumer on a fixed interval (say 1 minute) and have it run for longer than the refresh duration. that helped me pinpoint the fact that the C client actually requires some background thread to refresh the credential and then see what happens when the broker thread disconnects from MSK due to the credential expiring.

you should be able to set the credential duration to a minimum of 15 minutes so it's a bit annoying to test.

garrett528 avatar Jun 12 '22 12:06 garrett528

Thanks so much for sharing the testing method @garrett528! I tried that with a MSK cluster and let the program run for around 8 hours. It worked fine.

I also took a further look at the source code of Ruby Kafka client. On a high level, the authentication process appears to work in the following fashion:

  • client: init sasl_authenticator https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/client.rb#L110
  • pass sasl_authenticator to a method to init @connection_builder https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/client.rb#L110
  • pass @connection_builder to a method to init broker_pool https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/client.rb#L820
    • broker_pool: use @connection_builder to create broker https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/broker_pool.rb#L13
      • broker: all the core methods of broker rely on the method send_request https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/broker.rb#L199
      • send_request relies on@connection. (@connection will be set as nil if there is IdleConnection or ConnectionError error.)
      • Depends on the scenario, sometimes upstream will raise ConnectionError, sometime upstream will retry.. Retry should eventually trigger @connection_builder.build_connection https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/broker.rb#L214
        • connection_builder: build_connection will call @sasl_authenticator.authenticate!(connection)
        • sasl_authenticator: sasl_authenticator will call the method accordingly to authenticate the client https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/sasl_authenticator.rb#L71
        • In this particular scenario, it will call authenticate! of awsmskiam https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/sasl/awsmskiam.rb#L30. authenticate! will create a payload that wrapped instance variables@access_key_id and @session_token.

To sum up, if there is no connection error, the client will keep working even if the credentials have expired. If there is a connection error, depends on scenario, sometimes the Kafka client might just raise the connection error and crash the client. The application code will be responsible for re-creating the client at this point. The assumeRole credentials have been getting refreshed in the background, so the newly created client will be using valid assumeRole credentials.

If the Kafka client attempts to recreate the connection, it will invoke build_connection and authenticate! https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/sasl/awsmskiam.rb#L30 eventually. The credentials of awsmskiam.rb are encapsulated in an object and have been getting updated in the background automatically. The newly created connection should work as expected.

I apologize for the long text. I might miss some nuances of how connection errors are handled by Ruby Kafka client. I think the authentication process should be similar across clients written in different languages. I think the current implementation should be sufficient. Does the analysis sound reasonable to you?

hnaoto avatar Jun 14 '22 19:06 hnaoto

@hnaoto that makes sense to me. do you know which scenarios cause a crash vs a retry? if those scenarios are well-defined, it will help devs understand the circumstance that cause failure and allow them to make an educated decision on whether to catch and restart the client or to let it fail and investigate.

outside of that, this is great work!

garrett528 avatar Jun 15 '22 12:06 garrett528

I checked how ConnectionError exceptions are handled. ConnectionError can be thrown in a number of situations https://github.com/zendesk/ruby-kafka/search?q=ConnectionError

From the perspective of producer, looks like the pattern is:

  1. the ConnectionError will be caught by the upstream https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/producer.rb#L405
  2. the upstream will retry util the counter hit @max_retries https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/producer.rb#L422 (max_retries is a client/producer config)
  3. A connection can be re-established during retry. Otherwise, the client will crash after all the retries are exhausted.

hnaoto avatar Jun 16 '22 02:06 hnaoto

ok so it does have a retry mechanism that it hits. sounds like it's less of an issue with this client anyways since the ruby aws sdk handles the credential refresh automatically (the C client does not!).

garrett528 avatar Jun 16 '22 13:06 garrett528

Yes, there is a retry mechanism but once the client crashed, the messages inside the local buffer will be lost as well. (The behavior of buffer was briefly mentioned in this section: https://github.com/zendesk/ruby-kafka#buffering-and-error-handling). Ideally, the client should be able to re-establish the connection before it reaches the retry limits if there is an intermittent network issue.

😄 It would be much more complicated to implement refreshing credentials manually, and I think some people who worked on ruby AWS SDK had to conquer some hurdles as well. (Related threads: https://github.com/aws/aws-sdk-ruby/issues/2641 and https://github.com/aws/aws-sdk-ruby/pull/2642)

hnaoto avatar Jun 16 '22 18:06 hnaoto

Issue has been marked as stale due to a lack of activity.

github-actions[bot] avatar Jun 16 '23 00:06 github-actions[bot]