hamilton
hamilton copied to clipboard
Add asyncio based driver and related components
Is your feature request related to a problem? Please describe. It is conceivable to want to write Hamilton functions in an async based way.
For example, the data loading functions use asyncio based libraries, or you're making external web requests within a function...
However, Hamilton isn't set up to deal with async based functions when Hamilton itself is being run in an asyncio event loop.
Describe the solution you'd like
Since this is a different context. We should have a different Driver and related components that handle async functions and operating in an async manner. E.g. async def execute()
should be the driver function that needs to be awaited.
E.g. something like this in a fastapi app:
from hamilton import driver
dag = driver.Driver({...}, modules, adapter=...)
@app.get("/endpoint")
async def compute( ... ):
result = await dag.execute([output], inputs=data)
# transform result for fastapi
return result
Describe alternatives you've considered If running Hamilton within an event loop is required:
- Instead of doing this, you can instead separate the I/O based functions and do them outside of Hamilton. That way Hamilton remains pure computation and doesn't need to know about the asyncio world. E.g. From within an fast api app:
from hamilton import driver
dag = driver.Driver({...}, modules, adapter=...)
@app.get("/endpoint")
async def compute( ... ):
data = await pull_from_db(...)
result = dag.execute([output], inputs=data) # overrides= could also be used.
# transform result for fastapi
return result
If running async functions is required, but Hamilton itself is not being run in an event loop:
- the functions requiring to call asyncio code use a primitive like
asyncio.run()
to wrap that code so Hamilton doesn't need to know about anything asyncio.
Additional context Slack discussion - https://hamilton-opensource.slack.com/archives/C03M33QB4M8/p1659463180270219.
What does not work when run within a FastAPI web app::
async def get_from_db(...):
return await db_load
def my_function(...):
result = asyncio.run(get_from_db(...))
return result
Error:
RuntimeError: asyncio.run() cannot be called from a running event loop
What kind of works with the FastAPI: You can schedule tasks with Hamilton -- but you cannot await them.
async def _log_result(result: str) -> None:
"""simulates logging asynchronously somewhere"""
print(f'started {result}')
await asyncio.sleep(2)
print(f'finished {result}')
def log_result(result: str, event_loop: asyncio.AbstractEventLoop) -> dict:
task = event_loop.create_task(_log_result(result))
print("scheduled task", task)
return {'result': True}
If Hamilton executes this, the _log_result
coroutine will be scheduled after serving the response to the web request.
Related gist - https://gist.github.com/shelmigtwo/56d9f2874e93c6281c456c9187850dbf
Implementation here! https://github.com/stitchfix/hamilton/pull/171
This is completed, needs to be released
Released as part of 1.10.0