datachain icon indicating copy to clipboard operation
datachain copied to clipboard

UDF Checkpoints

Open ilongin opened this issue 1 month ago • 3 comments

Implements fine-grained checkpointing for UDF operations, enabling automatic recovery from failures with minimal recomputation.

Incremental Progress Tracking

  • Saves processed rows continuously during .map() and .gen() execution
  • Automatically resumes from where execution failed
  • Allows fixing bugs in UDF code and continuing without losing progress
  • Creates checkpoints when UDF operations complete successfully

Checkpoint Invalidation

  • Completed UDF checkpoints tied to function code
  • In-progress UDFs allow code changes and continuation
  • Completed UDFs recompute from scratch if code changes

New Environment Variable

  • DATACHAIN_UDF_RESET=1 - Force UDF restart from scratch, ignoring partial progress

Implementation Highlights

Job-Scoped Architecture

  • UDF tables scoped to job ID: udf_{job_id}_{hash}_{suffix}
  • Child jobs reuse parent tables via ancestor search
  • Clean separation prevents conflicts

Checkpoint Types

  • Partial checkpoints (hash_input) - Allow continuation with modified code
  • Final checkpoints (hash_output) - Invalidated if UDF changes
  • Processed table tracking for RowGenerator (.gen())

Aggregations

  • .agg() creates checkpoints on completion only
  • No partial progress tracking (restart from scratch on failure)

Files Changed

  • src/datachain/query/dataset.py - Core UDF checkpoint logic
  • src/datachain/data_storage/metastore.py - Checkpoint management
  • docs/guide/checkpoints.md - Updated documentation
  • Comprehensive tests in tests/func/test_checkpoints.py

Related

Fixes #1392

Summary by Sourcery

Introduce a comprehensive checkpointing framework for UDF execution: track and manage intermediate UDF results per job hash, support skipping or resuming work based on stored checkpoints, and add cleanup APIs to remove stale checkpoints and temporary tables.

New Features:

  • Implement UDF checkpointing with per-job hash tracking, resume/skip logic, and naming conventions for input/output/processed tables
  • Add Catalog.cleanup_checkpoints and Catalog.remove_checkpoint_by_hash to prune old checkpoints and associated UDF tables based on TTL or creation time
  • Enhance Metastore with idempotent create_checkpoint, optional job-scoped list_checkpoints, and remove_checkpoint APIs
  • Expose session-backed .job property and integrate checkpoint context into DataChain.hash and .save workflows

Enhancements:

  • Refactor UDFStep classes to use Session instead of Catalog and rename create_udf_table/process_input_query for clear input/output separation
  • Propagate hash_before/hash_after through query generation pipeline to drive checkpoint decisions
  • Handle callable object hashing in hash_utils and add database engine list_tables utility

Tests:

  • Add functional tests for checkpoint creation, parallel failures, and cleanup behavior under various TTL and job filters
  • Update unit tests to assert new checkpoint counts and hash_callable support for callable instances

Summary by Sourcery

Implement UDF-level checkpointing to enable incremental progress tracking and automatic recovery for map, gen, and aggregation operations.

New Features:

  • Introduce fine-grained UDF checkpointing for .map() and .gen() operations with partial and final checkpoints.
  • Enable automatic resumption of failed UDFs across jobs by reusing job-scoped input, output, and processed tables via ancestor search.
  • Add DATACHAIN_UDF_RESET environment variable to force UDF operations to restart from scratch.
  • Expose session.job property and incorporate last checkpoint hash into chain.hash and save workflows for consistent checkpoint context.
  • Extend metastore API with idempotent create_checkpoint, remove_checkpoint, list and query ancestor job checkpoints.
  • Support hashing of callable objects in hash_utils to ensure accurate checkpoint invalidation.

Enhancements:

  • Refactor UDFStep and dispatch pipeline to separate input, output, and processed tables, add batch callbacks, and flush partial results on errors.
  • Improve warehouse and SQLite metastore: idempotent pre-UDF table creation, rename_table preserving SQL types, metadata cleanup on drop/rename, and bump SQLite schema version.
  • Enhance insert_rows with optional batch_callback and tracking_field to correlate inputs and outputs during UDF dispatch.

Documentation:

  • Revise docs/guide/checkpoints.md to explain UDF checkpointing model, usage examples, invalidation rules, and the DATACHAIN_UDF_RESET flag.

Tests:

  • Add functional tests covering checkpoint creation, cross-job reuse, same-job re-execution, code-change invalidation, reset behavior, aggregator and generator continuation, and parallel execution recovery.
  • Add unit tests for metastore.get_ancestor_job_ids and hash_callable for callable objects.
  • Update existing tests and fixtures to reflect new UDF table naming conventions and adjusted checkpoint counts.

