fastapi-jwt
fastapi-jwt copied to clipboard
credentials: JwtAuthorizationCredentials = Security(access_security) ): It also allows refresh_ Security access
def get_current_user( credentials: JwtAuthorizationCredentials = Security(access_security) )
if not credentials:
raise HTTPException(status_code=401, detail='error')
return credentials.subject
# 使用python-jose来生成jwt,验证jwt,获取当前用户的方法
# 生成token
# def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
import os
from datetime import datetime, timedelta
from fastapi_jwt import JwtRefreshBearer, JwtAuthorizationCredentials, JwtAccessBearer
from fastapi import Security, HTTPException
from dotenv import load_dotenv
load_dotenv()
secret_key = os.getenv("SECRET_KEY", 'leees')
access_token_expires = int(os.getenv("JWT_EXPIRE_TIME", 7))
refresh_token_expires = int(os.getenv("JWT_REFRESH_TIME", 30))
access_security = JwtAccessBearer(
secret_key=secret_key,
auto_error=True,
# change access token validation timedelta
access_expires_delta=timedelta(days=access_token_expires)
)
# Read refresh token from bearer header only
refresh_security = JwtRefreshBearer(
secret_key=secret_key,
auto_error=True, # automatically raise HTTPException: HTTP_401_UNAUTHORIZED
refresh_expires_delta=timedelta(days=refresh_token_expires)
)
def create_token(data: dict):
return access_security.create_access_token(subject=data)
def create_refresh_token(data: dict):
return refresh_security.create_refresh_token(subject=data)
# 创建同时返回access_token和refresh_token的方法
def create_tokens_refresh(data: dict):
access_token = access_security.create_access_token(subject=data)
refresh_token = refresh_security.create_refresh_token(subject=data)
return {"access_token": access_token, "refresh_token": refresh_token}
# 刷新token
def refresh(
credentials: JwtAuthorizationCredentials = Security(refresh_security)
):
# Update access/refresh tokens pair
# We can customize expires_delta when creating
access_token = access_security.create_access_token(
subject=credentials.subject)
refresh_token = refresh_security.create_refresh_token(
subject=credentials.subject, expires_delta=timedelta(days=2))
return {"access_token": access_token, "refresh_token": refresh_token}
def get_current_user(
credentials: JwtAuthorizationCredentials = Security(access_security)
):
# auto_error=False, fo we should check manually
if not credentials:
raise HTTPException(status_code=401, detail='error')
# now we can access Credentials object
return credentials.subject