ray icon indicating copy to clipboard operation
ray copied to clipboard

[data] Monotonically increasing id

Open rishic3 opened this issue 1 month ago • 4 comments

Description

Implements monotonically increasing ID expression. This closely follows the Spark implementation https://github.com/apache/spark/blob/9bbdc0743034b40a904ca87a08da4e0bf2b1386c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala.

Example usage:

from ray.data.expressions import monotonically_increasing_id

ds = ray.data.range(100)
ds = ds.with_column("uid", monotonically_increasing_id())
train, test = ds_with_id.streaming_train_test_split(test_size=0.25, split_type="hash", hash_column="uid")

Related issues

Closes #57806

rishic3 avatar Dec 09 '25 06:12 rishic3

awesome! thanks a bunch for this contribution. will let @gvspraveen find someone to shepherd this.

richardliaw avatar Dec 10 '25 00:12 richardliaw

Thanks for the contribution. @bveeramani to shepherd this.

gvspraveen avatar Dec 10 '25 01:12 gvspraveen

hey @rishic3, thanks a bunch for the contribution! if you're interested in chatting more with the contributors, feel free to join our community sync -- https://docs.google.com/forms/d/e/1FAIpQLSeYWjNExnr6gbhO5rpM0i6wm4TBTdsm3y5S0LR8Syzk_2gelQ/viewform

richardliaw avatar Dec 10 '25 03:12 richardliaw

Previous commit implemented as a dataset method (see e0fc987). Pushed an update that instead implements as an expression as per original issue intended.

rishic3 avatar Dec 10 '25 06:12 rishic3

Hey @rishic3 , thanks for opening the PR! I'll review first thing tomorrow

bveeramani avatar Dec 12 '25 03:12 bveeramani

This expr is the first to depend on TaskContext. Looks like using it with shuffle will fail

ds = ray.data.range(10, override_num_blocks=2)
ds = ds.with_column("uid", monotonically_increasing_id())
ds = ds.random_shuffle()
ds.take_all()

With AssertionError: TaskContext is required for monotonically_increasing_id()

