vector-io icon indicating copy to clipboard operation
vector-io copied to clipboard

Implement adaptive flush to disk during fetch stage (to avoid OOM) in all export scripts

Open dhruv-anand-aintech opened this issue 1 year ago • 3 comments

dhruv-anand-aintech avatar Jan 17 '24 06:01 dhruv-anand-aintech

@greptile

dhruv-anand-aintech avatar May 01 '24 08:05 dhruv-anand-aintech

I was in the process of creating an implementation plan to modify the 'vespa_export.py' and 'azureai_export.py' scripts to implement the 'adaptive flush to disk' feature. However, I ran out of time before I could finish. I will need more time to complete this task. Please let me know if you have any questions or need further clarification on anything.

Implementation proposal (work in progress)

Step 1: Implement adaptive flush to disk in 'vertexai_vector_search_export.py'

Modify the 'vertexai_vector_search_export.py' script to implement the 'adaptive flush to disk' feature:

  • In the 'get_data_for_index' method, fetch data in batches instead of all at once. You can do this by modifying the 'find_neighbors' call to fetch a limited number of vectors at a time.
  • After each batch is fetched, check the size of the fetched data. If the size exceeds a certain limit (you can use a constant for this limit), write the data to disk and clear the fetched data from memory.
  • Continue this process until all vectors have been fetched and written to disk.

You can use the 'astradb_export.py' script as a reference for how to implement this feature.

Step 2: Implement adaptive flush to disk in 'milvus_export.py'

Modify the 'milvus_export.py' script to implement the 'adaptive flush to disk' feature:

  • In the 'get_data_for_collection' method, fetch data in batches as it currently does. However, instead of writing each batch to disk immediately, store the batches in memory until the size of the fetched data exceeds a certain limit.
  • When the size of the fetched data exceeds a certain limit (you can use a constant for this limit), write the data to disk and clear the fetched data from memory.
  • Continue this process until all vectors have been fetched and written to disk.

You can use the 'astradb_export.py' script as a reference for how to implement this feature.

Step 3: Implement adaptive flush to disk in 'vespa_export.py'

Modify the 'vespa_export.py' script to implement the 'adaptive flush to disk' feature:

  • In the 'get_data_for_index' method, fetch data in batches instead of all at once. You can do this by modifying the 'get_all_documents' call to fetch a limited number of documents at a time.
  • After each batch is fetched, check the size of the fetched data. If the size exceeds a certain limit (you can use a constant for this limit), write the data to disk and clear the fetched data from memory.
  • Continue this process until all documents have been fetched and written to disk.

You can use the 'astradb_export.py' script as a reference for how to implement this feature.


For more information about Ellipsis, check the documentation.

ellipsis-dev[bot] avatar May 01 '24 08:05 ellipsis-dev[bot]

To implement adaptive flush to disk during the fetch stage across all export scripts, follow these steps tailored to the structure of each script in the vector-io codebase:

  1. General Approach:

    • Monitor memory usage during data fetch operations.
    • Implement a threshold for memory usage, beyond which data should be flushed to disk.
    • Flush data to disk in batches, ensuring that memory usage stays within safe limits.
  2. For ExportAstraDB, ExportChroma, ExportKDBAI, ExportLanceDB, ExportMilvus, ExportPinecone, ExportQdrant, ExportVertexAIVectorSearch, ExportVespa, and ExportWeaviate:

    • Modify the get_data method to include a memory check after each batch fetch. Use Python's psutil library to monitor memory usage.
    • If memory usage exceeds a predefined threshold, flush the current batch to disk as a Parquet file and clear the batch from memory before continuing with the next fetch.
  3. Example Implementation for ExportKDBAI:

import psutil

def get_data(self):
    table_names = self.args['tables'].split(',')
    for table_name in table_names:
        data_batch = []
        for record in self.session.table(table_name).query():
            data_batch.append(record)
            if psutil.virtual_memory().percent > 80:  # Example threshold
                self.flush_to_disk(data_batch, table_name)
                data_batch = []
        if data_batch:  # Flush remaining data
            self.flush_to_disk(data_batch, table_name)

def flush_to_disk(self, data_batch, table_name):
    # Convert data_batch to DataFrame and save to Parquet
    df = pd.DataFrame(data_batch)
    df.to_parquet(f'{table_name}_{datetime.now().timestamp()}.parquet')
  1. Adaptation for Other Scripts:
    • Apply a similar approach to other export scripts, adjusting for each script's specific data fetching and processing logic.

This solution provides a scalable way to handle large data exports without running into OOM errors, ensuring data is safely written to disk in manageable batches.

References

ai-northstar-tech/vector-io/src/vdf_io/export_vdf/astradb_export.py ai-northstar-tech/vector-io/src/vdf_io/export_vdf/chroma_export.py ai-northstar-tech/vector-io/src/vdf_io/export_vdf/kdbai_export.py ai-northstar-tech/vector-io/src/vdf_io/export_vdf/lancedb_export.py ai-northstar-tech/vector-io/src/vdf_io/export_vdf/milvus_export.py ai-northstar-tech/vector-io/src/vdf_io/export_vdf/pinecone_export.py ai-northstar-tech/vector-io/src/vdf_io/export_vdf/qdrant_export.py ai-northstar-tech/vector-io/src/vdf_io/export_vdf/vertexai_vector_search_export.py ai-northstar-tech/vector-io/src/vdf_io/export_vdf/vespa_export.py ai-northstar-tech/vector-io/src/vdf_io/export_vdf/weaviate_export.py

Ask Greptile

greptile-apps[bot] avatar May 01 '24 16:05 greptile-apps[bot]