Custom access control
Would be nice to have roles and fine grained access control so mods can only view, admins can delete/add specific data etc
I currently do this with middleware but would be nice for some less hacky way:
class CustomHeaderMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
if request.cookies.get("sunbeam-session:warriorcats"):
request.scope["sunbeam_user"] = orjson.loads(b64decode(request.cookies.get("sunbeam-session:warriorcats")))
else:
return RedirectResponse("https://fateslist.xyz/frostpaw/herb?redirect=https://lynx.fateslist.xyz")
check = await app.state.db.fetchval(
"SELECT user_id FROM users WHERE user_id = $1 AND api_token = $2",
int(request.scope["sunbeam_user"]["user"]["id"]),
request.scope["sunbeam_user"]["token"]
)
if not check:
return HTMLResponse("<h1>Login and logout of Fates List to continue</h1>")
_, perm, _ = await is_staff(None, int(request.scope["sunbeam_user"]["user"]["id"]), 2, redis=app.state.redis)
# Only allow moderators to access the admin panel
if perm < 3:
return HTMLResponse("<h1>You do not have permission to access this page</h1>")
# Perm check
if request.url.path.startswith("/api"):
if request.url.path == "/api/tables/" and perm < 5:
return ORJSONResponse(["reviews", "bot_packs", "vanity", "leave_of_absence"])
elif request.url.path == "/api/tables/users/ids/" and request.method == "GET":
pass
elif request.url.path in ("/api/forms/", "/api/user/", "/api/openapi.json") or request.url.path.startswith("/api/docs"):
pass
elif perm < 5:
if request.url.path.startswith("/api/tables/vanity"):
if request.method != "GET":
return ORJSONResponse({"error": "You do not have permission to update vanity"}, status_code=403)
elif request.url.path.startswith("/api/tables/bot_packs"):
if request.method != "GET":
return ORJSONResponse({"error": "You do not have permission to update bot packs"}, status_code=403)
elif request.url.path.startswith("/api/tables/leave_of_absence/") and request.method in ("PATCH", "DELETE"):
ids = request.url.path.split("/")
loa_id = None
for id in ids:
if id.isdigit():
loa_id = int(id)
break
else:
return abort(404)
user_id = await app.state.db.fetchval("SELECT user_id::text FROM leave_of_absence WHERE id = $1", loa_id)
if user_id != request.scope["sunbeam_user"]["user"]["id"]:
return ORJSONResponse({"error": "You do not have permission to update this leave of absence"}, status_code=403)
elif not request.url.path.startswith(("/api/tables/reviews", "/api/tables/bot_packs", "/api/tables/leave_of_absence")):
return ORJSONResponse({"error": "You do not have permission to access this page"}, status_code=403)
key = "rl:%s" % request.scope["sunbeam_user"]["user"]["id"]
check = await app.state.redis.get(key)
if not check:
rl = await app.state.redis.set(key, "0", ex=30)
if request.method != "GET":
rl = await app.state.redis.incr(key)
if int(rl) > 3:
expire = await app.state.redis.ttl(key)
await app.state.db.execute("UPDATE users SET api_token = $1 WHERE user_id = $2", get_token(128), int(request.scope["sunbeam_user"]["user"]["id"]))
return ORJSONResponse({"error": f"You have exceeded the rate limit {expire} is TTL. API_TOKEN_RESET"}, status_code=429)
embed = Embed(
title = "Lynx API Request",
description = f"**This is usually malicious. When in doubt DM**",
color = 0x00ff00,
)
embed.add_field(name="User ID", value=request.scope["sunbeam_user"]["user"]["id"])
embed.add_field(name="Username", value=request.scope["sunbeam_user"]["user"]["username"])
embed.add_field(name="Request", value=f"{request.method} {request.url}")
username = request.scope["sunbeam_user"]["user"]["username"]
password = get_token(96)
try:
await BaseUser.create_user(
username=username,
password=password,
email=username + "@fateslist.xyz",
active=True,
admin=True
)
if request.url.path == "/":
return HTMLResponse(
f"""
<h1>Welcome to Lynx</h1>
<h2>Credentials for login is <code>{username}</code> and <code>{password}</code></h2>
<h3>You will never see this again! Take note somewhere secret, to reset this, go to /reset</h3>
"""
)
except Exception as exc:
print(exc)
response = await call_next(request)
embed.add_field(name="Status Code", value=f"{response.status_code} {HTTPStatus(response.status_code).phrase}")
asyncio.create_task(redis_ipc_new(app.state.redis, "SENDMSG", msg={"content": f"<@&942099547025465426>", "embed": embed.to_dict(), "channel_id": "935168801480261733", "mention_roles": ["942099547025465426"]}))
if not response.status_code < 400:
return response
try:
print(request.user.user.username)
except:
request.scope["user"] = Unknown()
if request.url.path.startswith("/api/tables/leave_of_absence") and request.method == "POST":
response_body = [section async for section in response.body_iterator]
response.body_iterator = iterate_in_threadpool(iter(response_body))
content = response_body[0]
content_dict = orjson.loads(content)
await app.state.db.execute("UPDATE leave_of_absence SET user_id = $1 WHERE id = $2", int(request.scope["sunbeam_user"]["user"]["id"]), content_dict[0]["id"])
return ORJSONResponse(content_dict)
if request.url.path.startswith("/api/tables/bots") and request.method == "PATCH":
print("Got bot edit, sending message")
path = request.url.path.rstrip("/")
bot_id = int(path.split("/")[-1])
print("Got bot id: ", bot_id)
owner = await app.state.db.fetchval("SELECT owner FROM bot_owner WHERE bot_id = $1", bot_id)
embed = Embed(
title = "Bot Edited Via Lynx",
description = f"Bot <@{bot_id}> has been edited via Lynx by user {request.user.user.username}",
color = 0x00ff00,
url=f"https://fateslist.xyz/bot/{bot_id}"
)
await redis_ipc_new(app.state.redis, "SENDMSG", msg={"content": f"<@{owner}>", "embed": embed.to_dict(), "channel_id": str(bot_logs)})
return response
I agree this would be nice.
It would be a big piece of work though, so needs some thought about how to tackle it in pieces.
+1 for roles and restrictions, @dantownsend do you have any new thoughts on this?
@shattl We might be able to do something.
Piccolo Admin is built on top of a class called PiccoloCRUD, which converts a Piccolo table into a REST API.
We have validators for PiccoloCRUD.
Currently we just use these in Piccolo Admin to restrict what non-superusers can do (i.e. they're not allowed to modify other users, or sessions).
We don't currently allow users to specify their own validators for use with Piccolo Admin, but we potentially could. Something like this:
async def manager_only(piccolo_crud, request):
user = request.user.user
manager = await Manager.exists().where(manager.user = user)
if not manager:
raise HTTPException("Only managers are allowed to do this")
admin = create_admin(tables=TableConfig(MyTable, validators=Validators(post_single=manager_only)))
We could build some kind of UI, but that might be quite hard, and not as flexible.
@sinisaos What do you think of this approach?
@dantownsend As you wrote, we already have some access control. Superuser can do everything, admin can do crud operations on all tables except User and Session table. If that's not enough and we need a little more control for certain endpoints, your approach seems good to me, to add validators to the TableConfig class and allow the user to specify which roles and endpoints this role has access to. But first we have to somehow restrict the access of the admin user, because now the admin user can do crud operations on all tables except User and Sessions.
In version 0.35.0, we added a validators argument to TableConfig, so fine grained access control is possible:
https://piccolo-admin.readthedocs.io/en/latest/table_config/index.html#validators
We might add some kind of UI for it in the future, but this approach is quite flexible, and should accommodate most use cases.