Daft icon indicating copy to clipboard operation
Daft copied to clipboard

arrow_schema conversion missing nullability, string/large_string mismatch

Open everettVT opened this issue 6 months ago • 4 comments

Describe the bug

I initially discovered this when I was using write_lance(), but upon deeper inspection it appears daft isn't keeping all field metadata like nullability (my particular use case).

Additionally, the automatic dtype cast from string to long string appears to be an issue as also demonstrated in issue 3605

To Reproduce

import daft
import pyarrow as pa
import lance 

uri = "./tmp/"

# Define Schema
my_schema = pa.schema([
    pa.field("id", pa.uint32(), nullable=False),
    pa.field("content", pa.string(), nullable=True)
])

# Create an Arrow Table from a list of Dictionaries
table1 = pa.Table.from_pylist([{
    "id": 1,
    "content": "foo"
}], schema=my_schema)

table2 = pa.Table.from_pylist([{
    "id": 2,
    "content": "bar"
}], schema=my_schema)


# Write the table to lance dataset 
ds = lance.write_dataset(table1, uri, schema= my_schema, mode="overwrite")

# Read the Lance Dataset
table1_df = daft.read_lance(uri)

# Add some data
table2_df = daft.from_arrow(table2)

# Concatenate the two tables
combined_table_df = table1_df.concat(table2_df).collect()

# Write Back to lance dataset
try: 
    combined_table_df.write_lance(uri, mode="append")
except Exception as e: 
    print("Error: ", e)

# Both df.write_lance / LanceDataSink converts daft schema to pyarrow schema by: 
LanceDataSink_pyarrow_schema_conversion = pa.schema((f.name, f.dtype.to_arrow_dtype()) for f in combined_table_df.schema())

# DaftNative converts daft schema to pyarrow schema by: 
DaftNative_pyarrow_schema_conversion = combined_table_df.schema().to_pyarrow_schema()

# Check if the pyarrow schema is the same as the original schema
print("\nDoes LanceDataSink_pyarrow_schema_conversion == my_schema: \n", LanceDataSink_pyarrow_schema_conversion == my_schema)
print("\nDoes DaftNative_pyarrow_schema_conversion == my_schema: \n", DaftNative_pyarrow_schema_conversion == my_schema)
print("\n")
print("my_schema: \n", my_schema)
print("\n")
print("LanceDataSink_pyarrow_schema_conversion: \n", LanceDataSink_pyarrow_schema_conversion)
print("\n")
print("DaftNative_pyarrow_schema_conversion: \n", DaftNative_pyarrow_schema_conversion)
print("\n")

# In order to successfully write to lance datasets,
# we have to convert back to arrow table and write with lance. 

# Our conversion to a table still fails the schema check: 
convert_table2_df_back_to_arrow = table2_df.to_arrow()
print("\nDoes convert_table2_df_back_to_arrow.schema == my_schema: \n", convert_table2_df_back_to_arrow.schema == my_schema)

# Here we can see that lance wont write the table because the schema is not the same. 
try: 
    lance.write_dataset(convert_table2_df_back_to_arrow, uri, schema=my_schema, mode="append")
except Exception as e: 
    print("Error: ", e)

# The only thing we can do is to coerce the table to the original schema: 
coerced_table = convert_table2_df_back_to_arrow.cast(my_schema)

# Check if the coerced table is the same as the original table
print("\nDoes coerced_table == table2: \n", coerced_table == table2)

# Write the coerced table to lance dataset
try: 
    lance.write_dataset(coerced_table, uri, schema=my_schema, mode="append")
    print("Successfully wrote the coerced table to lance dataset")
except Exception as e: 
    print("Error: ", e)

Expected behavior

Roundtrip Schema conversion between arrow should be have exact equality across Apache Arrow native integrations.

Given

import daft
import pyarrow as pa
import lance 


uri = "./tmp/"

# Define Schema
orig_schema = pa.schema([
    pa.field("id", pa.uint32(), nullable=False),
    pa.field("content", pa.string(), nullable=True)
])

daft_orig_schema = daft.Schema.from_pyarrow_schema(orig_schema)

# Create an Arrow Table from a list of Dictionaries
pa_table= pa.Table.from_pylist([{
    "id": 1,
    "content": "foo"
}], schema=my_schema)

