fastapi-jwt icon indicating copy to clipboard operation
fastapi-jwt copied to clipboard

credentials: JwtAuthorizationCredentials = Security(access_security) ): It also allows refresh_ Security access

Open Leewinner1 opened this issue 8 months ago • 0 comments

def get_current_user( credentials: JwtAuthorizationCredentials = Security(access_security) )

if not credentials:
    raise HTTPException(status_code=401, detail='error')


return credentials.subject

jwt.md

# 使用python-jose来生成jwt,验证jwt,获取当前用户的方法

# 生成token
# def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
import os
from datetime import datetime, timedelta

from fastapi_jwt import JwtRefreshBearer, JwtAuthorizationCredentials, JwtAccessBearer

from fastapi import Security, HTTPException
from dotenv import load_dotenv

load_dotenv()
secret_key = os.getenv("SECRET_KEY", 'leees')

access_token_expires = int(os.getenv("JWT_EXPIRE_TIME", 7))
refresh_token_expires = int(os.getenv("JWT_REFRESH_TIME", 30))


access_security = JwtAccessBearer(
    secret_key=secret_key,
    auto_error=True,
    # change access token validation timedelta
    access_expires_delta=timedelta(days=access_token_expires)
)


# Read refresh token from bearer header only
refresh_security = JwtRefreshBearer(
    secret_key=secret_key,
    auto_error=True,  # automatically raise HTTPException: HTTP_401_UNAUTHORIZED
    refresh_expires_delta=timedelta(days=refresh_token_expires)
)


def create_token(data: dict):
    return access_security.create_access_token(subject=data)


def create_refresh_token(data: dict):
    return refresh_security.create_refresh_token(subject=data)

# 创建同时返回access_token和refresh_token的方法


def create_tokens_refresh(data: dict):
    access_token = access_security.create_access_token(subject=data)
    refresh_token = refresh_security.create_refresh_token(subject=data)
    return {"access_token": access_token, "refresh_token": refresh_token}

# 刷新token


def refresh(
        credentials: JwtAuthorizationCredentials = Security(refresh_security)
):
    # Update access/refresh tokens pair
    # We can customize expires_delta when creating
    access_token = access_security.create_access_token(
        subject=credentials.subject)
    refresh_token = refresh_security.create_refresh_token(
        subject=credentials.subject, expires_delta=timedelta(days=2))

    return {"access_token": access_token, "refresh_token": refresh_token}


def get_current_user(
        credentials: JwtAuthorizationCredentials = Security(access_security)
):

    # auto_error=False, fo we should check manually

    if not credentials:
        raise HTTPException(status_code=401, detail='error')

    # now we can access Credentials object
    return credentials.subject

Leewinner1 avatar Oct 31 '23 16:10 Leewinner1