php-rdkafka icon indicating copy to clipboard operation
php-rdkafka copied to clipboard

feat: finalize OAUTHBEARER support

Open cb-freddysart opened this issue 1 year ago • 15 comments
trafficstars

This PR adds support for rd_kafka_oauthbearer_set_token and rd_kafka_oauthbearer_set_token_failure. See https://github.com/confluentinc/librdkafka/pull/2189/files#diff-eef17694b5807cd63a95c3d86c39752b62f8bdb46e46c59231bf02353a5daef5 for where this functionality was added to librdkafka.

With these changes, we are now able to successfully authenticate to AWS MSK via IAM. 🎉

Full working example:

<?php

use Aws\Credentials\CredentialProvider;
use Aws\Signature\SignatureV4;
use GuzzleHttp\Psr7\Request;
use Psr\Http\Message\RequestInterface;

require 'vendor/autoload.php';

class AuthToken {
    public function __construct(public string $tokenValue, public int $expiresAt) {}
}

class MSKAuthTokenProvider {

    private const ENDPOINT_URL_TEMPLATE = 'https://kafka.%s.amazonaws.com';
    private const DEFAULT_TOKEN_EXPIRY_SECONDS = 900;
    private const ACTION_TYPE = "Action";
    private const ACTION_NAME = "kafka-cluster:Connect";
    private const SIGNING_NAME = "kafka-cluster";
    private const USER_AGENT_KEY = "User-Agent";
    private const VERSION_KEY = 'Version';

    public function generateAuthToken(): AuthToken
    {
        $endpoint_url = sprintf(self::ENDPOINT_URL_TEMPLATE, getenv('AWS_REGION'))
            . '?' . http_build_query([self::ACTION_TYPE => self::ACTION_NAME]);

        $request = new Request('GET', $endpoint_url);
        $credentials = CredentialProvider::defaultProvider()()->wait();
        $sig_v4_signer = new SignatureV4(self::SIGNING_NAME, getenv('AWS_REGION'));
        $expiresAt = new \DateTime('+' . self::DEFAULT_TOKEN_EXPIRY_SECONDS . ' seconds');

        $presigned = $sig_v4_signer->presign($request, $credentials, $expiresAt);
        $presigned_uri = $presigned->getUri();
        // Append User-Agent and Version **after** signing
        $presigned_uri .= '&'. self::USER_AGENT_KEY . '='.urlencode('msk-iam-php').
            '&'. self::VERSION_KEY . '='. urlencode('2020_10_22'); // Version is always static

        $base64_bytes = base64_encode($presigned_uri);
        $base64_encoded_signed_url = rtrim($base64_bytes, '=');

        return new AuthToken($base64_encoded_signed_url, $this->getExpirationTimeMs($presigned));
    }

    private function getExpirationTimeMs(RequestInterface $request): int
    {
        parse_str($request->getUri()->getQuery(), $ret);
        $signing_time = DateTimeImmutable::createFromFormat('Ymd\THis\Z', $ret['X-Amz-Date'], new DateTimeZone('UTC'))
            ->getTimestamp();
        $lifetime_seconds = $ret['X-Amz-Expires'];
        $expiration_ts_seconds = $signing_time + $lifetime_seconds;

        return $expiration_ts_seconds * 1000;
    }
}

$provider = new MSKAuthTokenProvider();
$token = $provider->generateAuthToken();

$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', 'broker list');
$conf->set('security.protocol', 'SASL_SSL');
$conf->set('sasl.mechanisms', 'OAUTHBEARER');

$producer = new RdKafka\Producer($conf);

$conf->setOauthbearerTokenRefreshCb(function() use ($provider, $producer) {
    $token = $provider->generateAuthToken();
    $producer->oauthbearerSetToken($token->tokenValue, $token->expiresAt, 'kafka-cluster', ['one' => 'two']);
});

// Set initial token
$producer->oauthbearerSetToken($token->tokenValue, $token->expiresAt, 'kafka-cluster', ['one' => 'two']);

$topic = $producer->newTopic("test");
for ($i = 0; $i < 1; $i++) {
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Hello from PHP $i");
    $producer->poll(0);
}

for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
    $result = $producer->flush(10000);
    if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
        break;
    }
}

echo "Done\n";

cb-freddysart avatar Jan 10 '24 16:01 cb-freddysart

yes, we need this feature 🔥

btw, i already created aws-msk-iam-sasl-signer-php package for Amazon MSK IAM Authentication

PyaeSoneAungRgn avatar Jan 15 '24 03:01 PyaeSoneAungRgn

Nice!

Do you think it would be feasible to add tests for these methods?

Currently we start a Kafka instance in CI here: https://github.com/arnaud-lb/php-rdkafka/blob/bcd5004f461d1d3a5f879bb21280bdde6f6800c2/.github/workflows/test/start-kafka.sh.

Thank you for the great review @arnaud-lb . I think I’ve got this all fixed locally but I want to take the time to get valgrind setup to be sure (I’m on a m1 which is not supported yet so I need to break out my Linux machine).

I think I should be able to write some integration tests for this. Hopefully should have something up for review by the end of the week.

cb-freddysart avatar Jan 16 '24 15:01 cb-freddysart

I'm thrilled that the PR for oauth2 has already been implemented because I urgently need this feature for an application. Is it possible to provide a new release with this feature in the short term? That would be a big help for me.

stefan-bolsinger-hald avatar Jan 30 '24 15:01 stefan-bolsinger-hald

Hey all, I hope to get this over the line but I've had a shift in priorities after coming back from some time off. Configuring the OAuth side of things on Kafka is new to me and I haven't had the time to figure out exactly how that should work (for example, do we also need to spin up a keycloak container for generating and validating the tokens?).

I'm open to any help if anyone else is more familiar in this area. Otherwise, I'll keep poking at this when I have time.

cb-freddysart avatar Feb 09 '24 18:02 cb-freddysart