php-rdkafka
php-rdkafka copied to clipboard
feat: finalize OAUTHBEARER support
This PR adds support for rd_kafka_oauthbearer_set_token and rd_kafka_oauthbearer_set_token_failure. See https://github.com/confluentinc/librdkafka/pull/2189/files#diff-eef17694b5807cd63a95c3d86c39752b62f8bdb46e46c59231bf02353a5daef5 for where this functionality was added to librdkafka.
With these changes, we are now able to successfully authenticate to AWS MSK via IAM. 🎉
Full working example:
<?php
use Aws\Credentials\CredentialProvider;
use Aws\Signature\SignatureV4;
use GuzzleHttp\Psr7\Request;
use Psr\Http\Message\RequestInterface;
require 'vendor/autoload.php';
class AuthToken {
public function __construct(public string $tokenValue, public int $expiresAt) {}
}
class MSKAuthTokenProvider {
private const ENDPOINT_URL_TEMPLATE = 'https://kafka.%s.amazonaws.com';
private const DEFAULT_TOKEN_EXPIRY_SECONDS = 900;
private const ACTION_TYPE = "Action";
private const ACTION_NAME = "kafka-cluster:Connect";
private const SIGNING_NAME = "kafka-cluster";
private const USER_AGENT_KEY = "User-Agent";
private const VERSION_KEY = 'Version';
public function generateAuthToken(): AuthToken
{
$endpoint_url = sprintf(self::ENDPOINT_URL_TEMPLATE, getenv('AWS_REGION'))
. '?' . http_build_query([self::ACTION_TYPE => self::ACTION_NAME]);
$request = new Request('GET', $endpoint_url);
$credentials = CredentialProvider::defaultProvider()()->wait();
$sig_v4_signer = new SignatureV4(self::SIGNING_NAME, getenv('AWS_REGION'));
$expiresAt = new \DateTime('+' . self::DEFAULT_TOKEN_EXPIRY_SECONDS . ' seconds');
$presigned = $sig_v4_signer->presign($request, $credentials, $expiresAt);
$presigned_uri = $presigned->getUri();
// Append User-Agent and Version **after** signing
$presigned_uri .= '&'. self::USER_AGENT_KEY . '='.urlencode('msk-iam-php').
'&'. self::VERSION_KEY . '='. urlencode('2020_10_22'); // Version is always static
$base64_bytes = base64_encode($presigned_uri);
$base64_encoded_signed_url = rtrim($base64_bytes, '=');
return new AuthToken($base64_encoded_signed_url, $this->getExpirationTimeMs($presigned));
}
private function getExpirationTimeMs(RequestInterface $request): int
{
parse_str($request->getUri()->getQuery(), $ret);
$signing_time = DateTimeImmutable::createFromFormat('Ymd\THis\Z', $ret['X-Amz-Date'], new DateTimeZone('UTC'))
->getTimestamp();
$lifetime_seconds = $ret['X-Amz-Expires'];
$expiration_ts_seconds = $signing_time + $lifetime_seconds;
return $expiration_ts_seconds * 1000;
}
}
$provider = new MSKAuthTokenProvider();
$token = $provider->generateAuthToken();
$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', 'broker list');
$conf->set('security.protocol', 'SASL_SSL');
$conf->set('sasl.mechanisms', 'OAUTHBEARER');
$producer = new RdKafka\Producer($conf);
$conf->setOauthbearerTokenRefreshCb(function() use ($provider, $producer) {
$token = $provider->generateAuthToken();
$producer->oauthbearerSetToken($token->tokenValue, $token->expiresAt, 'kafka-cluster', ['one' => 'two']);
});
// Set initial token
$producer->oauthbearerSetToken($token->tokenValue, $token->expiresAt, 'kafka-cluster', ['one' => 'two']);
$topic = $producer->newTopic("test");
for ($i = 0; $i < 1; $i++) {
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Hello from PHP $i");
$producer->poll(0);
}
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
$result = $producer->flush(10000);
if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
break;
}
}
echo "Done\n";
yes, we need this feature 🔥
btw, i already created aws-msk-iam-sasl-signer-php package for Amazon MSK IAM Authentication
Nice!
Do you think it would be feasible to add tests for these methods?
Currently we start a Kafka instance in CI here: https://github.com/arnaud-lb/php-rdkafka/blob/bcd5004f461d1d3a5f879bb21280bdde6f6800c2/.github/workflows/test/start-kafka.sh.
Thank you for the great review @arnaud-lb . I think I’ve got this all fixed locally but I want to take the time to get valgrind setup to be sure (I’m on a m1 which is not supported yet so I need to break out my Linux machine).
I think I should be able to write some integration tests for this. Hopefully should have something up for review by the end of the week.
I'm thrilled that the PR for oauth2 has already been implemented because I urgently need this feature for an application. Is it possible to provide a new release with this feature in the short term? That would be a big help for me.
Hey all, I hope to get this over the line but I've had a shift in priorities after coming back from some time off. Configuring the OAuth side of things on Kafka is new to me and I haven't had the time to figure out exactly how that should work (for example, do we also need to spin up a keycloak container for generating and validating the tokens?).
I'm open to any help if anyone else is more familiar in this area. Otherwise, I'll keep poking at this when I have time.