querybook
querybook copied to clipboard
Logout button is just reauthenticating when using Zitadel OAuth
Hi,
I am using zitadel oAuth for my Querybook instance. The authentication is working fine, but when I logout, it simply reauthenticates and land me in the same page.
Could you give some insights on this issue?
import certifi
import requests
from flask import Markup, request, session as flask_session, redirect
import flask_login
from requests_oauthlib import OAuth2Session
from const.user_roles import UserRoleType
from models.user import UserRole
from app.db import with_session, DBSession
from env import QuerybookSettings
from lib.logger import get_logger
from logic.user import (
get_user_by_name,
create_user,
create_user_role,
)
from .utils import (
AuthenticationError,
AuthUser,
abort_unauthorized,
QuerybookLoginManager,
)
LOG = get_logger(__file__)
OAUTH_CALLBACK_PATH = "/oauth2callback"
class OAuthLoginManager(object):
def __init__(self):
self.login_manager = QuerybookLoginManager()
self.flask_app = None
@property
def oauth_session(self):
oauth_config = self.oauth_config
return OAuth2Session(
oauth_config["client_id"],
scope=oauth_config["scope"],
redirect_uri=oauth_config["callback_url"],
)
@property
def oauth_config(self):
return {
"callback_url": "{}{}".format(
QuerybookSettings.PUBLIC_URL, OAUTH_CALLBACK_PATH
),
"client_id": QuerybookSettings.OAUTH_CLIENT_ID,
"client_secret": QuerybookSettings.OAUTH_CLIENT_SECRET,
"authorization_url": QuerybookSettings.OAUTH_AUTHORIZATION_URL,
"token_url": QuerybookSettings.OAUTH_TOKEN_URL,
"profile_url": QuerybookSettings.OAUTH_USER_PROFILE,
"scope": "openid email profile",
}
def init_app(self, flask_app):
self.flask_app = flask_app
self.login_manager.init_app(self.flask_app)
self.flask_app.add_url_rule(
OAUTH_CALLBACK_PATH, "oauth_callback", self.oauth_callback
)
def login(self, request):
oauth_url, _ = self._get_authn_url()
flask_session["next"] = request.path
return redirect(oauth_url)
def _get_authn_url(self):
return self.oauth_session.authorization_url(
self.oauth_config["authorization_url"]
)
def oauth_callback(self):
LOG.debug("Handling Oauth callback...")
if request.args.get("error"):
return f"<h1>Error: {Markup.escape(request.args.get('error'))}</h1>"
code = request.args.get("code")
try:
access_token = self._fetch_access_token(code)
username, email, role = self._get_user_profile(access_token)
with DBSession() as session:
flask_login.login_user(
AuthUser(self.login_user(username, email, role, session=session))
)
except AuthenticationError as e:
LOG.error("Failed authenticate oauth user", e)
abort_unauthorized()
next_url = QuerybookSettings.PUBLIC_URL
if "next" in flask_session:
next_url = flask_session["next"]
del flask_session["next"]
return redirect(next_url)
def _fetch_access_token(self, code):
# Prepare the data for the token request
data = {
"grant_type": "authorization_code", # Explicitly specify the grant type
"client_id": self.oauth_config["client_id"],
"client_secret": self.oauth_config["client_secret"],
"code": code,
"redirect_uri": self.oauth_config["callback_url"],
}
# Make the POST request to the token URL
headers = {"Content-Type": "application/x-www-form-urlencoded"}
resp = requests.post(
self.oauth_config["token_url"],
data=data,
headers=headers,
# verify=False, # Disable SSL verification
)
# Check for errors in the response
if resp.status_code != 200:
LOG.error(
"Failed to fetch access token, status: %s, response: %s",
resp.status_code,
resp.text,
)
raise AuthenticationError("Failed to fetch access token.")
# Return the access token from the response
return resp.json().get("access_token")
def _get_user_profile(self, access_token):
resp = requests.get(
self.oauth_config["profile_url"],
headers={"Authorization": "Bearer {}".format(access_token)},
)
if not resp or resp.status_code != 200:
raise AuthenticationError(
"Failed to fetch user profile, status ({0})".format(
resp.status if resp else "None"
)
)
return self._parse_user_profile(resp)
def _parse_user_profile(self, profile_response):
user = profile_response.json()
username = user.get("preferred_username", "unknown_user")
email = user.get("email", None)
roles = user.get("roles", [])
role = roles[0]["role"] if roles and "role" in roles[0] else "USER"
return username, email, role
def get_user_by_id(self, uid, session=None):
return session.query(UserRole).filter(UserRole.uid == uid).all()
@with_session
def login_user(self, username, email, role, session=None):
if not username or not isinstance(username, str):
raise AuthenticationError("Please provide a valid username")
user = get_user_by_name(username, session=session)
if not user:
user = create_user(
username=username,
fullname=username,
email=email,
session=session
)
# Get user roles from OAuth profile
existing_roles = self.get_user_by_id(user.id, session=session)
if role== "ADMIN" and not existing_roles:
create_user_role(
uid=user.id,
role=UserRoleType.ADMIN,
commit=True,
session=session
)
return user
# Use the end_session_endpoi
login_manager = OAuthLoginManager()
ignore_paths = [OAUTH_CALLBACK_PATH]
def init_app(app):
login_manager.init_app(app)
def login(request):
return login_manager.login(request)
def oauth_authorization_url():
return login_manager._get_authn_url()
`