airflow icon indicating copy to clipboard operation
airflow copied to clipboard

BaseSQLToGCSOperator Parquet Format Fails to Write Dates/JSON

Open patricker opened this issue 3 years ago • 3 comments

Apache Airflow Provider(s)

google

Versions of Apache Airflow Providers

8.3.0

Apache Airflow version

2.3.4

Operating System

OSX

Deployment

Virtualenv installation

Deployment details

No response

What happened

PostgresToGCSOperator was being used to query a table containing timestamps and JSON data. export_format='parquet' was set. The export to Parquet fails.

For the Timestamp, the error is:

  File "/apache-airflow/airflow/providers/google/cloud/transfers/sql_to_gcs.py", line 154, in execute
    for file_to_upload in self._write_local_data_files(cursor):
  File "/apache-airflow/airflow/providers/google/cloud/transfers/sql_to_gcs.py", line 241, in _write_local_data_files
    tbl = pa.Table.from_pydict(row_pydic, parquet_schema)
  File "pyarrow/table.pxi", line 1724, in pyarrow.lib.Table.from_pydict
  File "pyarrow/table.pxi", line 2385, in pyarrow.lib._from_pydict
  File "pyarrow/array.pxi", line 341, in pyarrow.lib.asarray
  File "pyarrow/array.pxi", line 315, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 39, in pyarrow.lib._sequence_to_array
  File "pyarrow/error.pxi", line 143, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 122, in pyarrow.lib.check_status
pyarrow.lib.ArrowTypeError: object of type <class 'str'> cannot be converted to int

For the JSON, the error is the same, except:

pyarrow.lib.ArrowTypeError: Expected bytes, got a 'dict' object

What you think should happen instead

Both timestamp and JSON datatypes should be supported for export to Parquet

I tested with JSON as the export format, and had no issues.

How to reproduce

As long as the export format is set to parquet, then selecting any timestamp or JSON object reproduces the issue.

I copied the affected part of code and built a stand-alone example that creates the same exception:

import pyarrow as pa
schema = pa.schema([
     ("current_sign_in_at", pa.date64()),
     ("registered_at", pa.date64())
])
py_row = {'current_sign_in_at': ['2019-02-06T15:11:46.991841'], 'registered_at': ['2019-02-06T15:14:18.626211']}
tbl = pa.Table.from_pydict(py_row, schema)

Anything else

While troubleshooting this, I believe the fix has to be added to every subclass of BaseSQLToGCSOperator. In the convert_type method, timestamps/dates/times are all converted to strings. This is good for CSV and JSON, but is causing issues with Parquet. Instead the types should be returned as-is.

As for JSON, probably just set stringify_dict=True for Parquet parsing is my guess.

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

patricker avatar Sep 08 '22 18:09 patricker

I also had a similar error. It seems to be related to this issue

https://github.com/apache/airflow/issues/17538

zzsza avatar Sep 11 '22 08:09 zzsza

@zzsza I agree, it's the same issue. I think this just confirms the fix will need to touch all of the child classes where Parquet is concerned.

patricker avatar Sep 12 '22 16:09 patricker

I marked it as a good first issue for someone to solve (hopefully someone will be interested, pick it up and fix it) - but If you think you can open a PR @zzsza @patricker - opening a PR and leading it completions is the fastest way to get it fixed.

potiuk avatar Sep 18 '22 21:09 potiuk

@akbarnotopb maybe it will be easier for you to start with this issue?

eladkal avatar Sep 26 '22 08:09 eladkal

@akbarnotopb maybe it will be easier for you to start with this issue?

yep i was having this issue earlier, the problem is well explained in this https://github.com/apache/airflow/issues/17538 issue.

i can fix it later, but dont know how to make the pytest unit test working. *ive followed the guide with the vscode

im using docker-compose in this repo, will try to make the unit test working first. itll very helpful if you could tell me the easiest way to make it run 😁

akbarnotopb avatar Sep 26 '22 10:09 akbarnotopb

