sdk-generator icon indicating copy to clipboard operation
sdk-generator copied to clipboard

A way to access the asyncio loop, or a way to set it early

Open Torxed opened this issue 7 months ago • 0 comments

Checklist

  • [X] I have looked into the README and have not found a suitable solution or answer.
  • [X] I have looked into the documentation and have not found a suitable solution or answer.
  • [X] I have searched the issues and have not found a suitable solution or answer.
  • [X] I have searched the Slack Community and have not found a suitable solution or answer. (I do not have a slack account, nor do I feel that I have to create one just to delete it afterwards)
  • [X] I agree to the terms within the OpenFGA Code of Conduct.

Description

In a wrapper class, in short I have:

self.configuration = ClientConfiguration(
	api_url=f"{config.openfga.protocol}://{config.openfga.host}:{config.openfga.port}"
)
self.session = OpenFgaClient(self.configuration)

And when the wrapper's .read() is called from different co-routine as an example, this happens quite frequently:

File "/app/openfga_wrapper/__init__.py", line 82, in read
    return await self.session.read(body, self.options)
...
RuntimeError: Timeout context manager should be used inside a task

And attempting to create a task specifically yields the same result:

    return await asyncio.create_task(self.session.read(body, self.options))

It still complains that the Timeout context is not executed from within a task. I'm not entirely comfortable with asyncio in general, but from what I can tell it's mainly because of how OpenFGA attempts to find/use the currently running event loop, instead of consistently using one defined loop.

And attempting to create a secondary long lasting asyncio loop with threads will produce:

attached to a different loop

Obviously if you have the option to wrap everything in one asyncio.run() call, that's the ideal solution. However if using certain libraries or threads this becomes troublesome.

Expectation

A way to set a target event loop in either ClientConfiguration or OpenFgaClient

Reproduction

Below is a sample code of an API that will generate the issue. Note that fixing the below code to work, by creating a async def _main() and avoid using separate threads would fix this particular code - but it's intended to show the threaded issue.

Run curl http://127.0.0.1:8888 against:

import asyncio
import base64
import json
import random
import string
import time
import threading
import logging
from fastapi import FastAPI
from hypercorn.config import Config
from hypercorn.asyncio import serve
from openfga_sdk.exceptions import ApiException
from openfga_sdk import ClientConfiguration, OpenFgaClient, ReadRequestTupleKey

class OpenFGAWrapper(threading.Thread):
	def __init__(self):
		self.configuration = ClientConfiguration(
			api_url=f"http://openfga:8080"
		)
		self.session = None
		self.options = None
		self.store_id = None
		self.loop = None

		threading.Thread.__init__(self)
		self.start()

	def run(self):
		asyncio.run(self._reconnect())

	def close(self):
		try:
			self.loop = asyncio.get_running_loop()
		except RuntimeError:
			self.loop = asyncio.new_event_loop()

		return self.loop.run_until_complete(openfga.session.close())

	async def _reconnect(self):
		if self.session is None:
			try:
				self.loop = asyncio.get_running_loop()
			except RuntimeError:
				self.loop = asyncio.new_event_loop()

			self.session = OpenFgaClient(self.configuration)

			pageination_options = {
				"page_size": 25,
				"continuation_token": base64.urlsafe_b64encode(json.dumps({
					"pk":"LATEST_NSCONFIG_auth0store",
					"sk":''.join(random.SystemRandom().choice(string.ascii_uppercase + string.digits) for _ in range(12))
				}).encode()).decode()
			}

			# After connecting, use set_store_id() to set a default store for
			# this session.
			try:
				response = await self.session.list_stores(pageination_options)
			except ApiException as error:
				print(error.parsed_exception)
				raise error

			for store in response.stores:
				self.session.set_store_id(store.id)
				self.store_id = store.id

				# We set default options for later use, that use the last
				# available authorization model.
				try:
					response = await self.session.read_authorization_models()
				except ApiException as error:
					print(error.parsed_exception)
					raise error

				for model in response.authorization_models:
					self.options = {
						"authorization_model_id": model.id
					}

		while True:
			time.sleep(0.25)

	async def read(self, body):
		task = await self.loop.create_task(self.session.read(body, self.options))
		return task.result()

if __name__ == '__main__':
	logging.getLogger("hypercorn.error").setLevel(logging.DEBUG)
	logging.getLogger("hypercorn.access").setLevel(logging.DEBUG)

	corn_conf = Config()
	corn_conf.bind = f"127.0.0.1:8888"
	corn_conf.loglevel = "DEBUG"
	corn_conf.accesslog = '-'
	corn_conf.errorlog = '-'

	openfga = OpenFGAWrapper()
	app = FastAPI(
		title="TestAPI"
	)

	@app.get("/")
	async def entrypoint():
		body = ReadRequestTupleKey(
			user=f"user:testuser",
			relation="owner",
			object="document:",
		)

		try:
			response = await openfga.read(body)
		except ApiException as error:
			print(error.parsed_exception)
			raise error

	asyncio.run(serve(app, corn_conf))
	openfga.close()

SDK Checklist

  • [ ] JS SDK
  • [ ] Go SDK
  • [ ] .NET SDK
  • [X] Python SDK
  • [ ] Java SDK

OpenFGA SDK version

0.6.0

OpenFGA version

v1.4.3

SDK Configuration

See example code

Logs

No response

References

No response

Torxed avatar Jul 05 '24 12:07 Torxed