rdkafka-ruby icon indicating copy to clipboard operation
rdkafka-ruby copied to clipboard

Oauthbearer token refresh callback

Open bruce-szalwinski-he opened this issue 2 years ago • 15 comments

Took a stab at adding support for the oauth bearer token refresh callback. For a system test, I built a small ruby script, put it in a container, ran the container in ECS. I can see that the callback is being called. It is not clear what actions the callback is supposed to perform. For python, the MskAuthTokenProvider.generate_auth_token function creates a base64 encoded signed url and the kafka-python library uses that to make an authentication request. Meaning that for python, using the OAUTHBEARER sasl mechanism looks like:

class MSKTokenProvider():
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token('us-east-1')
        return token


tp = MSKTokenProvider()

producer = KafkaProducer(
    bootstrap_servers=list(get_settings().KAFKA_BOOTSTRAP_SERVERS.split(',')),
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=tp,
    client_id=socket.gethostname(),
)

# send a payload to a topic
producer.send(topic, payload)

For Ruby, so far this looks like:

    def self.start!(kafka_config)
      Rdkafka::Config.oauthbearer_token_refresh_callback = method(:token_callback)
      @producer = Rdkafka::Config.new(kafka_config).producer
    end

The token_callback currently creates a base64 urlencoded signed v4 url, but I can't tell that anything in rdkafka uses that to make an authentication request. Is the callback supposed to implement the authentication request as well?

Closes https://github.com/karafka/rdkafka-ruby/issues/406 Mentioned in: https://github.com/karafka/karafka/issues/1767

bruce-szalwinski-he avatar Feb 07 '24 02:02 bruce-szalwinski-he

This PR will trigger the callback but it lacks rd_kafka_oauthbearer_set_token and rd_kafka_oauthbearer_set_token_failure according to the flow.

ref https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka.h#L2191

That said, nice start :)

mensfeld avatar Feb 07 '24 10:02 mensfeld

This PR will trigger the callback but it lacks rd_kafka_oauthbearer_set_token and rd_kafka_oauthbearer_set_token_failure according to the flow.

ref https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka.h#L2191

Ok, added bindings for set_token, set_token_failure.

Checking for understanding:

  • the method referenced by oauthbearer_token_refresh_callback should create a token
  • if successful, it should call rd_kafka_oauthbearer_set_token
  • if unsuccessful, it should call rd_kafka_oauthbearer_set_token_failure
  • the oauthbearer_token_refresh_callback will be invoked prior to the token expiring, and the dance begins again.
  • something needs to call the oauthbearer_token_refresh_callback function the first time

bruce-szalwinski-he avatar Feb 07 '24 21:02 bruce-szalwinski-he

You got it right.

something needs to call the oauthbearer_token_refresh_callback function the first time

This is already implemented in waterdrop and karafka, so no changes needed (aside from a high-level API but that I will do).

mensfeld avatar Feb 08 '24 09:02 mensfeld

@kdhfred I've added the Bindings and am working on testing out this feature against AWS MSK. I created some code to generate the tokens, modeling it after how Python is creating credentials. Haven't had any luck yet.

  def generate_token(region: "us-east-1")
    aws_credentials = load_credentials_from_ecs
    construct_auth_token(aws_credentials, region)
  end

  def load_credentials_from_ecs
    ecs_credentials = Aws::ECSCredentials.new(retries: 3)
    ecs_credentials.credentials
  end

  def construct_auth_token(aws_credentials, region)
    endpoint_url = ENDPOINT_URL_TEMPLATE.gsub("{}", region)
    query_params = {"Action": "kafka-cluster:Connect"}
    url = URI::HTTPS.build(host: endpoint_url, path: "/", query: URI.encode_www_form(query_params).to_s)

    signer = Aws::Sigv4::Signer.new(
      service: 'kafka-cluster',
      region: 'us-east-1',
      credentials_provider: aws_credentials
    )

    url = signer.presign_url(
      http_method: 'GET',
      url: url,
      ).to_s

    url_utf_8 = url.encode("UTF-8")
    puts "URL: #{url_utf_8}"
    Base64.urlsafe_encode64(url_utf_8)
  end

bruce-szalwinski-he avatar Feb 08 '24 21:02 bruce-szalwinski-he

@bruce-szalwinski-he do you need my assistance with this?

mensfeld avatar Feb 13 '24 08:02 mensfeld

