astro-sdk icon indicating copy to clipboard operation
astro-sdk copied to clipboard

Test Migration path for Python SDK 1.1 to 1.2

Open kaxil opened this issue 2 years ago • 3 comments

We should test that users who enabled pickling are able to upgrade to Python SDK 1.2 from Python SDK 1.1 with a) pickling enabled and b) using Astro SDK Custom XCom backend built in https://github.com/astronomer/astro-sdk/pull/970

A good test would be install Astro SDK 1.1 and run some example dags and then upgrade python SDK to 1.2 and run the same DAGs. Check whether you can access XCom view in the Webserver too. Next test should be to disable pickling and use our Custom XCom backend and repeat the test

kaxil avatar Oct 11 '22 23:10 kaxil

Also looks like we don't have any docs about setting Custom XCom backend in https://astro-sdk-python.readthedocs.io/en/latest/index.html unless we add https://github.com/astronomer/astro-sdk/pull/1036 ?

kaxil avatar Oct 11 '22 23:10 kaxil

Getting the below error when I disable XCOM pickling, and execute my DAG using the latest code base on main branch. I did enable the custom xcom backend with AIRFLOW__CORE__XCOM_BACKEND: astro.custom_backend.astro_custom_backend.AstroCustomXcomBackend Am I missing something here?

cc @dimberman

