kinto
kinto copied to clipboard
How to allow requests with JWT issued by a custom server?
Your best take at it is to create a custom authentication backend. You have lots of example:
I encourage you to copy the tests to make sure you implement everything the right way.
It might looks like something like that:
from functools import partial
import jwt
import logging
from pyramid import authentication as base_auth
from pyramid.interfaces import IAuthenticationPolicy
from zope.interface import implementer
logger = logging.getLogger(__name__)
def get_conf(prefix, request, name):
key = f"{prefix}.{name}"
return request.registry.settings.get(key)
# config : kinto.jwt.secret = ThisIsNotASecret
jwt_conf = partial(get_conf, "jwt")
@implementer(IAuthenticationPolicy)
class JWTCustomAuthenticationPolicy(base_auth.CallbackAuthenticationPolicy):
name = "custom-jwt"
def __init__(self, realm="Realm"):
self.realm = realm
self._cache = None
def unauthenticated_userid(self, request):
"""Return the JWT user_id or ``None`` if token could not be verified.
"""
authorization = request.headers.get("Authorization", "")
try:
authmeth, token = authorization.split(" ", 1)
except ValueError:
return None
if authmeth.lower() != jwt_conf(request, "header_type").lower():
return None
user_id = self._verify_token(token, request)
return user_id
def forget(self, request):
"""A no-op. Credentials are sent on every request.
Return WWW-Authenticate Realm header for Bearer token.
"""
return [
(
"WWW-Authenticate",
'{} realm="{}"'.format(jwt_conf(request, "header_type"), self.realm),
)
]
def _verify_token(self, access_token, request):
"""Verify the token extracted from the Authorization header.
This method stores the result in two locations to avoid
validating the token too often:
- on the request object, in case the Pyramid authentication methods
like `effective_principals()` or `authenticated_userid()` are called
several times during the request cycle;
- in the cache backend, to reuse validated token from one request to
another (during ``cache_ttl_seconds`` seconds.)
"""
try:
payload = jwt.decode(
access_token, jwt_conf(request, "secret"), algorithm="HS256"
)
except (
jwt.ExpiredSignatureError,
jwt.InvalidSignatureError,
jwt.DecodeError,
ValueError,
):
return None
return payload["user_id"]