# From Arrow to Daft DataFrame(Native)
daft_df = daft.from_arrow(pa_table1)
daft_df_schema = df_1.schema().to_pyarrow_schema()

# From Arrow to Lance Dataset(Native) 
lance_ds = lance.write_dataset(pa_table, uri, schema = orig_schema, mode="overwrite")
lance_ds_schema = lance_ds.schema

# From Lance Dataset to Daft DataFrame
lance_df = daft.read_lance(uri) 
lance_df_schema = lance_df.schema().to_pyarrow_schema()


We should expect all of the following to be TRUE

assert orig_schema == daft_orig_schema
assert orig_schema == daft_df_schema
assert orig_schema == lance_ds_schema
assert orig_schema == lance_df_schema

We should also be able to expect that reading and writing to/from the same lance table should work.

## Lets say we add another record by first creating a new arrow table
pa_table2 = pa.Table.from_pylist([{
    "id": 2,
    "content": "bar"
}], schema=my_schema)

daft_df_2 = daft.from_arrow(pa_table2)

# Concat  should work
concat_df_from_arrow = daft_df.concat(table2_df).collect()
concat_df_from_lance = lance_df.concat(table2_df).collect()

# Write lance should work
try: 
    concat_df_from_arrow.write_lance(uri, mode="append")
except Exception as e: 
    print("Error: ", e)

try: 
    concat_df_from_lance.write_lance(uri, mode="append")
except Exception as e: 
    print("Error: ", e)

Component(s)

Other

Additional context

One important note

All pyarrow reconstruction should use Schema.to_pyarrow_schema(), not pa.schema((f.name, f.dtype.to_arrow_dtype()) for f in df.schema()).

Long story short: this misses the nullability and other metadata features that emerge in schema equality validation/checks.

everettVT avatar Jun 24 '25 05:06 everettVT

import pytest
import tempfile
import shutil
import os
import daft
import pyarrow as pa
import lance