[2022-10-12, 13:17:26 UTC] {xcom.py:599} ERROR - Could not serialize the XCom value into JSON. If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your *** config.
[2022-10-12, 13:17:26 UTC] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", line 72, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 2378, in xcom_push
    XCom.set(
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", line 72, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/xcom.py", line 206, in set
    value = cls.serialize_value(
  File "/usr/local/lib/python3.9/site-packages/airflow/models/xcom.py", line 597, in serialize_value
    return json.dumps(value).encode('UTF-8')
  File "/usr/local/lib/python3.9/json/__init__.py", line 231, in dumps
    return _default_encoder.encode(obj)
  File "/usr/local/lib/python3.9/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/usr/local/lib/python3.9/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/usr/local/lib/python3.9/json/encoder.py", line 179, in default
    raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type Table is not JSON serializable

phanikumv avatar Oct 12 '22 17:10 phanikumv

Tested this again after https://github.com/astronomer/astro-sdk/pull/1060 is merged.

Scenario: Enabled pickling, used astro-sdk-python==1.1.1 image image

Task

load_table_with_data = aql.load_file(
        input_file=File(path=f"{S3_BUCKET_NAME_SOURCE}/crxbank1.csv",
                        filetype=FileType.CSV, conn_id=AWS_CONN_ID),
        task_id="load_csv_data_bank1",
        # output_table=crx_data_table,
        outlets=[Dataset("snowflake://crx_data_bank1")],
        do_xcom_push=True
)

Result

[2022-10-14, 17:31:47 UTC] {base.py:71} INFO - Using connection ID 'aws_s3_default' for task execution.
[2022-10-14, 17:31:47 UTC] {load_file.py:82} INFO - Loading s3://ingestionfeeds/crxbank1.csv into None ...
[2022-10-14, 17:31:47 UTC] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/astro/sql/operators/load_file.py", line 78, in execute
    return self.load_data(input_file=self.input_file)
  File "/usr/local/lib/python3.9/site-packages/astro/sql/operators/load_file.py", line 90, in load_data
    raise IllegalLoadToDatabaseException()
astro.exceptions.IllegalLoadToDatabaseException: Failing this task because you do not have a custom xcom backend set up. If you use the default XCOM backend to store large dataframes, this can significantly degrade Airflow DB performance. Please set up a custom XCOM backend (info here https://www.astronomer.io/guides/custom-xcom-backends) or set the environment variable AIRFLOW__ASTRO_SDK__DATAFRAME_ALLOW_UNSAFE_STORAGE to true if you wish to proceed while knowing the risks. 

Task

load_table_with_data = aql.load_file(
        input_file=File(path=f"{S3_BUCKET_NAME_SOURCE}/crxbank1.csv",
                        filetype=FileType.CSV, conn_id=AWS_CONN_ID),
        task_id="load_csv_data_bank1",
        output_table=crx_data_table,
        outlets=[Dataset("snowflake://crx_data_bank1")],
        do_xcom_push=True
)

Result

[2022-10-14, 17:37:55 UTC] {sql.py:324} INFO - Rows affected: 1
[2022-10-14, 17:37:55 UTC] {snowflake.py:342} INFO - Rows affected: 1
[2022-10-14, 17:37:55 UTC] {snowflake.py:343} INFO - Snowflake query id: 01a7a081-0605-9efa-0000-682113b71a2e
[2022-10-14, 17:37:55 UTC] {connection.py:564} INFO - closed
[2022-10-14, 17:37:55 UTC] {connection.py:567} INFO - No async queries seem to be running, deleting session
[2022-10-14, 17:37:55 UTC] {load_file.py:119} INFO - Completed loading the data into Table(_name='temp_crx_data_bank1', conn_id='snowflake_default', metadata=Metadata(schema='PHANIKUMAR', database=None), columns=[], temp=False, uri='astro://snowflake_default@?table=temp_crx_data_bank1&schema=PHANIKUMAR', extra={}).
[2022-10-14, 17:37:56 UTC] {warnings.py:109} WARNING - /usr/local/lib/python3.9/site-packages/***/models/baseoperator.py:1389: RemovedInAirflow3Warning: Passing 'execution_date' to 'TaskInstance.xcom_push()' is deprecated.
  context['ti'].xcom_push(key=key, value=value, execution_date=execution_date)

[2022-10-14, 17:37:56 UTC] {taskinstance.py:1401} INFO - Marking task as SUCCESS.

Scenario:

Enabled pickling, used latest code on main branch

Task

load_table_with_data = aql.load_file(
        input_file=File(path=f"{S3_BUCKET_NAME_SOURCE}/crxbank1.csv",
                        filetype=FileType.CSV, conn_id=AWS_CONN_ID),
        task_id="load_csv_data_bank1",
        output_table=crx_data_table,
        outlets=[Dataset("snowflake://crx_data_bank1")],
        do_xcom_push=True
    )

Result

[2022-10-15, 03:18:32 UTC] {snowflake.py:342} INFO - Rows affected: 1
[2022-10-15, 03:18:32 UTC] {snowflake.py:343} INFO - Snowflake query id: 01a7a2c6-0605-a3ae-0000-682113c1d31a
[2022-10-15, 03:18:32 UTC] {connection.py:564} INFO - closed
[2022-10-15, 03:18:32 UTC] {connection.py:567} INFO - No async queries seem to be running, deleting session
[2022-10-15, 03:18:32 UTC] {load_file.py:117} INFO - Completed loading the data into Table(_name='temp_crx_data_bank1', conn_id='snowflake_default', metadata=Metadata(schema='PHANIKUMAR', database=None), columns=[], temp=False, uri='astro://snowflake_default@?table=temp_crx_data_bank1&schema=PHANIKUMAR', extra={}).
[2022-10-15, 03:18:33 UTC] {warnings.py:109} WARNING - /usr/local/lib/python3.9/site-packages/***/models/baseoperator.py:1389: RemovedInAirflow3Warning: Passing 'execution_date' to 'TaskInstance.xcom_push()' is deprecated.
  context['ti'].xcom_push(key=key, value=value, execution_date=execution_date)

[2022-10-15, 03:18:33 UTC] {taskinstance.py:1401} INFO - Marking task as SUCCESS.

Scenario: Disabled pickling, used latest code on main branch and tried to save dataframe as xcom in s3. image image

Task

load_table_with_data = aql.load_file(
        input_file=File(path=f"{S3_BUCKET_NAME_SOURCE}/crxbank1.csv",
                        filetype=FileType.CSV, conn_id=AWS_CONN_ID),
        task_id="load_csv_data_bank1",
        outlets=[Dataset("snowflake://crx_data_bank1")],
        do_xcom_push=True
)
    

Result The task ran successfully.

2022-10-14, 16:46:57 UTC] {base.py:71} INFO - Using connection ID 'aws_s3_default' for task execution.
[2022-10-14, 16:46:57 UTC] {load_file.py:87} INFO - Loading s3://ingestionfeeds/crxbank1.csv into None ...
[2022-10-14, 16:46:57 UTC] {base.py:71} INFO - Using connection ID 'aws_s3_default' for task execution.
[2022-10-14, 16:46:57 UTC] {connection_wrapper.py:292} INFO - AWS Connection (conn_id='aws_s3_default', conn_type='aws') credentials retrieved from login and password.
[2022-10-14, 16:47:01 UTC] {base.py:71} INFO - Using connection ID 'aws_s3_default' for task execution.
[2022-10-14, 16:47:01 UTC] {connection_wrapper.py:292} INFO - AWS Connection (conn_id='aws_s3_default', conn_type='aws') credentials retrieved from login and password.
[2022-10-14, 16:47:05 UTC] {load_file.py:144} INFO - Completed loading the data into dataframe.
[2022-10-14, 16:47:05 UTC] {base.py:71} INFO - Using connection ID 'aws_s3_default' for task execution.
[2022-10-14, 16:47:05 UTC] {connection_wrapper.py:292} INFO - AWS Connection (conn_id='aws_s3_default', conn_type='aws') credentials retrieved from login and password.
[2022-10-14, 16:47:09 UTC] {s3.py:901} INFO - smart_open.s3.MultipartWriter('xcombucket', 'xcomfolder//spjd9l4y0psya2051ip4pvjls07kqfrnkpffd4xuiawe30etcu3x0zb1lhg9r2ekr.parquet'): uploading part_num: 1, 10186 bytes (total 0.000GB)
[2022-10-14, 16:47:11 UTC] {warnings.py:109} WARNING - /usr/local/lib/python3.9/site-packages/***/models/xcom.py:649: RemovedInAirflow3Warning: Method `serialize_value` in XCom backend AstroCustomXcomBackend is using outdated signature andmust be updated to accept all params in `BaseXCom.set` except `session`. Support will be removed in a future release.
  warnings.warn(

[2022-10-14, 16:47:11 UTC] {warnings.py:109} WARNING - /usr/local/lib/python3.9/site-packages/***/models/baseoperator.py:1389: RemovedInAirflow3Warning: Passing 'execution_date' to 'TaskInstance.xcom_push()' is deprecated.
  context['ti'].xcom_push(key=key, value=value, execution_date=execution_date)

[2022-10-14, 16:47:11 UTC] {taskinstance.py:1401} INFO - Marking task as SUCCESS. dag_id=load_cleansed_data_to_snowflake_bank1, task_id=load_csv_data_bank1, execution_date=20221014T164653, start_date=20221014T164656, end_date=20221014T164711

The dataframe got saved successfully in s3.

Screenshot 2022-10-14 at 7 28 04 PM

phanikumv avatar Oct 14 '22 16:10 phanikumv

Scenario:

  • Install SDK 1.1.1, run example_dag with load_file -- and it should succeed (with XCom pickling)
  • Upgrade to the code in the main branch, and you clear the task that succeeded previously

Verify this by checking that the Task Run history on the UI is visible.

My fear is that if you visit Admin -> Xcom page might blow up. Will be good to test it out

kaxil avatar Oct 17 '22 12:10 kaxil

Scenario:

  • Install SDK 1.1.1, run example_dag with load_file -- and it should succeed (with XCom pickling)
  • Upgrade to the code in the main branch, and you clear the task that succeeded previously

Verify this by checking that the Task Run history on the UI is visible.

My fear is that if you visit Admin -> Xcom page might blow up. Will be good to test it out

These scenarios work. Please find the results for each of the scenario below @kaxil .

SDK 1.1.1 with Xcom picking enabled Screenshot 2022-10-17 at 8 47 52 PM

Upgrade code to current main branch with Xcom picking enabled, restarted containers(using make stop and make build-run) and cleared the task Screenshot 2022-10-17 at 9 04 47 PM

Used code from current main branch, with Xcom picking disabled, custom Xcom enabled, restarted containers(using make stop and make build-run) and cleared the task Screenshot 2022-10-17 at 9 16 09 PM

phanikumv avatar Oct 17 '22 15:10 phanikumv

Closing this as the upgrade scenarios have been tested

phanikumv avatar Oct 18 '22 08:10 phanikumv