astro-sdk icon indicating copy to clipboard operation
astro-sdk copied to clipboard

Mapped Tasks Not `cleanup`'d?

Open fritz-astronomer opened this issue 2 years ago • 3 comments

Describe the bug It appears that mapped tasks aren't cleaned up via cleanup. Relevant logs (note: there were TempTable objects in the XCOM view for mapped tasks in this DAG)

[2022-09-29, 16:01:35 UTC] {cleanup.py:183} INFO - No tables provided, will delete all temporary tables
[2022-09-29, 16:01:35 UTC] {cleanup.py:78} INFO - Tables found for cleanup: 

Version

  • astro-sdk-python[google,snowflake]==1.1.0
  • astro-runtime: 6.0.0

To Reproduce Load was a Parquet File from GCS into Snowflake. There was another MergeOperator after this, but that shouldn't be relevant

Steps to reproduce the behavior:

from airflow.models.dag import DAG
from astro.sql import LoadFileOperator, Table, cleanup
from astro.files import File
from datetime import datetime

with DAG("error", start_date=datetime(1970,1,1), schedule=None) as dag:
  LoadFileOperator.partial(
    task_id="foo",
    ...
  ).expand_kwargs([{
    "input_file": File(path=..., conn_id=...),
    "output_table": Table(temp=True, conn_id=..., metadata=..., columns=...)
  }])

  cleanup()

Expected behavior Temp tables should be cleaned up, even with mapped tasks

fritz-astronomer avatar Sep 29 '22 17:09 fritz-astronomer

@feluelle This would be a good one for you to take on if you have some bandwidth this week.

kaxil avatar Oct 04 '22 21:10 kaxil

Yes, let me take it :)

feluelle avatar Oct 05 '22 11:10 feluelle

Moving to 1.3.0 as we are not sure that this will be completed by the deadline.

phanikumv avatar Oct 11 '22 16:10 phanikumv

@feluelle is this ready for review yet?

phanikumv avatar Oct 26 '22 11:10 phanikumv

The PR is still in draft, because the issue is not yet fixed.

feluelle avatar Oct 26 '22 18:10 feluelle

Short update: Good news. I can reproduce this.

The following gets cleaned up on Snowflake by using aql.cleanup():

LoadFileOperator(
    task_id="foo",
    input_file=File(path=(CWD.parent.parent / "data/sample.csv").as_posix()),
    output_table=temp_table,
)

whereas this not:

LoadFileOperator.partial(
    task_id="foo",
).expand_kwargs([{
    "input_file": File(path=(CWD.parent.parent / "data/sample.csv").as_posix()),
    "output_table": temp_table,
}])

tested in our local test environment using test_utils.run_dag(sample_dag).

I will continue to investigate and will let you know when I know more.

feluelle avatar Oct 27 '22 16:10 feluelle

I have good news. I was able to resolve the issue. PR is ready for review here.

feluelle avatar Oct 28 '22 15:10 feluelle