prefect
prefect copied to clipboard
`sync_compatible` async/sync context detection error
First check
- [X] I added a descriptive title to this issue.
- [X] I used the GitHub search to find a similar issue and didn't find it.
- [X] I searched the Prefect documentation for this issue.
- [X] I checked that this issue is related to Prefect and not one of its dependencies.
Bug summary
Using pytest-asyncio to perform my test, the Azure.load("name") returns an error because the decorator synch_compatible
determines that I am not in an async context while I am. Running async_fn(*args, **kwargs)
in the wrapper the coroutine is correctly created and works.
Reproduction
import os
from time import sleep
import pytest
import pytest_asyncio
from httpx import ReadError
from prefect.client import get_client
from prefect.filesystems import Azure
from prefect.orion.schemas.core import BlockDocument
from prefect.testing.utilities import prefect_test_harness
@pytest.fixture(autouse=True, scope="session")
def prefect_test_fixture():
with prefect_test_harness():
yield
@pytest_asyncio.fixture
async def prefect_client(prefect_test_fixture):
async with get_client() as client:
yield client
@pytest_asyncio.fixture
async def azure_blob(prefect_client):
name = "azureblobblock"
azure_block_type = list(
filter(
lambda b: b.block_type.slug == "azure",
(await prefect_client.read_block_schemas()),
)
)[0]
azure_block = BlockDocument(
name=name,
data={
"bucket_path": "prefect",
"azure_storage_account_name": os.environ["ACCOUNT_NAME"],
"azure_storage_account_key": os.environ["ACCOUNT_KEY"],
},
block_schema_id=azure_block_type.id,
block_type_id=azure_block_type.block_type_id,
)
azure_block = await prefect_client.create_block_document(azure_block)
yield await Azure.load(name)
await prefect_client.delete_block_document(azure_block.id)
@pytest.mark.asyncio
async def test_azure_blob(azure_blob):
content = b"ok"
await azure_blob.write_path("yo.txt", content)
assert await azure_blob.read_path("yo.txt") == content
Error
@wraps(async_fn)
def wrapper(*args, **kwargs):
if in_async_main_thread():
caller_frame = sys._getframe(1)
caller_module = caller_frame.f_globals.get("__name__", "unknown")
caller_async = caller_frame.f_code.co_flags & inspect.CO_COROUTINE
if caller_async or any(
# Add exceptions for the internals anyio/asyncio which can run
# coroutines from synchronous functions
caller_module.startswith(f"{module}.")
for module in ["asyncio", "anyio"]
):
# In the main async context; return the coro for them to await
return async_fn(*args, **kwargs)
else:
# In the main thread but call was made from a sync method
> raise RuntimeError(
"A 'sync_compatible' method was called from a context that was "
"previously async but is now sync. The sync call must be changed "
"to run in a worker thread to support sending the coroutine for "
f"{async_fn.__name__!r} to the main thread."
)
E RuntimeError: A 'sync_compatible' method was called from a context that was previously async but is now sync. The sync call must be changed to run in a worker thread to support sending the coroutine for 'load' to the main thread.
Versions
Version: 2.3.0
API version: 0.8.0
Python version: 3.10.0
Git commit: 8d9316c0
Built: Tue, Aug 30, 2022 5:30 PM
OS/Arch: linux/x86_64
Profile: default
Server type: ephemeral
Server:
Database: sqlite
SQLite version: 3.38.5
Additional context
No response
To work around the issue I do the following hack:
block_document = await prefect_client.read_block_document_by_name(
name=azure_block.name, block_type_slug=azure_block.block_type.slug
)
yield Azure._from_block_document(block_document)
Thanks for the report! This is definitely incorrect behavior. I believe this is because caller_async
does not have handling for wrapped functions, but I'm not sure. We'll need to investigate.
I got a similar issue when I want to deploy to prefect server.
from prefect.blocks.core import Block
from pydantic import SecretStr
class SFTPCredentials(Block):
host: str
username: str
password: SecretStr
port: int
def update_block():
sftp_credentials = SFTPCredentials(
host="localhost",
username="demo",
password=SecretStr("pass"),
port=4422,
)
sftp_credentials.save(name="sftp-credentials", overwrite=True)
When I do prefect deployment apply myflow.yaml
with the method update_block().
I got
RuntimeError: A 'sync_compatible' method was called from a context that was previously async but is now sync. The sync call must be changed to run in a worker thread to support sending the coroutine for 'save' to the main thread.
Got same issue for this
RuntimeError: A 'sync_compatible' method was called from a context that was previously async but is now sync. The sync call must be changed to run in a worker thread to support sending the coroutine for 'load' to the main thread.
when i try to use the gcs load
from prefect.filesystems import GCS
gcs_block = GCS.load("test-block")
Should this code work?
from prefect import task, flow
from prefect.client import get_client
from prefect.filesystems import Azure
import asyncio
async def client_setup():
async with get_client() as client:
azure_block = Azure.load("stoprefectdev")
block_document = await client.read_block_document_by_name(
name=azure_block.name, block_type_slug=azure_block.block_type.slug
)
yield Azure._from_block_document(block_document)
asyncio.run(client_setup())
I'm getting the following error: RuntimeError: asyncio.run() cannot be called from a running event loop
@samanax Please don't add new errors to existing issues.
The code
from prefect import task, flow
from prefect.client import get_client
from prefect.filesystems import Azure
import asyncio
async def client_setup():
async with get_client() as client:
azure_block = await Azure.load("stoprefectdev")
block_document = await client.read_block_document_by_name(
name=azure_block.name, block_type_slug=azure_block.block_type.slug
)
return Azure._from_block_document(block_document)
asyncio.run(client_setup())
Runs fine for me in a Python shell.
This happens whenever I add
from prefect.filesystems import RemoteFileSystem
remote_file_system_block = RemoteFileSystem.load("my-s3")
no await
in my file at all
@chris-aeviator how is the file being executed?
@madkinsz it was inside my pipeline file, and deployed via
prefect deployment build ./test-project/test.py:my_task -n automated-task --storage-block remote-file-system/my-s3
but I have since deleted re-added my s3 block via prefect block register --file s3block.py
and removed it from my test.py
.
This sounds like a bug with how Prefect is loading your deployment file. Can you share the traceback?
I am getting the same error when I run the app from uvicorn(FastaPI). Here's a minimalist recreation:
error_test.py
# boilerplate fastapi helloworld
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
async def root():
return {"message": "Hello World"}
from prefect.filesystems import GitHub
storage = GitHub.load('repo')
then in the command line:
$ uvicorn error_test.py:app
Traceback (most recent call last):
File "/Users/erik/code_venvs/prefect-simple-webhook-_il2wvVZ-py3.9/bin/uvicorn", line 8, in <module>
sys.exit(main())
File "/Users/erik/code_venvs/prefect-simple-webhook-_il2wvVZ-py3.9/lib/python3.9/site-packages/click/core.py", line 1130, in __call__
return self.main(*args, **kwargs)
File "/Users/erik/code_venvs/prefect-simple-webhook-_il2wvVZ-py3.9/lib/python3.9/site-packages/click/core.py", line 1055, in main
rv = self.invoke(ctx)
File "/Users/erik/code_venvs/prefect-simple-webhook-_il2wvVZ-py3.9/lib/python3.9/site-packages/click/core.py", line 1404, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/Users/erik/code_venvs/prefect-simple-webhook-_il2wvVZ-py3.9/lib/python3.9/site-packages/click/core.py", line 760, in invoke
return __callback(*args, **kwargs)
File "/Users/erik/code_venvs/prefect-simple-webhook-_il2wvVZ-py3.9/lib/python3.9/site-packages/uvicorn/main.py", line 408, in main
run(
File "/Users/erik/code_venvs/prefect-simple-webhook-_il2wvVZ-py3.9/lib/python3.9/site-packages/uvicorn/main.py", line 576, in run
server.run()
File "/Users/erik/code_venvs/prefect-simple-webhook-_il2wvVZ-py3.9/lib/python3.9/site-packages/uvicorn/server.py", line 60, in run
return asyncio.run(self.serve(sockets=sockets))
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
return future.result()
File "/Users/erik/code_venvs/prefect-simple-webhook-_il2wvVZ-py3.9/lib/python3.9/site-packages/uvicorn/server.py", line 67, in serve
config.load()
File "/Users/erik/code_venvs/prefect-simple-webhook-_il2wvVZ-py3.9/lib/python3.9/site-packages/uvicorn/config.py", line 479, in load
self.loaded_app = import_from_string(self.app)
File "/Users/erik/code_venvs/prefect-simple-webhook-_il2wvVZ-py3.9/lib/python3.9/site-packages/uvicorn/importer.py", line 21, in import_from_string
module = importlib.import_module(module_str)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1030, in _gcd_import
File "<frozen importlib._bootstrap>", line 1007, in _find_and_load
File "<frozen importlib._bootstrap>", line 972, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
File "<frozen importlib._bootstrap>", line 1030, in _gcd_import
File "<frozen importlib._bootstrap>", line 1007, in _find_and_load
File "<frozen importlib._bootstrap>", line 986, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 680, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 790, in exec_module
File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
File "./error_test.py", line 11, in <module>
storage = GitHub.load('repo')
File "/Users/erik/code_venvs/prefect-simple-webhook-_il2wvVZ-py3.9/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 198, in wrapper
raise RuntimeError(
RuntimeError: A 'sync_compatible' method was called from a context that was previously async but is now sync. The sync call must be changed to run in a worker thread to support sending the coroutine for 'load' to the main thread.
Thanks for the reproduction! This is because when Uvicorn loads the file it does so from a thread with an event loop. If you place your calls in a synchronous function, things will work as you desire:
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
async def root():
return {"message": "Hello World"}
@app.get("/foo")
def foo():
from prefect.filesystems import GitHub
storage = GitHub.load('repo')
If you're going to use it from an async function, you should be using await GitHub.load('repo')
instead.
Thanks for the reproduction! This is because when Uvicorn loads the file it does so from a thread with an event loop. If you place your calls in a synchronous function, things will work as you desire:
from fastapi import FastAPI app = FastAPI() @app.get("/") async def root(): return {"message": "Hello World"} @app.get("/foo") def foo(): from prefect.filesystems import GitHub storage = GitHub.load('repo')
If you're going to use it from an async function, you should be using
await GitHub.load('repo')
instead.
It would be cool to have a way to explicitly call the sync version functions.
I call the same sync functions from sync and async environments. The async detector goes crazy when there is an event loop.
My function is sync and it fetches secrets via Secret.load
. I think currently, there is no way to force sync mode and I have to pass secrets as parameters instead.
This is addressed in Prefect 3, where sync_compatible allows a force_sync keyword.
Thanks!