im using docker-compose in this repo, will try to make the unit test working first. itll very helpful if you could tell me the easiest way to make it run 😁

You can join slack for support if you have difficulty

eladkal avatar Sep 26 '22 10:09 eladkal

@vchiapaikeo maybe you can help with this issue as well? :)

eladkal avatar Dec 24 '22 13:12 eladkal

Starting to take a look at this and at least for ints/dates, I can't seem to repro it. Trying w/ the below dag:

from airflow import DAG

from airflow.providers.google.cloud.transfers.postgres_to_gcs import (
    PostgresToGCSOperator,
)

DEFAULT_TASK_ARGS = {
    "owner": "gcp-data-platform",
    "retries": 1,
    "retry_delay": 10,
    "start_date": "2022-08-01",
}

with DAG(
    max_active_runs=1,
    concurrency=2,
    catchup=False,
    schedule_interval="@daily",
    dag_id="test_os_postgres_to_gcs",
    default_args=DEFAULT_TASK_ARGS,
) as dag:
    
    test_postgres_to_gcs = PostgresToGCSOperator(
        task_id="test_postgres_to_gcs",
        postgres_conn_id="postgres_default",
        sql="""
        SELECT id, dag_id, start_date
        FROM job
        LIMIT 2
        """,
        export_format="parquet",
        gcp_conn_id="google_cloud_default",
        bucket="my-bucket",
        filename="vchiapaikeo/sql-to-gcs/file.parquet",
    )

Task logs from the run:

