flytekit
flytekit copied to clipboard
Support async workflow & task
Tracking issue
closes flyteorg/flyte#1840
Why are the changes needed?
support async task. pythonfunctiontask
, pythonfunctionworkflow
and map_task
are supported. Conditional workflows
are currently not supported
import asyncio
from flytekit.experimental import eager
from flytekit import task, workflow, map_task
import time
from flytekit.configuration import Config
from flytekit.remote import FlyteRemote
@task(enable_deck=True)
async def async_add_one(x: int) -> int:
await asyncio.sleep(1)
return x + 1
@task
async def multi_add(models: list[int]) -> list[int]:
coros = [async_add_one(x=x) for x in models]
res = await asyncio.gather(*coros)
return res
@workflow
async def adv_async_p(models: list[int] = [1,2,3]) -> list[int]:
coros = map_task(async_add_one)(x=models)
new_models = await coros
res2 = await multi_add(models=new_models)
return res2
@workflow
async def sub3_wf(models: list[int] = [1,2,3]) -> list[int]:
await async_add_one(x=models[0])
return models
@workflow
async def sub_adv_p(models: list[int] = [1,2,3]) -> list[int]:
ann = await sub3_wf(models=models)
coro0 = async_add_one(x=ann[0])
coro1 = async_add_one(x=ann[1])
res = await asyncio.gather(coro0, coro1)
return res
print(res)
return wtf_resolve(x=res)
@workflow
async def adv_p(models: list[int] = [1,2,3]) -> list[int]:
res = await sub_adv_p(models=models)
return res
What changes were proposed in this pull request?
Added an awaitable
attribute in Promise
and VoidPromise
How was this patch tested?
added unit tests on map_task and promises.
Setup process
Screenshots
Check all the applicable boxes
- [ ] I updated the documentation accordingly.
- [ ] All new and existing tests passed.
- [x] All commits are signed-off.