kinto icon indicating copy to clipboard operation
kinto copied to clipboard

How to allow requests with JWT issued by a custom server?

Open chaitanya9186 opened this issue 4 years ago • 1 comments

chaitanya9186 avatar Jan 19 '20 03:01 chaitanya9186

Your best take at it is to create a custom authentication backend. You have lots of example:

I encourage you to copy the tests to make sure you implement everything the right way.

It might looks like something like that:

from functools import partial
import jwt
import logging

from pyramid import authentication as base_auth
from pyramid.interfaces import IAuthenticationPolicy

from zope.interface import implementer

logger = logging.getLogger(__name__)



def get_conf(prefix, request, name):
    key = f"{prefix}.{name}"
    return request.registry.settings.get(key)

# config : kinto.jwt.secret = ThisIsNotASecret
jwt_conf = partial(get_conf, "jwt")


@implementer(IAuthenticationPolicy)
class JWTCustomAuthenticationPolicy(base_auth.CallbackAuthenticationPolicy):
    name = "custom-jwt"

    def __init__(self, realm="Realm"):
        self.realm = realm
        self._cache = None

    def unauthenticated_userid(self, request):
        """Return the JWT user_id or ``None`` if token could not be verified.
        """
        authorization = request.headers.get("Authorization", "")
        try:
            authmeth, token = authorization.split(" ", 1)
        except ValueError:
            return None
        if authmeth.lower() != jwt_conf(request, "header_type").lower():
            return None

        user_id = self._verify_token(token, request)
        return user_id

    def forget(self, request):
        """A no-op. Credentials are sent on every request.
        Return WWW-Authenticate Realm header for Bearer token.
        """
        return [
            (
                "WWW-Authenticate",
                '{} realm="{}"'.format(jwt_conf(request, "header_type"), self.realm),
            )
        ]

    def _verify_token(self, access_token, request):
        """Verify the token extracted from the Authorization header.

        This method stores the result in two locations to avoid
        validating the token too often:

        - on the request object, in case the Pyramid authentication methods
          like `effective_principals()` or `authenticated_userid()` are called
          several times during the request cycle;

        - in the cache backend, to reuse validated token from one request to
          another (during ``cache_ttl_seconds`` seconds.)

        """
        try:
            payload = jwt.decode(
                access_token, jwt_conf(request, "secret"), algorithm="HS256"
            )
        except (
            jwt.ExpiredSignatureError,
            jwt.InvalidSignatureError,
            jwt.DecodeError,
            ValueError,
        ):
            return None

        return payload["user_id"]

Natim avatar Jan 19 '20 13:01 Natim