*** Reading local file: /root/airflow/logs/dag_id=test_os_postgres_to_gcs/run_id=scheduled__2022-12-25T00:00:00+00:00/task_id=test_postgres_to_gcs/attempt=13.log
[2022-12-26, 16:01:51 UTC] {taskinstance.py:1084} INFO - Dependencies all met for <TaskInstance: test_os_postgres_to_gcs.test_postgres_to_gcs scheduled__2022-12-25T00:00:00+00:00 [queued]>
[2022-12-26, 16:01:51 UTC] {taskinstance.py:1084} INFO - Dependencies all met for <TaskInstance: test_os_postgres_to_gcs.test_postgres_to_gcs scheduled__2022-12-25T00:00:00+00:00 [queued]>
[2022-12-26, 16:01:51 UTC] {taskinstance.py:1282} INFO - 
--------------------------------------------------------------------------------
[2022-12-26, 16:01:51 UTC] {taskinstance.py:1283} INFO - Starting attempt 13 of 14
[2022-12-26, 16:01:51 UTC] {taskinstance.py:1284} INFO - 
--------------------------------------------------------------------------------
[2022-12-26, 16:01:51 UTC] {taskinstance.py:1303} INFO - Executing <Task(PostgresToGCSOperator): test_postgres_to_gcs> on 2022-12-25 00:00:00+00:00
[2022-12-26, 16:01:51 UTC] {standard_task_runner.py:55} INFO - Started process 1333 to run task
[2022-12-26, 16:01:51 UTC] {standard_task_runner.py:82} INFO - Running: ['***', 'tasks', 'run', 'test_os_postgres_to_gcs', 'test_postgres_to_gcs', 'scheduled__2022-12-25T00:00:00+00:00', '--job-id', '15', '--raw', '--subdir', 'DAGS_FOLDER/test_postgres.py', '--cfg-path', '/tmp/tmp7276gvee']
[2022-12-26, 16:01:51 UTC] {standard_task_runner.py:83} INFO - Job 15: Subtask test_postgres_to_gcs
[2022-12-26, 16:01:51 UTC] {task_command.py:388} INFO - Running <TaskInstance: test_os_postgres_to_gcs.test_postgres_to_gcs scheduled__2022-12-25T00:00:00+00:00 [running]> on host d4b52cabcc3d
[2022-12-26, 16:01:52 UTC] {taskinstance.py:1512} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=gcp-data-platform
AIRFLOW_CTX_DAG_ID=test_os_postgres_to_gcs
AIRFLOW_CTX_TASK_ID=test_postgres_to_gcs
AIRFLOW_CTX_EXECUTION_DATE=2022-12-25T00:00:00+00:00
AIRFLOW_CTX_TRY_NUMBER=13
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-12-25T00:00:00+00:00
[2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:140} INFO - Executing query
[2022-12-26, 16:01:52 UTC] {base.py:73} INFO - Using connection ID 'postgres_default' for task execution.
[2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:159} INFO - Writing local data files
[2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:251} INFO - Logging row: [10, 'test_os_postgres_to_gcs', 1672069731.407203]
[2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:252} INFO - Logging schema: ['id', 'dag_id', 'start_date']
[2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:253} INFO - Logging row_pydic: {'id': [10], 'dag_id': ['test_os_postgres_to_gcs'], 'start_date': [1672069731.407203]}
[2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:254} INFO - Logging parquet_schema: id: int64
dag_id: string
start_date: timestamp[s]
[2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:251} INFO - Logging row: [11, 'test_os_postgres_to_gcs', 1672069921.269374]
[2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:252} INFO - Logging schema: ['id', 'dag_id', 'start_date']
[2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:253} INFO - Logging row_pydic: {'id': [11], 'dag_id': ['test_os_postgres_to_gcs'], 'start_date': [1672069921.269374]}
[2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:254} INFO - Logging parquet_schema: id: int64
dag_id: string
start_date: timestamp[s]
[2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:164} INFO - Uploading chunk file #0 to GCS.
[2022-12-26, 16:01:52 UTC] {base.py:73} INFO - Using connection ID 'google_cloud_default' for task execution.
[2022-12-26, 16:01:52 UTC] {credentials_provider.py:323} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2022-12-26, 16:01:52 UTC] {_default.py:649} WARNING - No project ID could be determined. Consider running `gcloud config set project` or setting the GOOGLE_CLOUD_PROJECT environment variable
[2022-12-26, 16:01:52 UTC] {gcs.py:520} INFO - File /tmp/tmpz75b333l uploaded to vchiapaikeo/sql-to-gcs/file.parquet in my-bucket bucket
[2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:167} INFO - Removing local file
[2022-12-26, 16:01:53 UTC] {taskinstance.py:1326} INFO - Marking task as SUCCESS. dag_id=test_os_postgres_to_gcs, task_id=test_postgres_to_gcs, execution_date=20221225T000000, start_date=20221226T160151, end_date=20221226T160153
[2022-12-26, 16:01:53 UTC] {local_task_job.py:208} INFO - Task exited with return code 0
[2022-12-26, 16:01:53 UTC] {taskinstance.py:2598} INFO - 0 downstream tasks scheduled from follow-on schedule check

And output from file seems fine:

vchiapaikeo@7676:airflow-src (vchiapaikeo/basesqltogcs-parquet-v1)$ gsutil cp gs://my-bucket/vchiapaikeo/sql-to-gcs/file.parquet .
Copying gs://my-bucket/vchiapaikeo/sql-to-gcs/file.parquet...
/ [1 files][  2.3 KiB/  2.3 KiB]                                                
Operation completed over 1 objects/2.3 KiB.                                      

vchiapaikeo@7676:airflow-src (vchiapaikeo/basesqltogcs-parquet-v1)$ 
vchiapaikeo@7676:airflow-src (vchiapaikeo/basesqltogcs-parquet-v1)$ pqrs cat file.parquet

##################
File: file.parquet
##################

{id: 10, dag_id: "test_os_postgres_to_gcs", start_date: 2022-12-26 15:48:51 +00:00}
{id: 11, dag_id: "test_os_postgres_to_gcs", start_date: 2022-12-26 15:52:01 +00:00}

I do think this repros with the json type though. Here's a log trying to export the serialized_dag table whose data object is of type json:

