[add_columns] implements add columns task through ray
Relevant information: https://github.com/ray-project/ray/pull/49070/files, https://github.com/ray-project/ray/issues/49063. I submitted a pull request in the Ray community regarding Ray's distributed read-write operations add_columns, delete_rows, and compaction. However, the Ray community believes that at this stage, they do not want to implement non-read-write operators. Therefore, I want to confirm here whether it is willing to directly put this part of the code into Lance for maintenance, because I see that datasink is also maintained in Lance.
@westonpace @wjones127 WDYT?
Also interested in this. I believe this would just require exposing add_columns_to_fragments in python to parallelize the fragment updates and updating the LanceCommitter to handle the transaction type
@westonpace Can we discuss here how to implement in this issue?
@Jay-ju sounds good. I'm back from break as of yesterday and a better API for distributed add_columns is one of my top priorities.
Adding columns is similar to a bulk write but there are two stages. Sometimes we can combine these two stages into a single pass. Sometimes we cannot. All of the APIs we have today assume we are doing this as a single pass.
The two stages are "initial write" and "aligned write".
After the "aligned write" pass we need to have one data file for each fragment. That data file contains our new columns. It must have the same number of physical rows as the other data files in the fragment. This means, for example, that it needs to have placeholder values for deleted rows. The rows must be in the same order as the other rows in the fragment.
Creating this aligned data can be very tricky for a number of reasons:
- Often we want to calculate the output in smaller batches. For example, a fragment may have one million rows. It may be very expensive to calculate a new embedding for one million rows. We might want to calculate one thousand rows at a time.
- Calculating these new values can sometimes take a long time. If the dataset is modified in any way while we are calculating these values (e.g. compaction, new rows added, etc.) then it is likely our output is no longer properly aligned, even if it was properly aligned to begin with.
- Calculating values in the exact same order can sometimes be difficult. For example, sometimes we want to calculate a new column by joining data from some other table based on some join key. We might end up calculating the data in a different order than it is currently stored. In addition, we might not have the same number of values between the two tables. Data in the table we are joining might have fewer rows.
As a result I would like to break the single-pass add columns workflow into a two pass workflow. The first pass writes all of the new column data, in whatever order, with whatever batch size. The second pass then reads this new column data and aligns it to the dataset at a point in time. This addresses our above concerns:
- We don't have to write data in the first pass at fragment-level sizes. We will do a mini-compaction as we align the data.
- It's ok if the initial write takes a long time. If data is compacted in the meantime it doesn't matter because we will align to the compacted representation during the align pass. If data is deleted it doesn't matter. We will just drop those rows as we align. If data is added the user can have a choice. Either we insert nulls or we fail the operation. We essentially reduce our critical section so that it only covers the second (align) stage which is much faster.
- We can use a join key during the align phase to reorder the data between the first and second passes.
I mention this because this is something we will probably want to expose in any Ray solution as well. It should be a two-pass solution that first does the initial write (essentially a map reduce on a user-defined batch size) and then does the alignment (another map reduce but now the batch size must be the fragment size).
I began this effort in November (https://github.com/lancedb/lance/pull/3180) but paused it during December for some high priority tasks. I am resuming this work now.
However, the Ray community believes that at this stage, they do not want to implement non-read-write operators. Therefore, I want to confirm here whether it is willing to directly put this part of the code into Lance for maintenance, because I see that datasink is also maintained in Lance.
Yes, having the features in Ray Data would be ideal for users but if Ray isn't interested then we're happy to host it in our python repo (similar to the ray data sink). I'm not entirely sure what the API would look like though. Is this just a set of tasks? Or is it a "dataset" object of some kind that wraps a Ray dataset and adds additional functionality? Or is it some other shape?
@Jay-ju sounds good. I'm back from break as of yesterday and a better API for distributed add_columns is one of my top priorities.
Adding columns is similar to a bulk write but there are two stages. Sometimes we can combine these two stages into a single pass. Sometimes we cannot. All of the APIs we have today assume we are doing this as a single pass.
The two stages are "initial write" and "aligned write".
After the "aligned write" pass we need to have one data file for each fragment. That data file contains our new columns. It must have the same number of physical rows as the other data files in the fragment. This means, for example, that it needs to have placeholder values for deleted rows. The rows must be in the same order as the other rows in the fragment.
Creating this aligned data can be very tricky for a number of reasons:
- Often we want to calculate the output in smaller batches. For example, a fragment may have one million rows. It may be very expensive to calculate a new embedding for one million rows. We might want to calculate one thousand rows at a time.
- Calculating these new values can sometimes take a long time. If the dataset is modified in any way while we are calculating these values (e.g. compaction, new rows added, etc.) then it is likely our output is no longer properly aligned, even if it was properly aligned to begin with.
- Calculating values in the exact same order can sometimes be difficult. For example, sometimes we want to calculate a new column by joining data from some other table based on some join key. We might end up calculating the data in a different order than it is currently stored. In addition, we might not have the same number of values between the two tables. Data in the table we are joining might have fewer rows.
As a result I would like to break the single-pass add columns workflow into a two pass workflow. The first pass writes all of the new column data, in whatever order, with whatever batch size. The second pass then reads this new column data and aligns it to the dataset at a point in time. This addresses our above concerns:
- We don't have to write data in the first pass at fragment-level sizes. We will do a mini-compaction as we align the data.
- It's ok if the initial write takes a long time. If data is compacted in the meantime it doesn't matter because we will align to the compacted representation during the align pass. If data is deleted it doesn't matter. We will just drop those rows as we align. If data is added the user can have a choice. Either we insert nulls or we fail the operation. We essentially reduce our critical section so that it only covers the second (align) stage which is much faster.
- We can use a join key during the align phase to reorder the data between the first and second passes.
I mention this because this is something we will probably want to expose in any Ray solution as well. It should be a two-pass solution that first does the initial write (essentially a map reduce on a user-defined batch size) and then does the alignment (another map reduce but now the batch size must be the fragment size).
I began this effort in November (#3180) but paused it during December for some high priority tasks. I am resuming this work now.
@westonpace I actually didn't fully understand the content of this part. However, generally looking at it, is it to solve the granularity problem of split tasks and the consistency problem caused by concurrent reading and writing? Can we release a version based on fragments first and then optimize it? Currently, looking at large data volumes, processing add_column according to fragments doesn't seem to have performance issues. Moreover, generally, this kind of processing is off-line, so the priority of consistency doesn't seem to be that high?
However, the Ray community believes that at this stage, they do not want to implement non-read-write operators. Therefore, I want to confirm here whether it is willing to directly put this part of the code into Lance for maintenance, because I see that datasink is also maintained in Lance.
Yes, having the features in Ray Data would be ideal for users but if Ray isn't interested then we're happy to host it in our python repo (similar to the ray data sink). I'm not entirely sure what the API would look like though. Is this just a set of tasks? Or is it a "dataset" object of some kind that wraps a Ray dataset and adds additional functionality? Or is it some other shape?
i have a demo here, https://github.com/ray-project/ray/pull/49070
Can we release a version based on fragments first and then optimize it?
There is a fragment-level API for adding columns today (it has a different name merge_columns but it is mostly identical, the naming issue is because add_columns was taken by a legacy behavior): https://github.com/lancedb/lance/blob/1d40479c1d51f634233483aa04985a6d30bb8323/python/python/lance/fragment.py#L553
However, generally looking at it, is it to solve the granularity problem of split tasks
I'm not entirely sure what split tasks are but this sounds correct.
and the consistency problem caused by concurrent reading and writing?
Yes, that's part of it. Modifying the dataset while the new column is being added.
Currently, looking at large data volumes, processing add_column according to fragments doesn't seem to have performance issues. Moreover, generally, this kind of processing is off-line, so the priority of consistency doesn't seem to be that high?
The problem I was working on was adding a new vector embedding column for 1B rows. It took 1+days of calendar time (many more days of CPU time) to calculate the values for the new column.
One problem was that large tasks (calculating the embedding for 1M rows) were difficult to complete.
The second problem was that the database could be modified in the long span of time calculating the new values and then Lance was rejecting it as a consistency failure.
i have a demo here, https://github.com/ray-project/ray/pull/49070
I'll take a look tomorrow morning.
The demo looks good. I see you are already using the fragment-level merge_columns. Is there any change you need in lance? Or were you just confirming that we'd be interested in hosting the tasks. I believe we would be happy to have the tasks here.
@westonpace Yes, this demo can be executed very well. We just need to confirm whether it needs to be maintained. If it needs to be maintained, submit a pull request in the Lance community and you can help review it.
Can we release a version based on fragments first and then optimize it?
There is a fragment-level API for adding columns today (it has a different name
merge_columnsbut it is mostly identical, the naming issue is becauseadd_columnswas taken by a legacy behavior):https://github.com/lancedb/lance/blob/1d40479c1d51f634233483aa04985a6d30bb8323/python/python/lance/fragment.py#L553
However, generally looking at it, is it to solve the granularity problem of split tasks
I'm not entirely sure what split tasks are but this sounds correct.
and the consistency problem caused by concurrent reading and writing?
Yes, that's part of it. Modifying the dataset while the new column is being added.
Currently, looking at large data volumes, processing add_column according to fragments doesn't seem to have performance issues. Moreover, generally, this kind of processing is off-line, so the priority of consistency doesn't seem to be that high?
The problem I was working on was adding a new vector embedding column for 1B rows. It took 1+days of calendar time (many more days of CPU time) to calculate the values for the new column.
One problem was that large tasks (calculating the embedding for 1M rows) were difficult to complete.
The second problem was that the database could be modified in the long span of time calculating the new values and then Lance was rejecting it as a consistency failure.
- I am very willing to take a look at the design with finer granularity and consistency considerations in the future, as well as the interface form for ray/spark. Is there any relevant design document that I can refer to first?
- In addition, does it take one day for 1 billion rows mainly because the vectorized logic is relatively time-consuming? If it is just arithmetic calculation, it should be relatively fast, right?
I believe https://github.com/lancedb/lance/pull/3369 addresses this issue. Please feel free to reopen if I miss something.