dagster
dagster copied to clipboard
mlflow integration causes 429 error responses from Databricks when running multiple parallel jobs/ops
Dagster version
1.7.4
What's the issue?
Databricks mlfow has a max qps of 3 The Dagster mlflow integration makes ~3 different calls to the mlflow API whenever the resource initializes. This calls are redundant in the broader context of the job to fetch the mlflow run id and start the run.
When you run multiple jobs with multiple fanned-out ops, then you easily hit the qps. This causes the resource init to fail and thus the entire job to also fail.
What did you expect to happen?
Running jobs in parallel would succeed without issue. A proper fix would probably include better state handling so subsequent ops in a job don't need to query mlflow.
Some level of auto retry (either at the op level or the individual requests) would be ideal too
How to reproduce?
Create a job with multiple ops that uses the dagster-mlflow integration Use a sensor to kick of many of the jobs in a single run request See how some of the ops will fail in resource init and thus fail the entire job
Deployment type
Dagster Helm chart
Deployment details
No response
Additional information
Error message from the Dagster UI for a failed resource init (I replaced our DBr instance id with
dagster._core.errors.DagsterSubprocessError: During multiprocess execution errors occurred in child processes:
In process 188: dagster._core.errors.DagsterResourceFunctionError: Error executing resource_fn on ResourceDefinition mlflow
Stack Trace:
File "/usr/local/lib/python3.10/site-packages/dagster/_core/executor/child_process_executor.py", line 78, in _execute_command_in_child_process
for step_event in command.execute():
File "/usr/local/lib/python3.10/site-packages/dagster/_core/executor/multiprocess.py", line 97, in execute
yield from execute_plan_iterator(
File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/api.py", line 869, in __iter__
yield from self.execution_context_manager.prepare_context()
File "/usr/local/lib/python3.10/site-packages/dagster/_utils/__init__.py", line 511, in generate_setup_events
obj = next(self.generator)
File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/context_creation_job.py", line 236, in execution_context_event_generator
yield from resources_manager.generate_setup_events()
File "/usr/local/lib/python3.10/site-packages/dagster/_utils/__init__.py", line 511, in generate_setup_events
obj = next(self.generator)
File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/resources_init.py", line 254, in resource_initialization_event_generator
yield from _core_resource_initialization_event_generator(
File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/resources_init.py", line 218, in _core_resource_initialization_event_generator
raise dagster_user_error
File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/resources_init.py", line 183, in _core_resource_initialization_event_generator
for event in manager.generate_setup_events():
File "/usr/local/lib/python3.10/site-packages/dagster/_utils/__init__.py", line 511, in generate_setup_events
obj = next(self.generator)
File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/resources_init.py", line 342, in single_resource_event_generator
raise dagster_user_error
File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/resources_init.py", line 314, in single_resource_event_generator
with user_code_error_boundary(
File "/usr/local/lib/python3.10/contextlib.py", line 153, in __exit__
self.gen.throw(typ, value, traceback)
File "/usr/local/lib/python3.10/site-packages/dagster/_core/errors.py", line 297, in user_code_error_boundary
raise new_error from e
The above exception was caused by the following exception:
mlflow.exceptions.MlflowException: API request to https://<redacted id>.cloud.databricks.com/api/2.0/mlflow/runs/search failed with exception HTTPSConnectionPool(host='<redacted id>.cloud.databricks.com', port=443): Max retries exceeded with url: /api/2.0/mlflow/runs/search (Caused by ResponseError('too many 429 error responses'))
Stack Trace:
File "/usr/local/lib/python3.10/site-packages/dagster/_core/errors.py", line 287, in user_code_error_boundary
yield
File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/resources_init.py", line 332, in single_resource_event_generator
resource = next(resource_iter)
File "/usr/local/lib/python3.10/site-packages/dagster_mlflow/resources.py", line 280, in mlflow_tracking
mlf = MlFlow(context)
File "/usr/local/lib/python3.10/site-packages/dagster_mlflow/resources.py", line 94, in __init__
self._setup()
File "/usr/local/lib/python3.10/site-packages/dagster_mlflow/resources.py", line 102, in _setup
run_id = self._get_current_run_id()
File "/usr/local/lib/python3.10/site-packages/dagster_mlflow/resources.py", line 132, in _get_current_run_id
current_run_df = mlflow.search_runs(
File "/usr/local/lib/python3.10/site-packages/mlflow/tracking/fluent.py", line 1980, in search_runs
runs = get_results_from_paginated_fn(
File "/usr/local/lib/python3.10/site-packages/mlflow/utils/__init__.py", line 271, in get_results_from_paginated_fn
page_results = paginated_fn(max_results_per_page, next_page_token)
File "/usr/local/lib/python3.10/site-packages/mlflow/tracking/fluent.py", line 1971, in pagination_wrapper_func
return MlflowClient().search_runs(
File "/usr/local/lib/python3.10/site-packages/mlflow/tracking/client.py", line 2321, in search_runs
return self._tracking_client.search_runs(
File "/usr/local/lib/python3.10/site-packages/mlflow/tracking/_tracking_service/client.py", line 674, in search_runs
return self.store.search_runs(
File "/usr/local/lib/python3.10/site-packages/mlflow/store/tracking/abstract_store.py", line 391, in search_runs
runs, token = self._search_runs(
File "/usr/local/lib/python3.10/site-packages/mlflow/store/tracking/rest_store.py", line 304, in _search_runs
response_proto = self._call_endpoint(SearchRuns, req_body)
File "/usr/local/lib/python3.10/site-packages/mlflow/store/tracking/rest_store.py", line 60, in _call_endpoint
return call_endpoint(self.get_host_creds(), endpoint, method, json_body, response_proto)
File "/usr/local/lib/python3.10/site-packages/mlflow/utils/rest_utils.py", line 289, in call_endpoint
response = http_request(**call_kwargs)
File "/usr/local/lib/python3.10/site-packages/mlflow/utils/rest_utils.py", line 151, in http_request
raise MlflowException(f"API request to {url} failed with exception {e}")
The above exception occurred during handling of the following exception:
requests.exceptions.RetryError: HTTPSConnectionPool(host='<redacted id>.cloud.databricks.com', port=443): Max retries exceeded with url: /api/2.0/mlflow/runs/search (Caused by ResponseError('too many 429 error responses'))
Stack Trace:
File "/usr/local/lib/python3.10/site-packages/mlflow/utils/rest_utils.py", line 128, in http_request
return _get_http_response_with_retries(
File "/usr/local/lib/python3.10/site-packages/mlflow/utils/request_utils.py", line 228, in _get_http_response_with_retries
return session.request(method, url, allow_redirects=allow_redirects, **kwargs)
File "/usr/local/lib/python3.10/site-packages/requests/sessions.py", line 589, in request
resp = self.send(prep, **send_kwargs)
File "/usr/local/lib/python3.10/site-packages/requests/sessions.py", line 703, in send
r = adapter.send(request, **kwargs)
File "/usr/local/lib/python3.10/site-packages/requests/adapters.py", line 510, in send
raise RetryError(e, request=request)
The above exception occurred during handling of the following exception:
urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='<redacted id>.cloud.databricks.com', port=443): Max retries exceeded with url: /api/2.0/mlflow/runs/search (Caused by ResponseError('too many 429 error responses'))
Stack Trace:
File "/usr/local/lib/python3.10/site-packages/requests/adapters.py", line 486, in send
resp = conn.urlopen(
File "/usr/local/lib/python3.10/site-packages/urllib3/connectionpool.py", line 946, in urlopen
return self.urlopen(
File "/usr/local/lib/python3.10/site-packages/urllib3/connectionpool.py", line 946, in urlopen
return self.urlopen(
File "/usr/local/lib/python3.10/site-packages/urllib3/connectionpool.py", line 946, in urlopen
return self.urlopen(
[Previous line repeated 2 more times]
File "/usr/local/lib/python3.10/site-packages/urllib3/connectionpool.py", line 936, in urlopen
retries = retries.increment(method, url, response=response, _pool=self)
File "/usr/local/lib/python3.10/site-packages/urllib3/util/retry.py", line 515, in increment
raise MaxRetryError(_pool, url, reason) from reason # type: ignore[arg-type]
The above exception was caused by the following exception:
urllib3.exceptions.ResponseError: too many 429 error responses
In process 196: dagster._core.errors.DagsterResourceFunctionError: Error executing resource_fn on ResourceDefinition mlflow
Stack Trace:
File "/usr/local/lib/python3.10/site-packages/dagster/_core/executor/child_process_executor.py", line 78, in _execute_command_in_child_process
for step_event in command.execute():
File "/usr/local/lib/python3.10/site-packages/dagster/_core/executor/multiprocess.py", line 97, in execute
yield from execute_plan_iterator(
File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/api.py", line 869, in __iter__
yield from self.execution_context_manager.prepare_context()
File "/usr/local/lib/python3.10/site-packages/dagster/_utils/__init__.py", line 511, in generate_setup_events
obj = next(self.generator)
File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/context_creation_job.py", line 236, in execution_context_event_generator
yield from resources_manager.generate_setup_events()
File "/usr/local/lib/python3.10/site-packages/dagster/_utils/__init__.py", line 511, in generate_setup_events
obj = next(self.generator)
File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/resources_init.py", line 254, in resource_initialization_event_generator
yield from _core_resource_initialization_event_generator(
File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/resources_init.py", line 218, in _core_resource_initialization_event_generator
raise dagster_user_error
File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/resources_init.py", line 183, in _core_resource_initialization_event_generator
for event in manager.generate_setup_events():
File "/usr/local/lib/python3.10/site-packages/dagster/_utils/__init__.py", line 511, in generate_setup_events
obj = next(self.generator)
File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/resources_init.py", line 342, in single_resource_event_generator
raise dagster_user_error
File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/resources_init.py", line 314, in single_resource_event_generator
with user_code_error_boundary(
File "/usr/local/lib/python3.10/contextlib.py", line 153, in __exit__
self.gen.throw(typ, value, traceback)
File "/usr/local/lib/python3.10/site-packages/dagster/_core/errors.py", line 297, in user_code_error_boundary
raise new_error from e
The above exception was caused by the following exception:
mlflow.exceptions.MlflowException: API request to https://<redacted id>.cloud.databricks.com/api/2.0/mlflow/runs/search failed with exception HTTPSConnectionPool(host='<redacted id>.cloud.databricks.com', port=443): Max retries exceeded with url: /api/2.0/mlflow/runs/search (Caused by ResponseError('too many 429 error responses'))
Stack Trace:
File "/usr/local/lib/python3.10/site-packages/dagster/_core/errors.py", line 287, in user_code_error_boundary
yield
File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/resources_init.py", line 332, in single_resource_event_generator
resource = next(resource_iter)
File "/usr/local/lib/python3.10/site-packages/dagster_mlflow/resources.py", line 280, in mlflow_tracking
mlf = MlFlow(context)
File "/usr/local/lib/python3.10/site-packages/dagster_mlflow/resources.py", line 94, in __init__
self._setup()
File "/usr/local/lib/python3.10/site-packages/dagster_mlflow/resources.py", line 102, in _setup
run_id = self._get_current_run_id()
File "/usr/local/lib/python3.10/site-packages/dagster_mlflow/resources.py", line 132, in _get_current_run_id
current_run_df = mlflow.search_runs(
File "/usr/local/lib/python3.10/site-packages/mlflow/tracking/fluent.py", line 1980, in search_runs
runs = get_results_from_paginated_fn(
File "/usr/local/lib/python3.10/site-packages/mlflow/utils/__init__.py", line 271, in get_results_from_paginated_fn
page_results = paginated_fn(max_results_per_page, next_page_token)
File "/usr/local/lib/python3.10/site-packages/mlflow/tracking/fluent.py", line 1971, in pagination_wrapper_func
return MlflowClient().search_runs(
File "/usr/local/lib/python3.10/site-packages/mlflow/tracking/client.py", line 2321, in search_runs
return self._tracking_client.search_runs(
File "/usr/local/lib/python3.10/site-packages/mlflow/tracking/_tracking_service/client.py", line 674, in search_runs
return self.store.search_runs(
File "/usr/local/lib/python3.10/site-packages/mlflow/store/tracking/abstract_store.py", line 391, in search_runs
runs, token = self._search_runs(
File "/usr/local/lib/python3.10/site-packages/mlflow/store/tracking/rest_store.py", line 304, in _search_runs
response_proto = self._call_endpoint(SearchRuns, req_body)
File "/usr/local/lib/python3.10/site-packages/mlflow/store/tracking/rest_store.py", line 60, in _call_endpoint
return call_endpoint(self.get_host_creds(), endpoint, method, json_body, response_proto)
File "/usr/local/lib/python3.10/site-packages/mlflow/utils/rest_utils.py", line 289, in call_endpoint
response = http_request(**call_kwargs)
File "/usr/local/lib/python3.10/site-packages/mlflow/utils/rest_utils.py", line 151, in http_request
raise MlflowException(f"API request to {url} failed with exception {e}")
The above exception occurred during handling of the following exception:
requests.exceptions.RetryError: HTTPSConnectionPool(host='<redacted id>.cloud.databricks.com', port=443): Max retries exceeded with url: /api/2.0/mlflow/runs/search (Caused by ResponseError('too many 429 error responses'))
Stack Trace:
File "/usr/local/lib/python3.10/site-packages/mlflow/utils/rest_utils.py", line 128, in http_request
return _get_http_response_with_retries(
File "/usr/local/lib/python3.10/site-packages/mlflow/utils/request_utils.py", line 228, in _get_http_response_with_retries
return session.request(method, url, allow_redirects=allow_redirects, **kwargs)
File "/usr/local/lib/python3.10/site-packages/requests/sessions.py", line 589, in request
resp = self.send(prep, **send_kwargs)
File "/usr/local/lib/python3.10/site-packages/requests/sessions.py", line 703, in send
r = adapter.send(request, **kwargs)
File "/usr/local/lib/python3.10/site-packages/requests/adapters.py", line 510, in send
raise RetryError(e, request=request)
The above exception occurred during handling of the following exception:
urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='<redacted id>.cloud.databricks.com', port=443): Max retries exceeded with url: /api/2.0/mlflow/runs/search (Caused by ResponseError('too many 429 error responses'))
Stack Trace:
File "/usr/local/lib/python3.10/site-packages/requests/adapters.py", line 486, in send
resp = conn.urlopen(
File "/usr/local/lib/python3.10/site-packages/urllib3/connectionpool.py", line 946, in urlopen
return self.urlopen(
File "/usr/local/lib/python3.10/site-packages/urllib3/connectionpool.py", line 946, in urlopen
return self.urlopen(
File "/usr/local/lib/python3.10/site-packages/urllib3/connectionpool.py", line 946, in urlopen
return self.urlopen(
[Previous line repeated 2 more times]
File "/usr/local/lib/python3.10/site-packages/urllib3/connectionpool.py", line 936, in urlopen
retries = retries.increment(method, url, response=response, _pool=self)
File "/usr/local/lib/python3.10/site-packages/urllib3/util/retry.py", line 515, in increment
raise MaxRetryError(_pool, url, reason) from reason # type: ignore[arg-type]
The above exception was caused by the following exception:
urllib3.exceptions.ResponseError: too many 429 error responses
In process 197: dagster._core.errors.DagsterResourceFunctionError: Error executing resource_fn on ResourceDefinition mlflow
Stack Trace:
File "/usr/local/lib/python3.10/site-packages/dagster/_core/executor/child_process_executor.py", line 78, in _execute_command_in_child_process
for step_event in command.execute():
File "/usr/local/lib/python3.10/site-packages/dagster/_core/executor/multiprocess.py", line 97, in execute
yield from execute_plan_iterator(
File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/api.py", line 869, in __iter__
yield from self.execution_context_manager.prepare_context()
File "/usr/local/lib/python3.10/site-packages/dagster/_utils/__init__.py", line 511, in generate_setup_events
obj = next(self.generator)
File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/context_creation_job.py", line 236, in execution_context_event_generator
yield from resources_manager.generate_setup_events()
File "/usr/local/lib/python3.10/site-packages/dagster/_utils/__init__.py", line 511, in generate_setup_events
obj = next(self.generator)
File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/resources_init.py", line 254, in resource_initialization_event_generator
yield from _core_resource_initialization_event_generator(
File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/resources_init.py", line 218, in _core_resource_initialization_event_generator
raise dagster_user_error
File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/resources_init.py", line 183, in _core_resource_initialization_event_generator
for event in manager.generate_setup_events():
File "/usr/local/lib/python3.10/site-packages/dagster/_utils/__init__.py", line 511, in generate_setup_events
obj = next(self.generator)
File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/resources_init.py", line 342, in single_resource_event_generator
raise dagster_user_error
File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/resources_init.py", line 314, in single_resource_event_generator
with user_code_error_boundary(
File "/usr/local/lib/python3.10/contextlib.py", line 153, in __exit__
self.gen.throw(typ, value, traceback)
File "/usr/local/lib/python3.10/site-packages/dagster/_core/errors.py", line 297, in user_code_error_boundary
raise new_error from e
The above exception was caused by the following exception:
mlflow.exceptions.MlflowException: API request to https://<redacted id>.cloud.databricks.com/api/2.0/mlflow/runs/search failed with exception HTTPSConnectionPool(host='<redacted id>.cloud.databricks.com', port=443): Max retries exceeded with url: /api/2.0/mlflow/runs/search (Caused by ResponseError('too many 429 error responses'))
Stack Trace:
File "/usr/local/lib/python3.10/site-packages/dagster/_core/errors.py", line 287, in user_code_error_boundary
yield
File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/resources_init.py", line 332, in single_resource_event_generator
resource = next(resource_iter)
File "/usr/local/lib/python3.10/site-packages/dagster_mlflow/resources.py", line 280, in mlflow_tracking
mlf = MlFlow(context)
File "/usr/local/lib/python3.10/site-packages/dagster_mlflow/resources.py", line 94, in __init__
self._setup()
File "/usr/local/lib/python3.10/site-packages/dagster_mlflow/resources.py", line 102, in _setup
run_id = self._get_current_run_id()
File "/usr/local/lib/python3.10/site-packages/dagster_mlflow/resources.py", line 132, in _get_current_run_id
current_run_df = mlflow.search_runs(
File "/usr/local/lib/python3.10/site-packages/mlflow/tracking/fluent.py", line 1980, in search_runs
runs = get_results_from_paginated_fn(
File "/usr/local/lib/python3.10/site-packages/mlflow/utils/__init__.py", line 271, in get_results_from_paginated_fn
page_results = paginated_fn(max_results_per_page, next_page_token)
File "/usr/local/lib/python3.10/site-packages/mlflow/tracking/fluent.py", line 1971, in pagination_wrapper_func
return MlflowClient().search_runs(
File "/usr/local/lib/python3.10/site-packages/mlflow/tracking/client.py", line 2321, in search_runs
return self._tracking_client.search_runs(
File "/usr/local/lib/python3.10/site-packages/mlflow/tracking/_tracking_service/client.py", line 674, in search_runs
return self.store.search_runs(
File "/usr/local/lib/python3.10/site-packages/mlflow/store/tracking/abstract_store.py", line 391, in search_runs
runs, token = self._search_runs(
File "/usr/local/lib/python3.10/site-packages/mlflow/store/tracking/rest_store.py", line 304, in _search_runs
response_proto = self._call_endpoint(SearchRuns, req_body)
File "/usr/local/lib/python3.10/site-packages/mlflow/store/tracking/rest_store.py", line 60, in _call_endpoint
return call_endpoint(self.get_host_creds(), endpoint, method, json_body, response_proto)
File "/usr/local/lib/python3.10/site-packages/mlflow/utils/rest_utils.py", line 289, in call_endpoint
response = http_request(**call_kwargs)
File "/usr/local/lib/python3.10/site-packages/mlflow/utils/rest_utils.py", line 151, in http_request
raise MlflowException(f"API request to {url} failed with exception {e}")
The above exception occurred during handling of the following exception:
requests.exceptions.RetryError: HTTPSConnectionPool(host='<redacted id>.cloud.databricks.com', port=443): Max retries exceeded with url: /api/2.0/mlflow/runs/search (Caused by ResponseError('too many 429 error responses'))
Stack Trace:
File "/usr/local/lib/python3.10/site-packages/mlflow/utils/rest_utils.py", line 128, in http_request
return _get_http_response_with_retries(
File "/usr/local/lib/python3.10/site-packages/mlflow/utils/request_utils.py", line 228, in _get_http_response_with_retries
return session.request(method, url, allow_redirects=allow_redirects, **kwargs)
File "/usr/local/lib/python3.10/site-packages/requests/sessions.py", line 589, in request
resp = self.send(prep, **send_kwargs)
File "/usr/local/lib/python3.10/site-packages/requests/sessions.py", line 703, in send
r = adapter.send(request, **kwargs)
File "/usr/local/lib/python3.10/site-packages/requests/adapters.py", line 510, in send
raise RetryError(e, request=request)
The above exception occurred during handling of the following exception:
urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='<redacted id>.cloud.databricks.com', port=443): Max retries exceeded with url: /api/2.0/mlflow/runs/search (Caused by ResponseError('too many 429 error responses'))
Stack Trace:
File "/usr/local/lib/python3.10/site-packages/requests/adapters.py", line 486, in send
resp = conn.urlopen(
File "/usr/local/lib/python3.10/site-packages/urllib3/connectionpool.py", line 946, in urlopen
return self.urlopen(
File "/usr/local/lib/python3.10/site-packages/urllib3/connectionpool.py", line 946, in urlopen
return self.urlopen(
File "/usr/local/lib/python3.10/site-packages/urllib3/connectionpool.py", line 946, in urlopen
return self.urlopen(
[Previous line repeated 2 more times]
File "/usr/local/lib/python3.10/site-packages/urllib3/connectionpool.py", line 936, in urlopen
retries = retries.increment(method, url, response=response, _pool=self)
File "/usr/local/lib/python3.10/site-packages/urllib3/util/retry.py", line 515, in increment
raise MaxRetryError(_pool, url, reason) from reason # type: ignore[arg-type]
The above exception was caused by the following exception:
urllib3.exceptions.ResponseError: too many 429 error responses
File "/usr/local/lib/python3.10/site-packages/dagster/_core/execution/api.py", line 760, in job_execution_iterator
for event in job_context.executor.execute(job_context, execution_plan):
File "/usr/local/lib/python3.10/site-packages/dagster/_core/executor/multiprocess.py", line 311, in execute
raise DagsterSubprocessError(
Message from the maintainers
Impacted by this issue? Give it a 👍! We factor engagement into prioritization.