*** Reading local file: /root/airflow/logs/dag_id=test_os_postgres_to_gcs/run_id=scheduled__2022-12-25T00:00:00+00:00/task_id=test_postgres_to_gcs/attempt=16.log
[2022-12-26, 16:04:37 UTC] {taskinstance.py:1084} INFO - Dependencies all met for <TaskInstance: test_os_postgres_to_gcs.test_postgres_to_gcs scheduled__2022-12-25T00:00:00+00:00 [queued]>
[2022-12-26, 16:04:37 UTC] {taskinstance.py:1084} INFO - Dependencies all met for <TaskInstance: test_os_postgres_to_gcs.test_postgres_to_gcs scheduled__2022-12-25T00:00:00+00:00 [queued]>
[2022-12-26, 16:04:37 UTC] {taskinstance.py:1282} INFO - 
--------------------------------------------------------------------------------
[2022-12-26, 16:04:37 UTC] {taskinstance.py:1283} INFO - Starting attempt 16 of 17
[2022-12-26, 16:04:37 UTC] {taskinstance.py:1284} INFO - 
--------------------------------------------------------------------------------
[2022-12-26, 16:04:37 UTC] {taskinstance.py:1303} INFO - Executing <Task(PostgresToGCSOperator): test_postgres_to_gcs> on 2022-12-25 00:00:00+00:00
[2022-12-26, 16:04:37 UTC] {standard_task_runner.py:55} INFO - Started process 1433 to run task
[2022-12-26, 16:04:37 UTC] {standard_task_runner.py:82} INFO - Running: ['***', 'tasks', 'run', 'test_os_postgres_to_gcs', 'test_postgres_to_gcs', 'scheduled__2022-12-25T00:00:00+00:00', '--job-id', '18', '--raw', '--subdir', 'DAGS_FOLDER/test_postgres.py', '--cfg-path', '/tmp/tmpi1oep8jw']
[2022-12-26, 16:04:37 UTC] {standard_task_runner.py:83} INFO - Job 18: Subtask test_postgres_to_gcs
[2022-12-26, 16:04:37 UTC] {task_command.py:388} INFO - Running <TaskInstance: test_os_postgres_to_gcs.test_postgres_to_gcs scheduled__2022-12-25T00:00:00+00:00 [running]> on host d4b52cabcc3d
[2022-12-26, 16:04:37 UTC] {taskinstance.py:1512} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=gcp-data-platform
AIRFLOW_CTX_DAG_ID=test_os_postgres_to_gcs
AIRFLOW_CTX_TASK_ID=test_postgres_to_gcs
AIRFLOW_CTX_EXECUTION_DATE=2022-12-25T00:00:00+00:00
AIRFLOW_CTX_TRY_NUMBER=16
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-12-25T00:00:00+00:00
[2022-12-26, 16:04:37 UTC] {sql_to_gcs.py:140} INFO - Executing query
[2022-12-26, 16:04:37 UTC] {base.py:73} INFO - Using connection ID 'postgres_default' for task execution.
[2022-12-26, 16:04:37 UTC] {sql_to_gcs.py:159} INFO - Writing local data files
[2022-12-26, 16:04:37 UTC] {sql_to_gcs.py:251} INFO - Logging row: ['test_os_patch_gcs_to_bigquery', '/files/dags/test.py', 28748778209882151, {'__version': 1, 'dag': {'max_active_runs': 1, 'default_args': {'__var': {'owner': 'gcp-data-platform', 'retries': 1, 'retry_delay': 10, 'start_date': {'__var': 1659312000.0, '__type': 'datetime'}}, '__type': 'dict'}, 'edge_info': {}, 'catchup': False, 'timezone': 'UTC', 'fileloc': '/files/dags/test.py', '_task_group': {'_group_id': None, 'prefix_group_id': True, 'tooltip': '', 'ui_color': 'CornflowerBlue', 'ui_fgcolor': '#000', 'children': {'test_gcs_to_bigquery': ['operator', 'test_gcs_to_bigquery']}, 'upstream_group_ids': [], 'downstream_group_ids': [], 'upstream_task_ids': [], 'downstream_task_ids': []}, 'dataset_triggers': [], 'schedule_interval': '@daily', '_max_active_tasks': 2, '_dag_id': 'test_os_patch_gcs_to_bigquery', 'start_date': 1659312000.0, '_processor_dags_folder': '/files/dags', 'tasks': [{'ui_fgcolor': '#000', 'template_ext': ['.sql'], 'downstream_task_ids': [], 'retry_delay': 10.0, 'template_fields': ['bucket', 'source_objects', 'schema_object', 'schema_object_bucket', 'destination_project_dataset_table', 'impersonation_chain'], 'retries': 1, 'owner': 'gcp-data-platform', 'pool': 'default_pool', 'task_id': 'test_gcs_to_bigquery', 'ui_color': '#f0eee4', 'template_fields_renderers': {}, '_task_type': 'GCSToBigQueryOperator', '_task_module': '***.providers.google.cloud.transfers.gcs_to_bigquery', '_is_empty': False, '_operator_extra_links': [{'***.providers.google.cloud.links.bigquery.BigQueryTableLink': {}}], 'bucket': 'my-bucket', 'source_objects': ['vchiapaikeo/file.csv'], 'schema_object_bucket': 'my-bucket', 'destination_project_dataset_table': 'my-project.vchiapaikeo.test1'}], 'dag_dependencies': [], 'params': {}}}, None, 1672068095.636378, '276713bbfc190eed20dcb70780b5b47e', None]
[2022-12-26, 16:04:37 UTC] {sql_to_gcs.py:252} INFO - Logging schema: ['dag_id', 'fileloc', 'fileloc_hash', 'data', 'data_compressed', 'last_updated', 'dag_hash', 'processor_subdir']
[2022-12-26, 16:04:37 UTC] {sql_to_gcs.py:253} INFO - Logging row_pydic: {'dag_id': ['test_os_patch_gcs_to_bigquery'], 'fileloc': ['/files/dags/test.py'], 'fileloc_hash': [28748778209882151], 'data': [{'__version': 1, 'dag': {'max_active_runs': 1, 'default_args': {'__var': {'owner': 'gcp-data-platform', 'retries': 1, 'retry_delay': 10, 'start_date': {'__var': 1659312000.0, '__type': 'datetime'}}, '__type': 'dict'}, 'edge_info': {}, 'catchup': False, 'timezone': 'UTC', 'fileloc': '/files/dags/test.py', '_task_group': {'_group_id': None, 'prefix_group_id': True, 'tooltip': '', 'ui_color': 'CornflowerBlue', 'ui_fgcolor': '#000', 'children': {'test_gcs_to_bigquery': ['operator', 'test_gcs_to_bigquery']}, 'upstream_group_ids': [], 'downstream_group_ids': [], 'upstream_task_ids': [], 'downstream_task_ids': []}, 'dataset_triggers': [], 'schedule_interval': '@daily', '_max_active_tasks': 2, '_dag_id': 'test_os_patch_gcs_to_bigquery', 'start_date': 1659312000.0, '_processor_dags_folder': '/files/dags', 'tasks': [{'ui_fgcolor': '#000', 'template_ext': ['.sql'], 'downstream_task_ids': [], 'retry_delay': 10.0, 'template_fields': ['bucket', 'source_objects', 'schema_object', 'schema_object_bucket', 'destination_project_dataset_table', 'impersonation_chain'], 'retries': 1, 'owner': 'gcp-data-platform', 'pool': 'default_pool', 'task_id': 'test_gcs_to_bigquery', 'ui_color': '#f0eee4', 'template_fields_renderers': {}, '_task_type': 'GCSToBigQueryOperator', '_task_module': '***.providers.google.cloud.transfers.gcs_to_bigquery', '_is_empty': False, '_operator_extra_links': [{'***.providers.google.cloud.links.bigquery.BigQueryTableLink': {}}], 'bucket': 'my-bucket', 'source_objects': ['vchiapaikeo/file.csv'], 'schema_object_bucket': 'my-bucket', 'destination_project_dataset_table': 'my-project.vchiapaikeo.test1'}], 'dag_dependencies': [], 'params': {}}}], 'data_compressed': [None], 'last_updated': [1672068095.636378], 'dag_hash': ['276713bbfc190eed20dcb70780b5b47e'], 'processor_subdir': [None]}
[2022-12-26, 16:04:37 UTC] {sql_to_gcs.py:254} INFO - Logging parquet_schema: dag_id: string
fileloc: string
fileloc_hash: int64
data: string
data_compressed: string
last_updated: timestamp[s]
dag_hash: string
processor_subdir: string
[2022-12-26, 16:04:37 UTC] {taskinstance.py:1782} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/airflow/airflow/providers/google/cloud/transfers/sql_to_gcs.py", line 160, in execute
    for file_to_upload in self._write_local_data_files(cursor):
  File "/opt/airflow/airflow/providers/google/cloud/transfers/sql_to_gcs.py", line 255, in _write_local_data_files
    tbl = pa.Table.from_pydict(row_pydic, parquet_schema)
  File "pyarrow/table.pxi", line 3648, in pyarrow.lib.Table.from_pydict
  File "pyarrow/table.pxi", line 5191, in pyarrow.lib._from_pydict
  File "pyarrow/array.pxi", line 343, in pyarrow.lib.asarray
  File "pyarrow/array.pxi", line 317, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 39, in pyarrow.lib._sequence_to_array
  File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 123, in pyarrow.lib.check_status
