qdrant-client icon indicating copy to clipboard operation
qdrant-client copied to clipboard

can't parallelize upload_records() on airflow dag

Open raulcarlomagno opened this issue 2 years ago • 1 comments

Hi, qdrant_client v0.9.5, i am able to use upload_records() method without problems with parallel=4 argument. It does work even inside jupyter, but in airflow when i run a dag with a task of type PythonVirtualenvOperator using client.upload_records(), if i set parallel=4 i get the next error, the only way to run it inside airflow is to set parallel=1, but it takes a lot of time to run of course.

[2022-08-27 01:23:52,077] {process_utils.py:143} INFO - Traceback (most recent call last):
[2022-08-27 01:23:52,078] {process_utils.py:143} INFO -   File "/usr/local/lib/python3.8/multiprocessing/forkserver.py", line 280, in main
[2022-08-27 01:23:52,078] {process_utils.py:143} INFO -     code = _serve_one(child_r, fds,
[2022-08-27 01:23:52,078] {process_utils.py:143} INFO -   File "/usr/local/lib/python3.8/multiprocessing/forkserver.py", line 319, in _serve_one
[2022-08-27 01:23:52,078] {process_utils.py:143} INFO -     code = spawn._main(child_r, parent_sentinel)
[2022-08-27 01:23:52,078] {process_utils.py:143} INFO -   File "/usr/local/lib/python3.8/multiprocessing/spawn.py", line 125, in _main
[2022-08-27 01:23:52,078] {process_utils.py:143} INFO -     prepare(preparation_data)
[2022-08-27 01:23:52,079] {process_utils.py:143} INFO -   File "/usr/local/lib/python3.8/multiprocessing/spawn.py", line 236, in prepare
[2022-08-27 01:23:52,079] {process_utils.py:143} INFO -     _fixup_main_from_path(data['init_main_from_path'])
[2022-08-27 01:23:52,079] {process_utils.py:143} INFO -   File "/usr/local/lib/python3.8/multiprocessing/spawn.py", line 287, in _fixup_main_from_path
[2022-08-27 01:23:52,079] {process_utils.py:143} INFO -     main_content = runpy.run_path(main_path,
[2022-08-27 01:23:52,079] {process_utils.py:143} INFO -   File "/usr/local/lib/python3.8/runpy.py", line 265, in run_path
[2022-08-27 01:23:52,079] {process_utils.py:143} INFO -     return _run_module_code(code, init_globals, run_name,
[2022-08-27 01:23:52,079] {process_utils.py:143} INFO -   File "/usr/local/lib/python3.8/runpy.py", line 97, in _run_module_code
[2022-08-27 01:23:52,079] {process_utils.py:143} INFO -     _run_code(code, mod_globals, init_globals,
[2022-08-27 01:23:52,080] {process_utils.py:143} INFO -   File "/usr/local/lib/python3.8/runpy.py", line 87, in _run_code
[2022-08-27 01:23:52,080] {process_utils.py:143} INFO -     exec(code, run_globals)
[2022-08-27 01:23:52,080] {process_utils.py:143} INFO -   File "/tmp/venvvgl4j5dw/script.py", line 234, in <module>
[2022-08-27 01:23:52,080] {process_utils.py:143} INFO -     res = func_upload_records(*arg_dict["args"], **arg_dict["kwargs"])
[2022-08-27 01:23:52,080] {process_utils.py:143} INFO -   File "/tmp/venvvgl4j5dw/script.py", line 222, in func_upload_records
[2022-08-27 01:23:52,080] {process_utils.py:143} INFO -     qdrant_conn.upload_records(
[2022-08-27 01:23:52,080] {process_utils.py:143} INFO -   File "/tmp/venvvgl4j5dw/lib/python3.8/site-packages/qdrant_client/qdrant_client.py", line 1133, in upload_records
[2022-08-27 01:23:52,080] {process_utils.py:143} INFO -     self._upload_collection(batches_iterator, collection_name, parallel)
[2022-08-27 01:23:52,081] {process_utils.py:143} INFO -   File "/tmp/venvvgl4j5dw/lib/python3.8/site-packages/qdrant_client/qdrant_client.py", line 1110, in _upload_collection
[2022-08-27 01:23:52,081] {process_utils.py:143} INFO -     for _ in pool.unordered_map(batches_iterator, **updater_kwargs):
[2022-08-27 01:23:52,081] {process_utils.py:143} INFO -   File "/tmp/venvvgl4j5dw/lib/python3.8/site-packages/qdrant_client/parallel_processor.py", line 117, in unordered_map
[2022-08-27 01:23:52,081] {process_utils.py:143} INFO -     self.start(**kwargs)
[2022-08-27 01:23:52,081] {process_utils.py:143} INFO -   File "/tmp/venvvgl4j5dw/lib/python3.8/site-packages/qdrant_client/parallel_processor.py", line 112, in start
[2022-08-27 01:23:52,081] {process_utils.py:143} INFO -     process.start()
[2022-08-27 01:23:52,081] {process_utils.py:143} INFO -   File "/usr/local/lib/python3.8/multiprocessing/process.py", line 121, in start
[2022-08-27 01:23:52,081] {process_utils.py:143} INFO -     self._popen = self._Popen(self)
[2022-08-27 01:23:52,082] {process_utils.py:143} INFO -   File "/usr/local/lib/python3.8/multiprocessing/context.py", line 291, in _Popen
[2022-08-27 01:23:52,082] {process_utils.py:143} INFO -     return Popen(process_obj)
[2022-08-27 01:23:52,082] {process_utils.py:143} INFO -   File "/usr/local/lib/python3.8/multiprocessing/popen_forkserver.py", line 35, in __init__
[2022-08-27 01:23:52,082] {process_utils.py:143} INFO -     super().__init__(process_obj)
[2022-08-27 01:23:52,082] {process_utils.py:143} INFO -   File "/usr/local/lib/python3.8/multiprocessing/popen_fork.py", line 19, in __init__
[2022-08-27 01:23:52,082] {process_utils.py:143} INFO -     self._launch(process_obj)
[2022-08-27 01:23:52,082] {process_utils.py:143} INFO -   File "/usr/local/lib/python3.8/multiprocessing/popen_forkserver.py", line 42, in _launch
[2022-08-27 01:23:52,082] {process_utils.py:143} INFO -     prep_data = spawn.get_preparation_data(process_obj._name)
[2022-08-27 01:23:52,083] {process_utils.py:143} INFO -   File "/usr/local/lib/python3.8/multiprocessing/spawn.py", line 154, in get_preparation_data
[2022-08-27 01:23:52,083] {process_utils.py:143} INFO -     _check_not_importing_main()
[2022-08-27 01:23:52,083] {process_utils.py:143} INFO -   File "/usr/local/lib/python3.8/multiprocessing/spawn.py", line 134, in _check_not_importing_main
[2022-08-27 01:23:52,083] {process_utils.py:143} INFO -     raise RuntimeError('''
[2022-08-27 01:23:52,083] {process_utils.py:143} INFO - RuntimeError:
[2022-08-27 01:23:52,083] {process_utils.py:143} INFO -         An attempt has been made to start a new process before the
[2022-08-27 01:23:52,083] {process_utils.py:143} INFO -         current process has finished its bootstrapping phase.
[2022-08-27 01:23:52,083] {process_utils.py:143} INFO - 
[2022-08-27 01:23:52,083] {process_utils.py:143} INFO -         This probably means that you are not using fork to start your
[2022-08-27 01:23:52,084] {process_utils.py:143} INFO -         child processes and you have forgotten to use the proper idiom
[2022-08-27 01:23:52,084] {process_utils.py:143} INFO -         in the main module:
[2022-08-27 01:23:52,084] {process_utils.py:143} INFO - 
[2022-08-27 01:23:52,084] {process_utils.py:143} INFO -             if __name__ == '__main__':
[2022-08-27 01:23:52,084] {process_utils.py:143} INFO -                 freeze_support()
[2022-08-27 01:23:52,084] {process_utils.py:143} INFO -                 ...
[2022-08-27 01:23:52,084] {process_utils.py:143} INFO - 
[2022-08-27 01:23:52,084] {process_utils.py:143} INFO -         The "freeze_support()" line can be omitted if the program
[2022-08-27 01:23:52,084] {process_utils.py:143} INFO -         is not going to be frozen to produce an executable.

image

raulcarlomagno avatar Aug 29 '22 09:08 raulcarlomagno

hi guys @generall @e-ivkov

do you know why this could be happening? and if there is a way to fix it? maybe an airflow expert could light us

thank you

raulcarlomagno avatar Sep 15 '22 09:09 raulcarlomagno