class TestSchemaNullabilityError:
    """Test suite for schema nullability issues in Daft's Lance integration."""
    
    @pytest.fixture
    def temp_lance_dir(self):
        """Create a temporary directory for Lance datasets."""
        temp_dir = tempfile.mkdtemp()
        yield temp_dir
        shutil.rmtree(temp_dir, ignore_errors=True)
    
    @pytest.fixture
    def test_schema(self):
        """Define test schema with mixed nullability."""
        return pa.schema([
            pa.field("id", pa.uint32(), nullable=False),
            pa.field("content", pa.string(), nullable=True)
        ])
    
    @pytest.fixture
    def test_tables(self, test_schema):
        """Create test Arrow tables with the defined schema."""
        table1 = pa.Table.from_pylist([{
            "id": 1,
            "content": "foo"
        }], schema=test_schema)
        
        table2 = pa.Table.from_pylist([{
            "id": 2,
            "content": "bar"
        }], schema=test_schema)
        
        return table1, table2
    
    def test_lance_dataset_creation(self, temp_lance_dir, test_schema, test_tables):
        """Test that we can create a Lance dataset with the original schema."""
        table1, _ = test_tables
        
        # Should succeed - Lance can write with explicit schema
        ds = lance.write_dataset(table1, temp_lance_dir, schema=test_schema, mode="overwrite")
        assert ds is not None
        
        # Verify the dataset has the correct schema
        dataset = lance.dataset(temp_lance_dir)
        assert dataset.schema == test_schema
    
    def test_daft_read_lance_preserves_schema(self, temp_lance_dir, test_schema, test_tables):
        """Test that daft.read_lance() preserves the original schema."""
        table1, _ = test_tables
        
        # Create Lance dataset
        lance.write_dataset(table1, temp_lance_dir, schema=test_schema, mode="overwrite")
        
        # Read with Daft
        df = daft.read_lance(temp_lance_dir)
        
        # Check if Daft's native conversion preserves schema
        daft_native_schema = df.schema().to_pyarrow_schema()
        assert daft_native_schema == test_schema, (
            f"Daft's native schema conversion should preserve nullability. "
            f"Expected: {test_schema}, Got: {daft_native_schema}"
        )
    
    def test_lance_data_sink_schema_conversion_bug(self, temp_lance_dir, test_schema, test_tables):
        """Test that demonstrates the LanceDataSink schema conversion bug."""
        table1, table2 = test_tables
        
        # Create initial Lance dataset
        lance.write_dataset(table1, temp_lance_dir, schema=test_schema, mode="overwrite")
        
        # Read and combine data
        table1_df = daft.read_lance(temp_lance_dir)
        table2_df = daft.from_arrow(table2)
        combined_df = table1_df.concat(table2_df).collect()
        
        # Test the buggy conversion used by LanceDataSink
        buggy_schema = pa.schema((f.name, f.dtype.to_arrow_dtype()) for f in combined_df.schema())
        
        # This should fail - the buggy conversion loses nullability info
        assert buggy_schema != test_schema, (
            "LanceDataSink's schema conversion should fail to preserve nullability "
            "(this demonstrates the bug)"
        )
        
        # Show what's different
        for orig_field, buggy_field in zip(test_schema, buggy_schema):
            if orig_field.nullable != buggy_field.nullable:
                print(f"Nullability mismatch for field '{orig_field.name}': "
                      f"original={orig_field.nullable}, buggy={buggy_field.nullable}")
    
    def test_write_lance_fails_due_to_schema_mismatch(self, temp_lance_dir, test_schema, test_tables):
        """Test that write_lance() fails due to schema mismatch."""
        table1, table2 = test_tables
        
        # Create initial Lance dataset
        lance.write_dataset(table1, temp_lance_dir, schema=test_schema, mode="overwrite")
        
        # Read and combine data
        table1_df = daft.read_lance(temp_lance_dir)
        table2_df = daft.from_arrow(table2)
        combined_df = table1_df.concat(table2_df).collect()
        
        # This should fail due to schema mismatch
        with pytest.raises(Exception) as exc_info:
            combined_df.write_lance(temp_lance_dir, mode="append")
        
        # Verify it's a schema-related error
        error_msg = str(exc_info.value).lower()
        assert any(keyword in error_msg for keyword in ['schema', 'mismatch', 'nullable']), (
            f"Expected schema-related error, got: {exc_info.value}"
        )
    
    def test_direct_lance_write_also_fails(self, temp_lance_dir, test_schema, test_tables):
        """Test that even direct Lance writing fails without schema coercion."""
        table1, table2 = test_tables
        
        # Create initial Lance dataset
        lance.write_dataset(table1, temp_lance_dir, schema=test_schema, mode="overwrite")
        
        # Convert table2 through Daft and back to Arrow
        table2_df = daft.from_arrow(table2)
        converted_table = table2_df.to_arrow()
        
        # Verify the converted table has different schema
        assert converted_table.schema != test_schema, (
            "Converted table should have different schema (demonstrates the issue)"
        )
        
        # Direct Lance write should fail
        with pytest.raises(Exception) as exc_info:
            lance.write_dataset(converted_table, temp_lance_dir, schema=test_schema, mode="append")
        
        error_msg = str(exc_info.value).lower()
        assert any(keyword in error_msg for keyword in ['schema', 'mismatch', 'type']), (
            f"Expected schema-related error, got: {exc_info.value}"
        )
    
    def test_schema_coercion_workaround(self, temp_lance_dir, test_schema, test_tables):
        """Test that schema coercion provides a workaround."""
        table1, table2 = test_tables
        
        # Create initial Lance dataset
        lance.write_dataset(table1, temp_lance_dir, schema=test_schema, mode="overwrite")
        
        # Convert table2 through Daft
        table2_df = daft.from_arrow(table2)
        converted_table = table2_df.to_arrow()
        
        # Coerce to original schema
        coerced_table = converted_table.cast(test_schema)
        
        # Verify coercion worked
        assert coerced_table.schema == test_schema, (
            "Coerced table should match original schema"
        )
        
        # This should succeed
        lance.write_dataset(coerced_table, temp_lance_dir, schema=test_schema, mode="append")
        
        # Verify data was written correctly
        dataset = lance.dataset(temp_lance_dir)
        result_table = dataset.to_table()
        
        # Should have 2 rows (original + appended)
        assert len(result_table) == 2
        assert result_table.schema == test_schema
    
    def test_correct_schema_conversion_method(self, test_schema, test_tables):
        """Test that the correct schema conversion method preserves nullability."""
        _, table2 = test_tables
        
        # Convert to Daft and back using the correct method
        table2_df = daft.from_arrow(table2)
        correct_schema = table2_df.schema().to_pyarrow_schema()
        
        # This should preserve the original schema
        assert correct_schema == test_schema, (
            f"Correct conversion should preserve schema. "
            f"Expected: {test_schema}, Got: {correct_schema}"
        )
    
    @pytest.mark.parametrize("nullable_config", [
        [True, True],    # Both nullable
        [False, False],  # Both non-nullable  
        [True, False],   # Mixed nullability
        [False, True],   # Mixed nullability (reversed)
    ])
    def test_various_nullability_configurations(self, temp_lance_dir, nullable_config):
        """Test the bug with various nullability configurations."""
        # Create schema with specified nullability
        schema = pa.schema([
            pa.field("col1", pa.int64(), nullable=nullable_config[0]),
            pa.field("col2", pa.string(), nullable=nullable_config[1])
        ])
        
        # Create test table
        table = pa.Table.from_pylist([{
            "col1": 1,
            "col2": "test"
        }], schema=schema)
        
        # Write to Lance
        lance.write_dataset(table, temp_lance_dir, schema=schema, mode="overwrite")
        
        # Read with Daft
        df = daft.read_lance(temp_lance_dir)
        
        # Test both conversion methods
        buggy_schema = pa.schema((f.name, f.dtype.to_arrow_dtype()) for f in df.schema())
        correct_schema = df.schema().to_pyarrow_schema()
        
        # Correct method should always work
        assert correct_schema == schema, (
            f"Correct method failed for nullability config {nullable_config}"
        )
        
        # Buggy method should fail when there are non-nullable fields
        if not all(nullable_config):
            assert buggy_schema != schema, (
                f"Buggy method should fail for nullability config {nullable_config}"
            )


