prefect
prefect copied to clipboard
Time between all tasks finished and flow finished depends on size of task outputs
First check
- [X] I added a descriptive title to this issue.
- [X] I used the GitHub search to find a similar issue and didn't find it.
- [X] I searched the Prefect documentation for this issue.
- [X] I checked that this issue is related to Prefect and not one of its dependencies.
Bug summary
According to the docs https://docs.prefect.io/concepts/flows/#final-state-determination, the final state of the flow is determined by the states of the tasks (if the flow returns no value). It would be expected that the flow would finish shortly after the logs show all tasks as completed, but this is not the case if tasks return large objects. The same problem does not occure on Prefect 1, if it is expected the docs should reflect this somehow.
Reproduction
import pandas as pd
import numpy as np
from prefect import task, flow
@task
def big_dataframe():
df = pd.DataFrame(np.ones((1000000, 200)))
return df
@task
def do_something_with_dataframe(df):
print(df.shape)
return 1
@flow
def test_flow(slow_return=True):
dataframe = big_dataframe()
final_task = do_something_with_dataframe(dataframe)
if not slow_return:
return final_task
if __name__ == "__main__":
test_flow(slow_return=True)
test_flow(slow_return=False)
Error
Note the 40 second delay between last task completing and flow completing for the case of running slow_return=True
15:11:51.379 | INFO | prefect.engine - Created flow run 'logical-impala' for flow 'test-flow'
15:11:52.554 | INFO | Flow run 'logical-impala' - Created task run 'big_dataframe-4ca639b6-0' for task 'big_dataframe'
15:11:52.555 | INFO | Flow run 'logical-impala' - Executing 'big_dataframe-4ca639b6-0' immediately...
15:12:15.958 | INFO | Task run 'big_dataframe-4ca639b6-0' - Finished in state Completed()
15:12:16.127 | INFO | Flow run 'logical-impala' - Created task run 'do_something_with_dataframe-221255df-0' for task 'do_something_with_dataframe'
15:12:16.128 | INFO | Flow run 'logical-impala' - Executing 'do_something_with_dataframe-221255df-0' immediately...
(1000000, 200)
15:12:16.727 | INFO | Task run 'do_something_with_dataframe-221255df-0' - Finished in state Completed()
15:12:53.728 | INFO | Flow run 'logical-impala' - Finished in state Completed('All states completed.')
Versions
2.4.0
Additional context
In my case this introduces many issues (when running with Docker at least):
- The docker container exists before the flow exists, flow gets stuck in running.
- Subflows show up as completed, but the calling flow does not complete.
Possibly related: seeing inconsistent cost of wrapping functions in flow/task. In the example below, the flow that returns a pandas df finishes in ~1s, while the one that returns a numpy ndarray takes ~20s (I also checked with larger data, the problem gets worse).
Additionally, if you make the M
ndarray only 1 column wide and twice as long, the performance gap again disappears.
from prefect import task, flow
import numpy as np
import pandas as pd
def long_matrix():
M = np.random.normal(size=[10_000_000, 2])
return M
def df_long_matrix():
return pd.DataFrame(long_matrix())
def np_long_matrix():
return long_matrix()
@flow
def df_flow():
return df_long_matrix()
@flow
def np_flow():
return np_long_matrix()
if __name__ == "__main__":
df_flow()
np_flow()
Could be worth noting in docs somewhere that it's advisable to return pandas dataframes instead of numpy types? I was experiencing big performance issues just from wrapping some of my functions in @task
but when I made the functions return a dataframe instead of numpy, wrapping with @task
ceased to cause issues.
Thanks for the additional details and reproduction! I'm pretty confused about there being any overhead here, these objects should all be safely shared in memory. There must be something weird going on with async/threads.
This appears to be resolved in the latest release — it returns immediately :)