(traceback>
2025-12-14 12:45:51,868	ERROR worker.py:433 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): [36mray::map()[39m (pid=443311, ip=10.110.47.100)
  File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/planner/plan_expression/expression_evaluator.py", line 781, in eval_projection
    names, output_cols = zip(*[(e.name, eval_expr(e, block)) for e in projection_exprs])
  File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/planner/plan_expression/expression_evaluator.py", line 781, in <listcomp>
    names, output_cols = zip(*[(e.name, eval_expr(e, block)) for e in projection_exprs])
  File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/planner/plan_expression/expression_evaluator.py", line 737, in eval_expr
    return evaluator.visit(expr)
  File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/expressions.py", line 96, in visit
    return self.visit_alias(expr)
  File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/planner/plan_expression/expression_evaluator.py", line 657, in visit_alias
    return self.visit(expr.expr)
  File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/expressions.py", line 104, in visit
    return self.visit_monotonically_increasing_id(expr)
  File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/planner/plan_expression/expression_evaluator.py", line 700, in visit_monotonically_increasing_id
    ctx is not None
AssertionError: TaskContext is required for monotonically_increasing_id()

The above exception was the direct cause of the following exception:

[36mray::map()[39m (pid=443311, ip=10.110.47.100)
  File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/planner/exchange/shuffle_task_spec.py", line 63, in map
    mapped_block = next(upstream_map_iter)
  File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 102, in __call__
    yield from self._post_process(results)
  File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 84, in _shape_blocks
    for result in results:
  File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 406, in _apply_transform
    yield from self._block_fn(blocks, ctx)
  File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 608, in transform_fn
    out_block = fn(block)
  File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 129, in _project_block
    _try_wrap_udf_exception(e)
  File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 427, in _try_wrap_udf_exception
    raise UserCodeException("UDF failed to process a data block.") from e
ray.exceptions.UserCodeException: UDF failed to process a data block.

...as the shuffle schedulers e.g. https://github.com/ray-project/ray/blob/4f869c4ac72c627c27c66b19c5d962e23b095482/python/ray/data/_internal/planner/exchange/pull_based_shuffle_task_scheduler.py#L88 do not propagate the task ctx to ShuffleTaskSpec.map().

Quick fix could be wrapping execution of the upstream map fn in the shuffle task spec in a new task context https://github.com/ray-project/ray/blob/4f869c4ac72c627c27c66b19c5d962e23b095482/python/ray/data/_internal/planner/exchange/shuffle_task_spec.py#L58 or I suppose the more complete change is creating the task ctx at the scheduler level and changing the downstream functions to accept task ctx.

Wondering if this path makes sense or if implementing this as a dedicated operator (like e0fc987) would be preferred to keep expressions independent of task context in the first place. @bveeramani @goutamvenkat-anyscale

rishic3 avatar Dec 14 '25 21:12 rishic3

This expr is the first to depend on TaskContext. Looks like using it with shuffle will fail

ds = ray.data.range(10, override_num_blocks=2)
ds = ds.with_column("uid", monotonically_increasing_id())
ds = ds.random_shuffle()
ds.take_all()

With AssertionError: TaskContext is required for monotonically_increasing_id()

(traceback>

2025-12-14 12:45:51,868	ERROR worker.py:433 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): �[36mray::map()�[39m (pid=443311, ip=10.110.47.100)
  File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/planner/plan_expression/expression_evaluator.py", line 781, in eval_projection
    names, output_cols = zip(*[(e.name, eval_expr(e, block)) for e in projection_exprs])
  File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/planner/plan_expression/expression_evaluator.py", line 781, in <listcomp>
    names, output_cols = zip(*[(e.name, eval_expr(e, block)) for e in projection_exprs])
  File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/planner/plan_expression/expression_evaluator.py", line 737, in eval_expr
    return evaluator.visit(expr)
  File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/expressions.py", line 96, in visit
    return self.visit_alias(expr)
  File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/planner/plan_expression/expression_evaluator.py", line 657, in visit_alias
    return self.visit(expr.expr)
  File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/expressions.py", line 104, in visit
    return self.visit_monotonically_increasing_id(expr)
  File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/planner/plan_expression/expression_evaluator.py", line 700, in visit_monotonically_increasing_id
    ctx is not None
AssertionError: TaskContext is required for monotonically_increasing_id()

The above exception was the direct cause of the following exception:

�[36mray::map()�[39m (pid=443311, ip=10.110.47.100)
  File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/planner/exchange/shuffle_task_spec.py", line 63, in map
    mapped_block = next(upstream_map_iter)
  File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 102, in __call__
    yield from self._post_process(results)
  File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 84, in _shape_blocks
    for result in results:
  File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 406, in _apply_transform
    yield from self._block_fn(blocks, ctx)
  File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 608, in transform_fn
    out_block = fn(block)
  File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 129, in _project_block
    _try_wrap_udf_exception(e)
  File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 427, in _try_wrap_udf_exception
    raise UserCodeException("UDF failed to process a data block.") from e
ray.exceptions.UserCodeException: UDF failed to process a data block.

...as the shuffle schedulers e.g.

https://github.com/ray-project/ray/blob/4f869c4ac72c627c27c66b19c5d962e23b095482/python/ray/data/_internal/planner/exchange/pull_based_shuffle_task_scheduler.py#L88

do not propagate the task ctx to ShuffleTaskSpec.map(). Quick fix could be wrapping execution of the upstream map fn in the shuffle task spec in a new task context

https://github.com/ray-project/ray/blob/4f869c4ac72c627c27c66b19c5d962e23b095482/python/ray/data/_internal/planner/exchange/shuffle_task_spec.py#L58

or I suppose the more complete change is creating the task ctx at the scheduler level and changing the downstream functions to accept task ctx. Wondering if this path makes sense or if implementing this as a dedicated operator (like e0fc987) would be preferred to keep expressions independent of task context in the first place. @bveeramani @goutamvenkat-anyscale

Got it. Will defer to @goutamvenkat-anyscale since he's the resident expressions expert

bveeramani avatar Dec 17 '25 01:12 bveeramani