pyarrow.lib.ArrowTypeError: Expected bytes, got a 'dict' object
[2022-12-26, 16:04:37 UTC] {taskinstance.py:1326} INFO - Marking task as UP_FOR_RETRY. dag_id=test_os_postgres_to_gcs, task_id=test_postgres_to_gcs, execution_date=20221225T000000, start_date=20221226T160437, end_date=20221226T160437
[2022-12-26, 16:04:37 UTC] {standard_task_runner.py:105} ERROR - Failed to execute job 18 for task test_postgres_to_gcs (Expected bytes, got a 'dict' object; 1433)
[2022-12-26, 16:04:37 UTC] {local_task_job.py:208} INFO - Task exited with return code 1
[2022-12-26, 16:04:37 UTC] {taskinstance.py:2598} INFO - 0 downstream tasks scheduled from follow-on schedule check

vchiapaikeo avatar Dec 26 '22 16:12 vchiapaikeo

Thanks @vchiapaikeo ! Closing then as can't reproduce

eladkal avatar Dec 27 '22 08:12 eladkal

Just to close the loop on the JSON issue - with the stringify_dict option set to True, this works just fine. The following dag exports successfully to GCS:

from airflow import DAG

from airflow.providers.google.cloud.transfers.postgres_to_gcs import (
    PostgresToGCSOperator,
)

