dlt icon indicating copy to clipboard operation
dlt copied to clipboard

Allow for "reduce" steps in LanceDB adapter

Open zilto opened this issue 6 months ago • 4 comments

Feature description

Allow the LanceDB and other Vector DB adapter to specify a "contextualize" or rolling window operation to join partitioned text chunks before applying the embedding function.

Are you a dlt user?

Yes, I'm already a dlt user.

Use case

context

The constructs of @dlt.resource and @dlt.transformer are very convenient for document ingestion for NLP/LLM use cases. The @dlt.resource returns full-text and @dlt.transformer can chunk it (into paragraphs for example). The LanceDB and other vector DB adapters make it easy to embed the full-text and the chunked text columns. We get something like this:

@dlt.resource
def full_text():
  yield {"document_id": "foo", "text": ...}
  
@dlt.transformer(data_from=full_text)
def chunked_text(document):
  for idx, chunk in enumerate(_split_text(document)):
    yield {
      "document_id": document["document_id"],
      "chunk_id": idx,
      "text": chunk,
    }

Full-text

| document id  | text                                          |
| "foo"        | "The quick brown fox jumps over the lazy dog" |

Chunks (3 words)

| document id | chunk id | text               |
| "foo"       | 1        |  "The quick brown" |
| "foo"       | 2        |  "fox jumps over"  |
| "foo"       | 3        |  "the lazy dog"    |

limitations

However, embedding these "partitioned" chunks is often low value for RAG. A common operation is "contextualizing" chunks, which consists of a rolling window operation (with window size and stride / overlap parameters). For instance LanceDB has contextualize(), but it requires converting the data to a pandas dataframe. Let's illustrate a "2-chunk window" based on the previous table:

Contexts

| document id | chunk id | context  id | text                              |
| "foo"       | 1, 2     | 1           |  "The quick brown fox jumps over" |
| "foo"       | 2, 3     | 2           |  "fox jumps over the lazy dog"    |

AFAIK, dlt doesn't provide a clear API for normalizing the chunk_id and the context_id columns. The "contextualize" operation could be directly implemented in a single @dlt.transformer, but it would only include document_id -> context_id and miss the fact that "contextualized chunks" aren't independent; they share underlying chunks.

Proposed solution

adding a "reducer" step

I was able to hack around to receive a batch of "chunks" and use dlt.mark.with_table_name to dispatch both a "context table" and "relation table" from the same @dlt.transformer. Mock code:

def _split_text(text: str):
  words = text.split()
  for i in range(0, len(words), 3):
        yield ' '.join(words[i:i+3])
        
def _contextualize(chunks: list[str], window=5, stride=3, min_window_size=2):
    n_chunks = len(chunks)
    for start_i in range(0, n_chunks, stride):
        if (start_i + window <= n_chunks) or (n_chunks - start_i >= min_window_size):
            yield " ".join(chunks[start_i : min(start_i + window, n_chunks)])

@dlt.source:
def document_source():
  @dlt.resource(primary_key="document_id")
  def document():
    yield {"document_id": "foo", "text": "The quick brown fox jumps over the lazy dog"}
  
  # this needs to accumulate a "batch" for the whole document before
  # starting the "reduce" / rolling operation step;
  @dlt.transformer(data_from=document, primary_key="chunk_id"):
  def chunks(item: dict):
    return [
      dict(
        document_id=item["document_id"],
        chunk_id=idx,
        text=text,
      )
      for idx, text in _split_text(item["text"])
    ]
 
 
  # order is important for reduce / rolling step
  # default to order of the batch or specifying sorting key
  @dlt.transformer(data_from=chunks, primary_key="context_id")
  def contexts(items: list[dict]):
    # first handle the m-to-n relationship
    # set of foreign keys (i.e., "chunk_id")
    chunk_id_set = set(item["chunk_id"] for item in items)
    context_id = hash_set(chunk_id_set )
    
    # create a table only containing the keys
    for chunk_id in chunk_id_set :
      yield dlt.mark.with_table_name(
        {"chunk_id": chunk_id, "context_id": context_id},
        "chunks_to_contexts_keys",
      ) 
      
    # main transformation logic
    for contextualized in _contextualize([chunk["text"] for chunk in items]):
      yield dlt.mark.with_table_name(
        {"context_id": context_id, "text": contextualized},
        "contexts"
      )
      
  return (document, chunks, contexts)

Contexts

|  context id | text                              |
|  hash(1, 2) |  "The quick brown fox jumps over" |
|  hash(2, 3) |  "fox jumps over the lazy dog"    |

Chunks-to-contexts keys

| chunk id | context id |
| 1        | hash(1, 2) |
| 2        | hash(1, 2) |
| 2        | hash(2, 3) |
| 3        | hash(2, 3) |

There's probably room for a generic @dlt.reducer that automatically manages the primary / foreign keys based on the other resources metadata, handles the key set hashing, and dispatches results to tables. Given that this could be a can of worm, it could be tested and refined while being hidden behind the lancedb_adapter. The API could be expanded to

lancedb_adapter(
  chunks,
  embed="text",
  window=10,
  stride=3,  # could also be renamed to overlap by changing the algo
  min_window_size=3,
)

This would reproduce the above logic by creating the chunks table as defined by the user (chunks resource) and creating the second table automatically

Related issues

No response

zilto avatar Aug 15 '24 13:08 zilto