dagster
dagster copied to clipboard
dagster doesn't properly release locked duckdb with partitioned asset
Dagster version
1.5.14
What's the issue?
I use dagster_duckdb_polars for partitioned asset with time window partition, when I try to backfill for the whole history the issue is I experience some partition fails because of "Could not set lock on file "db.db": Resource temporarily unavailable"
I suspect it is dagster's issue that doesn't realease the table lock properly, because:
- not all partition failed, I have around 120 partions, somewhere around 20 fails if backfill all, around 3 fails when I try to backfill the failed partions, and after 3-4 times retry all partitions will succeed;
- which partition is going to fail is quite random;
- I tried
backfill_policy=BackfillPolicy(max_partitions_per_run=1)
for the asset, and in the job defination set{"execution": {"config": {"multiprocess": {{"max_concurrent": 1}}}}}
this doesn't relieve the issue
What did you expect to happen?
should be able to write to duckdb with partitioned data without cause table to be locked, at least in sequencial mode (max_concurrent =1)
How to reproduce?
create some large upsteam data and write to a duckdb table partitioned by time, in my case I have 120 monthly partition and upstream data have around 100 million records
Deployment type
Local
Deployment details
No response
Additional information
No response
Message from the maintainers
Impacted by this issue? Give it a 👍! We factor engagement into prioritization.
I ran into the same issue during the sample project. If I rerun it passes along.
2024-05-04 20:35:43 -0400 - dagster - ERROR - __ASSET_JOB - 2fb83681-ffb8-4cdb-bf98-7e43645a9525 - 1027389 - manhattan_stats - STEP_FAILURE - Execution of step "manhattan_stats" failed.
dagster._core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "manhattan_stats"::
duckdb.duckdb.IOException: IO Error: Could not set lock on file "/home/michael/projects/dagster/dagster_university/data/staging/data.duckdb": Conflicting lock is held in /usr/bin/python3.11 (PID 1027395) by user michael. See also https://duckdb.org/docs/connect/concurrency
Stack Trace:
File "/home/michael/projects/dagster/.venv/lib/python3.11/site-packages/dagster/_core/execution/plan/utils.py", line 54, in op_execution_error_boundary
yield
File "/home/michael/projects/dagster/.venv/lib/python3.11/site-packages/dagster/_utils/__init__.py", line 463, in iterate_with_context
next_output = next(iterator)
^^^^^^^^^^^^^^
File "/home/michael/projects/dagster/.venv/lib/python3.11/site-packages/dagster/_core/execution/plan/compute_generator.py", line 131, in _coerce_op_compute_fn_to_iterator
result = invoke_compute_fn(
^^^^^^^^^^^^^^^^^^
File "/home/michael/projects/dagster/.venv/lib/python3.11/site-packages/dagster/_core/execution/plan/compute_generator.py", line 125, in invoke_compute_fn
return fn(context, **args_to_pass) if context_arg_provided else fn(**args_to_pass)
^^^^^^^^^^^^^^^^^^
File "/home/michael/projects/dagster/dagster_university/dagster_university/assets/metrics.py", line 29, in manhattan_stats
conn = duckdb.connect(os.getenv("DUCKDB_DATABASE"))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
I was able to work around this with a custom connect function wrapping retry logic:
def connect_to_duckdb(database, max_retries=10, retry_delay=2):
"""
Attempts to connect to a DuckDB database with a retry mechanism.
Args:
max_retries (int): Maximum number of retries before giving up.
retry_delay (int): Time to wait between retries (in seconds).
Returns:
duckdb.DuckDBPyConnection or None: Returns a DuckDB connection if successful, None otherwise.
"""
retry_count = 0
connection = None
while retry_count < max_retries:
try:
connection = duckdb.connect(database)
logging.info("Connected to DuckDB successfully!")
break # Exit the loop on successful connection
except Exception as e:
logging.error(
f"Connection attempt failed: {e}. Retrying in {retry_delay} seconds...")
time.sleep(retry_delay) # Wait before retrying
retry_count += 1 # Increment the retry count
if connection is None:
logging.error("Failed to connect to DuckDB after maximum retries.")
return connection # Return the established connection or None if failed
It seems similar issues were reported in the past too. #18746.
@basaran I am relatively new to both duckdb and dagster. I was wondering if the code that you shared is a helper function or some sort of custom dagster resource? Also, are you using this code snippet instead of importing DuckDBResource? Thanks for any help!
I tried implementing your code snippet as a part of a custom duckdb resource using the ConfigurableResource class, but it didn't end up doing it for me. I still ran into the same problem with partitions. Is this something that will potentially be patched soon or should I look to use a different technology other than duckdb?
@nobelsmith There was an attempt in this PR https://github.com/dagster-io/dagster/pull/18873 but it seems to have stalled. If you'd like you can try pick up the thread there.
@nobelsmith, I just use the connect_to_duckdb
function above in an asset. Such as:
@asset(
deps=["taxi_zones_file"]
)
def taxi_zones(context):
sql_query = f"""
create or replace table zones as (
select
LocationID as zone_id,
zone,
borough,
the_geom as geometry
from '{constants.TAXI_ZONES_FILE_PATH}'
);
"""
# conn = duckdb.connect(os.getenv("DUCKDB_DATABASE"))
conn = connect_to_duckdb(context, os.getenv('DUCKDB_DATABASE'))
conn.execute(sql_query)