`daft.io.lance.merge_columns` not working with examples in docs
Describe the bug
i try the sample code in https://docs.daft.ai/en/stable/connectors/lance/#data-evolution
but i found exception like
ValueError: Invalid user input: Column c already exists in the dataset, /home/runner/work/lance/lance/rust/lance/src/dataset/schema_evolution.rs:150:21
To Reproduce
daft version 0.6.10
def test_schema_evolution():
lance_path = "test_schema_evolution.lance"
# clear lance path
if os.path.exists(lance_path):
shutil.rmtree(lance_path, ignore_errors=True)
# write demo data to lance dataset
df = daft.from_pylist([{"c": 10, "d": 20}, {"c": 30, "d": 40}, {"c": 50, "d": 60}])
df.write_lance(lance_path, mode="overwrite")
print(f"Created demo Lance dataset with fixed schema at: {lance_path}")
# before change
df_before = daft.read_lance(lance_path)
print("Schema before evolution:")
print(df_before.schema())
def schema_evolution_transform(batch: pa.RecordBatch) -> pa.RecordBatch:
import pyarrow.compute as pc
return batch.append_column("new_column", pc.multiply(batch["c"], 2))
merge_columns(lance_path, transform=schema_evolution_transform, read_columns=["c"])
# after change
df_after = daft.read_lance(lance_path)
print("Schema after evolution:")
print(df_after.schema())
and Error:
Error when running pipeline node UDF FragmentHandler
Traceback (most recent call last):
File "/inspire/ssd/project/embodied-multimodality/public/kchen/sii-python-sdk/.venv/lib/python3.12/site-packages/daft/io/lance/lance_merge_column.py", line 33, in __call__
fragment_meta, schema = fragment.merge_columns(self.transform, self.read_columns, None, self.reader_schema)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/inspire/ssd/project/embodied-multimodality/public/kchen/sii-python-sdk/.venv/lib/python3.12/site-packages/lance/fragment.py", line 774, in merge_columns
metadata, schema = self._fragment.add_columns(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
ValueError: Invalid user input: Column c already exists in the dataset, /home/runner/work/lance/lance/rust/lance/src/dataset/schema_evolution.rs:150:21
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/inspire/ssd/project/embodied-multimodality/public/kchen/sii-python-sdk/sii-dataset-scripts/test_daft.py", line 574, in <module>
main()
File "/inspire/ssd/project/embodied-multimodality/public/kchen/sii-python-sdk/sii-dataset-scripts/test_daft.py", line 568, in main
test_schema_evolution()
File "/inspire/ssd/project/embodied-multimodality/public/kchen/sii-python-sdk/sii-dataset-scripts/test_daft.py", line 419, in test_schema_evolution
merge_columns(lance_path, transform=schema_evolution_transform, read_columns=["c"])
daft.errors.UDFException: User-defined function `<daft.io.lance.lance_merge_column.FragmentHandler object at 0x7f27d7da1460>` failed when executing on inputs:
- fragment_id (Int64, length=1)
Expected behavior
Expect to have a new column in lance dataset with no error.
Component(s)
Documentation, Other
Additional context
may be associated with lance?
@JingYiJun I'll track this issue
@Jay-ju will you be investigating this issue and implementing a fix? If so, I will assign it to you
@kevinzwang yes, i take it
@JingYiJun @kevinzwang
This should just be an issue with usage. The returned columns only need to include the new columns, and there's no need to include the existing columns.
just like this:
def double_score(x: pa.lib.RecordBatch) -> pa.lib.RecordBatch:
from daft.dependencies import pa
df = x.to_pandas()
return pa.lib.RecordBatch.from_pandas(
pd.DataFrame({"new_column": df["c"] * 2}),
schema=pa.schema([pa.field("new_column", pa.float64())]),
)
@JingYiJun does this fix work on your end? If so, feel free to close this issue
closing this issue. Feel free to reopen if the above fix did not address your use case.