@bruce-szalwinski-he do you need my assistance with this?

I've been trying to get a system test working. I need Ruby to create a valid pre-signed URL and I'm stuck. I've been able to create producers in both Python and in Go and they have no trouble creating pre-signed urls. I've fired up an EC2 box where I have a Ruby script that creates pre-signed URL and I feed it to the Python and Go programs and they both fail with some version of

client/metadata got error from broker -1 while fetching metadata: kafka server: SASL Authentication failed: [0bd12d32-5dd8-4602-83d9-b5214a579ea2]: Invalid authentication payload

I've had Go create a pre-signed URL and I feed it to Python and that works fine.

I posted the script and a question over at https://github.com/aws/aws-sdk-ruby/discussions/2985.

bruce-szalwinski-he avatar Feb 13 '24 14:02 bruce-szalwinski-he

Ok, got my pre-signed url working. Now to plug that logic into system test and see if rdkafka-ruby in ecs will publish to kafka.

bruce-szalwinski-he avatar Feb 13 '24 19:02 bruce-szalwinski-he

Ok, got my system tests working and can publish messages to AWS MSK using IAM AUTH!!! I think it's ready for a review @mensfeld

bruce-szalwinski-he avatar Feb 15 '24 16:02 bruce-szalwinski-he

Awesome. I will pick it up early next week :clap:

mensfeld avatar Feb 15 '24 19:02 mensfeld

@bruce-szalwinski-he I did not forget. I just need to wrap my head around whether we should pass client_ptr or use opaque to pass the context awareness to the rebalance callback.

mensfeld avatar Feb 20 '24 10:02 mensfeld

@bruce-szalwinski-he I did not forget about it. I'm just still thinking how to effectively pass the client reference back for multi-connection processes. The conceptual layer is absolutely valid on your side and I do not have anything to ask you to do. I'm jut not merging it yet because I am thinking.

mensfeld avatar Feb 23 '24 09:02 mensfeld

Ok, @bruce-szalwinski-he, I think I got my head around that.