@pytest.fixture
def temp_dir():
    """Create temporary directory for Lance datasets."""
    temp_dir = tempfile.mkdtemp()
    yield temp_dir
    shutil.rmtree(temp_dir, ignore_errors=True)


def test_schema_nullability_error(temp_dir):
    """Test that reproduces the schema nullability error in write_lance."""
    
    uri = temp_dir + "/"

    # Define Schema
    my_schema = pa.schema([
        pa.field("id", pa.uint32(), nullable=False),
        pa.field("content", pa.string(), nullable=True)
    ])

    # Create an Arrow Table from a list of Dictionaries
    table1 = pa.Table.from_pylist([{
        "id": 1,
        "content": "foo"
    }], schema=my_schema)

    table2 = pa.Table.from_pylist([{
        "id": 2,
        "content": "bar"
    }], schema=my_schema)

    # Write the table to lance dataset 
    ds = lance.write_dataset(table1, uri, schema=my_schema, mode="overwrite")

    # Read the Lance Dataset
    table1_df = daft.read_lance(uri)

    # Add some data
    table2_df = daft.from_arrow(table2)

    # Concatenate the two tables
    combined_table_df = table1_df.concat(table2_df).collect()

    # Write Back to lance dataset
    with pytest.raises(Exception) as exc_info:
        combined_table_df.write_lance(uri, mode="append")
    
    print("Error: ", exc_info.value)

    # Both df.write_lance / LanceDataSink converts daft schema to pyarrow schema by: 
    LanceDataSink_pyarrow_schema_conversion = pa.schema((f.name, f.dtype.to_arrow_dtype()) for f in combined_table_df.schema())

    # DaftNative converts daft schema to pyarrow schema by: 
    DaftNative_pyarrow_schema_conversion = combined_table_df.schema().to_pyarrow_schema()

    # Check if the pyarrow schema is the same as the original schema
    print("\nDoes LanceDataSink_pyarrow_schema_conversion == my_schema: \n", LanceDataSink_pyarrow_schema_conversion == my_schema)
    print("\nDoes DaftNative_pyarrow_schema_conversion == my_schema: \n", DaftNative_pyarrow_schema_conversion == my_schema)
    print("\n")
    print("my_schema: \n", my_schema)
    print("\n")
    print("LanceDataSink_pyarrow_schema_conversion: \n", LanceDataSink_pyarrow_schema_conversion)
    print("\n")
    print("DaftNative_pyarrow_schema_conversion: \n", DaftNative_pyarrow_schema_conversion)
    print("\n")

    # In order to successfully write to lance datasets,
    # we have to convert back to arrow table and write with lance. 

    # Our conversion to a table still fails the schema check: 
    convert_table2_df_back_to_arrow = table2_df.to_arrow()
    print("\nDoes convert_table2_df_back_to_arrow.schema == my_schema: \n", convert_table2_df_back_to_arrow.schema == my_schema)

    # Here we can see that lance wont write the table because the schema is not the same. 
    with pytest.raises(Exception) as exc_info:
        lance.write_dataset(convert_table2_df_back_to_arrow, uri, schema=my_schema, mode="append")
    
    print("Error: ", exc_info.value)

    # The only thing we can do is to coerce the table to the original schema: 
    coerced_table = convert_table2_df_back_to_arrow.cast(my_schema)

    # Check if the coerced table is the same as the original table
    print("\nDoes coerced_table == table2: \n", coerced_table == table2)

    # Write the coerced table to lance dataset
    try: 
        lance.write_dataset(coerced_table, uri, schema=my_schema, mode="append")
        print("Successfully wrote the coerced table to lance dataset")
    except Exception as e: 
        print("Error: ", e)
        
    # Assertions to verify the bug exists
    assert LanceDataSink_pyarrow_schema_conversion != my_schema, "LanceDataSink conversion should fail (demonstrates bug)"
    assert DaftNative_pyarrow_schema_conversion == my_schema, "DaftNative conversion should work"
    assert convert_table2_df_back_to_arrow.schema != my_schema, "Converted table schema should differ"
    assert coerced_table.schema == my_schema, "Coerced table should match original schema"


