ray icon indicating copy to clipboard operation
ray copied to clipboard

[Data] Add Checkpoint/Resume Support for Ray Data Pipelines

Open dragongu opened this issue 5 months ago • 11 comments

Summary

Ray Data currently lacks built-in checkpointing functionality, which makes it challenging to recover from failures in long-running data processing pipelines. This feature request proposes adding checkpoint and resume capabilities to Ray Data to improve fault tolerance and reduce the cost of restarting large-scale data processing jobs.

Motivation

Large Ray Data pipelines can take hours or days to complete. When failures occur due to unconfigured retryable exceptions or bug, the entire pipeline must restart from the beginning, resulting in:

  1. High Costs
    • Significant GPU resource waste
    • Extended time-to-completion
  2. Operational Complexity
    • Users currently need to manually segment large jobs (e.g., splitting a single large job into 10 parts)
    • No built-in mechanism to preserve progress when jobs are interrupted
    • Cross-cluster job migration is not supported Proposed Solution: jobs must be migrated to other clusters/data centers when high-priority workloads require the urgently resources

Requirements Overview

  1. Job State Persistence to External Storage
  2. Cross-Cluster Resume

Use case

No response

dragongu avatar Jul 29 '25 10:07 dragongu

Hey @dragongu , we're discussing this internally. We've heard this request a couple times. We will get back shortly.

richardliaw avatar Jul 30 '25 17:07 richardliaw

Hey @dragongu , we're discussing this internally. We've heard this request a couple times. We will get back shortly.

@richardliaw Please keep me informed when you have plans

dragongu avatar Jul 31 '25 11:07 dragongu

any update? any suggestion for now?

shawn-augmentcode avatar Sep 17 '25 23:09 shawn-augmentcode

@shawn-augmentcode one workaround for now is https://github.com/ray-project/ray/issues/49438

We're planning to get to this in November.

richardliaw avatar Nov 02 '25 21:11 richardliaw

@shawn-augmentcode one workaround for now is #49438

We're planning to get to this in November.

May I ask if there's any progress? If you have design, I hope to implement it.

wxwmd avatar Nov 10 '25 02:11 wxwmd

any update? any suggestion for now?

wenxueru avatar Nov 12 '25 01:11 wenxueru

Hi Thanks for reaching out. Yes this is in plan to be worked this month. @srinathk10 and @raulchen already have a plan for implementation and will post it here.

gvspraveen avatar Nov 12 '25 16:11 gvspraveen

Batch Inference Checkpointing - OSS design

Here is the design document.

Context For long running Data pipelines deployed for Batch inferencing, each time a non-recoverable failure occurs, the pipeline starts over from the beginning of time instead of resuming from a checkpoint (known good). This document proposes a feature to add checkpoint and resume capabilities to improve Fault tolerance.

Requirements For initial implementation we are considering:

  • Support Datapipelines deployed for Batch inference with map-like transformations only.
  • Operations like Joins/Groupbys are not supported.

Proposed Design

Checkpoint Row-wise tracking the unique row IDs in the Datasource.

  • As Blocks of data are streamed through the Data pipeline, each row from the Data source is tracked with an unique row ID (no duplicates) through the pipeline.
  • In the final stage of the pipeline, track the fully processed rows durably by its unique row IDs as checkpoint metadata files.
  • If a non-recoverable failure occurs, upon pipeline restart, checkpointed metadata files are read and fully processed rows are pruned from the read stage of Data Pipeline.

Limitations

  • This feature supports only datasets with the following characteristics:
  • Start with a read operation.
  • End with a write operation.
  • Contain only map-based operators such as map, map_batches, filter, and flat_map.
  • Don't use aggregations, joins, or other operators that shuffle or combine data across partitions.
  • Requires an unique ID column from the Data source that's preserved through the Data pipeline.

Alternate Design Considerations

Checkpoint Individual Blocks

High level idea Index each block from the read operator, track which blocks have finished.

Pros: Checkpoint data is more light-weight (O(num_blocks)) compared with row-level tracking.

Cons: Some operators may split/concatenate/shuffle blocks, it’s hard to maintain the mappings of blocks between the input and output.

APIs Add CheckpointConfig to DataContext to manage checkpointing for Data pipelines.