ilongin avatar Oct 22 '25 14:10 ilongin

Reviewer's Guide

This PR implements a comprehensive, fine-grained checkpointing framework for UDF operations (.map() and .gen()), introducing partial and final checkpoints tied to function code, job-scoped input/output/processed tables, and seamless recovery from failures with minimal recomputation. It extends metastore and warehouse layers to support idempotent checkpoint management, batch tracking callbacks, job-aware hashing, and integrates a DATACHAIN_UDF_RESET flag. The changes also refactor the save logic to include job context, update documentation and CLI messaging, and expand test coverage to validate varied UDF failure and recovery scenarios.

Sequence diagram for UDF checkpointed execution and recovery

sequenceDiagram
  participant actor User
  participant DataChain
  participant UDFStep
  participant Metastore
  participant Warehouse

  User->>DataChain: Run .map()/.gen() with UDF
  DataChain->>UDFStep: Apply UDF step (with job context)
  UDFStep->>Metastore: Check for existing checkpoint (partial/final)
  alt Final checkpoint exists
    UDFStep->>Warehouse: Reuse output table, skip UDF execution
  else Partial checkpoint exists
    UDFStep->>Warehouse: Copy partial table, compute unprocessed rows
    UDFStep->>Warehouse: Resume UDF only for unprocessed rows
    Warehouse->>Metastore: Update processed table and checkpoint
  else No checkpoint
    UDFStep->>Warehouse: Create input table, run UDF from scratch
    Warehouse->>Metastore: Create partial checkpoint, update processed table
  end
  UDFStep->>Metastore: On success, promote to final checkpoint
  DataChain->>User: Return results, progress tracked

ER diagram for job-scoped UDF checkpoint tables

