dlt
dlt copied to clipboard
Allow for "reduce" steps in LanceDB adapter
Feature description
Allow the LanceDB and other Vector DB adapter to specify a "contextualize" or rolling window operation to join partitioned text chunks before applying the embedding function.
Are you a dlt user?
Yes, I'm already a dlt user.
Use case
context
The constructs of @dlt.resource
and @dlt.transformer
are very convenient for document ingestion for NLP/LLM use cases. The @dlt.resource
returns full-text and @dlt.transformer
can chunk it (into paragraphs for example). The LanceDB and other vector DB adapters make it easy to embed the full-text and the chunked text columns. We get something like this:
@dlt.resource
def full_text():
yield {"document_id": "foo", "text": ...}
@dlt.transformer(data_from=full_text)
def chunked_text(document):
for idx, chunk in enumerate(_split_text(document)):
yield {
"document_id": document["document_id"],
"chunk_id": idx,
"text": chunk,
}
Full-text
| document id | text |
| "foo" | "The quick brown fox jumps over the lazy dog" |
Chunks (3 words)
| document id | chunk id | text |
| "foo" | 1 | "The quick brown" |
| "foo" | 2 | "fox jumps over" |
| "foo" | 3 | "the lazy dog" |
limitations
However, embedding these "partitioned" chunks is often low value for RAG. A common operation is "contextualizing" chunks, which consists of a rolling window operation (with window size and stride / overlap parameters). For instance LanceDB has contextualize(), but it requires converting the data to a pandas dataframe. Let's illustrate a "2-chunk window" based on the previous table:
Contexts
| document id | chunk id | context id | text |
| "foo" | 1, 2 | 1 | "The quick brown fox jumps over" |
| "foo" | 2, 3 | 2 | "fox jumps over the lazy dog" |
AFAIK, dlt doesn't provide a clear API for normalizing the chunk_id
and the context_id
columns. The "contextualize" operation could be directly implemented in a single @dlt.transformer
, but it would only include document_id -> context_id
and miss the fact that "contextualized chunks" aren't independent; they share underlying chunks.
Proposed solution
adding a "reducer" step
I was able to hack around to receive a batch of "chunks" and use dlt.mark.with_table_name
to dispatch both a "context table" and "relation table" from the same @dlt.transformer
. Mock code:
def _split_text(text: str):
words = text.split()
for i in range(0, len(words), 3):
yield ' '.join(words[i:i+3])
def _contextualize(chunks: list[str], window=5, stride=3, min_window_size=2):
n_chunks = len(chunks)
for start_i in range(0, n_chunks, stride):
if (start_i + window <= n_chunks) or (n_chunks - start_i >= min_window_size):
yield " ".join(chunks[start_i : min(start_i + window, n_chunks)])
@dlt.source:
def document_source():
@dlt.resource(primary_key="document_id")
def document():
yield {"document_id": "foo", "text": "The quick brown fox jumps over the lazy dog"}
# this needs to accumulate a "batch" for the whole document before
# starting the "reduce" / rolling operation step;
@dlt.transformer(data_from=document, primary_key="chunk_id"):
def chunks(item: dict):
return [
dict(
document_id=item["document_id"],
chunk_id=idx,
text=text,
)
for idx, text in _split_text(item["text"])
]
# order is important for reduce / rolling step
# default to order of the batch or specifying sorting key
@dlt.transformer(data_from=chunks, primary_key="context_id")
def contexts(items: list[dict]):
# first handle the m-to-n relationship
# set of foreign keys (i.e., "chunk_id")
chunk_id_set = set(item["chunk_id"] for item in items)
context_id = hash_set(chunk_id_set )
# create a table only containing the keys
for chunk_id in chunk_id_set :
yield dlt.mark.with_table_name(
{"chunk_id": chunk_id, "context_id": context_id},
"chunks_to_contexts_keys",
)
# main transformation logic
for contextualized in _contextualize([chunk["text"] for chunk in items]):
yield dlt.mark.with_table_name(
{"context_id": context_id, "text": contextualized},
"contexts"
)
return (document, chunks, contexts)
Contexts
| context id | text |
| hash(1, 2) | "The quick brown fox jumps over" |
| hash(2, 3) | "fox jumps over the lazy dog" |
Chunks-to-contexts keys
| chunk id | context id |
| 1 | hash(1, 2) |
| 2 | hash(1, 2) |
| 2 | hash(2, 3) |
| 3 | hash(2, 3) |
There's probably room for a generic @dlt.reducer
that automatically manages the primary / foreign keys based on the other resources metadata, handles the key set hashing, and dispatches results to tables. Given that this could be a can of worm, it could be tested and refined while being hidden behind the lancedb_adapter
. The API could be expanded to
lancedb_adapter(
chunks,
embed="text",
window=10,
stride=3, # could also be renamed to overlap by changing the algo
min_window_size=3,
)
This would reproduce the above logic by creating the chunks table as defined by the user (chunks
resource) and creating the second table automatically
Related issues
No response