python-bigquery-dataframes icon indicating copy to clipboard operation
python-bigquery-dataframes copied to clipboard

feat: add `write_engine` parameter to `read_FORMATNAME` methods to control how data is written to BigQuery

Open tswast opened this issue 1 year ago • 4 comments

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:

  • [ ] Make sure to open an issue as a bug/issue before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea
  • [ ] Ensure the tests and linter pass
  • [ ] Code coverage does not decrease (if any source code was changed)
  • [ ] Appropriate docs were updated (if necessary)

Fixes internal issue 323176126 🦕

tswast avatar Feb 06 '24 19:02 tswast

Blocked by https://github.com/googleapis/python-bigquery/issues/1815

tswast avatar Feb 09 '24 22:02 tswast

Test failure is a real one:

E TypeError: Object of type bool_ is not JSON serializable
=================================== FAILURES ===================================
_____________ test_read_csv_gcs_default_engine[bigquery_streaming] _____________
[gw19] linux -- Python 3.11.6 /tmpfs/src/github/python-bigquery-dataframes/.nox/system-3-11/bin/python

session = <bigframes.session.Session object at 0x7f0d0f897cd0> scalars_dfs = ( bool_col bytes_col
rowindex ...... 2038-01-19 03:14:17.999999+00:00 8 False ... <NA>

[9 rows x 13 columns]) gcs_folder = 'gs://bigframes-dev-testing/bigframes_tests_system_20240229220731_1845bf/' write_engine = 'bigquery_streaming'

@skip_legacy_pandas
@pytest.mark.parametrize(
    ("write_engine",),
    (
        ("default",),
        ("bigquery_inline",),
        ("bigquery_load",),
        ("bigquery_streaming",),
    ),
)
def test_read_csv_gcs_default_engine(session, scalars_dfs, gcs_folder, write_engine):
    scalars_df, _ = scalars_dfs
    if scalars_df.index.name is not None:
        path = gcs_folder + "test_read_csv_gcs_default_engine_w_index*.csv"
    else:
        path = gcs_folder + "test_read_csv_gcs_default_engine_wo_index*.csv"
    read_path = path.replace("*", FIRST_FILE)
    scalars_df.to_csv(path, index=False)
    dtype = scalars_df.dtypes.to_dict()
    dtype.pop("geography_col")
  df = session.read_csv(
        read_path,
        # Convert default pandas dtypes to match BigQuery DataFrames dtypes.
        dtype=dtype,
        write_engine=write_engine,
    )

tests/system/small/test_session.py:435:


bigframes/session/init.py:1162: in read_csv return self._read_pandas( bigframes/session/init.py:933: in _read_pandas return self._read_pandas_bigquery_table( bigframes/session/init.py:988: in _read_pandas_bigquery_table table_expression = bigframes_io.pandas_to_bigquery_streaming( bigframes/session/_io/bigquery.py:294: in pandas_to_bigquery_streaming for errors in bqclient.insert_rows_from_dataframe( .nox/system-3-11/lib/python3.11/site-packages/google/cloud/bigquery/client.py:3662: in insert_rows_from_dataframe result = self.insert_rows(table, rows_chunk, selected_fields, **kwargs) .nox/system-3-11/lib/python3.11/site-packages/google/cloud/bigquery/client.py:3605: in insert_rows return self.insert_rows_json(table, json_rows, **kwargs) .nox/system-3-11/lib/python3.11/site-packages/google/cloud/bigquery/client.py:3801: in insert_rows_json response = self._call_api( .nox/system-3-11/lib/python3.11/site-packages/google/cloud/bigquery/client.py:827: in _call_api return call() .nox/system-3-11/lib/python3.11/site-packages/google/api_core/retry/retry_unary.py:293: in retry_wrapped_func return retry_target( .nox/system-3-11/lib/python3.11/site-packages/google/api_core/retry/retry_unary.py:153: in retry_target _retry_error_helper( .nox/system-3-11/lib/python3.11/site-packages/google/api_core/retry/retry_base.py:212: in _retry_error_helper raise final_exc from source_exc .nox/system-3-11/lib/python3.11/site-packages/google/api_core/retry/retry_unary.py:144: in retry_target result = target() .nox/system-3-11/lib/python3.11/site-packages/google/cloud/_http/init.py:479: in api_request data = json.dumps(data) /usr/local/lib/python3.11/json/init.py:231: in dumps return _default_encoder.encode(obj) /usr/local/lib/python3.11/json/encoder.py:200: in encode chunks = self.iterencode(o, _one_shot=True) /usr/local/lib/python3.11/json/encoder.py:258: in iterencode return _iterencode(o, 0)


self = <json.encoder.JSONEncoder object at 0x7f0d3d5e8b90>, o = True

def default(self, o):
    """Implement this method in a subclass such that it returns
    a serializable object for ``o``, or calls the base implementation
    (to raise a ``TypeError``).

    For example, to support arbitrary iterators, you could
    implement default like this::

        def default(self, o):
            try:
                iterable = iter(o)
            except TypeError:
                pass
            else:
                return list(iterable)
            # Let the base class default method raise the TypeError
            return JSONEncoder.default(self, o)

    """
  raise TypeError(f'Object of type {o.__class__.__name__} '
                    f'is not JSON serializable')

E TypeError: Object of type bool_ is not JSON serializable

/usr/local/lib/python3.11/json/encoder.py:180: TypeError ----------------------------- Captured stdout call ----------------------------- Query job 2b56fdd1-5dcc-4afd-8521-69b517d5afae is DONE.1.1 kB processed.
https://console.cloud.google.com/bigquery?project=bigframes-dev&j=bq:US:2b56fdd1-5dcc-4afd-8521-69b517d5afae&page=queryresults Query job 7d127271-a535-48d0-b092-df329771fb89 is RUNNING. https://console.cloud.google.com/bigquery?project=bigframes-dev&j=bq:US:7d127271-a535-48d0-b092-df329771fb89&page=queryresults Query job 7d127271-a535-48d0-b092-df329771fb89 is DONE.0 Bytes processed.
https://console.cloud.google.com/bigquery?project=bigframes-dev&j=bq:US:7d127271-a535-48d0-b092-df329771fb89&page=queryresults =============================== warnings summary ===============================

This will likely require a fix upstream in google-cloud-bigquery, but in the meantime I can make sure to use a vendored version of insert_rows_from_dataframe that can serialize a numpy bool_ value, similar to how there's a special case for NaN already.

Edit: I already fixed this in https://github.com/googleapis/python-bigquery/pull/1816, waiting on version 3.18.0.

  • [x] bump minimum google-cloud-bigquery after merging https://github.com/googleapis/python-bigquery/pull/1817

tswast avatar Mar 01 '24 16:03 tswast

Marking as do not merge for now. Thanks for your feedback so far. I will wait until we implement go/pandas-gbq-and-bigframes-redundancy before merging this (likely mid-April).

tswast avatar Mar 06 '24 18:03 tswast

Marking as do not merge for now. Thanks for your feedback so far. I will wait until we implement go/pandas-gbq-and-bigframes-redundancy before merging this (likely mid-April).

I've mailed https://github.com/googleapis/python-bigquery-pandas/pull/814 as a first step of this project. I plan to follow-up that PR with one that copies the bigquery_streaming from this PR to pandas-gbq so that the pandas -> BigQuery logic can be consolidated to the pandas-gbq package.

tswast avatar Sep 20 '24 20:09 tswast