@DeveloperAPI
@dataclass
class DataContext:
 …
    # Configuration for Ray Data checkpointing.
    # If None, checkpointing is disabled.
    _checkpoint_config: Optional[CheckpointConfig] = None
@PublicAPI(stability="beta")
class CheckpointConfig:
    """Configuration for checkpointing.

    Args:
        id_column: Name of the ID column in the input dataset. ID values must be unique across all rows in the dataset and must persist during all operators.
        checkpoint_path: Path to store the checkpoint data. It can be a path to a cloud object storage (e.g. `s3://bucket/path`) or a file system path. If the latter, the path must be a network-mounted file system (e.g.    `/mnt/cluster_storage/`) that is accessible to the entire cluster.
        delete_checkpoint_on_success: If true, automatically delete checkpoint data when the dataset execution succeeds. Only supported for batch-based backend currently.
        override_filesystem: Override the :class:`pyarrow.fs.FileSystem` object used to read/write checkpoint data. Use this when you want to use custom credentials.
        override_backend: Override the :class:`CheckpointBackend` object used to access the checkpoint backend storage.
    """

Example Usage To enable checkpointing, do the following:

  • Set DataContext.checkpoint_config for the dataset.
  • Set an ID column with the id_column config to uniquely identify each row. This column must not change across the entire job.
from ray.data.checkpoint import CheckpointConfig

DataContext.get_current().checkpoint_config =  CheckpointConfig(
   id_column="id",
   checkpoint_path="s3://my_bucket/checkpoint",
)

ds = ray.data.read_parquet("...")
ds = ds.map(...)
ds.write_parquet("...")

Implementation Details

Recovery Involves below steps:

  • Preloading existing checkpoint if found in the checkpoint_path.
  • Past Read stage in the pipeline, inserting BlockMapTransformFn to prune checkpointed rows from previous failed run.
  • At the Write stage in the pipeline, inserting BlockMapTransformFn to write out fully processed row IDs as checkpointed metadata file.
  • When the entire pipeline runs to completion successfully, delete the checkpointed metadata files when delete_checkpoint_on_success is True.

1. Preload existing checkpoint

Before executing Data pipeline, leverage ExecutionContext to Load Checkpoint Metadata before data pipeline execution.

class LoadCheckpointCallback(ExecutionCallback):
    """ExecutionCallback that handles checkpoints."""

    def __init__(self, config: CheckpointConfig):
        assert config is not None
        self._config = config
        self._checkpoint_ref: Optional[ObjectRef[Block]] = None

    def before_execution_starts(self, executor: StreamingExecutor):
        # Load checkpoint data before execution starts.

    def after_execution_succeeds(self, executor: StreamingExecutor):
        # Remove the callback from the DataContext.
        # Delete checkpoint data.

    def after_execution_fails(self, executor: StreamingExecutor, error: Exception):
        # Remove the callback from the DataContext.

    def load_checkpoint(self) -> ObjectRef[Block]:
        return self._checkpoint_ref

2. Filter Checkpointed Rows

Once checkpoint Block is loaded LoadCheckpointCallback, we need to filter out checkpointed row IDs found in the CheckpointBlock. For this insert a filter_checkpointed_rows_for_blocks BlockMapTransformFn to Read Op.

BlockMapTransformFn(
    functools.partial(
        filter_checkpointed_rows_for_blocks,
        checkpoint_config=data_context.checkpoint_config,
    ),
    output_block_size_option=OutputBlockSizeOption.of(
        target_max_block_size=data_context.target_max_block_size,
    ),
)

3. Write Checkpoint Metadata

To Write Op, add write_checkpoint_for_block BlockMapTransformFn to write out the Checkpoint metadata files.


BlockMapTransformFn(
    write_checkpoint_for_block,
    is_udf=False,
    disable_block_shaping=True,
)


srinathk10 avatar Nov 14 '25 21:11 srinathk10

We also need this feature very much. Is there a preliminary implementation of this feature?

neuyilan avatar Dec 02 '25 03:12 neuyilan

@neuyilan Thanks for checking. We can target getting a PR out next week.

srinathk10 avatar Dec 02 '25 05:12 srinathk10

Hi @srinathk10 , any progress?

umialpha avatar Dec 10 '25 09:12 umialpha

Hi @umialpha, I'll take over OSS checkpointing, thanks for the patience.

owenowenisme avatar Dec 11 '25 15:12 owenowenisme