Daft icon indicating copy to clipboard operation
Daft copied to clipboard

Adding support for LanceDB Table & AsyncTable

Open everettVT opened this issue 6 months ago • 9 comments

Is your feature request related to a problem?

Looking to add support for LanceDB Tables in Sync and Async Versions so that the daft catalog can manage references to lancedb tables as well as iceberg.

There are a couple of tricky things here upon deeper inspection.

My initial approach was going to be something along the lines of: daft/daft/io/_lance.py

"""WARNING! These APIs are internal; please use Catalog.from_iceberg() and Table.from_iceberg()."""

from __future__ import annotations

import warnings
from typing import TYPE_CHECKING, Any

from lancedb.table import Table as InnerTable, AsyncTable as AsyncInnerTable

from daft.catalog import Catalog, Identifier, NotFoundError, Properties, Schema, Table
from daft.io._lance import read_lance

if TYPE_CHECKING:
    from daft.dataframe import DataFrame



class LanceTable(Table):
    _inner: InnerTable

    _read_options = {"url"}
    _write_options: set[str] = set()

    def __init__(self, inner: InnerTable):
        """Please use `Table.from_lance`"""
        self._inner = inner

    @property
    def name(self) -> str:
        return self._inner.name()[-1]

    def schema(self) -> Schema:
        return self.read().schema()

    @staticmethod
    def _from_obj(obj: object) -> LanceTable:
        """Returns an LanceTable if the given object can be adapted so."""
        if isinstance(obj, InnerTable):
            t = LanceTable.__new__(LanceTable)
            t._inner = obj
            return t
        raise ValueError(f"Unsupported lance table type: {type(obj)}")

    def read(self, url:str, **options: Any | None) -> DataFrame:
        Table._validate_options("Lance read", options, LanceTable._read_options)
        return read_lance(url=options.get("url"))

    def append(self, df: DataFrame, **options: Any) -> None:
        self._validate_options("Lance write", options, LanceTable._write_options)

        df.write_lance(self._inner, mode="append")

    def overwrite(self, df: DataFrame, **options: Any) -> None:
        self._validate_options("Lance write", options, LanceTable._write_options)

        df.write_lance(self._inner, mode="overwrite", **options)


def _to_lance_ident(ident: Identifier | str) -> tuple[str, ...] | str:
    return tuple(ident) if isinstance(ident, Identifier) else ident

But once you get around to ensuring the options are properly available, I quickly realized daft.read_lance() is actually implementing pylance to read lance datasets directly, not the lancedb higher level table api.

This makes sense, and the near term path may be to leave read_lance exactly as it is, but seeing as lance and lancedb have undergone some larger upgrades in past year or so I figured I'd start a discussion about deepening the integration between daft and lancedb.

This discussion is particularly relevant to lance-namespace project https://github.com/lancedb/lance-namespace for a future LanceCatalog.

CC: @jackye1995 @rchowell

Describe the solution you'd like

My goal would be to be able to support the first Async Table on Daft using Lance.

Describe alternatives you've considered

No response

Additional Context

Lance Dataset: https://github.com/lancedb/lance/blob/e30cd8297608c9a80b5dc6640c4e373860d1b31d/python/python/lance/dataset.py#L209

daft.read_lance() : https://github.com/Eventual-Inc/Daft/blob/8b0dea12287ecc5b511607fd5b5cb061fcad8688/daft/io/_lance.py#L33

df.write_lance(): https://github.com/Eventual-Inc/Daft/blob/8b0dea12287ecc5b511607fd5b5cb061fcad8688/daft/dataframe/dataframe.py#L1217

Would you like to implement a fix?

Yes

everettVT avatar Jun 05 '25 22:06 everettVT

More context for LanceDB folks: Daft has a catalog and session abstraction for referencing and managing tables. While you can currently read and write from lance tables using pylance, there is not currently support for referencing a LanceTable in the catalog.

this may be where @jackye1995 's IcebergRESTCatalog or LanceNamespace can also offer a catalog integration.

everettVT avatar Jun 05 '25 22:06 everettVT

I don't know too much about Daft's catalog structure internals, but just looking at the documentation, it seems like Daft directly integrates with a catalog service like AWS Glue, Unity, S3 Tables, etc. where each catalog has different table formats, Iceberg/Delta/etc. (DaftCatalog -> Glue -> Iceberg/Delta tables in Glue)

If that is the paradigm, I don't think LanceNamespace is needed. LanceNamespace is for integrating with connectors that supports 1 format per connector, like SparkCatalog (SparkCatalog -> LanceNamespaceSparkCatalog -> LanceNamespace -> GlueLanceNamespace -> Lance tables in Glue)

So as long as we agree upon a convention, we should be good at catalog front. This is what LanceNamespace specification can define and we can agree upon in https://github.com/lancedb/lance-namespace/tree/main/spec/impls. @everettVT could you share which catalog you would like initial support? Maybe we can start from there.

