BaseSQLToGCSOperator Parquet Format Fails to Write Dates/JSON
Apache Airflow Provider(s)
Versions of Apache Airflow Providers
8.3.0
Apache Airflow version
2.3.4
Operating System
OSX
Deployment
Virtualenv installation
Deployment details
No response
What happened
PostgresToGCSOperator was being used to query a table containing timestamps and JSON data.
export_format='parquet' was set.
The export to Parquet fails.
For the Timestamp, the error is:
File "/apache-airflow/airflow/providers/google/cloud/transfers/sql_to_gcs.py", line 154, in execute
for file_to_upload in self._write_local_data_files(cursor):
File "/apache-airflow/airflow/providers/google/cloud/transfers/sql_to_gcs.py", line 241, in _write_local_data_files
tbl = pa.Table.from_pydict(row_pydic, parquet_schema)
File "pyarrow/table.pxi", line 1724, in pyarrow.lib.Table.from_pydict
File "pyarrow/table.pxi", line 2385, in pyarrow.lib._from_pydict
File "pyarrow/array.pxi", line 341, in pyarrow.lib.asarray
File "pyarrow/array.pxi", line 315, in pyarrow.lib.array
File "pyarrow/array.pxi", line 39, in pyarrow.lib._sequence_to_array
File "pyarrow/error.pxi", line 143, in pyarrow.lib.pyarrow_internal_check_status
File "pyarrow/error.pxi", line 122, in pyarrow.lib.check_status
pyarrow.lib.ArrowTypeError: object of type <class 'str'> cannot be converted to int
For the JSON, the error is the same, except:
pyarrow.lib.ArrowTypeError: Expected bytes, got a 'dict' object
What you think should happen instead
Both timestamp and JSON datatypes should be supported for export to Parquet
I tested with JSON as the export format, and had no issues.
How to reproduce
As long as the export format is set to parquet, then selecting any timestamp or JSON object reproduces the issue.
I copied the affected part of code and built a stand-alone example that creates the same exception:
import pyarrow as pa
schema = pa.schema([
("current_sign_in_at", pa.date64()),
("registered_at", pa.date64())
])
py_row = {'current_sign_in_at': ['2019-02-06T15:11:46.991841'], 'registered_at': ['2019-02-06T15:14:18.626211']}
tbl = pa.Table.from_pydict(py_row, schema)
Anything else
While troubleshooting this, I believe the fix has to be added to every subclass of BaseSQLToGCSOperator. In the convert_type method, timestamps/dates/times are all converted to strings. This is good for CSV and JSON, but is causing issues with Parquet. Instead the types should be returned as-is.
As for JSON, probably just set stringify_dict=True for Parquet parsing is my guess.
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
I also had a similar error. It seems to be related to this issue
https://github.com/apache/airflow/issues/17538
@zzsza I agree, it's the same issue. I think this just confirms the fix will need to touch all of the child classes where Parquet is concerned.
I marked it as a good first issue for someone to solve (hopefully someone will be interested, pick it up and fix it) - but If you think you can open a PR @zzsza @patricker - opening a PR and leading it completions is the fastest way to get it fixed.
@akbarnotopb maybe it will be easier for you to start with this issue?
@akbarnotopb maybe it will be easier for you to start with this issue?
yep i was having this issue earlier, the problem is well explained in this https://github.com/apache/airflow/issues/17538 issue.
i can fix it later, but dont know how to make the pytest unit test working. *ive followed the guide with the vscode
im using docker-compose in this repo, will try to make the unit test working first. itll very helpful if you could tell me the easiest way to make it run 😁
im using docker-compose in this repo, will try to make the unit test working first. itll very helpful if you could tell me the easiest way to make it run 😁
You can join slack for support if you have difficulty
@vchiapaikeo maybe you can help with this issue as well? :)
Starting to take a look at this and at least for ints/dates, I can't seem to repro it. Trying w/ the below dag:
from airflow import DAG
from airflow.providers.google.cloud.transfers.postgres_to_gcs import (
PostgresToGCSOperator,
)
DEFAULT_TASK_ARGS = {
"owner": "gcp-data-platform",
"retries": 1,
"retry_delay": 10,
"start_date": "2022-08-01",
}
with DAG(
max_active_runs=1,
concurrency=2,
catchup=False,
schedule_interval="@daily",
dag_id="test_os_postgres_to_gcs",
default_args=DEFAULT_TASK_ARGS,
) as dag:
test_postgres_to_gcs = PostgresToGCSOperator(
task_id="test_postgres_to_gcs",
postgres_conn_id="postgres_default",
sql="""
SELECT id, dag_id, start_date
FROM job
LIMIT 2
""",
export_format="parquet",
gcp_conn_id="google_cloud_default",
bucket="my-bucket",
filename="vchiapaikeo/sql-to-gcs/file.parquet",
)
Task logs from the run:
*** Reading local file: /root/airflow/logs/dag_id=test_os_postgres_to_gcs/run_id=scheduled__2022-12-25T00:00:00+00:00/task_id=test_postgres_to_gcs/attempt=13.log
[2022-12-26, 16:01:51 UTC] {taskinstance.py:1084} INFO - Dependencies all met for <TaskInstance: test_os_postgres_to_gcs.test_postgres_to_gcs scheduled__2022-12-25T00:00:00+00:00 [queued]>
[2022-12-26, 16:01:51 UTC] {taskinstance.py:1084} INFO - Dependencies all met for <TaskInstance: test_os_postgres_to_gcs.test_postgres_to_gcs scheduled__2022-12-25T00:00:00+00:00 [queued]>
[2022-12-26, 16:01:51 UTC] {taskinstance.py:1282} INFO -
--------------------------------------------------------------------------------
[2022-12-26, 16:01:51 UTC] {taskinstance.py:1283} INFO - Starting attempt 13 of 14
[2022-12-26, 16:01:51 UTC] {taskinstance.py:1284} INFO -
--------------------------------------------------------------------------------
[2022-12-26, 16:01:51 UTC] {taskinstance.py:1303} INFO - Executing <Task(PostgresToGCSOperator): test_postgres_to_gcs> on 2022-12-25 00:00:00+00:00
[2022-12-26, 16:01:51 UTC] {standard_task_runner.py:55} INFO - Started process 1333 to run task
[2022-12-26, 16:01:51 UTC] {standard_task_runner.py:82} INFO - Running: ['***', 'tasks', 'run', 'test_os_postgres_to_gcs', 'test_postgres_to_gcs', 'scheduled__2022-12-25T00:00:00+00:00', '--job-id', '15', '--raw', '--subdir', 'DAGS_FOLDER/test_postgres.py', '--cfg-path', '/tmp/tmp7276gvee']
[2022-12-26, 16:01:51 UTC] {standard_task_runner.py:83} INFO - Job 15: Subtask test_postgres_to_gcs
[2022-12-26, 16:01:51 UTC] {task_command.py:388} INFO - Running <TaskInstance: test_os_postgres_to_gcs.test_postgres_to_gcs scheduled__2022-12-25T00:00:00+00:00 [running]> on host d4b52cabcc3d
[2022-12-26, 16:01:52 UTC] {taskinstance.py:1512} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=gcp-data-platform
AIRFLOW_CTX_DAG_ID=test_os_postgres_to_gcs
AIRFLOW_CTX_TASK_ID=test_postgres_to_gcs
AIRFLOW_CTX_EXECUTION_DATE=2022-12-25T00:00:00+00:00
AIRFLOW_CTX_TRY_NUMBER=13
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-12-25T00:00:00+00:00
[2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:140} INFO - Executing query
[2022-12-26, 16:01:52 UTC] {base.py:73} INFO - Using connection ID 'postgres_default' for task execution.
[2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:159} INFO - Writing local data files
[2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:251} INFO - Logging row: [10, 'test_os_postgres_to_gcs', 1672069731.407203]
[2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:252} INFO - Logging schema: ['id', 'dag_id', 'start_date']
[2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:253} INFO - Logging row_pydic: {'id': [10], 'dag_id': ['test_os_postgres_to_gcs'], 'start_date': [1672069731.407203]}
[2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:254} INFO - Logging parquet_schema: id: int64
dag_id: string
start_date: timestamp[s]
[2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:251} INFO - Logging row: [11, 'test_os_postgres_to_gcs', 1672069921.269374]
[2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:252} INFO - Logging schema: ['id', 'dag_id', 'start_date']
[2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:253} INFO - Logging row_pydic: {'id': [11], 'dag_id': ['test_os_postgres_to_gcs'], 'start_date': [1672069921.269374]}
[2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:254} INFO - Logging parquet_schema: id: int64
dag_id: string
start_date: timestamp[s]
[2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:164} INFO - Uploading chunk file #0 to GCS.
[2022-12-26, 16:01:52 UTC] {base.py:73} INFO - Using connection ID 'google_cloud_default' for task execution.
[2022-12-26, 16:01:52 UTC] {credentials_provider.py:323} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2022-12-26, 16:01:52 UTC] {_default.py:649} WARNING - No project ID could be determined. Consider running `gcloud config set project` or setting the GOOGLE_CLOUD_PROJECT environment variable
[2022-12-26, 16:01:52 UTC] {gcs.py:520} INFO - File /tmp/tmpz75b333l uploaded to vchiapaikeo/sql-to-gcs/file.parquet in my-bucket bucket
[2022-12-26, 16:01:52 UTC] {sql_to_gcs.py:167} INFO - Removing local file
[2022-12-26, 16:01:53 UTC] {taskinstance.py:1326} INFO - Marking task as SUCCESS. dag_id=test_os_postgres_to_gcs, task_id=test_postgres_to_gcs, execution_date=20221225T000000, start_date=20221226T160151, end_date=20221226T160153
[2022-12-26, 16:01:53 UTC] {local_task_job.py:208} INFO - Task exited with return code 0
[2022-12-26, 16:01:53 UTC] {taskinstance.py:2598} INFO - 0 downstream tasks scheduled from follow-on schedule check
And output from file seems fine:
vchiapaikeo@7676:airflow-src (vchiapaikeo/basesqltogcs-parquet-v1)$ gsutil cp gs://my-bucket/vchiapaikeo/sql-to-gcs/file.parquet .
Copying gs://my-bucket/vchiapaikeo/sql-to-gcs/file.parquet...
/ [1 files][ 2.3 KiB/ 2.3 KiB]
Operation completed over 1 objects/2.3 KiB.
vchiapaikeo@7676:airflow-src (vchiapaikeo/basesqltogcs-parquet-v1)$
vchiapaikeo@7676:airflow-src (vchiapaikeo/basesqltogcs-parquet-v1)$ pqrs cat file.parquet
##################
File: file.parquet
##################
{id: 10, dag_id: "test_os_postgres_to_gcs", start_date: 2022-12-26 15:48:51 +00:00}
{id: 11, dag_id: "test_os_postgres_to_gcs", start_date: 2022-12-26 15:52:01 +00:00}
I do think this repros with the json type though. Here's a log trying to export the serialized_dag table whose data object is of type json:
*** Reading local file: /root/airflow/logs/dag_id=test_os_postgres_to_gcs/run_id=scheduled__2022-12-25T00:00:00+00:00/task_id=test_postgres_to_gcs/attempt=16.log
[2022-12-26, 16:04:37 UTC] {taskinstance.py:1084} INFO - Dependencies all met for <TaskInstance: test_os_postgres_to_gcs.test_postgres_to_gcs scheduled__2022-12-25T00:00:00+00:00 [queued]>
[2022-12-26, 16:04:37 UTC] {taskinstance.py:1084} INFO - Dependencies all met for <TaskInstance: test_os_postgres_to_gcs.test_postgres_to_gcs scheduled__2022-12-25T00:00:00+00:00 [queued]>
[2022-12-26, 16:04:37 UTC] {taskinstance.py:1282} INFO -
--------------------------------------------------------------------------------
[2022-12-26, 16:04:37 UTC] {taskinstance.py:1283} INFO - Starting attempt 16 of 17
[2022-12-26, 16:04:37 UTC] {taskinstance.py:1284} INFO -
--------------------------------------------------------------------------------
[2022-12-26, 16:04:37 UTC] {taskinstance.py:1303} INFO - Executing <Task(PostgresToGCSOperator): test_postgres_to_gcs> on 2022-12-25 00:00:00+00:00
[2022-12-26, 16:04:37 UTC] {standard_task_runner.py:55} INFO - Started process 1433 to run task
[2022-12-26, 16:04:37 UTC] {standard_task_runner.py:82} INFO - Running: ['***', 'tasks', 'run', 'test_os_postgres_to_gcs', 'test_postgres_to_gcs', 'scheduled__2022-12-25T00:00:00+00:00', '--job-id', '18', '--raw', '--subdir', 'DAGS_FOLDER/test_postgres.py', '--cfg-path', '/tmp/tmpi1oep8jw']
[2022-12-26, 16:04:37 UTC] {standard_task_runner.py:83} INFO - Job 18: Subtask test_postgres_to_gcs
[2022-12-26, 16:04:37 UTC] {task_command.py:388} INFO - Running <TaskInstance: test_os_postgres_to_gcs.test_postgres_to_gcs scheduled__2022-12-25T00:00:00+00:00 [running]> on host d4b52cabcc3d
[2022-12-26, 16:04:37 UTC] {taskinstance.py:1512} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=gcp-data-platform
AIRFLOW_CTX_DAG_ID=test_os_postgres_to_gcs
AIRFLOW_CTX_TASK_ID=test_postgres_to_gcs
AIRFLOW_CTX_EXECUTION_DATE=2022-12-25T00:00:00+00:00
AIRFLOW_CTX_TRY_NUMBER=16
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-12-25T00:00:00+00:00
[2022-12-26, 16:04:37 UTC] {sql_to_gcs.py:140} INFO - Executing query
[2022-12-26, 16:04:37 UTC] {base.py:73} INFO - Using connection ID 'postgres_default' for task execution.
[2022-12-26, 16:04:37 UTC] {sql_to_gcs.py:159} INFO - Writing local data files
[2022-12-26, 16:04:37 UTC] {sql_to_gcs.py:251} INFO - Logging row: ['test_os_patch_gcs_to_bigquery', '/files/dags/test.py', 28748778209882151, {'__version': 1, 'dag': {'max_active_runs': 1, 'default_args': {'__var': {'owner': 'gcp-data-platform', 'retries': 1, 'retry_delay': 10, 'start_date': {'__var': 1659312000.0, '__type': 'datetime'}}, '__type': 'dict'}, 'edge_info': {}, 'catchup': False, 'timezone': 'UTC', 'fileloc': '/files/dags/test.py', '_task_group': {'_group_id': None, 'prefix_group_id': True, 'tooltip': '', 'ui_color': 'CornflowerBlue', 'ui_fgcolor': '#000', 'children': {'test_gcs_to_bigquery': ['operator', 'test_gcs_to_bigquery']}, 'upstream_group_ids': [], 'downstream_group_ids': [], 'upstream_task_ids': [], 'downstream_task_ids': []}, 'dataset_triggers': [], 'schedule_interval': '@daily', '_max_active_tasks': 2, '_dag_id': 'test_os_patch_gcs_to_bigquery', 'start_date': 1659312000.0, '_processor_dags_folder': '/files/dags', 'tasks': [{'ui_fgcolor': '#000', 'template_ext': ['.sql'], 'downstream_task_ids': [], 'retry_delay': 10.0, 'template_fields': ['bucket', 'source_objects', 'schema_object', 'schema_object_bucket', 'destination_project_dataset_table', 'impersonation_chain'], 'retries': 1, 'owner': 'gcp-data-platform', 'pool': 'default_pool', 'task_id': 'test_gcs_to_bigquery', 'ui_color': '#f0eee4', 'template_fields_renderers': {}, '_task_type': 'GCSToBigQueryOperator', '_task_module': '***.providers.google.cloud.transfers.gcs_to_bigquery', '_is_empty': False, '_operator_extra_links': [{'***.providers.google.cloud.links.bigquery.BigQueryTableLink': {}}], 'bucket': 'my-bucket', 'source_objects': ['vchiapaikeo/file.csv'], 'schema_object_bucket': 'my-bucket', 'destination_project_dataset_table': 'my-project.vchiapaikeo.test1'}], 'dag_dependencies': [], 'params': {}}}, None, 1672068095.636378, '276713bbfc190eed20dcb70780b5b47e', None]
[2022-12-26, 16:04:37 UTC] {sql_to_gcs.py:252} INFO - Logging schema: ['dag_id', 'fileloc', 'fileloc_hash', 'data', 'data_compressed', 'last_updated', 'dag_hash', 'processor_subdir']
[2022-12-26, 16:04:37 UTC] {sql_to_gcs.py:253} INFO - Logging row_pydic: {'dag_id': ['test_os_patch_gcs_to_bigquery'], 'fileloc': ['/files/dags/test.py'], 'fileloc_hash': [28748778209882151], 'data': [{'__version': 1, 'dag': {'max_active_runs': 1, 'default_args': {'__var': {'owner': 'gcp-data-platform', 'retries': 1, 'retry_delay': 10, 'start_date': {'__var': 1659312000.0, '__type': 'datetime'}}, '__type': 'dict'}, 'edge_info': {}, 'catchup': False, 'timezone': 'UTC', 'fileloc': '/files/dags/test.py', '_task_group': {'_group_id': None, 'prefix_group_id': True, 'tooltip': '', 'ui_color': 'CornflowerBlue', 'ui_fgcolor': '#000', 'children': {'test_gcs_to_bigquery': ['operator', 'test_gcs_to_bigquery']}, 'upstream_group_ids': [], 'downstream_group_ids': [], 'upstream_task_ids': [], 'downstream_task_ids': []}, 'dataset_triggers': [], 'schedule_interval': '@daily', '_max_active_tasks': 2, '_dag_id': 'test_os_patch_gcs_to_bigquery', 'start_date': 1659312000.0, '_processor_dags_folder': '/files/dags', 'tasks': [{'ui_fgcolor': '#000', 'template_ext': ['.sql'], 'downstream_task_ids': [], 'retry_delay': 10.0, 'template_fields': ['bucket', 'source_objects', 'schema_object', 'schema_object_bucket', 'destination_project_dataset_table', 'impersonation_chain'], 'retries': 1, 'owner': 'gcp-data-platform', 'pool': 'default_pool', 'task_id': 'test_gcs_to_bigquery', 'ui_color': '#f0eee4', 'template_fields_renderers': {}, '_task_type': 'GCSToBigQueryOperator', '_task_module': '***.providers.google.cloud.transfers.gcs_to_bigquery', '_is_empty': False, '_operator_extra_links': [{'***.providers.google.cloud.links.bigquery.BigQueryTableLink': {}}], 'bucket': 'my-bucket', 'source_objects': ['vchiapaikeo/file.csv'], 'schema_object_bucket': 'my-bucket', 'destination_project_dataset_table': 'my-project.vchiapaikeo.test1'}], 'dag_dependencies': [], 'params': {}}}], 'data_compressed': [None], 'last_updated': [1672068095.636378], 'dag_hash': ['276713bbfc190eed20dcb70780b5b47e'], 'processor_subdir': [None]}
[2022-12-26, 16:04:37 UTC] {sql_to_gcs.py:254} INFO - Logging parquet_schema: dag_id: string
fileloc: string
fileloc_hash: int64
data: string
data_compressed: string
last_updated: timestamp[s]
dag_hash: string
processor_subdir: string
[2022-12-26, 16:04:37 UTC] {taskinstance.py:1782} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/airflow/airflow/providers/google/cloud/transfers/sql_to_gcs.py", line 160, in execute
for file_to_upload in self._write_local_data_files(cursor):
File "/opt/airflow/airflow/providers/google/cloud/transfers/sql_to_gcs.py", line 255, in _write_local_data_files
tbl = pa.Table.from_pydict(row_pydic, parquet_schema)
File "pyarrow/table.pxi", line 3648, in pyarrow.lib.Table.from_pydict
File "pyarrow/table.pxi", line 5191, in pyarrow.lib._from_pydict
File "pyarrow/array.pxi", line 343, in pyarrow.lib.asarray
File "pyarrow/array.pxi", line 317, in pyarrow.lib.array
File "pyarrow/array.pxi", line 39, in pyarrow.lib._sequence_to_array
File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
File "pyarrow/error.pxi", line 123, in pyarrow.lib.check_status
pyarrow.lib.ArrowTypeError: Expected bytes, got a 'dict' object
[2022-12-26, 16:04:37 UTC] {taskinstance.py:1326} INFO - Marking task as UP_FOR_RETRY. dag_id=test_os_postgres_to_gcs, task_id=test_postgres_to_gcs, execution_date=20221225T000000, start_date=20221226T160437, end_date=20221226T160437
[2022-12-26, 16:04:37 UTC] {standard_task_runner.py:105} ERROR - Failed to execute job 18 for task test_postgres_to_gcs (Expected bytes, got a 'dict' object; 1433)
[2022-12-26, 16:04:37 UTC] {local_task_job.py:208} INFO - Task exited with return code 1
[2022-12-26, 16:04:37 UTC] {taskinstance.py:2598} INFO - 0 downstream tasks scheduled from follow-on schedule check
Thanks @vchiapaikeo ! Closing then as can't reproduce
Just to close the loop on the JSON issue - with the stringify_dict option set to True, this works just fine. The following dag exports successfully to GCS:
from airflow import DAG
from airflow.providers.google.cloud.transfers.postgres_to_gcs import (
PostgresToGCSOperator,
)
DEFAULT_TASK_ARGS = {
"owner": "gcp-data-platform",
"retries": 1,
"retry_delay": 10,
"start_date": "2022-08-01",
}
with DAG(
max_active_runs=1,
concurrency=2,
catchup=False,
schedule_interval="@daily",
dag_id="test_os_postgres_to_gcs",
default_args=DEFAULT_TASK_ARGS,
) as dag:
test_postgres_to_gcs = PostgresToGCSOperator(
task_id="test_postgres_to_gcs",
postgres_conn_id="postgres_default",
sql="""
SELECT
dag_id, # varchar
fileloc_hash, # bigint
last_updated, # timestampz
data_compressed, # bytea
data # json
FROM serialized_dag
LIMIT 20
""",
export_format="parquet",
gcp_conn_id="google_cloud_default",
bucket="my-bucket",
filename="vchiapaikeo/sql-to-gcs/file.parquet",
stringify_dict=True,
)
And the output:
vchiapaikeo@7676:airflow-src (vchiapaikeo/basesqltogcs-parquet-v1)$ gsutil cp gs://my-bucket/vchiapaikeo/sql-to-gcs/file.parquet .
Copying gs://my-bucket/vchiapaikeo/sql-to-gcs/file.parquet...
/ [1 files][ 25.0 KiB/ 25.0 KiB]
Operation completed over 1 objects/25.0 KiB.
vchiapaikeo@7676:airflow-src (vchiapaikeo/basesqltogcs-parquet-v1)$
vchiapaikeo@7676:airflow-src (vchiapaikeo/basesqltogcs-parquet-v1)$ pqrs cat file.parquet
##################
File: file.parquet
##################
{dag_id: "test_os_patch_gcs_to_bigquery", fileloc_hash: 28748778209882151, last_updated: 2022-12-27 14:33:10 +00:00, data_compressed: null, data: "{"__version": 1, "dag": {"fileloc": "/files/dags/test.py", "schedule_interval": "@daily", "timezone": "UTC", "catchup": false, "dataset_triggers": [], "default_args": {"__var": {"owner": "gcp-data-platform", "retries": 1, "retry_delay": 10, "start_date": {"__var": 1659312000.0, "__type": "datetime"}}, "__type": "dict"}, "_dag_id": "test_os_patch_gcs_to_bigquery", "_max_active_tasks": 2, "max_active_runs": 1, "_task_group": {"_group_id": null, "prefix_group_id": true, "tooltip": "", "ui_color": "CornflowerBlue", "ui_fgcolor": "#000", "children": {"test_gcs_to_bigquery": ["operator", "test_gcs_to_bigquery"]}, "upstream_group_ids": [], "downstream_group_ids": [], "upstream_task_ids": [], "downstream_task_ids": []}, "start_date": 1659312000.0, "edge_info": {}, "_processor_dags_folder": "/files/dags", "tasks": [{"retries": 1, "ui_color": "#f0eee4", "template_fields": ["bucket", "source_objects", "schema_object", "schema_object_bucket", "destination_project_dataset_table", "impersonation_chain"], "template_fields_renderers": {}, "pool": "default_pool", "template_ext": [".sql"], "retry_delay": 10.0, "owner": "gcp-data-platform", "ui_fgcolor": "#000", "task_id": "test_gcs_to_bigquery", "downstream_task_ids": [], "_task_type": "GCSToBigQueryOperator", "_task_module": "airflow.providers.google.cloud.transfers.gcs_to_bigquery", "_is_empty": false, "_operator_extra_links": [{"airflow.providers.google.cloud.links.bigquery.BigQueryTableLink": {}}], "bucket": "my-bucket", "source_objects": ["vchiapaikeo/file.csv"], "schema_object_bucket": "my-bucket", "destination_project_dataset_table": "my-project.vchiapaikeo.test1"}], "dag_dependencies": [], "params": {}}}"}
{dag_id: "test_os_postgres_to_gcs", fileloc_hash: 2152014207817635, last_updated: 2022-12-27 15:20:35 +00:00, data_compressed: null, data: "{"__version": 1, "dag": {"start_date": 1659312000.0, "_dag_id": "test_os_postgres_to_gcs", "schedule_interval": "@daily", "_max_active_tasks": 2, "max_active_runs": 1, "_task_group": {"_group_id": null, "prefix_group_id": true, "tooltip": "", "ui_color": "CornflowerBlue", "ui_fgcolor": "#000", "children": {"test_postgres_to_gcs": ["operator", "test_postgres_to_gcs"]}, "upstream_group_ids": [], "downstream_group_ids": [], "upstream_task_ids": [], "downstream_task_ids": []}, "default_args": {"__var": {"owner": "gcp-data-platform", "retries": 1, "retry_delay": 10, "start_date": {"__var": 1659312000.0, "__type": "datetime"}}, "__type": "dict"}, "edge_info": {}, "timezone": "UTC", "catchup": false, "dataset_triggers": [], "fileloc": "/files/dags/test_postgres.py", "_processor_dags_folder": "/files/dags", "tasks": [{"retry_delay": 10.0, "task_id": "test_postgres_to_gcs", "template_ext": [".sql"], "retries": 1, "ui_color": "#a0e08c", "template_fields_renderers": {"sql": "sql"}, "downstream_task_ids": [], "template_fields": ["sql", "bucket", "filename", "schema_filename", "schema", "parameters", "impersonation_chain"], "owner": "gcp-data-platform", "pool": "default_pool", "ui_fgcolor": "#000", "_task_type": "PostgresToGCSOperator", "_task_module": "airflow.providers.google.cloud.transfers.postgres_to_gcs", "_is_empty": false, "sql": "\n SELECT dag_id, fileloc_hash, last_updated, data_compressed, data\n FROM serialized_dag\n LIMIT 20\n ", "bucket": "my-bucket", "filename": "vchiapaikeo/sql-to-gcs/file.parquet"}], "dag_dependencies": [], "params": {}}}"}
However, one of the drawbacks with this approach is that the JSON gets turned into a string type and not a deserialized representation which might be more efficient for querying parquet.
vchiapaikeo@7676:airflow-src (vchiapaikeo/basesqltogcs-parquet-v1)$ pqrs schema file.parquet
Metadata for file: file.parquet
version: 2
num of rows: 2
created by: parquet-cpp-arrow version 9.0.0
metadata:
ARROW:schema: /////0gBAAAQAAAAAAAKAAwABgAFAAgACgAAAAABBAAMAAAACAAIAAAABAAIAAAABAAAAAUAAADsAAAAmAAAAGQAAAAwAAAABAAAADj///8AAAEFEAAAABgAAAAEAAAAAAAAAAQAAABkYXRhAAAAACj///9g////AAABBRAAAAAgAAAABAAAAAAAAAAPAAAAZGF0YV9jb21wcmVzc2VkAFj///+Q////AAABChAAAAAgAAAABAAAAAAAAAAMAAAAbGFzdF91cGRhdGVkAAAAAIj////A////AAABAhAAAAAoAAAABAAAAAAAAAAMAAAAZmlsZWxvY19oYXNoAAAAAAgADAAIAAcACAAAAAAAAAFAAAAAEAAUAAgABgAHAAwAAAAQABAAAAAAAAEFEAAAABwAAAAEAAAAAAAAAAYAAABkYWdfaWQAAAQABAAEAAAA
message schema {
OPTIONAL BYTE_ARRAY dag_id (STRING);
OPTIONAL INT64 fileloc_hash;
OPTIONAL INT64 last_updated (TIMESTAMP(MILLIS,false));
OPTIONAL BYTE_ARRAY data_compressed (STRING);
OPTIONAL BYTE_ARRAY data (STRING);
}
Hi,
I'm running into this very same issue. I'm using MySQLToGCSOperator with export_format="parquet".
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/transfers/sql_to_gcs.py", line 181, in execute
for file_to_upload in self._write_local_data_files(cursor):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/transfers/sql_to_gcs.py", line 304, in _write_local_data_files
self._write_rows_to_parquet(parquet_writer, rows_buffer)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/transfers/sql_to_gcs.py", line 227, in _write_rows_to_parquet
tbl = pa.Table.from_pydict(rows_pydic, parquet_writer.schema)
File "pyarrow/table.pxi", line 3725, in pyarrow.lib.Table.from_pydict
File "pyarrow/table.pxi", line 5271, in pyarrow.lib._from_pydict
File "pyarrow/array.pxi", line 350, in pyarrow.lib.asarray
File "pyarrow/array.pxi", line 320, in pyarrow.lib.array
File "pyarrow/array.pxi", line 39, in pyarrow.lib._sequence_to_array
File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
File "pyarrow/error.pxi", line 123, in pyarrow.lib.check_status
pyarrow.lib.ArrowTypeError: object of type <class 'str'> cannot be converted to int
I'm on Cloud Composer 2.4.6, Airflow 2.6.3. I think as a workaround, I might either try exporting to CSV instead, or exporting the timestamp fields as Unix time.