airbyte
airbyte copied to clipboard
Improvement : Better write logic for Snowflake cortex destination
Suggested improvements post related refactoring which will enable the following changes. Related refactoring ticket: https://github.com/airbytehq/PyAirbyte/issues/225
- Currently in
destination_snowflake_cortex/indexer.py
we convert chunks into airbyte messages and send them to PyAirbyte. See below:
def _get_airbyte_messsages_from_chunks(
self,
document_chunks: Iterable[Any],
) -> Iterable[AirbyteMessage]:
"""Creates Airbyte messages from chunk records."""
A better approach would be to write a single source message to multiple chunks. When we have the SQL processor better refactored, I think this logic can probably live inside the Cortex Processor itself, and thereby handing each message by writing or replacing one or more chunks (rows).
- Currently, we update the configured catalog to make sure the catalog contains all columns being passed to (PyAirbyte) to be written to database. (see below)
def _get_updated_catalog(self) -> ConfiguredAirbyteCatalog:
"""Adds following columns to catalog
document_id (primary key) -> unique per record/document
chunk_id -> unique per chunk
page_content -> text content of the page
metadata -> metadata of the record
embedding -> embedding of the page content
"""
We should ideally move the following into the SQL processor class, where we can accept a source catalog without modifying it, and handle the difference between what the catalog says and what the destination table should look like.
This issue is dependent on #225