For Lance tables open source, the main thing is probably more to support reading and writing the tables. Given we are both using rust, datafusion, arrow, hopefully this is pretty straightforward.

For LanceDB remote tables, that requires a bit more thinking. I think that probably need a dedicated integration of Daft with LanceNamespace, where LanceDB remote tables are exposed behind a LanceRestNamespace.

jackye1995 avatar Jun 05 '25 23:06 jackye1995

I think since the the directory catalog is flat, it should matche closely with the recent MemoryCatalog addition: https://github.com/Eventual-Inc/Daft/pull/4445 which is backed by Rust and fully available in python.

REST would definitely be nice. I do plan on using storage buckets for production.

everettVT avatar Jun 05 '25 23:06 everettVT

So this is something that has been on my mind as of late as well. Ideally I'd also like daft to support being able to read from lancedb directly as well as lance. I'm not exactly sure the best design for that as daft.read_lancedb seems odd in conjunction with existing daft.read_lance.

Happy to collaborate and come up with a nice integration!

universalmind303 avatar Jun 06 '25 16:06 universalmind303

I've been mulling this over the past several days, and am working up a response that aims to honor some of the more sophisticated methods that lance datasets support while considering the nearest term path for a daft LanceTable. IMO, the long term goal is to support async table access. Which would be a key differentiator from iceberg and other open table formats.

My use case is only concerned with concurrent writes on non-conflicting rows (assuming we are just appending). Updates, inserts, merge_inserts/upserts are probably more test intensive.

Just keeping this conversation alive while we collect resources on:

  1. LanceDB Table and AsyncTable API vs Daft Table API (I think we just need read, write, name, and schema)
  2. How AsyncTable supports async requests.
  3. Updating read_lance to support version / commit args. (Looks like read_lance is pretty bare bones)

Looking to add a more focused comment soon.

everettVT avatar Jun 10 '25 14:06 everettVT

I red through the codebase a bit more, here is my understanding. There are really 2 integration patterns that co-exists in Daft today:

Connector based approach

Daft mainly connects with different formats/data sources through the read_<format>(...) or write_<format>(...) and go from there. For each format, the parameter is different, and for Lance it takes directly a Lance dataset object. This is what Lance namespace project is designed to integrate with. This is similar to the Iceberg read that accepts both a path or a PyIceberg table. So the signature would be something like:

def read_lance(
  table: Union[str, "lance.namespace.LanceTable"],
  io_config: Optional[IOConfig] = None) -> DataFrame:
    ...

Catalog routing

There are also Daft catalog integration, which is basically a router layer to route a specific table instance in the specific catalog to use a specific reader or writer function, depending on the table information stored in the specific catalog. The function's input parameter needs to be updated to support that specific type. For that, we don't really need to use any library in the Lance namespace, we just need to agree upon the same convention to be used that is specific to each catalog implementation (e.g. Unity, Glue, etc.), and then make sure read_lance and write_lance is updated to accept those inputs, something like what is done for delta lake:

def read_lance(
  table: Union[str, DataCatalogTable, "UnityCatalogTable"],
  io_config: Optional[IOConfig] = None) -> DataFrame:
    ...

Connector approach + catalog routing

With catalog routing, I can see for example in GlueCatalog implementation of Iceberg, the Iceberg pyIceberg table is loaded first, before feeding into the read_iceberg function: https://github.com/Eventual-Inc/Daft/blob/fcb2a94d3b3ed149cf81af0920370f868699f6ec/daft/catalog/__glue.py#L456-L474. So something similar could be done for Lance.

So for Lance, I think an integration workflow like Iceberg might make more sense. Would be curious what the Daft maintainers think, is that the recommended approach?

jackye1995 avatar Jun 10 '25 19:06 jackye1995

Regarding LanceDB async table, it is still a Lance table, the term "async" refers to more of the execution mode that instead of computing everything locally, the execution plan (e.g. search, merge) are sent to remote LanceDB service to be executed distributedly. From that perspective, async or not makes no difference for Daft because you will leverage Daft compute instead of LanceDB compute to fulfill your read and write operations. We can expose a LanceDBNamespace implementation to make sure your LanceDB async table is accessible through other open engines.

jackye1995 avatar Jun 10 '25 21:06 jackye1995

@jackye1995 @everettVT

I can see for example in GlueCatalog implementation of Iceberg, the Iceberg pyIceberg table is loaded first, before feeding into the read_iceberg function.

Jack is spot on. This would be the easiest path forward for implementing the Table ABC backed by a Lance table.

rchowell avatar Jun 13 '25 03:06 rchowell

@rchowell @jackye1995 Thank you for the insights here.

This should be enough context for me to build a first draft. I'm making this a priority for me this next week.

I'll post here and tag you guys once I've got something to review

everettVT avatar Jun 14 '25 19:06 everettVT