Adding support for LanceDB Table & AsyncTable
Is your feature request related to a problem?
Looking to add support for LanceDB Tables in Sync and Async Versions so that the daft catalog can manage references to lancedb tables as well as iceberg.
There are a couple of tricky things here upon deeper inspection.
My initial approach was going to be something along the lines of:
daft/daft/io/_lance.py
"""WARNING! These APIs are internal; please use Catalog.from_iceberg() and Table.from_iceberg()."""
from __future__ import annotations
import warnings
from typing import TYPE_CHECKING, Any
from lancedb.table import Table as InnerTable, AsyncTable as AsyncInnerTable
from daft.catalog import Catalog, Identifier, NotFoundError, Properties, Schema, Table
from daft.io._lance import read_lance
if TYPE_CHECKING:
from daft.dataframe import DataFrame
class LanceTable(Table):
_inner: InnerTable
_read_options = {"url"}
_write_options: set[str] = set()
def __init__(self, inner: InnerTable):
"""Please use `Table.from_lance`"""
self._inner = inner
@property
def name(self) -> str:
return self._inner.name()[-1]
def schema(self) -> Schema:
return self.read().schema()
@staticmethod
def _from_obj(obj: object) -> LanceTable:
"""Returns an LanceTable if the given object can be adapted so."""
if isinstance(obj, InnerTable):
t = LanceTable.__new__(LanceTable)
t._inner = obj
return t
raise ValueError(f"Unsupported lance table type: {type(obj)}")
def read(self, url:str, **options: Any | None) -> DataFrame:
Table._validate_options("Lance read", options, LanceTable._read_options)
return read_lance(url=options.get("url"))
def append(self, df: DataFrame, **options: Any) -> None:
self._validate_options("Lance write", options, LanceTable._write_options)
df.write_lance(self._inner, mode="append")
def overwrite(self, df: DataFrame, **options: Any) -> None:
self._validate_options("Lance write", options, LanceTable._write_options)
df.write_lance(self._inner, mode="overwrite", **options)
def _to_lance_ident(ident: Identifier | str) -> tuple[str, ...] | str:
return tuple(ident) if isinstance(ident, Identifier) else ident
But once you get around to ensuring the options are properly available, I quickly realized daft.read_lance() is actually implementing pylance to read lance datasets directly, not the lancedb higher level table api.
This makes sense, and the near term path may be to leave read_lance exactly as it is, but seeing as lance and lancedb have undergone some larger upgrades in past year or so I figured I'd start a discussion about deepening the integration between daft and lancedb.
This discussion is particularly relevant to lance-namespace project https://github.com/lancedb/lance-namespace for a future LanceCatalog.
CC: @jackye1995 @rchowell
Describe the solution you'd like
My goal would be to be able to support the first Async Table on Daft using Lance.
Describe alternatives you've considered
No response
Additional Context
Lance Dataset: https://github.com/lancedb/lance/blob/e30cd8297608c9a80b5dc6640c4e373860d1b31d/python/python/lance/dataset.py#L209
daft.read_lance() : https://github.com/Eventual-Inc/Daft/blob/8b0dea12287ecc5b511607fd5b5cb061fcad8688/daft/io/_lance.py#L33
df.write_lance(): https://github.com/Eventual-Inc/Daft/blob/8b0dea12287ecc5b511607fd5b5cb061fcad8688/daft/dataframe/dataframe.py#L1217
Would you like to implement a fix?
Yes
More context for LanceDB folks: Daft has a catalog and session abstraction for referencing and managing tables. While you can currently read and write from lance tables using pylance, there is not currently support for referencing a LanceTable in the catalog.
this may be where @jackye1995 's IcebergRESTCatalog or LanceNamespace can also offer a catalog integration.
I don't know too much about Daft's catalog structure internals, but just looking at the documentation, it seems like Daft directly integrates with a catalog service like AWS Glue, Unity, S3 Tables, etc. where each catalog has different table formats, Iceberg/Delta/etc. (DaftCatalog -> Glue -> Iceberg/Delta tables in Glue)
If that is the paradigm, I don't think LanceNamespace is needed. LanceNamespace is for integrating with connectors that supports 1 format per connector, like SparkCatalog (SparkCatalog -> LanceNamespaceSparkCatalog -> LanceNamespace -> GlueLanceNamespace -> Lance tables in Glue)
So as long as we agree upon a convention, we should be good at catalog front. This is what LanceNamespace specification can define and we can agree upon in https://github.com/lancedb/lance-namespace/tree/main/spec/impls. @everettVT could you share which catalog you would like initial support? Maybe we can start from there.
For Lance tables open source, the main thing is probably more to support reading and writing the tables. Given we are both using rust, datafusion, arrow, hopefully this is pretty straightforward.
For LanceDB remote tables, that requires a bit more thinking. I think that probably need a dedicated integration of Daft with LanceNamespace, where LanceDB remote tables are exposed behind a LanceRestNamespace.
I think since the the directory catalog is flat, it should matche closely with the recent MemoryCatalog addition: https://github.com/Eventual-Inc/Daft/pull/4445 which is backed by Rust and fully available in python.
REST would definitely be nice. I do plan on using storage buckets for production.
So this is something that has been on my mind as of late as well. Ideally I'd also like daft to support being able to read from lancedb directly as well as lance. I'm not exactly sure the best design for that as daft.read_lancedb seems odd in conjunction with existing daft.read_lance.
Happy to collaborate and come up with a nice integration!
I've been mulling this over the past several days, and am working up a response that aims to honor some of the more sophisticated methods that lance datasets support while considering the nearest term path for a daft LanceTable. IMO, the long term goal is to support async table access. Which would be a key differentiator from iceberg and other open table formats.
My use case is only concerned with concurrent writes on non-conflicting rows (assuming we are just appending). Updates, inserts, merge_inserts/upserts are probably more test intensive.
Just keeping this conversation alive while we collect resources on:
- LanceDB Table and AsyncTable API vs Daft Table API (I think we just need read, write, name, and schema)
- How AsyncTable supports async requests.
- Updating read_lance to support version / commit args. (Looks like read_lance is pretty bare bones)
Looking to add a more focused comment soon.
I red through the codebase a bit more, here is my understanding. There are really 2 integration patterns that co-exists in Daft today:
Connector based approach
Daft mainly connects with different formats/data sources through the read_<format>(...) or write_<format>(...) and go from there. For each format, the parameter is different, and for Lance it takes directly a Lance dataset object. This is what Lance namespace project is designed to integrate with. This is similar to the Iceberg read that accepts both a path or a PyIceberg table. So the signature would be something like:
def read_lance(
table: Union[str, "lance.namespace.LanceTable"],
io_config: Optional[IOConfig] = None) -> DataFrame:
...
Catalog routing
There are also Daft catalog integration, which is basically a router layer to route a specific table instance in the specific catalog to use a specific reader or writer function, depending on the table information stored in the specific catalog. The function's input parameter needs to be updated to support that specific type.
For that, we don't really need to use any library in the Lance namespace, we just need to agree upon the same convention to be used that is specific to each catalog implementation (e.g. Unity, Glue, etc.), and then make sure read_lance and write_lance is updated to accept those inputs, something like what is done for delta lake:
def read_lance(
table: Union[str, DataCatalogTable, "UnityCatalogTable"],
io_config: Optional[IOConfig] = None) -> DataFrame:
...
Connector approach + catalog routing
With catalog routing, I can see for example in GlueCatalog implementation of Iceberg, the Iceberg pyIceberg table is loaded first, before feeding into the read_iceberg function: https://github.com/Eventual-Inc/Daft/blob/fcb2a94d3b3ed149cf81af0920370f868699f6ec/daft/catalog/__glue.py#L456-L474. So something similar could be done for Lance.
So for Lance, I think an integration workflow like Iceberg might make more sense. Would be curious what the Daft maintainers think, is that the recommended approach?
Regarding LanceDB async table, it is still a Lance table, the term "async" refers to more of the execution mode that instead of computing everything locally, the execution plan (e.g. search, merge) are sent to remote LanceDB service to be executed distributedly. From that perspective, async or not makes no difference for Daft because you will leverage Daft compute instead of LanceDB compute to fulfill your read and write operations. We can expose a LanceDBNamespace implementation to make sure your LanceDB async table is accessible through other open engines.
@jackye1995 @everettVT
I can see for example in GlueCatalog implementation of Iceberg, the Iceberg pyIceberg table is loaded first, before feeding into the read_iceberg function.
Jack is spot on. This would be the easiest path forward for implementing the Table ABC backed by a Lance table.
@rchowell @jackye1995 Thank you for the insights here.
This should be enough context for me to build a first draft. I'm making this a priority for me this next week.
I'll post here and tag you guys once I've got something to review