sdk-generator
sdk-generator copied to clipboard
A way to access the asyncio loop, or a way to set it early
Checklist
- [X] I have looked into the README and have not found a suitable solution or answer.
- [X] I have looked into the documentation and have not found a suitable solution or answer.
- [X] I have searched the issues and have not found a suitable solution or answer.
- [X] I have searched the Slack Community and have not found a suitable solution or answer. (I do not have a slack account, nor do I feel that I have to create one just to delete it afterwards)
- [X] I agree to the terms within the OpenFGA Code of Conduct.
Description
In a wrapper class, in short I have:
self.configuration = ClientConfiguration(
api_url=f"{config.openfga.protocol}://{config.openfga.host}:{config.openfga.port}"
)
self.session = OpenFgaClient(self.configuration)
And when the wrapper's .read()
is called from different co-routine as an example, this happens quite frequently:
File "/app/openfga_wrapper/__init__.py", line 82, in read
return await self.session.read(body, self.options)
...
RuntimeError: Timeout context manager should be used inside a task
And attempting to create a task specifically yields the same result:
return await asyncio.create_task(self.session.read(body, self.options))
It still complains that the Timeout context is not executed from within a task.
I'm not entirely comfortable with asyncio
in general, but from what I can tell it's mainly because of how OpenFGA
attempts to find/use the currently running event loop, instead of consistently using one defined loop.
And attempting to create a secondary long lasting asyncio loop with threads will produce:
attached to a different loop
Obviously if you have the option to wrap everything in one asyncio.run()
call, that's the ideal solution. However if using certain libraries or threads this becomes troublesome.
Expectation
A way to set a target event loop in either ClientConfiguration
or OpenFgaClient
Reproduction
Below is a sample code of an API that will generate the issue.
Note that fixing the below code to work, by creating a async def _main()
and avoid using separate threads would fix this particular code - but it's intended to show the threaded issue.
Run curl http://127.0.0.1:8888
against:
import asyncio
import base64
import json
import random
import string
import time
import threading
import logging
from fastapi import FastAPI
from hypercorn.config import Config
from hypercorn.asyncio import serve
from openfga_sdk.exceptions import ApiException
from openfga_sdk import ClientConfiguration, OpenFgaClient, ReadRequestTupleKey
class OpenFGAWrapper(threading.Thread):
def __init__(self):
self.configuration = ClientConfiguration(
api_url=f"http://openfga:8080"
)
self.session = None
self.options = None
self.store_id = None
self.loop = None
threading.Thread.__init__(self)
self.start()
def run(self):
asyncio.run(self._reconnect())
def close(self):
try:
self.loop = asyncio.get_running_loop()
except RuntimeError:
self.loop = asyncio.new_event_loop()
return self.loop.run_until_complete(openfga.session.close())
async def _reconnect(self):
if self.session is None:
try:
self.loop = asyncio.get_running_loop()
except RuntimeError:
self.loop = asyncio.new_event_loop()
self.session = OpenFgaClient(self.configuration)
pageination_options = {
"page_size": 25,
"continuation_token": base64.urlsafe_b64encode(json.dumps({
"pk":"LATEST_NSCONFIG_auth0store",
"sk":''.join(random.SystemRandom().choice(string.ascii_uppercase + string.digits) for _ in range(12))
}).encode()).decode()
}
# After connecting, use set_store_id() to set a default store for
# this session.
try:
response = await self.session.list_stores(pageination_options)
except ApiException as error:
print(error.parsed_exception)
raise error
for store in response.stores:
self.session.set_store_id(store.id)
self.store_id = store.id
# We set default options for later use, that use the last
# available authorization model.
try:
response = await self.session.read_authorization_models()
except ApiException as error:
print(error.parsed_exception)
raise error
for model in response.authorization_models:
self.options = {
"authorization_model_id": model.id
}
while True:
time.sleep(0.25)
async def read(self, body):
task = await self.loop.create_task(self.session.read(body, self.options))
return task.result()
if __name__ == '__main__':
logging.getLogger("hypercorn.error").setLevel(logging.DEBUG)
logging.getLogger("hypercorn.access").setLevel(logging.DEBUG)
corn_conf = Config()
corn_conf.bind = f"127.0.0.1:8888"
corn_conf.loglevel = "DEBUG"
corn_conf.accesslog = '-'
corn_conf.errorlog = '-'
openfga = OpenFGAWrapper()
app = FastAPI(
title="TestAPI"
)
@app.get("/")
async def entrypoint():
body = ReadRequestTupleKey(
user=f"user:testuser",
relation="owner",
object="document:",
)
try:
response = await openfga.read(body)
except ApiException as error:
print(error.parsed_exception)
raise error
asyncio.run(serve(app, corn_conf))
openfga.close()
SDK Checklist
- [ ] JS SDK
- [ ] Go SDK
- [ ] .NET SDK
- [X] Python SDK
- [ ] Java SDK
OpenFGA SDK version
0.6.0
OpenFGA version
v1.4.3
SDK Configuration
See example code
Logs
No response
References
No response