api-client icon indicating copy to clipboard operation
api-client copied to clipboard

OAuth handler

Open dogmatic69 opened this issue 2 years ago • 1 comments

Is your feature request related to a problem? Please describe. Ability to handle OAuth flows with secret id/key exchange for token, perhaps with refresh.

Describe the solution you'd like a new auth class like HeaderAuthentication/CookieAuthentication but can take id, key, url and work some magic before the request is done to get an actual token (and then use the normal HeaderAuthentication

Describe alternatives you've considered I've made my own adapter, but it's pretty common auth method so a one liner would be nice.

Additional context I wrote this which seems to do the trick, maybe could be adapted a bit. It does nothing in the perform_initial_auth, instead gets the token if needed in the get_headers call so that if the token is expired it can be refreshed first.

An alternative would be to assume one request per instance and then just have some kind of BodyAuthentication that can send id/key in the body and collect the token from the body which is then put into the existing HeaderAuthentication.

from time import time

from apiclient.authentication_methods import BaseAuthenticationMethod


class OAuthAuthentication(BaseAuthenticationMethod):
    """Authentication using secret_id and secret_key."""

    _access_token = None
    _token_expiration = None
    _refresh_token = None
    _refresh_expiration = None

    def __init__(
        self,
        token_url,
        refresh_url,
        body,
        expiry_margin=10,
    ):
        """Initialize OAuthAuthentication."""
        self._token_url = token_url
        self._refresh_url = refresh_url
        self._body = body
        self._expiry_margin = expiry_margin

    def get_headers(self):
        self.refresh_token()
        return {
            'Authorization': f'Bearer {self._access_token}',
        }

    def refresh_token(self):
        if self._token_expiration and self._token_expiration >= int(time()):
            return True

        if self._refresh_expiration and self._refresh_expiration >= int(time()):
            ret = self._client.post(
                self._refresh_url,
                data={
                    'refresh': self._refresh_token,
                },
            )
            self._access_token = ret.get('access')
            self._token_expiration = int(time()) + int(ret.get('access_expires')) - self._expiry_margin
            return True

        ret = self._client.post(self._token_url, data=self._body)
        self._access_token = ret.get('access')
        self._refresh_token = ret.get('refresh')
        self._token_expiration = int(time()) + int(ret.get('access_expires')) - self._expiry_margin
        self._refresh_expiration = int(time()) + int(ret.get('refresh_expires')) - self._expiry_margin

    def perform_initial_auth(self, client):
        self._client = client

dogmatic69 avatar Jan 06 '22 18:01 dogmatic69

so using the client like that does not work as it used the underlying client which has an auth method resulting in recursion.

Tried to clone and unset but resulted in some issues so ended up just making a new instance of client with NoAuthentication to do the request for the token.

dogmatic69 avatar Jan 06 '22 21:01 dogmatic69