Daft icon indicating copy to clipboard operation
Daft copied to clipboard

`daft.io.lance.merge_columns` not working with examples in docs

Open JingYiJun opened this issue 5 months ago • 5 comments

Describe the bug

i try the sample code in https://docs.daft.ai/en/stable/connectors/lance/#data-evolution but i found exception like

ValueError: Invalid user input: Column c already exists in the dataset, /home/runner/work/lance/lance/rust/lance/src/dataset/schema_evolution.rs:150:21

To Reproduce

daft version 0.6.10

def test_schema_evolution():
    lance_path = "test_schema_evolution.lance"

    # clear lance path
    if os.path.exists(lance_path):
        shutil.rmtree(lance_path, ignore_errors=True)

    # write demo data to lance dataset
    df = daft.from_pylist([{"c": 10, "d": 20}, {"c": 30, "d": 40}, {"c": 50, "d": 60}])
    df.write_lance(lance_path, mode="overwrite")
    print(f"Created demo Lance dataset with fixed schema at: {lance_path}")

    # before change
    df_before = daft.read_lance(lance_path)
    print("Schema before evolution:")
    print(df_before.schema())

    def schema_evolution_transform(batch: pa.RecordBatch) -> pa.RecordBatch:
        import pyarrow.compute as pc

        return batch.append_column("new_column", pc.multiply(batch["c"], 2))

    merge_columns(lance_path, transform=schema_evolution_transform, read_columns=["c"])

    # after change
    df_after = daft.read_lance(lance_path)
    print("Schema after evolution:")
    print(df_after.schema())

and Error:

Error when running pipeline node UDF FragmentHandler
Traceback (most recent call last):
  File "/inspire/ssd/project/embodied-multimodality/public/kchen/sii-python-sdk/.venv/lib/python3.12/site-packages/daft/io/lance/lance_merge_column.py", line 33, in __call__
    fragment_meta, schema = fragment.merge_columns(self.transform, self.read_columns, None, self.reader_schema)
                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/inspire/ssd/project/embodied-multimodality/public/kchen/sii-python-sdk/.venv/lib/python3.12/site-packages/lance/fragment.py", line 774, in merge_columns
    metadata, schema = self._fragment.add_columns(
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^
ValueError: Invalid user input: Column c already exists in the dataset, /home/runner/work/lance/lance/rust/lance/src/dataset/schema_evolution.rs:150:21

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/inspire/ssd/project/embodied-multimodality/public/kchen/sii-python-sdk/sii-dataset-scripts/test_daft.py", line 574, in <module>
    main()
  File "/inspire/ssd/project/embodied-multimodality/public/kchen/sii-python-sdk/sii-dataset-scripts/test_daft.py", line 568, in main
    test_schema_evolution()
  File "/inspire/ssd/project/embodied-multimodality/public/kchen/sii-python-sdk/sii-dataset-scripts/test_daft.py", line 419, in test_schema_evolution
    merge_columns(lance_path, transform=schema_evolution_transform, read_columns=["c"])
daft.errors.UDFException: User-defined function `<daft.io.lance.lance_merge_column.FragmentHandler object at 0x7f27d7da1460>` failed when executing on inputs:
  - fragment_id (Int64, length=1)

Expected behavior

Expect to have a new column in lance dataset with no error.

Component(s)

Documentation, Other

Additional context

may be associated with lance?

JingYiJun avatar Nov 10 '25 07:11 JingYiJun

@JingYiJun I'll track this issue

Jay-ju avatar Nov 10 '25 07:11 Jay-ju

@Jay-ju will you be investigating this issue and implementing a fix? If so, I will assign it to you

kevinzwang avatar Nov 10 '25 10:11 kevinzwang

@kevinzwang yes, i take it

Jay-ju avatar Nov 11 '25 03:11 Jay-ju

@JingYiJun @kevinzwang
This should just be an issue with usage. The returned columns only need to include the new columns, and there's no need to include the existing columns. just like this:

def double_score(x: pa.lib.RecordBatch) -> pa.lib.RecordBatch:
            from daft.dependencies import pa

            df = x.to_pandas()
            return pa.lib.RecordBatch.from_pandas(
                pd.DataFrame({"new_column": df["c"] * 2}),
                schema=pa.schema([pa.field("new_column", pa.float64())]),
            )

Jay-ju avatar Nov 11 '25 05:11 Jay-ju

@JingYiJun does this fix work on your end? If so, feel free to close this issue

kevinzwang avatar Nov 11 '25 22:11 kevinzwang

closing this issue. Feel free to reopen if the above fix did not address your use case.

universalmind303 avatar Dec 08 '25 19:12 universalmind303