dagster icon indicating copy to clipboard operation
dagster copied to clipboard

dagster doesn't properly release locked duckdb with partitioned asset

Open Sage0614 opened this issue 1 year ago • 2 comments

Dagster version

1.5.14

What's the issue?

I use dagster_duckdb_polars for partitioned asset with time window partition, when I try to backfill for the whole history the issue is I experience some partition fails because of "Could not set lock on file "db.db": Resource temporarily unavailable"

I suspect it is dagster's issue that doesn't realease the table lock properly, because:

  1. not all partition failed, I have around 120 partions, somewhere around 20 fails if backfill all, around 3 fails when I try to backfill the failed partions, and after 3-4 times retry all partitions will succeed;
  2. which partition is going to fail is quite random;
  3. I tried backfill_policy=BackfillPolicy(max_partitions_per_run=1) for the asset, and in the job defination set {"execution": {"config": {"multiprocess": {{"max_concurrent": 1}}}}} this doesn't relieve the issue

What did you expect to happen?

should be able to write to duckdb with partitioned data without cause table to be locked, at least in sequencial mode (max_concurrent =1)

How to reproduce?

create some large upsteam data and write to a duckdb table partitioned by time, in my case I have 120 monthly partition and upstream data have around 100 million records

Deployment type

Local

Deployment details

No response

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

Sage0614 avatar Jan 08 '24 16:01 Sage0614

I ran into the same issue during the sample project. If I rerun it passes along.

2024-05-04 20:35:43 -0400 - dagster - ERROR - __ASSET_JOB - 2fb83681-ffb8-4cdb-bf98-7e43645a9525 - 1027389 - manhattan_stats - STEP_FAILURE - Execution of step "manhattan_stats" failed.

dagster._core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "manhattan_stats"::

duckdb.duckdb.IOException: IO Error: Could not set lock on file "/home/michael/projects/dagster/dagster_university/data/staging/data.duckdb": Conflicting lock is held in /usr/bin/python3.11 (PID 1027395) by user michael. See also https://duckdb.org/docs/connect/concurrency

Stack Trace:
  File "/home/michael/projects/dagster/.venv/lib/python3.11/site-packages/dagster/_core/execution/plan/utils.py", line 54, in op_execution_error_boundary
    yield
  File "/home/michael/projects/dagster/.venv/lib/python3.11/site-packages/dagster/_utils/__init__.py", line 463, in iterate_with_context
    next_output = next(iterator)
                  ^^^^^^^^^^^^^^
  File "/home/michael/projects/dagster/.venv/lib/python3.11/site-packages/dagster/_core/execution/plan/compute_generator.py", line 131, in _coerce_op_compute_fn_to_iterator
    result = invoke_compute_fn(
             ^^^^^^^^^^^^^^^^^^
  File "/home/michael/projects/dagster/.venv/lib/python3.11/site-packages/dagster/_core/execution/plan/compute_generator.py", line 125, in invoke_compute_fn
    return fn(context, **args_to_pass) if context_arg_provided else fn(**args_to_pass)
                                                                    ^^^^^^^^^^^^^^^^^^
  File "/home/michael/projects/dagster/dagster_university/dagster_university/assets/metrics.py", line 29, in manhattan_stats
    conn = duckdb.connect(os.getenv("DUCKDB_DATABASE"))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

basaran avatar May 05 '24 00:05 basaran

I was able to work around this with a custom connect function wrapping retry logic:

def connect_to_duckdb(database, max_retries=10, retry_delay=2):
    """
    Attempts to connect to a DuckDB database with a retry mechanism.
    Args:
        max_retries (int): Maximum number of retries before giving up.
        retry_delay (int): Time to wait between retries (in seconds).

    Returns:
        duckdb.DuckDBPyConnection or None: Returns a DuckDB connection if successful, None otherwise.
    """
    retry_count = 0
    connection = None

    while retry_count < max_retries:
        try:
            connection = duckdb.connect(database)
            logging.info("Connected to DuckDB successfully!")
            break  # Exit the loop on successful connection
        except Exception as e:
            logging.error(
                f"Connection attempt failed: {e}. Retrying in {retry_delay} seconds...")
            time.sleep(retry_delay)  # Wait before retrying
            retry_count += 1  # Increment the retry count

    if connection is None:
        logging.error("Failed to connect to DuckDB after maximum retries.")

    return connection  # Return the established connection or None if failed

It seems similar issues were reported in the past too. #18746.

basaran avatar May 05 '24 02:05 basaran

@basaran I am relatively new to both duckdb and dagster. I was wondering if the code that you shared is a helper function or some sort of custom dagster resource? Also, are you using this code snippet instead of importing DuckDBResource? Thanks for any help!

nobelsmith avatar Jun 18 '24 19:06 nobelsmith

I tried implementing your code snippet as a part of a custom duckdb resource using the ConfigurableResource class, but it didn't end up doing it for me. I still ran into the same problem with partitions. Is this something that will potentially be patched soon or should I look to use a different technology other than duckdb?

nobelsmith avatar Jun 19 '24 14:06 nobelsmith

@nobelsmith There was an attempt in this PR https://github.com/dagster-io/dagster/pull/18873 but it seems to have stalled. If you'd like you can try pick up the thread there.

garethbrickman avatar Jun 19 '24 14:06 garethbrickman

@nobelsmith, I just use the connect_to_duckdb function above in an asset. Such as:

@asset(
    deps=["taxi_zones_file"]
)
def taxi_zones(context):
    sql_query = f"""
        create or replace table zones as (
            select
                LocationID as zone_id,
                zone,
                borough,
                the_geom as geometry
        from '{constants.TAXI_ZONES_FILE_PATH}'
        );
    """
    # conn = duckdb.connect(os.getenv("DUCKDB_DATABASE"))
    conn = connect_to_duckdb(context, os.getenv('DUCKDB_DATABASE'))
    conn.execute(sql_query)

basaran avatar Jun 20 '24 19:06 basaran