Rare Block on GBQ write:
Environment details
- OS type and version: macOS, Ventura 13.5.2.
- Python version:
python --version3.10.12 - pip version:
pip --versionpip 23.2.1 from /Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/pip (python 3.10) pandas-gbqversion:pip show pandas-gbqName: pandas-gbq Version: 0.19.2 Summary: Google BigQuery connector for pandas Home-page: https://github.com/googleapis/python-bigquery-pandas Author: pandas-gbq authors Author-email: [email protected] License: BSD-3-Clause Location: /Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages Requires: db-dtypes, google-api-core, google-auth, google-auth-oauthlib, google-cloud-bigquery, google-cloud-bigquery-storage, numpy, pandas, pyarrow, pydata-google-auth, setuptools Required-by: EMS
Steps to reproduce
- Run lots of jobs, write to GBQ 100,000+ times. In this case, the write to GBQ has succeeded for many thousands of rows.
- twice it has stalled and needed to be interrupted.
Code example
The DB is already set up in this method and the credentials are not None. The stall happens in the df.to_gbq() call. No exception is thrown to be caught.
def _push_to_database(self):
df = pd.concat(self.results)
# Store remotely for flexibility.
if self.remote is not None:
try:
with self.remote.connect() as rdb:
df.to_sql(self.table_name, rdb, if_exists='append', method='multi')
except SQLAlchemyError as e:
logging.error("%s", e)
if self.credentials is not None:
try:
df.to_gbq(f'EMS.{self.table_name}',
if_exists='append',
progress_bar=False,
credentials=self.credentials)
except pandas_gbq.exceptions.GenericGBQException as e:
logging.error("%s", e)
elif self.project_id is not None:
try:
df.to_gbq(f'EMS.{self.table_name}',
if_exists='append',
progress_bar=False,
project_id=self.project_id)
except pandas_gbq.exceptions.GenericGBQException as e:
logging.error("%s", e)
# Store locally for durability.
with self.local.connect() as ldb:
df.to_sql(self.table_name, ldb, if_exists='append', method='multi')
self.results = []
Stack trace
Traceback (most recent call last):
File "/Users/awd/Projects/MultiverseExperiments/AMP_matrix_recovery/run_amp_normal_bayes.py", line 494, in <module>
do_coiled_experiment('exp_dicts/AMP_matrix_recovery_normal_bayes_3_2.json')
File "/Users/awd/Projects/MultiverseExperiments/AMP_matrix_recovery/run_amp_normal_bayes.py", line 449, in do_coiled_experiment
do_on_cluster(exp, run_amp_instance, client, credentials=get_gbq_credentials())
File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/EMS/manager.py", line 412, in do_on_cluster
db.push_batch()
File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/EMS/manager.py", line 117, in push_batch
self._push_to_database()
File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/EMS/manager.py", line 73, in _push_to_database
df.to_gbq(f'EMS.{self.table_name}',
File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/pandas/core/frame.py", line 2161, in to_gbq
gbq.to_gbq(
File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/pandas/io/gbq.py", line 223, in to_gbq
pandas_gbq.to_gbq(
File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/pandas_gbq/gbq.py", line 1220, in to_gbq
connector.load_data(
File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/pandas_gbq/gbq.py", line 602, in load_data
chunks = load.load_chunks(
File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/pandas_gbq/load.py", line 243, in load_chunks
load_parquet(
File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/pandas_gbq/load.py", line 137, in load_parquet
).result()
File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/google/cloud/bigquery/job/base.py", line 922, in result
return super(_AsyncJob, self).result(timeout=timeout, **kwargs)
File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/google/api_core/future/polling.py", line 256, in result
self._blocking_poll(timeout=timeout, retry=retry, polling=polling)
File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/google/api_core/future/polling.py", line 137, in _blocking_poll
polling(self._done_or_raise)(retry=retry)
File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/google/api_core/retry.py", line 349, in retry_wrapped_func
return retry_target(
File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/google/api_core/retry.py", line 191, in retry_target
return target()
File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/google/api_core/future/polling.py", line 119, in _done_or_raise
if not self.done(retry=retry):
File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/google/cloud/bigquery/job/base.py", line 889, in done
self.reload(retry=retry, timeout=timeout)
File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/google/cloud/bigquery/job/base.py", line 781, in reload
api_response = client._call_api(
File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/google/cloud/bigquery/client.py", line 816, in _call_api
return call()
File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/google/cloud/_http/__init__.py", line 482, in api_request
response = self._make_request(
File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/google/cloud/_http/__init__.py", line 341, in _make_request
return self._do_request(
File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/google/cloud/_http/__init__.py", line 379, in _do_request
return self.http.request(
File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/google/auth/transport/requests.py", line 542, in request
response = super(AuthorizedSession, self).request(
File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/requests/sessions.py", line 589, in request
resp = self.send(prep, **send_kwargs)
File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/requests/sessions.py", line 703, in send
r = adapter.send(request, **kwargs)
File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/requests/adapters.py", line 486, in send
resp = conn.urlopen(
File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/urllib3/connectionpool.py", line 703, in urlopen
httplib_response = self._make_request(
File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/urllib3/connectionpool.py", line 449, in _make_request
six.raise_from(e, None)
File "<string>", line 3, in raise_from
File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/site-packages/urllib3/connectionpool.py", line 444, in _make_request
httplib_response = conn.getresponse()
File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/http/client.py", line 1375, in getresponse
response.begin()
File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/http/client.py", line 318, in begin
version, status, reason = self._read_status()
File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/http/client.py", line 279, in _read_status
line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/socket.py", line 705, in readinto
return self._sock.recv_into(b)
File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/ssl.py", line 1274, in recv_into
return self.read(nbytes, buffer)
File "/Users/awd/mambaforge/envs/AMPMatrixRecovery/lib/python3.10/ssl.py", line 1130, in read
return self._sslobj.read(len, buffer)
Making sure to follow these steps will guarantee the quickest resolution possible.
Thanks!
Hi @adonoho, thank you for reporting this issue. I tried, but haven't been able to reproduce it. I suspect this has something to do with your network being unstable. Maybe you can add a timeout for the df.to_gbq() function and retry if it stalls? Also, you said "Run lots of jobs, write to GBQ 100,000+ times", but I hit quota error just after 1500 insertions. How did you bypass this?
@Linchin This is a program that collects values from a compute cluster. Each function returns the single row of a data frame. They are concatenated and then written to GBQ. I've had jobs create 5M rows in 4K row chunks, i.e. every minute or 4k rows whichever occurs sooner. I will explore the timeout function to retry. I am happy to instrument my code however you might wish to help find this problem.
BTW, df.to_gbq() doesn't appear to support a timeout parameter. (I am new to Google APIs. Please forgive me if it is documented in an non-obvious place to me.)
Indeed df.to_gbq() doesn't have a timeout option. I'm more thinking about using Python to do it, such as the examples here.
Presumably, the underlying Google API calls support timeouts? Is a better answer to surface exceptions that involve timeouts? (I followed the link you mentioned and because they say they don't think it plays well with threads will rule it out. FTR, this is a DASK app that is gathering data via Tornado and presenting it to the single threaded __main__ code. I am really quite happy to implement timeout catching code instead of making the loop potentially unstable.
From the above trace, I found the following interesting #TODO in load_chunks() line 252:
if api_method == "load_parquet":
load_parquet(
client,
dataframe,
destination_table_ref,
write_disposition,
location,
schema,
billing_project=billing_project,
)
# TODO: yield progress depending on result() with timeout
return [0]
Clearly, the new load_parquet() method is not yet complete. What can I do to help fix this code? (Bear in mind that I am, due to inexperience with Google APIs, uncertain how the maintenance team manages timeout issues in pandas_gbq.