querybook icon indicating copy to clipboard operation
querybook copied to clipboard

Logout button is just reauthenticating when using Zitadel OAuth

Open haripriyacv opened this issue 6 months ago • 0 comments

Hi,

I am using zitadel oAuth for my Querybook instance. The authentication is working fine, but when I logout, it simply reauthenticates and land me in the same page.

Could you give some insights on this issue?

import certifi
import requests


from flask import Markup, request, session as flask_session, redirect
import flask_login
from requests_oauthlib import OAuth2Session
from const.user_roles import UserRoleType

from models.user import  UserRole

from app.db import with_session, DBSession
from env import QuerybookSettings
from lib.logger import get_logger
from logic.user import (
    get_user_by_name,
    create_user,
    create_user_role,    
)
from .utils import (
    AuthenticationError,
    AuthUser,
    abort_unauthorized,
    QuerybookLoginManager,
)

LOG = get_logger(__file__)

OAUTH_CALLBACK_PATH = "/oauth2callback"


class OAuthLoginManager(object):
    def __init__(self):
        self.login_manager = QuerybookLoginManager()
        self.flask_app = None

    @property
    def oauth_session(self):
        oauth_config = self.oauth_config
        return OAuth2Session(
            oauth_config["client_id"],
            scope=oauth_config["scope"],
            redirect_uri=oauth_config["callback_url"],
        )


    @property
    def oauth_config(self):
        return {
            "callback_url": "{}{}".format(
                QuerybookSettings.PUBLIC_URL, OAUTH_CALLBACK_PATH
            ),
            "client_id": QuerybookSettings.OAUTH_CLIENT_ID,
            "client_secret": QuerybookSettings.OAUTH_CLIENT_SECRET,
            "authorization_url": QuerybookSettings.OAUTH_AUTHORIZATION_URL,
            "token_url": QuerybookSettings.OAUTH_TOKEN_URL,
            "profile_url": QuerybookSettings.OAUTH_USER_PROFILE,
            "scope": "openid email profile",
        }

    def init_app(self, flask_app):
        self.flask_app = flask_app

        self.login_manager.init_app(self.flask_app)
        self.flask_app.add_url_rule(
            OAUTH_CALLBACK_PATH, "oauth_callback", self.oauth_callback
        )

    def login(self, request):
        oauth_url, _ = self._get_authn_url()
        flask_session["next"] = request.path
        return redirect(oauth_url)

    def _get_authn_url(self):
        return self.oauth_session.authorization_url(
            self.oauth_config["authorization_url"]
        )

    def oauth_callback(self):
        LOG.debug("Handling Oauth callback...")

        if request.args.get("error"):
            return f"<h1>Error: {Markup.escape(request.args.get('error'))}</h1>"

        code = request.args.get("code")
        try:
            access_token = self._fetch_access_token(code)
            username, email, role = self._get_user_profile(access_token)
            with DBSession() as session:
                flask_login.login_user(
                    AuthUser(self.login_user(username, email, role, session=session))
                )
        except AuthenticationError as e:
            LOG.error("Failed authenticate oauth user", e)
            abort_unauthorized()

        next_url = QuerybookSettings.PUBLIC_URL
        if "next" in flask_session:
            next_url = flask_session["next"]
            del flask_session["next"]

        return redirect(next_url)

    def _fetch_access_token(self, code):
        # Prepare the data for the token request
        data = {
            "grant_type": "authorization_code",  # Explicitly specify the grant type
            "client_id": self.oauth_config["client_id"],
            "client_secret": self.oauth_config["client_secret"],
            "code": code,
            "redirect_uri": self.oauth_config["callback_url"],
        }

        # Make the POST request to the token URL
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        resp = requests.post(
            self.oauth_config["token_url"],
            data=data,
            headers=headers,
            # verify=False,  # Disable SSL verification
        )

        # Check for errors in the response
        if resp.status_code != 200:
            LOG.error(
                "Failed to fetch access token, status: %s, response: %s",
                resp.status_code,
                resp.text,
            )
            raise AuthenticationError("Failed to fetch access token.")

        # Return the access token from the response
        return resp.json().get("access_token")

    def _get_user_profile(self, access_token):
        resp = requests.get(
            self.oauth_config["profile_url"],
            headers={"Authorization": "Bearer {}".format(access_token)},
        )
        if not resp or resp.status_code != 200:
            raise AuthenticationError(
                "Failed to fetch user profile, status ({0})".format(
                    resp.status if resp else "None"
                )
            )
        return self._parse_user_profile(resp)

    def _parse_user_profile(self, profile_response):
        user = profile_response.json()
        username = user.get("preferred_username", "unknown_user")
        email = user.get("email", None)
        roles = user.get("roles", [])
        role = roles[0]["role"] if roles and "role" in roles[0] else "USER"
        return username, email, role

    def get_user_by_id(self, uid, session=None):
        return session.query(UserRole).filter(UserRole.uid == uid).all()        

    @with_session
    def login_user(self, username, email, role, session=None):
        if not username or not isinstance(username, str):
            raise AuthenticationError("Please provide a valid username")

        user = get_user_by_name(username, session=session)
        if not user:
            user = create_user(
                username=username, 
                fullname=username, 
                email=email, 
                session=session
            )
            # Get user roles from OAuth profile
        existing_roles = self.get_user_by_id(user.id, session=session)
        if role== "ADMIN" and not existing_roles:
                    create_user_role(
                        uid=user.id,
                        role=UserRoleType.ADMIN,
                        commit=True,
                        session=session
                    )            
        return user
    

        # Use the end_session_endpoi


login_manager = OAuthLoginManager()

ignore_paths = [OAUTH_CALLBACK_PATH]


def init_app(app):
    login_manager.init_app(app)


def login(request):
    return login_manager.login(request)


def oauth_authorization_url():
    return login_manager._get_authn_url()

`

haripriyacv avatar Jun 27 '25 14:06 haripriyacv