qdrant-client
qdrant-client copied to clipboard
can't parallelize upload_records() on airflow dag
Hi, qdrant_client v0.9.5, i am able to use upload_records() method without problems with parallel=4 argument. It does work even inside jupyter, but in airflow when i run a dag with a task of type PythonVirtualenvOperator using client.upload_records(), if i set parallel=4 i get the next error, the only way to run it inside airflow is to set parallel=1, but it takes a lot of time to run of course.
[2022-08-27 01:23:52,077] {process_utils.py:143} INFO - Traceback (most recent call last):
[2022-08-27 01:23:52,078] {process_utils.py:143} INFO - File "/usr/local/lib/python3.8/multiprocessing/forkserver.py", line 280, in main
[2022-08-27 01:23:52,078] {process_utils.py:143} INFO - code = _serve_one(child_r, fds,
[2022-08-27 01:23:52,078] {process_utils.py:143} INFO - File "/usr/local/lib/python3.8/multiprocessing/forkserver.py", line 319, in _serve_one
[2022-08-27 01:23:52,078] {process_utils.py:143} INFO - code = spawn._main(child_r, parent_sentinel)
[2022-08-27 01:23:52,078] {process_utils.py:143} INFO - File "/usr/local/lib/python3.8/multiprocessing/spawn.py", line 125, in _main
[2022-08-27 01:23:52,078] {process_utils.py:143} INFO - prepare(preparation_data)
[2022-08-27 01:23:52,079] {process_utils.py:143} INFO - File "/usr/local/lib/python3.8/multiprocessing/spawn.py", line 236, in prepare
[2022-08-27 01:23:52,079] {process_utils.py:143} INFO - _fixup_main_from_path(data['init_main_from_path'])
[2022-08-27 01:23:52,079] {process_utils.py:143} INFO - File "/usr/local/lib/python3.8/multiprocessing/spawn.py", line 287, in _fixup_main_from_path
[2022-08-27 01:23:52,079] {process_utils.py:143} INFO - main_content = runpy.run_path(main_path,
[2022-08-27 01:23:52,079] {process_utils.py:143} INFO - File "/usr/local/lib/python3.8/runpy.py", line 265, in run_path
[2022-08-27 01:23:52,079] {process_utils.py:143} INFO - return _run_module_code(code, init_globals, run_name,
[2022-08-27 01:23:52,079] {process_utils.py:143} INFO - File "/usr/local/lib/python3.8/runpy.py", line 97, in _run_module_code
[2022-08-27 01:23:52,079] {process_utils.py:143} INFO - _run_code(code, mod_globals, init_globals,
[2022-08-27 01:23:52,080] {process_utils.py:143} INFO - File "/usr/local/lib/python3.8/runpy.py", line 87, in _run_code
[2022-08-27 01:23:52,080] {process_utils.py:143} INFO - exec(code, run_globals)
[2022-08-27 01:23:52,080] {process_utils.py:143} INFO - File "/tmp/venvvgl4j5dw/script.py", line 234, in <module>
[2022-08-27 01:23:52,080] {process_utils.py:143} INFO - res = func_upload_records(*arg_dict["args"], **arg_dict["kwargs"])
[2022-08-27 01:23:52,080] {process_utils.py:143} INFO - File "/tmp/venvvgl4j5dw/script.py", line 222, in func_upload_records
[2022-08-27 01:23:52,080] {process_utils.py:143} INFO - qdrant_conn.upload_records(
[2022-08-27 01:23:52,080] {process_utils.py:143} INFO - File "/tmp/venvvgl4j5dw/lib/python3.8/site-packages/qdrant_client/qdrant_client.py", line 1133, in upload_records
[2022-08-27 01:23:52,080] {process_utils.py:143} INFO - self._upload_collection(batches_iterator, collection_name, parallel)
[2022-08-27 01:23:52,081] {process_utils.py:143} INFO - File "/tmp/venvvgl4j5dw/lib/python3.8/site-packages/qdrant_client/qdrant_client.py", line 1110, in _upload_collection
[2022-08-27 01:23:52,081] {process_utils.py:143} INFO - for _ in pool.unordered_map(batches_iterator, **updater_kwargs):
[2022-08-27 01:23:52,081] {process_utils.py:143} INFO - File "/tmp/venvvgl4j5dw/lib/python3.8/site-packages/qdrant_client/parallel_processor.py", line 117, in unordered_map
[2022-08-27 01:23:52,081] {process_utils.py:143} INFO - self.start(**kwargs)
[2022-08-27 01:23:52,081] {process_utils.py:143} INFO - File "/tmp/venvvgl4j5dw/lib/python3.8/site-packages/qdrant_client/parallel_processor.py", line 112, in start
[2022-08-27 01:23:52,081] {process_utils.py:143} INFO - process.start()
[2022-08-27 01:23:52,081] {process_utils.py:143} INFO - File "/usr/local/lib/python3.8/multiprocessing/process.py", line 121, in start
[2022-08-27 01:23:52,081] {process_utils.py:143} INFO - self._popen = self._Popen(self)
[2022-08-27 01:23:52,082] {process_utils.py:143} INFO - File "/usr/local/lib/python3.8/multiprocessing/context.py", line 291, in _Popen
[2022-08-27 01:23:52,082] {process_utils.py:143} INFO - return Popen(process_obj)
[2022-08-27 01:23:52,082] {process_utils.py:143} INFO - File "/usr/local/lib/python3.8/multiprocessing/popen_forkserver.py", line 35, in __init__
[2022-08-27 01:23:52,082] {process_utils.py:143} INFO - super().__init__(process_obj)
[2022-08-27 01:23:52,082] {process_utils.py:143} INFO - File "/usr/local/lib/python3.8/multiprocessing/popen_fork.py", line 19, in __init__
[2022-08-27 01:23:52,082] {process_utils.py:143} INFO - self._launch(process_obj)
[2022-08-27 01:23:52,082] {process_utils.py:143} INFO - File "/usr/local/lib/python3.8/multiprocessing/popen_forkserver.py", line 42, in _launch
[2022-08-27 01:23:52,082] {process_utils.py:143} INFO - prep_data = spawn.get_preparation_data(process_obj._name)
[2022-08-27 01:23:52,083] {process_utils.py:143} INFO - File "/usr/local/lib/python3.8/multiprocessing/spawn.py", line 154, in get_preparation_data
[2022-08-27 01:23:52,083] {process_utils.py:143} INFO - _check_not_importing_main()
[2022-08-27 01:23:52,083] {process_utils.py:143} INFO - File "/usr/local/lib/python3.8/multiprocessing/spawn.py", line 134, in _check_not_importing_main
[2022-08-27 01:23:52,083] {process_utils.py:143} INFO - raise RuntimeError('''
[2022-08-27 01:23:52,083] {process_utils.py:143} INFO - RuntimeError:
[2022-08-27 01:23:52,083] {process_utils.py:143} INFO - An attempt has been made to start a new process before the
[2022-08-27 01:23:52,083] {process_utils.py:143} INFO - current process has finished its bootstrapping phase.
[2022-08-27 01:23:52,083] {process_utils.py:143} INFO -
[2022-08-27 01:23:52,083] {process_utils.py:143} INFO - This probably means that you are not using fork to start your
[2022-08-27 01:23:52,084] {process_utils.py:143} INFO - child processes and you have forgotten to use the proper idiom
[2022-08-27 01:23:52,084] {process_utils.py:143} INFO - in the main module:
[2022-08-27 01:23:52,084] {process_utils.py:143} INFO -
[2022-08-27 01:23:52,084] {process_utils.py:143} INFO - if __name__ == '__main__':
[2022-08-27 01:23:52,084] {process_utils.py:143} INFO - freeze_support()
[2022-08-27 01:23:52,084] {process_utils.py:143} INFO - ...
[2022-08-27 01:23:52,084] {process_utils.py:143} INFO -
[2022-08-27 01:23:52,084] {process_utils.py:143} INFO - The "freeze_support()" line can be omitted if the program
[2022-08-27 01:23:52,084] {process_utils.py:143} INFO - is not going to be frozen to produce an executable.
hi guys @generall @e-ivkov
do you know why this could be happening? and if there is a way to fix it? maybe an airflow expert could light us
thank you