[data] Monotonically increasing id
Description
Implements monotonically increasing ID expression. This closely follows the Spark implementation https://github.com/apache/spark/blob/9bbdc0743034b40a904ca87a08da4e0bf2b1386c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala.
Example usage:
from ray.data.expressions import monotonically_increasing_id
ds = ray.data.range(100)
ds = ds.with_column("uid", monotonically_increasing_id())
train, test = ds_with_id.streaming_train_test_split(test_size=0.25, split_type="hash", hash_column="uid")
Related issues
Closes #57806
awesome! thanks a bunch for this contribution. will let @gvspraveen find someone to shepherd this.
Thanks for the contribution. @bveeramani to shepherd this.
hey @rishic3, thanks a bunch for the contribution! if you're interested in chatting more with the contributors, feel free to join our community sync -- https://docs.google.com/forms/d/e/1FAIpQLSeYWjNExnr6gbhO5rpM0i6wm4TBTdsm3y5S0LR8Syzk_2gelQ/viewform
Previous commit implemented as a dataset method (see e0fc987). Pushed an update that instead implements as an expression as per original issue intended.
Hey @rishic3 , thanks for opening the PR! I'll review first thing tomorrow
This expr is the first to depend on TaskContext. Looks like using it with shuffle will fail
ds = ray.data.range(10, override_num_blocks=2)
ds = ds.with_column("uid", monotonically_increasing_id())
ds = ds.random_shuffle()
ds.take_all()
With AssertionError: TaskContext is required for monotonically_increasing_id()
(traceback>
2025-12-14 12:45:51,868 ERROR worker.py:433 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): [36mray::map()[39m (pid=443311, ip=10.110.47.100)
File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/planner/plan_expression/expression_evaluator.py", line 781, in eval_projection
names, output_cols = zip(*[(e.name, eval_expr(e, block)) for e in projection_exprs])
File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/planner/plan_expression/expression_evaluator.py", line 781, in <listcomp>
names, output_cols = zip(*[(e.name, eval_expr(e, block)) for e in projection_exprs])
File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/planner/plan_expression/expression_evaluator.py", line 737, in eval_expr
return evaluator.visit(expr)
File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/expressions.py", line 96, in visit
return self.visit_alias(expr)
File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/planner/plan_expression/expression_evaluator.py", line 657, in visit_alias
return self.visit(expr.expr)
File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/expressions.py", line 104, in visit
return self.visit_monotonically_increasing_id(expr)
File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/planner/plan_expression/expression_evaluator.py", line 700, in visit_monotonically_increasing_id
ctx is not None
AssertionError: TaskContext is required for monotonically_increasing_id()
The above exception was the direct cause of the following exception:
[36mray::map()[39m (pid=443311, ip=10.110.47.100)
File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/planner/exchange/shuffle_task_spec.py", line 63, in map
mapped_block = next(upstream_map_iter)
File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 102, in __call__
yield from self._post_process(results)
File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 84, in _shape_blocks
for result in results:
File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 406, in _apply_transform
yield from self._block_fn(blocks, ctx)
File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 608, in transform_fn
out_block = fn(block)
File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 129, in _project_block
_try_wrap_udf_exception(e)
File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 427, in _try_wrap_udf_exception
raise UserCodeException("UDF failed to process a data block.") from e
ray.exceptions.UserCodeException: UDF failed to process a data block.
...as the shuffle schedulers e.g. https://github.com/ray-project/ray/blob/4f869c4ac72c627c27c66b19c5d962e23b095482/python/ray/data/_internal/planner/exchange/pull_based_shuffle_task_scheduler.py#L88 do not propagate the task ctx to ShuffleTaskSpec.map().
Quick fix could be wrapping execution of the upstream map fn in the shuffle task spec in a new task context https://github.com/ray-project/ray/blob/4f869c4ac72c627c27c66b19c5d962e23b095482/python/ray/data/_internal/planner/exchange/shuffle_task_spec.py#L58 or I suppose the more complete change is creating the task ctx at the scheduler level and changing the downstream functions to accept task ctx.
Wondering if this path makes sense or if implementing this as a dedicated operator (like e0fc987) would be preferred to keep expressions independent of task context in the first place. @bveeramani @goutamvenkat-anyscale
This expr is the first to depend on TaskContext. Looks like using it with shuffle will fail
ds = ray.data.range(10, override_num_blocks=2) ds = ds.with_column("uid", monotonically_increasing_id()) ds = ds.random_shuffle() ds.take_all()With
AssertionError: TaskContext is required for monotonically_increasing_id()(traceback>
2025-12-14 12:45:51,868 ERROR worker.py:433 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): �[36mray::map()�[39m (pid=443311, ip=10.110.47.100) File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/planner/plan_expression/expression_evaluator.py", line 781, in eval_projection names, output_cols = zip(*[(e.name, eval_expr(e, block)) for e in projection_exprs]) File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/planner/plan_expression/expression_evaluator.py", line 781, in <listcomp> names, output_cols = zip(*[(e.name, eval_expr(e, block)) for e in projection_exprs]) File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/planner/plan_expression/expression_evaluator.py", line 737, in eval_expr return evaluator.visit(expr) File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/expressions.py", line 96, in visit return self.visit_alias(expr) File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/planner/plan_expression/expression_evaluator.py", line 657, in visit_alias return self.visit(expr.expr) File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/expressions.py", line 104, in visit return self.visit_monotonically_increasing_id(expr) File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/planner/plan_expression/expression_evaluator.py", line 700, in visit_monotonically_increasing_id ctx is not None AssertionError: TaskContext is required for monotonically_increasing_id() The above exception was the direct cause of the following exception: �[36mray::map()�[39m (pid=443311, ip=10.110.47.100) File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/planner/exchange/shuffle_task_spec.py", line 63, in map mapped_block = next(upstream_map_iter) File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 102, in __call__ yield from self._post_process(results) File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 84, in _shape_blocks for result in results: File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 406, in _apply_transform yield from self._block_fn(blocks, ctx) File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 608, in transform_fn out_block = fn(block) File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 129, in _project_block _try_wrap_udf_exception(e) File "/home/rishic/anaconda3/envs/ray-dev/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 427, in _try_wrap_udf_exception raise UserCodeException("UDF failed to process a data block.") from e ray.exceptions.UserCodeException: UDF failed to process a data block....as the shuffle schedulers e.g.
https://github.com/ray-project/ray/blob/4f869c4ac72c627c27c66b19c5d962e23b095482/python/ray/data/_internal/planner/exchange/pull_based_shuffle_task_scheduler.py#L88
do not propagate the task ctx to ShuffleTaskSpec.map(). Quick fix could be wrapping execution of the upstream map fn in the shuffle task spec in a new task context
https://github.com/ray-project/ray/blob/4f869c4ac72c627c27c66b19c5d962e23b095482/python/ray/data/_internal/planner/exchange/shuffle_task_spec.py#L58
or I suppose the more complete change is creating the task ctx at the scheduler level and changing the downstream functions to accept task ctx. Wondering if this path makes sense or if implementing this as a dedicated operator (like e0fc987) would be preferred to keep expressions independent of task context in the first place. @bveeramani @goutamvenkat-anyscale
Got it. Will defer to @goutamvenkat-anyscale since he's the resident expressions expert