rd_kafka_oauthbearer_set_token and rd_kafka_oauthbearer_set_token_failure methods are missing on producer and consumer (admin also?). IMHO, This could be modularized and will have to be exposed to provide an experience that would align with current rdkafka behavior (this is why I'm requesting changes in this PR now).

Now for the harder part. We can do two things here, and I will ping @thijsc to help you decide what to do.

Let me describe the issue here and potential solutions:

Currently, all callbacks are global and contextless. The same callback object is assigned for statistics, errors, and potentially OAuth regardless of the instance to which it is associated.

This is not a problem for stats because stats include the name of the Kafka client for referencing, and for errors, it, let's say, is "less of an issue." I can tell it that way, as users are usually interested in the error and its origins but not entirely in the underlying native Kafka reference.

That said, this will NOT be the same for this scenario where you need to take appropriate actions in the context of a given refresh, mainly for the rd_kafka_oauthbearer_set_token operation.

There are few approaches we can take here:

  1. We expose the raw underlying client instance and expect users to use it via the bindings API. IMHO, this is a bad idea because, ideally, we want to hide things like rd_kafka_oauthbearer_set_token direct usage from users behind a ruby abstraction.
  2. We can use opaque and build a table of references like we do for configs, fetch it, and reference. This could work but is extensive work IMHO beyond the scope of this task.
  3. We could redesign this callback API to make it instance based and basically assign the callback with a referenceto the object itself in it. This would however require anyhow opaque management.
  4. We can "just" keep it as it is with a small change, to make this API aligned in behaviour with other callbacks until "total redesign" (recommended by me). The small change would be just providing in the arguments to the callback the instance name (producer/consumer/admin) allowing users that would need to reference their rdkafka instances that way.

I plan to redesign this API to make all the callbacks instance based only with global as an API fallback because the current shared approach is not sufficient for this and other cases. I will not be able however work on this at least before Q3 and at the same time I do not want to withhold this from being in use.

mensfeld avatar Feb 29 '24 10:02 mensfeld

Ok, I had previously implemented option 1 in my sample app that was using this branch of the library. That ended up looking something like below. The OAuthTokenRefresher.refresh_token is using the raw instance and directly using the bindings. Not the look we are going for.

class OAuthTokenRefresher
  def refresh_token(client, config)
    token = OAuthTokenGenerator.new.generate_token
    error_buffer = FFI::MemoryPointer.from_string(" " * 256)
    response = Rdkafka::Bindings.rd_kafka_oauthbearer_set_token(
      client, token.token, token.lifetime_ms, token.principal_name,
      token.extensions, token.extension_size, error_buffer, 256
    )
    if response != 0
      Rdkafka::Bindings.rd_kafka_oauthbearer_set_token_failure(
        client,
        "Failed to set token: #{error_buffer.read_string}"
      )
    end
  end
end

def refresh_token(client, config)
  OAuthTokenRefresher.new.refresh_token(client, config)
end

module KafkaProducer
  def self.start!(kafka_config)
    Rdkafka::Config.oauthbearer_token_refresh_callback = method(:refresh_token)
  end
end

With the oauthbearer_set_token being added to producer, consumer and admin classes, this can now look like below. We hide the bindings as well as the complexities of creating and capturing the errors.

class OAuthTokenRefresher
  def refresh_token(client, config)
    token = OAuthTokenGenerator.new.generate_token
    ???.oauthbearer_set_token(
      token: token.token, 
      lifetime_ms: token.lifetime_ms, 
      principal_name: token.principal_name
    )
  end
end

The client that is passed to the refresh_token callback is an FFI::Pointer. I'm not sure how to get from an FFI::Pointer to my consumer/producer/admin instance. But I think that doesn't matter as you are proposing we don't use it. I think what you are saying in option 4 is that the callback receives an additional argument, let's call it instance_id, and the caller has the burden of managing the mapping of instance id to client instances (consumer, producer, admin).

The refresh_token could then look something like:

  def refresh_token(_client, config, instance_id)
    token = OAuthTokenGenerator.new.generate_token
    my_client = instances[instance_id]
    my_client.oauthbearer_set_token(
      token: token.token,
      lifetime_ms: token.lifetime_ms,
      principal_name: token.principal_name
    )
  end

And in some future version of this library, we remove the burden of having the client manage these mappings.

bruce-szalwinski-he avatar Mar 01 '24 18:03 bruce-szalwinski-he

I think I'm not understanding this part:

We can "just" keep it as it is with a small change, to make this API aligned in behaviour with other callbacks until "total redesign" (recommended by me). The small change would be just providing in the arguments to the callback the instance name (producer/consumer/admin) allowing users that would need to reference their rdkafka instances that way.

The arguments that are passed to the callbacks are defined by librdkafka, correct? I believe the oauth refresh callback is governed by https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka.h#L2241, with is going to return the client, config, and opaque.

Adding another arg to the callback

    callback :oauthbearer_token_refresh_cb, [:pointer, :string, :pointer, :string], :void

    OAuthbearerTokenRefreshCallback = FFI::Function.new(
      :void, [:pointer, :string, :pointer, :string]
    ) do |_client_ptr, _config, _opaque, client_id|
      puts("OAuthbearerTokenRefreshCallback called")
      puts()
      puts("client_id: #{client_id}")
      if Rdkafka::Config.oauthbearer_token_refresh_callback
        Rdkafka::Config.oauthbearer_token_refresh_callback.call(client_id)
      end
    end

Results in crash reports.

So perhaps you meant something else?

  • I do see how the stats hash includes the client id, https://github.com/confluentinc/librdkafka/blob/master/STATISTICS.md#top-level.
  • I think https://github.com/karafka/rdkafka-ruby/issues/82 talks about the same idea
  • I also see the waterdrop stats wrapper, https://github.com/karafka/waterdrop/blob/master/lib/waterdrop/instrumentation/callbacks/statistics.rb

bruce-szalwinski-he avatar Mar 02 '24 01:03 bruce-szalwinski-he

@mensfeld thanks for the chat today. I think this is ready now. I left you a few questions about docs.

bruce-szalwinski-he avatar Mar 03 '24 21:03 bruce-szalwinski-he

I can't repro the failure for 3.1

rbenv local 3.1.4
DEBUG_CONSUMER=true bundle exec rspec
337 examples, 0 failures

DEBUG_PRODUCER=true bundle exec rspec
337 examples, 0 failures

bruce-szalwinski-he avatar Mar 09 '24 02:03 bruce-szalwinski-he

@bruce-szalwinski-he this error is due to lack of correct shutdown of rdkafka. I will take a look. It's something introduced within this code.

mensfeld avatar Mar 09 '24 08:03 mensfeld