if __name__ == "__main__":
    pytest.main([__file__, "-v"]) 

everettVT avatar Jun 24 '25 05:06 everettVT

I think this is part of a more general problem that Daft schemas don't store nullable information at all, and the type checking doesn't know what to do with nullable info. For example, with the concat you shared.

table1_df.concat(table2_df)

If instead of your case, column content was nullable in table1_df but not nullable in table2_df, Daft needs to figure out that the output col is nullable.

Since its a larger problem, I'm going to open a separate issue for storing and using null info in schemas and link this issue to that

srilman avatar Jun 24 '25 17:06 srilman

@srilman excellent notes here.

Concerning your example of concatenating two dataframes with fields that share the same field name and datatype with different nullability, this makes sense but may warrant a warning in the df.explain(). I'm not sure how to make the call on that tbh.

everettVT avatar Jun 25 '25 16:06 everettVT

+1 For me as well. When I read parquet with daft it upcasts pyarrow types to large_string, while pyarrow reads them as string.

daft.read_parquet(str(path)).to_arrow().schema

output:

path: large_string
label_type: large_list<item: large_string>
  child 0, item: large_string
label: large_list<item: large_string>
  child 0, item: large_string
mask_type: large_string
mask_path: large_string
organ: large_string
cancer: large_string
image_type: large_string
image_size: large_list<item: int64>
  child 0, item: int64
microscope: large_string
microscope_magnification: large_string
subset: large_string
object_coordinates: large_string
split: large_string
query_id: int64
query: large_string
answer: large_string
objects: large_list<item: large_string>
  child 0, item: large_string
perceptual_hash: large_string
sha256_hash: large_string

When using arrow directly to read parquet

pa.parquet.read_table(str(path)).schema

output:

path: string
label_type: list<element: string>
  child 0, element: string
label: list<element: string>
  child 0, element: string
mask_type: string
mask_path: string
organ: string
cancer: string
image_type: string
image_size: list<element: int64>
  child 0, element: int64
microscope: string
microscope_magnification: string
subset: string
object_coordinates: string
split: string
query_id: int64
query: string
answer: string
objects: list<element: string>
  child 0, element: string
perceptual_hash: string
sha256_hash: string
-- schema metadata --
pandas: '{"index_columns": [{"kind": "range", "name": null, "start": 0, "' + 2648

daankaiko avatar Aug 19 '25 10:08 daankaiko