DEFAULT_TASK_ARGS = {
    "owner": "gcp-data-platform",
    "retries": 1,
    "retry_delay": 10,
    "start_date": "2022-08-01",
}

with DAG(
    max_active_runs=1,
    concurrency=2,
    catchup=False,
    schedule_interval="@daily",
    dag_id="test_os_postgres_to_gcs",
    default_args=DEFAULT_TASK_ARGS,
) as dag:

    test_postgres_to_gcs = PostgresToGCSOperator(
        task_id="test_postgres_to_gcs",
        postgres_conn_id="postgres_default",
        sql="""
        SELECT
            dag_id,  # varchar
            fileloc_hash,  # bigint
            last_updated,  # timestampz
            data_compressed,  # bytea
            data  # json
        FROM serialized_dag
        LIMIT 20
        """,
        export_format="parquet",
        gcp_conn_id="google_cloud_default",
        bucket="my-bucket",
        filename="vchiapaikeo/sql-to-gcs/file.parquet",
        stringify_dict=True,
    )

And the output:

vchiapaikeo@7676:airflow-src (vchiapaikeo/basesqltogcs-parquet-v1)$ gsutil cp gs://my-bucket/vchiapaikeo/sql-to-gcs/file.parquet .
Copying gs://my-bucket/vchiapaikeo/sql-to-gcs/file.parquet...
/ [1 files][ 25.0 KiB/ 25.0 KiB]                                                
Operation completed over 1 objects/25.0 KiB.                                     
vchiapaikeo@7676:airflow-src (vchiapaikeo/basesqltogcs-parquet-v1)$ 
vchiapaikeo@7676:airflow-src (vchiapaikeo/basesqltogcs-parquet-v1)$ pqrs cat file.parquet

