hamilton icon indicating copy to clipboard operation
hamilton copied to clipboard

Add asyncio based driver and related components

Open skrawcz opened this issue 2 years ago • 4 comments

Is your feature request related to a problem? Please describe. It is conceivable to want to write Hamilton functions in an async based way.

For example, the data loading functions use asyncio based libraries, or you're making external web requests within a function...

However, Hamilton isn't set up to deal with async based functions when Hamilton itself is being run in an asyncio event loop.

Describe the solution you'd like Since this is a different context. We should have a different Driver and related components that handle async functions and operating in an async manner. E.g. async def execute() should be the driver function that needs to be awaited. E.g. something like this in a fastapi app:


from hamilton import driver
dag = driver.Driver({...}, modules, adapter=...)

@app.get("/endpoint")
async def compute( ... ):
    result = await dag.execute([output], inputs=data)
    # transform result for fastapi
    return result

Describe alternatives you've considered If running Hamilton within an event loop is required:

  • Instead of doing this, you can instead separate the I/O based functions and do them outside of Hamilton. That way Hamilton remains pure computation and doesn't need to know about the asyncio world. E.g. From within an fast api app:

from hamilton import driver
dag = driver.Driver({...}, modules, adapter=...)

@app.get("/endpoint")
async def compute( ... ):
    data = await pull_from_db(...)
    result = dag.execute([output], inputs=data) # overrides= could also be used.
    # transform result for fastapi
    return result

If running async functions is required, but Hamilton itself is not being run in an event loop:

  • the functions requiring to call asyncio code use a primitive like asyncio.run() to wrap that code so Hamilton doesn't need to know about anything asyncio.

Additional context Slack discussion - https://hamilton-opensource.slack.com/archives/C03M33QB4M8/p1659463180270219.

skrawcz avatar Aug 02 '22 19:08 skrawcz

What does not work when run within a FastAPI web app::

async def get_from_db(...):
     return await db_load

def my_function(...):
      result =  asyncio.run(get_from_db(...))
      return result

Error:

RuntimeError: asyncio.run() cannot be called from a running event loop

What kind of works with the FastAPI: You can schedule tasks with Hamilton -- but you cannot await them.

async def _log_result(result: str) -> None:
    """simulates logging asynchronously somewhere"""
    print(f'started {result}')
    await asyncio.sleep(2)
    print(f'finished {result}')


def log_result(result: str, event_loop: asyncio.AbstractEventLoop) -> dict:
    task = event_loop.create_task(_log_result(result))
    print("scheduled task", task)
    return {'result': True}

If Hamilton executes this, the _log_result coroutine will be scheduled after serving the response to the web request.

skrawcz avatar Aug 02 '22 20:08 skrawcz

Related gist - https://gist.github.com/shelmigtwo/56d9f2874e93c6281c456c9187850dbf

skrawcz avatar Aug 03 '22 20:08 skrawcz

Implementation here! https://github.com/stitchfix/hamilton/pull/171

elijahbenizzy avatar Aug 08 '22 02:08 elijahbenizzy

This is completed, needs to be released

elijahbenizzy avatar Aug 10 '22 01:08 elijahbenizzy

Released as part of 1.10.0

skrawcz avatar Aug 22 '22 05:08 skrawcz