astro-sdk
astro-sdk copied to clipboard
Test Migration path for Python SDK 1.1 to 1.2
We should test that users who enabled pickling are able to upgrade to Python SDK 1.2 from Python SDK 1.1 with a) pickling enabled and b) using Astro SDK Custom XCom backend built in https://github.com/astronomer/astro-sdk/pull/970
A good test would be install Astro SDK 1.1 and run some example dags and then upgrade python SDK to 1.2 and run the same DAGs. Check whether you can access XCom view in the Webserver too. Next test should be to disable pickling and use our Custom XCom backend and repeat the test
Also looks like we don't have any docs about setting Custom XCom backend in https://astro-sdk-python.readthedocs.io/en/latest/index.html unless we add https://github.com/astronomer/astro-sdk/pull/1036 ?
Getting the below error when I disable XCOM pickling, and execute my DAG using the latest code base on main branch. I did enable the custom xcom backend with
AIRFLOW__CORE__XCOM_BACKEND: astro.custom_backend.astro_custom_backend.AstroCustomXcomBackend
Am I missing something here?
cc @dimberman
[2022-10-12, 13:17:26 UTC] {xcom.py:599} ERROR - Could not serialize the XCom value into JSON. If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your *** config.
[2022-10-12, 13:17:26 UTC] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", line 72, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 2378, in xcom_push
XCom.set(
File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", line 72, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/airflow/models/xcom.py", line 206, in set
value = cls.serialize_value(
File "/usr/local/lib/python3.9/site-packages/airflow/models/xcom.py", line 597, in serialize_value
return json.dumps(value).encode('UTF-8')
File "/usr/local/lib/python3.9/json/__init__.py", line 231, in dumps
return _default_encoder.encode(obj)
File "/usr/local/lib/python3.9/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/usr/local/lib/python3.9/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/usr/local/lib/python3.9/json/encoder.py", line 179, in default
raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type Table is not JSON serializable
Tested this again after https://github.com/astronomer/astro-sdk/pull/1060 is merged.
Scenario:
Enabled pickling, used astro-sdk-python==1.1.1
Task
load_table_with_data = aql.load_file(
input_file=File(path=f"{S3_BUCKET_NAME_SOURCE}/crxbank1.csv",
filetype=FileType.CSV, conn_id=AWS_CONN_ID),
task_id="load_csv_data_bank1",
# output_table=crx_data_table,
outlets=[Dataset("snowflake://crx_data_bank1")],
do_xcom_push=True
)
Result
[2022-10-14, 17:31:47 UTC] {base.py:71} INFO - Using connection ID 'aws_s3_default' for task execution.
[2022-10-14, 17:31:47 UTC] {load_file.py:82} INFO - Loading s3://ingestionfeeds/crxbank1.csv into None ...
[2022-10-14, 17:31:47 UTC] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/astro/sql/operators/load_file.py", line 78, in execute
return self.load_data(input_file=self.input_file)
File "/usr/local/lib/python3.9/site-packages/astro/sql/operators/load_file.py", line 90, in load_data
raise IllegalLoadToDatabaseException()
astro.exceptions.IllegalLoadToDatabaseException: Failing this task because you do not have a custom xcom backend set up. If you use the default XCOM backend to store large dataframes, this can significantly degrade Airflow DB performance. Please set up a custom XCOM backend (info here https://www.astronomer.io/guides/custom-xcom-backends) or set the environment variable AIRFLOW__ASTRO_SDK__DATAFRAME_ALLOW_UNSAFE_STORAGE to true if you wish to proceed while knowing the risks.
Task
load_table_with_data = aql.load_file(
input_file=File(path=f"{S3_BUCKET_NAME_SOURCE}/crxbank1.csv",
filetype=FileType.CSV, conn_id=AWS_CONN_ID),
task_id="load_csv_data_bank1",
output_table=crx_data_table,
outlets=[Dataset("snowflake://crx_data_bank1")],
do_xcom_push=True
)
Result
[2022-10-14, 17:37:55 UTC] {sql.py:324} INFO - Rows affected: 1
[2022-10-14, 17:37:55 UTC] {snowflake.py:342} INFO - Rows affected: 1
[2022-10-14, 17:37:55 UTC] {snowflake.py:343} INFO - Snowflake query id: 01a7a081-0605-9efa-0000-682113b71a2e
[2022-10-14, 17:37:55 UTC] {connection.py:564} INFO - closed
[2022-10-14, 17:37:55 UTC] {connection.py:567} INFO - No async queries seem to be running, deleting session
[2022-10-14, 17:37:55 UTC] {load_file.py:119} INFO - Completed loading the data into Table(_name='temp_crx_data_bank1', conn_id='snowflake_default', metadata=Metadata(schema='PHANIKUMAR', database=None), columns=[], temp=False, uri='astro://snowflake_default@?table=temp_crx_data_bank1&schema=PHANIKUMAR', extra={}).
[2022-10-14, 17:37:56 UTC] {warnings.py:109} WARNING - /usr/local/lib/python3.9/site-packages/***/models/baseoperator.py:1389: RemovedInAirflow3Warning: Passing 'execution_date' to 'TaskInstance.xcom_push()' is deprecated.
context['ti'].xcom_push(key=key, value=value, execution_date=execution_date)
[2022-10-14, 17:37:56 UTC] {taskinstance.py:1401} INFO - Marking task as SUCCESS.
Scenario:
Enabled pickling, used latest code on main branch
Task
load_table_with_data = aql.load_file(
input_file=File(path=f"{S3_BUCKET_NAME_SOURCE}/crxbank1.csv",
filetype=FileType.CSV, conn_id=AWS_CONN_ID),
task_id="load_csv_data_bank1",
output_table=crx_data_table,
outlets=[Dataset("snowflake://crx_data_bank1")],
do_xcom_push=True
)
Result
[2022-10-15, 03:18:32 UTC] {snowflake.py:342} INFO - Rows affected: 1
[2022-10-15, 03:18:32 UTC] {snowflake.py:343} INFO - Snowflake query id: 01a7a2c6-0605-a3ae-0000-682113c1d31a
[2022-10-15, 03:18:32 UTC] {connection.py:564} INFO - closed
[2022-10-15, 03:18:32 UTC] {connection.py:567} INFO - No async queries seem to be running, deleting session
[2022-10-15, 03:18:32 UTC] {load_file.py:117} INFO - Completed loading the data into Table(_name='temp_crx_data_bank1', conn_id='snowflake_default', metadata=Metadata(schema='PHANIKUMAR', database=None), columns=[], temp=False, uri='astro://snowflake_default@?table=temp_crx_data_bank1&schema=PHANIKUMAR', extra={}).
[2022-10-15, 03:18:33 UTC] {warnings.py:109} WARNING - /usr/local/lib/python3.9/site-packages/***/models/baseoperator.py:1389: RemovedInAirflow3Warning: Passing 'execution_date' to 'TaskInstance.xcom_push()' is deprecated.
context['ti'].xcom_push(key=key, value=value, execution_date=execution_date)
[2022-10-15, 03:18:33 UTC] {taskinstance.py:1401} INFO - Marking task as SUCCESS.
Scenario:
Disabled pickling, used latest code on main branch and tried to save dataframe as xcom in s3.
Task
load_table_with_data = aql.load_file(
input_file=File(path=f"{S3_BUCKET_NAME_SOURCE}/crxbank1.csv",
filetype=FileType.CSV, conn_id=AWS_CONN_ID),
task_id="load_csv_data_bank1",
outlets=[Dataset("snowflake://crx_data_bank1")],
do_xcom_push=True
)
Result The task ran successfully.
2022-10-14, 16:46:57 UTC] {base.py:71} INFO - Using connection ID 'aws_s3_default' for task execution.
[2022-10-14, 16:46:57 UTC] {load_file.py:87} INFO - Loading s3://ingestionfeeds/crxbank1.csv into None ...
[2022-10-14, 16:46:57 UTC] {base.py:71} INFO - Using connection ID 'aws_s3_default' for task execution.
[2022-10-14, 16:46:57 UTC] {connection_wrapper.py:292} INFO - AWS Connection (conn_id='aws_s3_default', conn_type='aws') credentials retrieved from login and password.
[2022-10-14, 16:47:01 UTC] {base.py:71} INFO - Using connection ID 'aws_s3_default' for task execution.
[2022-10-14, 16:47:01 UTC] {connection_wrapper.py:292} INFO - AWS Connection (conn_id='aws_s3_default', conn_type='aws') credentials retrieved from login and password.
[2022-10-14, 16:47:05 UTC] {load_file.py:144} INFO - Completed loading the data into dataframe.
[2022-10-14, 16:47:05 UTC] {base.py:71} INFO - Using connection ID 'aws_s3_default' for task execution.
[2022-10-14, 16:47:05 UTC] {connection_wrapper.py:292} INFO - AWS Connection (conn_id='aws_s3_default', conn_type='aws') credentials retrieved from login and password.
[2022-10-14, 16:47:09 UTC] {s3.py:901} INFO - smart_open.s3.MultipartWriter('xcombucket', 'xcomfolder//spjd9l4y0psya2051ip4pvjls07kqfrnkpffd4xuiawe30etcu3x0zb1lhg9r2ekr.parquet'): uploading part_num: 1, 10186 bytes (total 0.000GB)
[2022-10-14, 16:47:11 UTC] {warnings.py:109} WARNING - /usr/local/lib/python3.9/site-packages/***/models/xcom.py:649: RemovedInAirflow3Warning: Method `serialize_value` in XCom backend AstroCustomXcomBackend is using outdated signature andmust be updated to accept all params in `BaseXCom.set` except `session`. Support will be removed in a future release.
warnings.warn(
[2022-10-14, 16:47:11 UTC] {warnings.py:109} WARNING - /usr/local/lib/python3.9/site-packages/***/models/baseoperator.py:1389: RemovedInAirflow3Warning: Passing 'execution_date' to 'TaskInstance.xcom_push()' is deprecated.
context['ti'].xcom_push(key=key, value=value, execution_date=execution_date)
[2022-10-14, 16:47:11 UTC] {taskinstance.py:1401} INFO - Marking task as SUCCESS. dag_id=load_cleansed_data_to_snowflake_bank1, task_id=load_csv_data_bank1, execution_date=20221014T164653, start_date=20221014T164656, end_date=20221014T164711
The dataframe got saved successfully in s3.

Scenario:
- Install SDK 1.1.1, run
example_dag
withload_file
-- and it should succeed (with XCom pickling) - Upgrade to the code in the main branch, and you clear the task that succeeded previously
Verify this by checking that the Task Run history on the UI is visible.
My fear is that if you visit Admin -> Xcom page might blow up. Will be good to test it out
Scenario:
- Install SDK 1.1.1, run
example_dag
withload_file
-- and it should succeed (with XCom pickling)- Upgrade to the code in the main branch, and you clear the task that succeeded previously
Verify this by checking that the Task Run history on the UI is visible.
My fear is that if you visit Admin -> Xcom page might blow up. Will be good to test it out
These scenarios work. Please find the results for each of the scenario below @kaxil .
SDK 1.1.1 with Xcom picking enabled
Upgrade code to current main branch with Xcom picking enabled, restarted containers(using make stop
and make build-run
) and cleared the task
Used code from current main branch, with Xcom picking disabled, custom Xcom enabled, restarted containers(using make stop
and make build-run
) and cleared the task
Closing this as the upgrade scenarios have been tested