##################
File: file.parquet
##################

{dag_id: "test_os_patch_gcs_to_bigquery", fileloc_hash: 28748778209882151, last_updated: 2022-12-27 14:33:10 +00:00, data_compressed: null, data: "{"__version": 1, "dag": {"fileloc": "/files/dags/test.py", "schedule_interval": "@daily", "timezone": "UTC", "catchup": false, "dataset_triggers": [], "default_args": {"__var": {"owner": "gcp-data-platform", "retries": 1, "retry_delay": 10, "start_date": {"__var": 1659312000.0, "__type": "datetime"}}, "__type": "dict"}, "_dag_id": "test_os_patch_gcs_to_bigquery", "_max_active_tasks": 2, "max_active_runs": 1, "_task_group": {"_group_id": null, "prefix_group_id": true, "tooltip": "", "ui_color": "CornflowerBlue", "ui_fgcolor": "#000", "children": {"test_gcs_to_bigquery": ["operator", "test_gcs_to_bigquery"]}, "upstream_group_ids": [], "downstream_group_ids": [], "upstream_task_ids": [], "downstream_task_ids": []}, "start_date": 1659312000.0, "edge_info": {}, "_processor_dags_folder": "/files/dags", "tasks": [{"retries": 1, "ui_color": "#f0eee4", "template_fields": ["bucket", "source_objects", "schema_object", "schema_object_bucket", "destination_project_dataset_table", "impersonation_chain"], "template_fields_renderers": {}, "pool": "default_pool", "template_ext": [".sql"], "retry_delay": 10.0, "owner": "gcp-data-platform", "ui_fgcolor": "#000", "task_id": "test_gcs_to_bigquery", "downstream_task_ids": [], "_task_type": "GCSToBigQueryOperator", "_task_module": "airflow.providers.google.cloud.transfers.gcs_to_bigquery", "_is_empty": false, "_operator_extra_links": [{"airflow.providers.google.cloud.links.bigquery.BigQueryTableLink": {}}], "bucket": "my-bucket", "source_objects": ["vchiapaikeo/file.csv"], "schema_object_bucket": "my-bucket", "destination_project_dataset_table": "my-project.vchiapaikeo.test1"}], "dag_dependencies": [], "params": {}}}"}
{dag_id: "test_os_postgres_to_gcs", fileloc_hash: 2152014207817635, last_updated: 2022-12-27 15:20:35 +00:00, data_compressed: null, data: "{"__version": 1, "dag": {"start_date": 1659312000.0, "_dag_id": "test_os_postgres_to_gcs", "schedule_interval": "@daily", "_max_active_tasks": 2, "max_active_runs": 1, "_task_group": {"_group_id": null, "prefix_group_id": true, "tooltip": "", "ui_color": "CornflowerBlue", "ui_fgcolor": "#000", "children": {"test_postgres_to_gcs": ["operator", "test_postgres_to_gcs"]}, "upstream_group_ids": [], "downstream_group_ids": [], "upstream_task_ids": [], "downstream_task_ids": []}, "default_args": {"__var": {"owner": "gcp-data-platform", "retries": 1, "retry_delay": 10, "start_date": {"__var": 1659312000.0, "__type": "datetime"}}, "__type": "dict"}, "edge_info": {}, "timezone": "UTC", "catchup": false, "dataset_triggers": [], "fileloc": "/files/dags/test_postgres.py", "_processor_dags_folder": "/files/dags", "tasks": [{"retry_delay": 10.0, "task_id": "test_postgres_to_gcs", "template_ext": [".sql"], "retries": 1, "ui_color": "#a0e08c", "template_fields_renderers": {"sql": "sql"}, "downstream_task_ids": [], "template_fields": ["sql", "bucket", "filename", "schema_filename", "schema", "parameters", "impersonation_chain"], "owner": "gcp-data-platform", "pool": "default_pool", "ui_fgcolor": "#000", "_task_type": "PostgresToGCSOperator", "_task_module": "airflow.providers.google.cloud.transfers.postgres_to_gcs", "_is_empty": false, "sql": "\n        SELECT dag_id, fileloc_hash, last_updated, data_compressed, data\n        FROM serialized_dag\n        LIMIT 20\n        ", "bucket": "my-bucket", "filename": "vchiapaikeo/sql-to-gcs/file.parquet"}], "dag_dependencies": [], "params": {}}}"}

However, one of the drawbacks with this approach is that the JSON gets turned into a string type and not a deserialized representation which might be more efficient for querying parquet.

vchiapaikeo@7676:airflow-src (vchiapaikeo/basesqltogcs-parquet-v1)$ pqrs schema file.parquet
Metadata for file: file.parquet

version: 2
num of rows: 2
created by: parquet-cpp-arrow version 9.0.0
metadata:
  ARROW:schema: /////0gBAAAQAAAAAAAKAAwABgAFAAgACgAAAAABBAAMAAAACAAIAAAABAAIAAAABAAAAAUAAADsAAAAmAAAAGQAAAAwAAAABAAAADj///8AAAEFEAAAABgAAAAEAAAAAAAAAAQAAABkYXRhAAAAACj///9g////AAABBRAAAAAgAAAABAAAAAAAAAAPAAAAZGF0YV9jb21wcmVzc2VkAFj///+Q////AAABChAAAAAgAAAABAAAAAAAAAAMAAAAbGFzdF91cGRhdGVkAAAAAIj////A////AAABAhAAAAAoAAAABAAAAAAAAAAMAAAAZmlsZWxvY19oYXNoAAAAAAgADAAIAAcACAAAAAAAAAFAAAAAEAAUAAgABgAHAAwAAAAQABAAAAAAAAEFEAAAABwAAAAEAAAAAAAAAAYAAABkYWdfaWQAAAQABAAEAAAA
message schema {
  OPTIONAL BYTE_ARRAY dag_id (STRING);
  OPTIONAL INT64 fileloc_hash;
  OPTIONAL INT64 last_updated (TIMESTAMP(MILLIS,false));
  OPTIONAL BYTE_ARRAY data_compressed (STRING);
  OPTIONAL BYTE_ARRAY data (STRING);
}

vchiapaikeo avatar Dec 27 '22 15:12 vchiapaikeo

Hi,

I'm running into this very same issue. I'm using MySQLToGCSOperator with export_format="parquet".

Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/transfers/sql_to_gcs.py", line 181, in execute
    for file_to_upload in self._write_local_data_files(cursor):
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/transfers/sql_to_gcs.py", line 304, in _write_local_data_files
    self._write_rows_to_parquet(parquet_writer, rows_buffer)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/transfers/sql_to_gcs.py", line 227, in _write_rows_to_parquet
    tbl = pa.Table.from_pydict(rows_pydic, parquet_writer.schema)
  File "pyarrow/table.pxi", line 3725, in pyarrow.lib.Table.from_pydict
  File "pyarrow/table.pxi", line 5271, in pyarrow.lib._from_pydict
  File "pyarrow/array.pxi", line 350, in pyarrow.lib.asarray
  File "pyarrow/array.pxi", line 320, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 39, in pyarrow.lib._sequence_to_array
  File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 123, in pyarrow.lib.check_status
pyarrow.lib.ArrowTypeError: object of type <class 'str'> cannot be converted to int

I'm on Cloud Composer 2.4.6, Airflow 2.6.3. I think as a workaround, I might either try exporting to CSV instead, or exporting the timestamp fields as Unix time.

renzepost avatar Jan 12 '24 13:01 renzepost