Oauthbearer token refresh callback
Took a stab at adding support for the oauth bearer token refresh callback. For a system test, I built a small ruby script, put it in a container, ran the container in ECS. I can see that the callback is being called. It is not clear what actions the callback is supposed to perform. For python, the MskAuthTokenProvider.generate_auth_token function creates a base64 encoded signed url and the kafka-python library uses that to make an authentication request. Meaning that for python, using the OAUTHBEARER sasl mechanism looks like:
class MSKTokenProvider():
def token(self):
token, _ = MSKAuthTokenProvider.generate_auth_token('us-east-1')
return token
tp = MSKTokenProvider()
producer = KafkaProducer(
bootstrap_servers=list(get_settings().KAFKA_BOOTSTRAP_SERVERS.split(',')),
security_protocol='SASL_SSL',
sasl_mechanism='OAUTHBEARER',
sasl_oauth_token_provider=tp,
client_id=socket.gethostname(),
)
# send a payload to a topic
producer.send(topic, payload)
For Ruby, so far this looks like:
def self.start!(kafka_config)
Rdkafka::Config.oauthbearer_token_refresh_callback = method(:token_callback)
@producer = Rdkafka::Config.new(kafka_config).producer
end
The token_callback currently creates a base64 urlencoded signed v4 url, but I can't tell that anything in rdkafka uses that to make an authentication request. Is the callback supposed to implement the authentication request as well?
Closes https://github.com/karafka/rdkafka-ruby/issues/406 Mentioned in: https://github.com/karafka/karafka/issues/1767
This PR will trigger the callback but it lacks rd_kafka_oauthbearer_set_token and rd_kafka_oauthbearer_set_token_failure according to the flow.
ref https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka.h#L2191
That said, nice start :)
This PR will trigger the callback but it lacks
rd_kafka_oauthbearer_set_tokenandrd_kafka_oauthbearer_set_token_failureaccording to the flow.ref https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka.h#L2191
Ok, added bindings for set_token, set_token_failure.
Checking for understanding:
- the method referenced by
oauthbearer_token_refresh_callbackshould create a token - if successful, it should call
rd_kafka_oauthbearer_set_token - if unsuccessful, it should call
rd_kafka_oauthbearer_set_token_failure - the
oauthbearer_token_refresh_callbackwill be invoked prior to the token expiring, and the dance begins again. - something needs to call the
oauthbearer_token_refresh_callbackfunction the first time
You got it right.
something needs to call the oauthbearer_token_refresh_callback function the first time
This is already implemented in waterdrop and karafka, so no changes needed (aside from a high-level API but that I will do).
@kdhfred I've added the Bindings and am working on testing out this feature against AWS MSK. I created some code to generate the tokens, modeling it after how Python is creating credentials. Haven't had any luck yet.
def generate_token(region: "us-east-1")
aws_credentials = load_credentials_from_ecs
construct_auth_token(aws_credentials, region)
end
def load_credentials_from_ecs
ecs_credentials = Aws::ECSCredentials.new(retries: 3)
ecs_credentials.credentials
end
def construct_auth_token(aws_credentials, region)
endpoint_url = ENDPOINT_URL_TEMPLATE.gsub("{}", region)
query_params = {"Action": "kafka-cluster:Connect"}
url = URI::HTTPS.build(host: endpoint_url, path: "/", query: URI.encode_www_form(query_params).to_s)
signer = Aws::Sigv4::Signer.new(
service: 'kafka-cluster',
region: 'us-east-1',
credentials_provider: aws_credentials
)
url = signer.presign_url(
http_method: 'GET',
url: url,
).to_s
url_utf_8 = url.encode("UTF-8")
puts "URL: #{url_utf_8}"
Base64.urlsafe_encode64(url_utf_8)
end
@bruce-szalwinski-he do you need my assistance with this?
@bruce-szalwinski-he do you need my assistance with this?
I've been trying to get a system test working. I need Ruby to create a valid pre-signed URL and I'm stuck. I've been able to create producers in both Python and in Go and they have no trouble creating pre-signed urls. I've fired up an EC2 box where I have a Ruby script that creates pre-signed URL and I feed it to the Python and Go programs and they both fail with some version of
client/metadata got error from broker -1 while fetching metadata: kafka server: SASL Authentication failed: [0bd12d32-5dd8-4602-83d9-b5214a579ea2]: Invalid authentication payload
I've had Go create a pre-signed URL and I feed it to Python and that works fine.
I posted the script and a question over at https://github.com/aws/aws-sdk-ruby/discussions/2985.
Ok, got my pre-signed url working. Now to plug that logic into system test and see if rdkafka-ruby in ecs will publish to kafka.
Ok, got my system tests working and can publish messages to AWS MSK using IAM AUTH!!! I think it's ready for a review @mensfeld
Awesome. I will pick it up early next week :clap:
@bruce-szalwinski-he I did not forget. I just need to wrap my head around whether we should pass client_ptr or use opaque to pass the context awareness to the rebalance callback.
@bruce-szalwinski-he I did not forget about it. I'm just still thinking how to effectively pass the client reference back for multi-connection processes. The conceptual layer is absolutely valid on your side and I do not have anything to ask you to do. I'm jut not merging it yet because I am thinking.
Ok, @bruce-szalwinski-he, I think I got my head around that.
rd_kafka_oauthbearer_set_token and rd_kafka_oauthbearer_set_token_failure methods are missing on producer and consumer (admin also?). IMHO, This could be modularized and will have to be exposed to provide an experience that would align with current rdkafka behavior (this is why I'm requesting changes in this PR now).
Now for the harder part. We can do two things here, and I will ping @thijsc to help you decide what to do.
Let me describe the issue here and potential solutions:
Currently, all callbacks are global and contextless. The same callback object is assigned for statistics, errors, and potentially OAuth regardless of the instance to which it is associated.
This is not a problem for stats because stats include the name of the Kafka client for referencing, and for errors, it, let's say, is "less of an issue." I can tell it that way, as users are usually interested in the error and its origins but not entirely in the underlying native Kafka reference.
That said, this will NOT be the same for this scenario where you need to take appropriate actions in the context of a given refresh, mainly for the rd_kafka_oauthbearer_set_token operation.
There are few approaches we can take here:
- We expose the raw underlying client instance and expect users to use it via the bindings API. IMHO, this is a bad idea because, ideally, we want to hide things like
rd_kafka_oauthbearer_set_tokendirect usage from users behind a ruby abstraction. - We can use opaque and build a table of references like we do for configs, fetch it, and reference. This could work but is extensive work IMHO beyond the scope of this task.
- We could redesign this callback API to make it instance based and basically assign the callback with a referenceto the object itself in it. This would however require anyhow opaque management.
- We can "just" keep it as it is with a small change, to make this API aligned in behaviour with other callbacks until "total redesign" (recommended by me). The small change would be just providing in the arguments to the callback the instance name (producer/consumer/admin) allowing users that would need to reference their rdkafka instances that way.
I plan to redesign this API to make all the callbacks instance based only with global as an API fallback because the current shared approach is not sufficient for this and other cases. I will not be able however work on this at least before Q3 and at the same time I do not want to withhold this from being in use.
Ok, I had previously implemented option 1 in my sample app that was using this branch of the library. That ended up looking something like below. The OAuthTokenRefresher.refresh_token is using the raw instance and directly using the bindings. Not the look we are going for.
class OAuthTokenRefresher
def refresh_token(client, config)
token = OAuthTokenGenerator.new.generate_token
error_buffer = FFI::MemoryPointer.from_string(" " * 256)
response = Rdkafka::Bindings.rd_kafka_oauthbearer_set_token(
client, token.token, token.lifetime_ms, token.principal_name,
token.extensions, token.extension_size, error_buffer, 256
)
if response != 0
Rdkafka::Bindings.rd_kafka_oauthbearer_set_token_failure(
client,
"Failed to set token: #{error_buffer.read_string}"
)
end
end
end
def refresh_token(client, config)
OAuthTokenRefresher.new.refresh_token(client, config)
end
module KafkaProducer
def self.start!(kafka_config)
Rdkafka::Config.oauthbearer_token_refresh_callback = method(:refresh_token)
end
end
With the oauthbearer_set_token being added to producer, consumer and admin classes, this can now look like below. We hide the bindings as well as the complexities of creating and capturing the errors.
class OAuthTokenRefresher
def refresh_token(client, config)
token = OAuthTokenGenerator.new.generate_token
???.oauthbearer_set_token(
token: token.token,
lifetime_ms: token.lifetime_ms,
principal_name: token.principal_name
)
end
end
The client that is passed to the refresh_token callback is an FFI::Pointer. I'm not sure how to get from an FFI::Pointer to my consumer/producer/admin instance. But I think that doesn't matter as you are proposing we don't use it. I think what you are saying in option 4 is that the callback receives an additional argument, let's call it instance_id, and the caller has the burden of managing the mapping of instance id to client instances (consumer, producer, admin).
The refresh_token could then look something like:
def refresh_token(_client, config, instance_id)
token = OAuthTokenGenerator.new.generate_token
my_client = instances[instance_id]
my_client.oauthbearer_set_token(
token: token.token,
lifetime_ms: token.lifetime_ms,
principal_name: token.principal_name
)
end
And in some future version of this library, we remove the burden of having the client manage these mappings.
I think I'm not understanding this part:
We can "just" keep it as it is with a small change, to make this API aligned in behaviour with other callbacks until "total redesign" (recommended by me). The small change would be just providing in the arguments to the callback the instance name (producer/consumer/admin) allowing users that would need to reference their rdkafka instances that way.
The arguments that are passed to the callbacks are defined by librdkafka, correct? I believe the oauth refresh callback is governed by https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka.h#L2241, with is going to return the client, config, and opaque.
Adding another arg to the callback
callback :oauthbearer_token_refresh_cb, [:pointer, :string, :pointer, :string], :void
OAuthbearerTokenRefreshCallback = FFI::Function.new(
:void, [:pointer, :string, :pointer, :string]
) do |_client_ptr, _config, _opaque, client_id|
puts("OAuthbearerTokenRefreshCallback called")
puts()
puts("client_id: #{client_id}")
if Rdkafka::Config.oauthbearer_token_refresh_callback
Rdkafka::Config.oauthbearer_token_refresh_callback.call(client_id)
end
end
Results in crash reports.
So perhaps you meant something else?
- I do see how the stats hash includes the client id, https://github.com/confluentinc/librdkafka/blob/master/STATISTICS.md#top-level.
- I think https://github.com/karafka/rdkafka-ruby/issues/82 talks about the same idea
- I also see the waterdrop stats wrapper, https://github.com/karafka/waterdrop/blob/master/lib/waterdrop/instrumentation/callbacks/statistics.rb
@mensfeld thanks for the chat today. I think this is ready now. I left you a few questions about docs.
I can't repro the failure for 3.1
rbenv local 3.1.4
DEBUG_CONSUMER=true bundle exec rspec
337 examples, 0 failures
DEBUG_PRODUCER=true bundle exec rspec
337 examples, 0 failures
@bruce-szalwinski-he this error is due to lack of correct shutdown of rdkafka. I will take a look. It's something introduced within this code.