arrow_schema conversion missing nullability, string/large_string mismatch
Describe the bug
I initially discovered this when I was using write_lance(), but upon deeper inspection it appears daft isn't keeping all field metadata like nullability (my particular use case).
Additionally, the automatic dtype cast from string to long string appears to be an issue as also demonstrated in issue 3605
To Reproduce
import daft
import pyarrow as pa
import lance
uri = "./tmp/"
# Define Schema
my_schema = pa.schema([
pa.field("id", pa.uint32(), nullable=False),
pa.field("content", pa.string(), nullable=True)
])
# Create an Arrow Table from a list of Dictionaries
table1 = pa.Table.from_pylist([{
"id": 1,
"content": "foo"
}], schema=my_schema)
table2 = pa.Table.from_pylist([{
"id": 2,
"content": "bar"
}], schema=my_schema)
# Write the table to lance dataset
ds = lance.write_dataset(table1, uri, schema= my_schema, mode="overwrite")
# Read the Lance Dataset
table1_df = daft.read_lance(uri)
# Add some data
table2_df = daft.from_arrow(table2)
# Concatenate the two tables
combined_table_df = table1_df.concat(table2_df).collect()
# Write Back to lance dataset
try:
combined_table_df.write_lance(uri, mode="append")
except Exception as e:
print("Error: ", e)
# Both df.write_lance / LanceDataSink converts daft schema to pyarrow schema by:
LanceDataSink_pyarrow_schema_conversion = pa.schema((f.name, f.dtype.to_arrow_dtype()) for f in combined_table_df.schema())
# DaftNative converts daft schema to pyarrow schema by:
DaftNative_pyarrow_schema_conversion = combined_table_df.schema().to_pyarrow_schema()
# Check if the pyarrow schema is the same as the original schema
print("\nDoes LanceDataSink_pyarrow_schema_conversion == my_schema: \n", LanceDataSink_pyarrow_schema_conversion == my_schema)
print("\nDoes DaftNative_pyarrow_schema_conversion == my_schema: \n", DaftNative_pyarrow_schema_conversion == my_schema)
print("\n")
print("my_schema: \n", my_schema)
print("\n")
print("LanceDataSink_pyarrow_schema_conversion: \n", LanceDataSink_pyarrow_schema_conversion)
print("\n")
print("DaftNative_pyarrow_schema_conversion: \n", DaftNative_pyarrow_schema_conversion)
print("\n")
# In order to successfully write to lance datasets,
# we have to convert back to arrow table and write with lance.
# Our conversion to a table still fails the schema check:
convert_table2_df_back_to_arrow = table2_df.to_arrow()
print("\nDoes convert_table2_df_back_to_arrow.schema == my_schema: \n", convert_table2_df_back_to_arrow.schema == my_schema)
# Here we can see that lance wont write the table because the schema is not the same.
try:
lance.write_dataset(convert_table2_df_back_to_arrow, uri, schema=my_schema, mode="append")
except Exception as e:
print("Error: ", e)
# The only thing we can do is to coerce the table to the original schema:
coerced_table = convert_table2_df_back_to_arrow.cast(my_schema)
# Check if the coerced table is the same as the original table
print("\nDoes coerced_table == table2: \n", coerced_table == table2)
# Write the coerced table to lance dataset
try:
lance.write_dataset(coerced_table, uri, schema=my_schema, mode="append")
print("Successfully wrote the coerced table to lance dataset")
except Exception as e:
print("Error: ", e)
Expected behavior
Roundtrip Schema conversion between arrow should be have exact equality across Apache Arrow native integrations.
Given
import daft
import pyarrow as pa
import lance
uri = "./tmp/"
# Define Schema
orig_schema = pa.schema([
pa.field("id", pa.uint32(), nullable=False),
pa.field("content", pa.string(), nullable=True)
])
daft_orig_schema = daft.Schema.from_pyarrow_schema(orig_schema)
# Create an Arrow Table from a list of Dictionaries
pa_table= pa.Table.from_pylist([{
"id": 1,
"content": "foo"
}], schema=my_schema)
# From Arrow to Daft DataFrame(Native)
daft_df = daft.from_arrow(pa_table1)
daft_df_schema = df_1.schema().to_pyarrow_schema()
# From Arrow to Lance Dataset(Native)
lance_ds = lance.write_dataset(pa_table, uri, schema = orig_schema, mode="overwrite")
lance_ds_schema = lance_ds.schema
# From Lance Dataset to Daft DataFrame
lance_df = daft.read_lance(uri)
lance_df_schema = lance_df.schema().to_pyarrow_schema()
We should expect all of the following to be TRUE
assert orig_schema == daft_orig_schema
assert orig_schema == daft_df_schema
assert orig_schema == lance_ds_schema
assert orig_schema == lance_df_schema
We should also be able to expect that reading and writing to/from the same lance table should work.
## Lets say we add another record by first creating a new arrow table
pa_table2 = pa.Table.from_pylist([{
"id": 2,
"content": "bar"
}], schema=my_schema)
daft_df_2 = daft.from_arrow(pa_table2)
# Concat should work
concat_df_from_arrow = daft_df.concat(table2_df).collect()
concat_df_from_lance = lance_df.concat(table2_df).collect()
# Write lance should work
try:
concat_df_from_arrow.write_lance(uri, mode="append")
except Exception as e:
print("Error: ", e)
try:
concat_df_from_lance.write_lance(uri, mode="append")
except Exception as e:
print("Error: ", e)
Component(s)
Other
Additional context
One important note
All pyarrow reconstruction should use Schema.to_pyarrow_schema(), not pa.schema((f.name, f.dtype.to_arrow_dtype()) for f in df.schema()).
Long story short: this misses the nullability and other metadata features that emerge in schema equality validation/checks.
import pytest
import tempfile
import shutil
import os
import daft
import pyarrow as pa
import lance
class TestSchemaNullabilityError:
"""Test suite for schema nullability issues in Daft's Lance integration."""
@pytest.fixture
def temp_lance_dir(self):
"""Create a temporary directory for Lance datasets."""
temp_dir = tempfile.mkdtemp()
yield temp_dir
shutil.rmtree(temp_dir, ignore_errors=True)
@pytest.fixture
def test_schema(self):
"""Define test schema with mixed nullability."""
return pa.schema([
pa.field("id", pa.uint32(), nullable=False),
pa.field("content", pa.string(), nullable=True)
])
@pytest.fixture
def test_tables(self, test_schema):
"""Create test Arrow tables with the defined schema."""
table1 = pa.Table.from_pylist([{
"id": 1,
"content": "foo"
}], schema=test_schema)
table2 = pa.Table.from_pylist([{
"id": 2,
"content": "bar"
}], schema=test_schema)
return table1, table2
def test_lance_dataset_creation(self, temp_lance_dir, test_schema, test_tables):
"""Test that we can create a Lance dataset with the original schema."""
table1, _ = test_tables
# Should succeed - Lance can write with explicit schema
ds = lance.write_dataset(table1, temp_lance_dir, schema=test_schema, mode="overwrite")
assert ds is not None
# Verify the dataset has the correct schema
dataset = lance.dataset(temp_lance_dir)
assert dataset.schema == test_schema
def test_daft_read_lance_preserves_schema(self, temp_lance_dir, test_schema, test_tables):
"""Test that daft.read_lance() preserves the original schema."""
table1, _ = test_tables
# Create Lance dataset
lance.write_dataset(table1, temp_lance_dir, schema=test_schema, mode="overwrite")
# Read with Daft
df = daft.read_lance(temp_lance_dir)
# Check if Daft's native conversion preserves schema
daft_native_schema = df.schema().to_pyarrow_schema()
assert daft_native_schema == test_schema, (
f"Daft's native schema conversion should preserve nullability. "
f"Expected: {test_schema}, Got: {daft_native_schema}"
)
def test_lance_data_sink_schema_conversion_bug(self, temp_lance_dir, test_schema, test_tables):
"""Test that demonstrates the LanceDataSink schema conversion bug."""
table1, table2 = test_tables
# Create initial Lance dataset
lance.write_dataset(table1, temp_lance_dir, schema=test_schema, mode="overwrite")
# Read and combine data
table1_df = daft.read_lance(temp_lance_dir)
table2_df = daft.from_arrow(table2)
combined_df = table1_df.concat(table2_df).collect()
# Test the buggy conversion used by LanceDataSink
buggy_schema = pa.schema((f.name, f.dtype.to_arrow_dtype()) for f in combined_df.schema())
# This should fail - the buggy conversion loses nullability info
assert buggy_schema != test_schema, (
"LanceDataSink's schema conversion should fail to preserve nullability "
"(this demonstrates the bug)"
)
# Show what's different
for orig_field, buggy_field in zip(test_schema, buggy_schema):
if orig_field.nullable != buggy_field.nullable:
print(f"Nullability mismatch for field '{orig_field.name}': "
f"original={orig_field.nullable}, buggy={buggy_field.nullable}")
def test_write_lance_fails_due_to_schema_mismatch(self, temp_lance_dir, test_schema, test_tables):
"""Test that write_lance() fails due to schema mismatch."""
table1, table2 = test_tables
# Create initial Lance dataset
lance.write_dataset(table1, temp_lance_dir, schema=test_schema, mode="overwrite")
# Read and combine data
table1_df = daft.read_lance(temp_lance_dir)
table2_df = daft.from_arrow(table2)
combined_df = table1_df.concat(table2_df).collect()
# This should fail due to schema mismatch
with pytest.raises(Exception) as exc_info:
combined_df.write_lance(temp_lance_dir, mode="append")
# Verify it's a schema-related error
error_msg = str(exc_info.value).lower()
assert any(keyword in error_msg for keyword in ['schema', 'mismatch', 'nullable']), (
f"Expected schema-related error, got: {exc_info.value}"
)
def test_direct_lance_write_also_fails(self, temp_lance_dir, test_schema, test_tables):
"""Test that even direct Lance writing fails without schema coercion."""
table1, table2 = test_tables
# Create initial Lance dataset
lance.write_dataset(table1, temp_lance_dir, schema=test_schema, mode="overwrite")
# Convert table2 through Daft and back to Arrow
table2_df = daft.from_arrow(table2)
converted_table = table2_df.to_arrow()
# Verify the converted table has different schema
assert converted_table.schema != test_schema, (
"Converted table should have different schema (demonstrates the issue)"
)
# Direct Lance write should fail
with pytest.raises(Exception) as exc_info:
lance.write_dataset(converted_table, temp_lance_dir, schema=test_schema, mode="append")
error_msg = str(exc_info.value).lower()
assert any(keyword in error_msg for keyword in ['schema', 'mismatch', 'type']), (
f"Expected schema-related error, got: {exc_info.value}"
)
def test_schema_coercion_workaround(self, temp_lance_dir, test_schema, test_tables):
"""Test that schema coercion provides a workaround."""
table1, table2 = test_tables
# Create initial Lance dataset
lance.write_dataset(table1, temp_lance_dir, schema=test_schema, mode="overwrite")
# Convert table2 through Daft
table2_df = daft.from_arrow(table2)
converted_table = table2_df.to_arrow()
# Coerce to original schema
coerced_table = converted_table.cast(test_schema)
# Verify coercion worked
assert coerced_table.schema == test_schema, (
"Coerced table should match original schema"
)
# This should succeed
lance.write_dataset(coerced_table, temp_lance_dir, schema=test_schema, mode="append")
# Verify data was written correctly
dataset = lance.dataset(temp_lance_dir)
result_table = dataset.to_table()
# Should have 2 rows (original + appended)
assert len(result_table) == 2
assert result_table.schema == test_schema
def test_correct_schema_conversion_method(self, test_schema, test_tables):
"""Test that the correct schema conversion method preserves nullability."""
_, table2 = test_tables
# Convert to Daft and back using the correct method
table2_df = daft.from_arrow(table2)
correct_schema = table2_df.schema().to_pyarrow_schema()
# This should preserve the original schema
assert correct_schema == test_schema, (
f"Correct conversion should preserve schema. "
f"Expected: {test_schema}, Got: {correct_schema}"
)
@pytest.mark.parametrize("nullable_config", [
[True, True], # Both nullable
[False, False], # Both non-nullable
[True, False], # Mixed nullability
[False, True], # Mixed nullability (reversed)
])
def test_various_nullability_configurations(self, temp_lance_dir, nullable_config):
"""Test the bug with various nullability configurations."""
# Create schema with specified nullability
schema = pa.schema([
pa.field("col1", pa.int64(), nullable=nullable_config[0]),
pa.field("col2", pa.string(), nullable=nullable_config[1])
])
# Create test table
table = pa.Table.from_pylist([{
"col1": 1,
"col2": "test"
}], schema=schema)
# Write to Lance
lance.write_dataset(table, temp_lance_dir, schema=schema, mode="overwrite")
# Read with Daft
df = daft.read_lance(temp_lance_dir)
# Test both conversion methods
buggy_schema = pa.schema((f.name, f.dtype.to_arrow_dtype()) for f in df.schema())
correct_schema = df.schema().to_pyarrow_schema()
# Correct method should always work
assert correct_schema == schema, (
f"Correct method failed for nullability config {nullable_config}"
)
# Buggy method should fail when there are non-nullable fields
if not all(nullable_config):
assert buggy_schema != schema, (
f"Buggy method should fail for nullability config {nullable_config}"
)
@pytest.fixture
def temp_dir():
"""Create temporary directory for Lance datasets."""
temp_dir = tempfile.mkdtemp()
yield temp_dir
shutil.rmtree(temp_dir, ignore_errors=True)
def test_schema_nullability_error(temp_dir):
"""Test that reproduces the schema nullability error in write_lance."""
uri = temp_dir + "/"
# Define Schema
my_schema = pa.schema([
pa.field("id", pa.uint32(), nullable=False),
pa.field("content", pa.string(), nullable=True)
])
# Create an Arrow Table from a list of Dictionaries
table1 = pa.Table.from_pylist([{
"id": 1,
"content": "foo"
}], schema=my_schema)
table2 = pa.Table.from_pylist([{
"id": 2,
"content": "bar"
}], schema=my_schema)
# Write the table to lance dataset
ds = lance.write_dataset(table1, uri, schema=my_schema, mode="overwrite")
# Read the Lance Dataset
table1_df = daft.read_lance(uri)
# Add some data
table2_df = daft.from_arrow(table2)
# Concatenate the two tables
combined_table_df = table1_df.concat(table2_df).collect()
# Write Back to lance dataset
with pytest.raises(Exception) as exc_info:
combined_table_df.write_lance(uri, mode="append")
print("Error: ", exc_info.value)
# Both df.write_lance / LanceDataSink converts daft schema to pyarrow schema by:
LanceDataSink_pyarrow_schema_conversion = pa.schema((f.name, f.dtype.to_arrow_dtype()) for f in combined_table_df.schema())
# DaftNative converts daft schema to pyarrow schema by:
DaftNative_pyarrow_schema_conversion = combined_table_df.schema().to_pyarrow_schema()
# Check if the pyarrow schema is the same as the original schema
print("\nDoes LanceDataSink_pyarrow_schema_conversion == my_schema: \n", LanceDataSink_pyarrow_schema_conversion == my_schema)
print("\nDoes DaftNative_pyarrow_schema_conversion == my_schema: \n", DaftNative_pyarrow_schema_conversion == my_schema)
print("\n")
print("my_schema: \n", my_schema)
print("\n")
print("LanceDataSink_pyarrow_schema_conversion: \n", LanceDataSink_pyarrow_schema_conversion)
print("\n")
print("DaftNative_pyarrow_schema_conversion: \n", DaftNative_pyarrow_schema_conversion)
print("\n")
# In order to successfully write to lance datasets,
# we have to convert back to arrow table and write with lance.
# Our conversion to a table still fails the schema check:
convert_table2_df_back_to_arrow = table2_df.to_arrow()
print("\nDoes convert_table2_df_back_to_arrow.schema == my_schema: \n", convert_table2_df_back_to_arrow.schema == my_schema)
# Here we can see that lance wont write the table because the schema is not the same.
with pytest.raises(Exception) as exc_info:
lance.write_dataset(convert_table2_df_back_to_arrow, uri, schema=my_schema, mode="append")
print("Error: ", exc_info.value)
# The only thing we can do is to coerce the table to the original schema:
coerced_table = convert_table2_df_back_to_arrow.cast(my_schema)
# Check if the coerced table is the same as the original table
print("\nDoes coerced_table == table2: \n", coerced_table == table2)
# Write the coerced table to lance dataset
try:
lance.write_dataset(coerced_table, uri, schema=my_schema, mode="append")
print("Successfully wrote the coerced table to lance dataset")
except Exception as e:
print("Error: ", e)
# Assertions to verify the bug exists
assert LanceDataSink_pyarrow_schema_conversion != my_schema, "LanceDataSink conversion should fail (demonstrates bug)"
assert DaftNative_pyarrow_schema_conversion == my_schema, "DaftNative conversion should work"
assert convert_table2_df_back_to_arrow.schema != my_schema, "Converted table schema should differ"
assert coerced_table.schema == my_schema, "Coerced table should match original schema"
if __name__ == "__main__":
pytest.main([__file__, "-v"])
I think this is part of a more general problem that Daft schemas don't store nullable information at all, and the type checking doesn't know what to do with nullable info. For example, with the concat you shared.
table1_df.concat(table2_df)
If instead of your case, column content was nullable in table1_df but not nullable in table2_df, Daft needs to figure out that the output col is nullable.
Since its a larger problem, I'm going to open a separate issue for storing and using null info in schemas and link this issue to that
@srilman excellent notes here.
Concerning your example of concatenating two dataframes with fields that share the same field name and datatype with different nullability, this makes sense but may warrant a warning in the df.explain(). I'm not sure how to make the call on that tbh.
+1 For me as well. When I read parquet with daft it upcasts pyarrow types to large_string, while pyarrow reads them as string.
daft.read_parquet(str(path)).to_arrow().schema
output:
path: large_string
label_type: large_list<item: large_string>
child 0, item: large_string
label: large_list<item: large_string>
child 0, item: large_string
mask_type: large_string
mask_path: large_string
organ: large_string
cancer: large_string
image_type: large_string
image_size: large_list<item: int64>
child 0, item: int64
microscope: large_string
microscope_magnification: large_string
subset: large_string
object_coordinates: large_string
split: large_string
query_id: int64
query: large_string
answer: large_string
objects: large_list<item: large_string>
child 0, item: large_string
perceptual_hash: large_string
sha256_hash: large_string
When using arrow directly to read parquet
pa.parquet.read_table(str(path)).schema
output:
path: string
label_type: list<element: string>
child 0, element: string
label: list<element: string>
child 0, element: string
mask_type: string
mask_path: string
organ: string
cancer: string
image_type: string
image_size: list<element: int64>
child 0, element: int64
microscope: string
microscope_magnification: string
subset: string
object_coordinates: string
split: string
query_id: int64
query: string
answer: string
objects: list<element: string>
child 0, element: string
perceptual_hash: string
sha256_hash: string
-- schema metadata --
pandas: '{"index_columns": [{"kind": "range", "name": null, "start": 0, "' + 2648