vector-io
vector-io copied to clipboard
Implement adaptive flush to disk during fetch stage (to avoid OOM) in all export scripts
@greptile
I was in the process of creating an implementation plan to modify the 'vespa_export.py' and 'azureai_export.py' scripts to implement the 'adaptive flush to disk' feature. However, I ran out of time before I could finish. I will need more time to complete this task. Please let me know if you have any questions or need further clarification on anything.
Implementation proposal (work in progress)
Step 1: Implement adaptive flush to disk in 'vertexai_vector_search_export.py'
Modify the 'vertexai_vector_search_export.py' script to implement the 'adaptive flush to disk' feature:
- In the 'get_data_for_index' method, fetch data in batches instead of all at once. You can do this by modifying the 'find_neighbors' call to fetch a limited number of vectors at a time.
- After each batch is fetched, check the size of the fetched data. If the size exceeds a certain limit (you can use a constant for this limit), write the data to disk and clear the fetched data from memory.
- Continue this process until all vectors have been fetched and written to disk.
You can use the 'astradb_export.py' script as a reference for how to implement this feature.
Step 2: Implement adaptive flush to disk in 'milvus_export.py'
Modify the 'milvus_export.py' script to implement the 'adaptive flush to disk' feature:
- In the 'get_data_for_collection' method, fetch data in batches as it currently does. However, instead of writing each batch to disk immediately, store the batches in memory until the size of the fetched data exceeds a certain limit.
- When the size of the fetched data exceeds a certain limit (you can use a constant for this limit), write the data to disk and clear the fetched data from memory.
- Continue this process until all vectors have been fetched and written to disk.
You can use the 'astradb_export.py' script as a reference for how to implement this feature.
Step 3: Implement adaptive flush to disk in 'vespa_export.py'
Modify the 'vespa_export.py' script to implement the 'adaptive flush to disk' feature:
- In the 'get_data_for_index' method, fetch data in batches instead of all at once. You can do this by modifying the 'get_all_documents' call to fetch a limited number of documents at a time.
- After each batch is fetched, check the size of the fetched data. If the size exceeds a certain limit (you can use a constant for this limit), write the data to disk and clear the fetched data from memory.
- Continue this process until all documents have been fetched and written to disk.
You can use the 'astradb_export.py' script as a reference for how to implement this feature.
For more information about Ellipsis, check the documentation.
To implement adaptive flush to disk during the fetch stage across all export scripts, follow these steps tailored to the structure of each script in the vector-io codebase:
-
General Approach:
- Monitor memory usage during data fetch operations.
- Implement a threshold for memory usage, beyond which data should be flushed to disk.
- Flush data to disk in batches, ensuring that memory usage stays within safe limits.
-
For
ExportAstraDB,ExportChroma,ExportKDBAI,ExportLanceDB,ExportMilvus,ExportPinecone,ExportQdrant,ExportVertexAIVectorSearch,ExportVespa, andExportWeaviate:- Modify the
get_datamethod to include a memory check after each batch fetch. Use Python'spsutillibrary to monitor memory usage. - If memory usage exceeds a predefined threshold, flush the current batch to disk as a Parquet file and clear the batch from memory before continuing with the next fetch.
- Modify the
-
Example Implementation for
ExportKDBAI:
import psutil
def get_data(self):
table_names = self.args['tables'].split(',')
for table_name in table_names:
data_batch = []
for record in self.session.table(table_name).query():
data_batch.append(record)
if psutil.virtual_memory().percent > 80: # Example threshold
self.flush_to_disk(data_batch, table_name)
data_batch = []
if data_batch: # Flush remaining data
self.flush_to_disk(data_batch, table_name)
def flush_to_disk(self, data_batch, table_name):
# Convert data_batch to DataFrame and save to Parquet
df = pd.DataFrame(data_batch)
df.to_parquet(f'{table_name}_{datetime.now().timestamp()}.parquet')
- Adaptation for Other Scripts:
- Apply a similar approach to other export scripts, adjusting for each script's specific data fetching and processing logic.
This solution provides a scalable way to handle large data exports without running into OOM errors, ensuring data is safely written to disk in manageable batches.
References
ai-northstar-tech/vector-io/src/vdf_io/export_vdf/astradb_export.py ai-northstar-tech/vector-io/src/vdf_io/export_vdf/chroma_export.py ai-northstar-tech/vector-io/src/vdf_io/export_vdf/kdbai_export.py ai-northstar-tech/vector-io/src/vdf_io/export_vdf/lancedb_export.py ai-northstar-tech/vector-io/src/vdf_io/export_vdf/milvus_export.py ai-northstar-tech/vector-io/src/vdf_io/export_vdf/pinecone_export.py ai-northstar-tech/vector-io/src/vdf_io/export_vdf/qdrant_export.py ai-northstar-tech/vector-io/src/vdf_io/export_vdf/vertexai_vector_search_export.py ai-northstar-tech/vector-io/src/vdf_io/export_vdf/vespa_export.py ai-northstar-tech/vector-io/src/vdf_io/export_vdf/weaviate_export.py