UDF Checkpoints
Implements fine-grained checkpointing for UDF operations, enabling automatic recovery from failures with minimal recomputation.
Incremental Progress Tracking
- Saves processed rows continuously during
.map()and.gen()execution - Automatically resumes from where execution failed
- Allows fixing bugs in UDF code and continuing without losing progress
- Creates checkpoints when UDF operations complete successfully
Checkpoint Invalidation
- Completed UDF checkpoints tied to function code
- In-progress UDFs allow code changes and continuation
- Completed UDFs recompute from scratch if code changes
New Environment Variable
DATACHAIN_UDF_RESET=1- Force UDF restart from scratch, ignoring partial progress
Implementation Highlights
Job-Scoped Architecture
- UDF tables scoped to job ID:
udf_{job_id}_{hash}_{suffix} - Child jobs reuse parent tables via ancestor search
- Clean separation prevents conflicts
Checkpoint Types
- Partial checkpoints (
hash_input) - Allow continuation with modified code - Final checkpoints (
hash_output) - Invalidated if UDF changes - Processed table tracking for RowGenerator (
.gen())
Aggregations
.agg()creates checkpoints on completion only- No partial progress tracking (restart from scratch on failure)
Files Changed
src/datachain/query/dataset.py- Core UDF checkpoint logicsrc/datachain/data_storage/metastore.py- Checkpoint managementdocs/guide/checkpoints.md- Updated documentation- Comprehensive tests in
tests/func/test_checkpoints.py
Related
Fixes #1392
Summary by Sourcery
Introduce a comprehensive checkpointing framework for UDF execution: track and manage intermediate UDF results per job hash, support skipping or resuming work based on stored checkpoints, and add cleanup APIs to remove stale checkpoints and temporary tables.
New Features:
- Implement UDF checkpointing with per-job hash tracking, resume/skip logic, and naming conventions for input/output/processed tables
- Add Catalog.cleanup_checkpoints and Catalog.remove_checkpoint_by_hash to prune old checkpoints and associated UDF tables based on TTL or creation time
- Enhance Metastore with idempotent create_checkpoint, optional job-scoped list_checkpoints, and remove_checkpoint APIs
- Expose session-backed .job property and integrate checkpoint context into DataChain.hash and .save workflows
Enhancements:
- Refactor UDFStep classes to use Session instead of Catalog and rename create_udf_table/process_input_query for clear input/output separation
- Propagate hash_before/hash_after through query generation pipeline to drive checkpoint decisions
- Handle callable object hashing in hash_utils and add database engine list_tables utility
Tests:
- Add functional tests for checkpoint creation, parallel failures, and cleanup behavior under various TTL and job filters
- Update unit tests to assert new checkpoint counts and hash_callable support for callable instances
Summary by Sourcery
Implement UDF-level checkpointing to enable incremental progress tracking and automatic recovery for map, gen, and aggregation operations.
New Features:
- Introduce fine-grained UDF checkpointing for .map() and .gen() operations with partial and final checkpoints.
- Enable automatic resumption of failed UDFs across jobs by reusing job-scoped input, output, and processed tables via ancestor search.
- Add DATACHAIN_UDF_RESET environment variable to force UDF operations to restart from scratch.
- Expose session.job property and incorporate last checkpoint hash into chain.hash and save workflows for consistent checkpoint context.
- Extend metastore API with idempotent create_checkpoint, remove_checkpoint, list and query ancestor job checkpoints.
- Support hashing of callable objects in hash_utils to ensure accurate checkpoint invalidation.
Enhancements:
- Refactor UDFStep and dispatch pipeline to separate input, output, and processed tables, add batch callbacks, and flush partial results on errors.
- Improve warehouse and SQLite metastore: idempotent pre-UDF table creation, rename_table preserving SQL types, metadata cleanup on drop/rename, and bump SQLite schema version.
- Enhance insert_rows with optional batch_callback and tracking_field to correlate inputs and outputs during UDF dispatch.
Documentation:
- Revise docs/guide/checkpoints.md to explain UDF checkpointing model, usage examples, invalidation rules, and the DATACHAIN_UDF_RESET flag.
Tests:
- Add functional tests covering checkpoint creation, cross-job reuse, same-job re-execution, code-change invalidation, reset behavior, aggregator and generator continuation, and parallel execution recovery.
- Add unit tests for metastore.get_ancestor_job_ids and hash_callable for callable objects.
- Update existing tests and fixtures to reflect new UDF table naming conventions and adjusted checkpoint counts.
Reviewer's Guide
This PR implements a comprehensive, fine-grained checkpointing framework for UDF operations (.map() and .gen()), introducing partial and final checkpoints tied to function code, job-scoped input/output/processed tables, and seamless recovery from failures with minimal recomputation. It extends metastore and warehouse layers to support idempotent checkpoint management, batch tracking callbacks, job-aware hashing, and integrates a DATACHAIN_UDF_RESET flag. The changes also refactor the save logic to include job context, update documentation and CLI messaging, and expand test coverage to validate varied UDF failure and recovery scenarios.
Sequence diagram for UDF checkpointed execution and recovery
sequenceDiagram
participant actor User
participant DataChain
participant UDFStep
participant Metastore
participant Warehouse
User->>DataChain: Run .map()/.gen() with UDF
DataChain->>UDFStep: Apply UDF step (with job context)
UDFStep->>Metastore: Check for existing checkpoint (partial/final)
alt Final checkpoint exists
UDFStep->>Warehouse: Reuse output table, skip UDF execution
else Partial checkpoint exists
UDFStep->>Warehouse: Copy partial table, compute unprocessed rows
UDFStep->>Warehouse: Resume UDF only for unprocessed rows
Warehouse->>Metastore: Update processed table and checkpoint
else No checkpoint
UDFStep->>Warehouse: Create input table, run UDF from scratch
Warehouse->>Metastore: Create partial checkpoint, update processed table
end
UDFStep->>Metastore: On success, promote to final checkpoint
DataChain->>User: Return results, progress tracked
ER diagram for job-scoped UDF checkpoint tables
erDiagram
Job {
string id PK
string parent_job_id
}
Checkpoint {
string id PK
string job_id FK
string hash
boolean partial
datetime created_at
}
Table {
string name PK
string type
}
Job ||--o{ Checkpoint : "has"
Job ||--o{ Table : "owns"
Checkpoint ||--o{ Table : "tracks"
Class diagram for UDFStep and checkpoint management
classDiagram
class UDFStep {
+Session session
+apply(query_generator, temp_tables, *args, **kwargs)
+create_output_table(name)
+get_input_query(input_table_name, original_query)
+create_processed_table(checkpoint, copy_from_parent)
+populate_udf_output_table(udf_table, query, processed_table)
+get_or_create_input_table(query, hash)
+_checkpoint_exist(hash, partial)
+_skip_udf(checkpoint, hash_input, query)
+_run_from_scratch(hash_input, hash_output, query)
+_continue_udf(checkpoint, hash_output, query)
+calculate_unprocessed_rows(input_table, partial_table, processed_table, original_query)
+job
+metastore
+warehouse
}
class UDFSignal {
+create_output_table(name)
+create_result_query(udf_table, query)
+inherits UDFStep
}
class RowGenerator {
+create_output_table(name)
+create_processed_table(checkpoint, copy_from_parent)
+create_result_query(udf_table, query)
+inherits UDFStep
}
class Metastore {
+create_checkpoint(job_id, hash, partial)
+remove_checkpoint(checkpoint)
+get_ancestor_job_ids(job_id)
+find_checkpoint(job_id, hash, partial)
+get_last_checkpoint(job_id)
}
class Warehouse {
+insert_rows(table, rows, batch_size, batch_callback, tracking_field)
+rename_table(old_table, new_name)
+create_pre_udf_table(query, name)
+get_table(name)
+drop_table(table, if_exists)
+copy_table(target, query)
}
UDFSignal --|> UDFStep
RowGenerator --|> UDFStep
File-Level Changes
| Change | Details | Files |
|---|---|---|
| Refactor UDFStep to orchestrate checkpoint lifecycle |
|
src/datachain/query/dataset.py |
| Enhance metastore for ancestor lookup and idempotent checkpoints |
|
src/datachain/data_storage/metastore.py |
| Upgrade warehouse/SQLite for named UDF tables and batch tracking |
|
src/datachain/data_storage/sqlite.pysrc/datachain/data_storage/warehouse.py |
| Make datachain save and hash job-aware |
|
src/datachain/lib/dc/datachain.py |
| Support hashing of callable objects in utility |
|
src/datachain/hash_utils.pytests/unit/test_hash_utils.py |
Introduce DATACHAIN_UDF_RESET for forced restart |
|
src/datachain/query/dataset.pysrc/datachain/lib/dc/datachain.py |
| Update documentation and CLI for checkpoint features |
|
docs/guide/checkpoints.mdsrc/datachain/cli/commands/misc.pytests/test_cli_e2e.pytests/test_query_e2e.pytests/func/test_catalog.py |
| Expand test coverage for UDF checkpoint scenarios |
|
tests/func/test_checkpoints.pytests/unit/lib/test_checkpoints.pytests/conftest.pytests/func/test_warehouse.pytests/func/test_metastore.py |
Assessment against linked issues
| Issue | Objective | Addressed | Explanation |
|---|---|---|---|
| https://github.com/iterative/datachain/issues/1392 | Implement UDF-level checkpointing to save and reuse partial UDF data between job runs if UDF fails, allowing users to fix bugs and continue without losing progress. | ✅ | |
| https://github.com/iterative/datachain/issues/1392 | Ensure that partial UDF checkpoints are not invalidated by changes to the UDF code until the UDF finishes successfully, so users can rerun with modified code and still reuse partial results. | ✅ | |
| https://github.com/iterative/datachain/issues/1392 | Create two types of UDF checkpoints: (1) a partial checkpoint before UDF calculation (hash includes only UDF inputs, not UDF code), and (2) a final checkpoint after UDF is done (hash includes UDF inputs and UDF code), with correct invalidation and promotion logic. | ✅ |
Possibly linked issues
- #UDF checkpoints: The PR directly implements the two-tiered UDF checkpointing mechanism described in the issue, allowing partial results to be reused even after UDF code changes, and invalidating final checkpoints if UDF code changes.
- #issue: The PR implements UDF-level checkpointing, directly fulfilling the issue's request to resume
datachain queryexecution. - #1392: The PR implements the
Checkpointmodel, adding UDF-level checkpoints for recovery and progress tracking.
Tips and commands
Interacting with Sourcery
- Trigger a new review: Comment
@sourcery-ai reviewon the pull request. - Continue discussions: Reply directly to Sourcery's review comments.
- Generate a GitHub issue from a review comment: Ask Sourcery to create an
issue from a review comment by replying to it. You can also reply to a
review comment with
@sourcery-ai issueto create an issue from it. - Generate a pull request title: Write
@sourcery-aianywhere in the pull request title to generate a title at any time. You can also comment@sourcery-ai titleon the pull request to (re-)generate the title at any time. - Generate a pull request summary: Write
@sourcery-ai summaryanywhere in the pull request body to generate a PR summary at any time exactly where you want it. You can also comment@sourcery-ai summaryon the pull request to (re-)generate the summary at any time. - Generate reviewer's guide: Comment
@sourcery-ai guideon the pull request to (re-)generate the reviewer's guide at any time. - Resolve all Sourcery comments: Comment
@sourcery-ai resolveon the pull request to resolve all Sourcery comments. Useful if you've already addressed all the comments and don't want to see them anymore. - Dismiss all Sourcery reviews: Comment
@sourcery-ai dismisson the pull request to dismiss all existing Sourcery reviews. Especially useful if you want to start fresh with a new review - don't forget to comment@sourcery-ai reviewto trigger a new review!
Customizing Your Experience
Access your dashboard to:
- Enable or disable review features such as the Sourcery-generated pull request summary, the reviewer's guide, and others.
- Change the review language.
- Add, remove or edit custom review instructions.
- Adjust other review settings.
Getting Help
- Contact our support team for questions or feedback.
- Visit our documentation for detailed guides and information.
- Keep in touch with the Sourcery team by following us on X/Twitter, LinkedIn or GitHub.
Deploying datachain-documentation with
Cloudflare Pages
| Latest commit: |
f914b7f
|
| Status: | ✅ Deploy successful! |
| Preview URL: | https://3ee78108.datachain-documentation.pages.dev |
| Branch Preview URL: | https://ilongin-1392-udf-checkpoints.datachain-documentation.pages.dev |
Codecov Report
:x: Patch coverage is 91.72932% with 22 lines in your changes missing coverage. Please review.
:loudspeaker: Thoughts on this report? Let us know!