erDiagram
  Job {
    string id PK
    string parent_job_id
  }
  Checkpoint {
    string id PK
    string job_id FK
    string hash
    boolean partial
    datetime created_at
  }
  Table {
    string name PK
    string type
  }
  Job ||--o{ Checkpoint : "has"
  Job ||--o{ Table : "owns"
  Checkpoint ||--o{ Table : "tracks"

Class diagram for UDFStep and checkpoint management

classDiagram
  class UDFStep {
    +Session session
    +apply(query_generator, temp_tables, *args, **kwargs)
    +create_output_table(name)
    +get_input_query(input_table_name, original_query)
    +create_processed_table(checkpoint, copy_from_parent)
    +populate_udf_output_table(udf_table, query, processed_table)
    +get_or_create_input_table(query, hash)
    +_checkpoint_exist(hash, partial)
    +_skip_udf(checkpoint, hash_input, query)
    +_run_from_scratch(hash_input, hash_output, query)
    +_continue_udf(checkpoint, hash_output, query)
    +calculate_unprocessed_rows(input_table, partial_table, processed_table, original_query)
    +job
    +metastore
    +warehouse
  }
  class UDFSignal {
    +create_output_table(name)
    +create_result_query(udf_table, query)
    +inherits UDFStep
  }
  class RowGenerator {
    +create_output_table(name)
    +create_processed_table(checkpoint, copy_from_parent)
    +create_result_query(udf_table, query)
    +inherits UDFStep
  }
  class Metastore {
    +create_checkpoint(job_id, hash, partial)
    +remove_checkpoint(checkpoint)
    +get_ancestor_job_ids(job_id)
    +find_checkpoint(job_id, hash, partial)
    +get_last_checkpoint(job_id)
  }
  class Warehouse {
    +insert_rows(table, rows, batch_size, batch_callback, tracking_field)
    +rename_table(old_table, new_name)
    +create_pre_udf_table(query, name)
    +get_table(name)
    +drop_table(table, if_exists)
    +copy_table(target, query)
  }
  UDFSignal --|> UDFStep
  RowGenerator --|> UDFStep

File-Level Changes

Change Details Files
Refactor UDFStep to orchestrate checkpoint lifecycle
  • Add methods _checkpoint_exist, _skip_udf, _run_from_scratch, _continue_udf to drive partial/final checkpoint flows
  • Switch from Catalog to Session for UDFStep context and introduce job/metastore/warehouse properties
  • Extend populate_udf_output_table with optional processed_table and batch callbacks to track processed sys__ids
  • Always flush insert buffers on exception to expose partial outputs for recovery
src/datachain/query/dataset.py
Enhance metastore for ancestor lookup and idempotent checkpoints
  • Implement get_ancestor_job_ids via recursive CTE to search parent jobs
  • Adjust checkpoint schema unique constraint to include partial flag
  • Make create_checkpoint idempotent with on_conflict_do_nothing and return existing records
  • Add remove_checkpoint API to delete partial checkpoints after promotion
src/datachain/data_storage/metastore.py
Upgrade warehouse/SQLite for named UDF tables and batch tracking
  • Change create_pre_udf_table signature to accept explicit names and skip population if existing
  • Add batch_callback and tracking_field parameters to insert_rows to record processed rows
  • Improve rename_table to preserve metadata and handle errors
  • Clean up stale metadata entries when dropping or renaming tables
src/datachain/data_storage/sqlite.py
src/datachain/data_storage/warehouse.py
Make datachain save and hash job-aware
  • Extend Chain.hash() to accept name and in_job to include last checkpoint hash
  • Refactor save to compute a job-scoped hash and resolve dataset checkpoints accordingly
  • Remove old _calculate_job_hash, unify _resolve_checkpoint logic
src/datachain/lib/dc/datachain.py
Support hashing of callable objects in utility
  • Detect non-function callables and hash their __call__ method
  • Gracefully handle missing __name__ attributes when determining lambdas
  • Add unit tests verifying consistent hashes for callable instances
src/datachain/hash_utils.py
tests/unit/test_hash_utils.py
Introduce DATACHAIN_UDF_RESET for forced restart
  • Import and use env2bool to read DATACHAIN_UDF_RESET in UDFStep logic
  • Bypass checkpoint reuse when reset flag is set, forcing full UDF recompute
  • Adjust save and query resolution to respect reset behavior
src/datachain/query/dataset.py
src/datachain/lib/dc/datachain.py
Update documentation and CLI for checkpoint features
  • Revise guide/checkpoints.md to describe UDF checkpointing mechanics, invalidation rules, and reset flag
  • Change CLI gc messaging to reference temporary tables and updated behavior
  • Adjust end-to-end tests to expect new GC output
docs/guide/checkpoints.md
src/datachain/cli/commands/misc.py
tests/test_cli_e2e.py
tests/test_query_e2e.py
tests/func/test_catalog.py
Expand test coverage for UDF checkpoint scenarios
  • Add functional tests for map/gen checkpoint creation, continuation, cross-job reuse, code-change invalidation, parallel execution, and reset behavior
  • Modify unit tests to reflect new table naming, checkpoint counts, and cleanup expectations
  • Enhance conftest.py with cleanup utilities for UDF tables and ensure fresh job IDs between runs
tests/func/test_checkpoints.py
tests/unit/lib/test_checkpoints.py
tests/conftest.py
tests/func/test_warehouse.py
tests/func/test_metastore.py

Assessment against linked issues

Issue Objective Addressed Explanation
https://github.com/iterative/datachain/issues/1392 Implement UDF-level checkpointing to save and reuse partial UDF data between job runs if UDF fails, allowing users to fix bugs and continue without losing progress.
https://github.com/iterative/datachain/issues/1392 Ensure that partial UDF checkpoints are not invalidated by changes to the UDF code until the UDF finishes successfully, so users can rerun with modified code and still reuse partial results.
https://github.com/iterative/datachain/issues/1392 Create two types of UDF checkpoints: (1) a partial checkpoint before UDF calculation (hash includes only UDF inputs, not UDF code), and (2) a final checkpoint after UDF is done (hash includes UDF inputs and UDF code), with correct invalidation and promotion logic.

Possibly linked issues

  • #UDF checkpoints: The PR directly implements the two-tiered UDF checkpointing mechanism described in the issue, allowing partial results to be reused even after UDF code changes, and invalidating final checkpoints if UDF code changes.
  • #issue: The PR implements UDF-level checkpointing, directly fulfilling the issue's request to resume datachain query execution.
  • #1392: The PR implements the Checkpoint model, adding UDF-level checkpoints for recovery and progress tracking.

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an issue from a review comment by replying to it. You can also reply to a review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull request title to generate a title at any time. You can also comment @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in the pull request body to generate a PR summary at any time exactly where you want it. You can also comment @sourcery-ai summary on the pull request to (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the pull request to resolve all Sourcery comments. Useful if you've already addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull request to dismiss all existing Sourcery reviews. Especially useful if you want to start fresh with a new review - don't forget to comment @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

  • Contact our support team for questions or feedback.
  • Visit our documentation for detailed guides and information.
  • Keep in touch with the Sourcery team by following us on X/Twitter, LinkedIn or GitHub.

sourcery-ai[bot] avatar Oct 22 '25 14:10 sourcery-ai[bot]

Deploying datachain-documentation with  Cloudflare Pages  Cloudflare Pages

Latest commit: f914b7f
Status: ✅  Deploy successful!
Preview URL: https://3ee78108.datachain-documentation.pages.dev
Branch Preview URL: https://ilongin-1392-udf-checkpoints.datachain-documentation.pages.dev

View logs