cohere-python icon indicating copy to clipboard operation
cohere-python copied to clipboard

feat: Add memory-efficient embed_stream method for large datasets

Open fede-kamel opened this issue 3 months ago • 4 comments

Summary

This PR introduces a memory-efficient streaming API for embeddings that enables processing of large datasets without loading all embeddings into memory at once. The new embed_stream() method yields embeddings one at a time, reducing memory usage from O(n) to O(1).

Motivation

When embedding large datasets (thousands or millions of texts), the current embed() method loads all results into memory, which can cause:

  • Out-of-memory errors for large datasets
  • Poor performance due to memory pressure
  • Inability to process results incrementally

This streaming approach solves these issues by processing and yielding embeddings individually.

Implementation

Core Components

  1. StreamingEmbedParser (src/cohere/streaming_utils.py)

    • Uses ijson for incremental JSON parsing when available
    • Falls back to regular JSON parsing if ijson not installed
    • Supports both embeddings_floats and embeddings_by_type formats
  2. embed_stream() method

    • Added to BaseCohere class for v1 API
    • Added to V2Client class for v2 API
    • Processes texts in configurable batches (default: 10)
    • Returns iterator of StreamedEmbedding objects

Usage Example

import cohere

client = cohere.Client()

# Process large dataset without loading all embeddings into memory
for embedding in client.embed_stream(
    texts=large_text_list,  # Can be thousands of texts
    model="embed-english-v3.0",
    input_type="classification",
    batch_size=20  # Process 20 texts per API call
):
    # Process each embedding individually
    save_to_database(embedding.index, embedding.embedding)
    # Memory usage remains constant regardless of dataset size

Testing

Comprehensive test suite added in tests/test_embed_streaming.py:

  • ✅ Fallback JSON parsing tests
  • ✅ Mock response tests for v1 and v2 clients
  • ✅ Empty input handling
  • ✅ Real API integration tests
  • ✅ Memory efficiency validation
  • ✅ All tests passing (6 passed)

Performance

  • Memory Usage: O(1) instead of O(n)
  • Batch Processing: Configurable batch size for optimal API usage
  • Incremental Processing: Results can be processed as they arrive
  • No Breaking Changes: Existing embed() method unchanged

Quality Checks

  • Ruff linting: All checks passed
  • Mypy type checking: No issues found
  • Tests: Full test coverage with mocked and real API tests
  • Backward compatibility: No changes to existing APIs

Dependencies

  • Optional: ijson for efficient streaming (falls back gracefully if not installed)
  • All existing dependencies remain unchanged

Future Enhancements

This streaming pattern could be extended to other endpoints that return large collections of data.

fede-kamel avatar Sep 24 '25 19:09 fede-kamel

Test Results with Real API

I've run the complete test suite with a real API key and all tests are passing successfully:

$ CO_API_KEY= <api key>  python -m pytest tests/test_embed_streaming.py -v

============================= test session starts ==============================
platform linux -- Python 3.13.5, pytest-7.4.4, pluggy-1.6.0
rootdir: /home/fede/Projects/cohere-python
configfile: pyproject.toml
plugins: anyio-4.10.0, asyncio-0.23.8
collected 6 items

tests/test_embed_streaming.py::TestEmbedStreaming::test_embed_stream_empty_input PASSED [ 16%]
tests/test_embed_streaming.py::TestEmbedStreaming::test_embed_stream_memory_efficiency PASSED [ 33%]
tests/test_embed_streaming.py::TestEmbedStreaming::test_embed_stream_with_mock PASSED [ 50%]
tests/test_embed_streaming.py::TestEmbedStreaming::test_embed_stream_with_real_api PASSED [ 66%]
tests/test_embed_streaming.py::TestEmbedStreaming::test_streaming_embed_parser_fallback PASSED [ 83%]
tests/test_embed_streaming.py::TestEmbedStreaming::test_v2_embed_stream_with_mock PASSED [100%]

======================== 6 passed, 6 warnings in 0.97s =========================

Real API Integration Test Output

The test_embed_stream_with_real_api test successfully:

  • Connected to the Cohere API
  • Processed 3 texts in batches of 2
  • Received embeddings with 1024 dimensions each
  • Verified streaming functionality works correctly with real responses

Demo Run

I also ran a demo script processing 10 texts in batches of 3:

Testing memory-efficient embed streaming...
Processing 10 texts in batches of 3

✓ Processed embedding 0: 'The quick brown fox jumps over...' (dims: 1024)
✓ Processed embedding 1: 'Machine learning is transformi...' (dims: 1024)
✓ Processed embedding 2: 'Natural language processing en...' (dims: 1024)
✓ Processed embedding 3: 'Embeddings capture semantic me...' (dims: 1024)
✓ Processed embedding 4: 'Vector databases enable effici...' (dims: 1024)
✓ Processed embedding 5: 'Large language models understa...' (dims: 1024)
✓ Processed embedding 6: 'Streaming APIs reduce memory c...' (dims: 1024)
✓ Processed embedding 7: 'Batch processing improves thro...' (dims: 1024)
✓ Processed embedding 8: 'Python is great for data scien...' (dims: 1024)
✓ Processed embedding 9: 'Cohere provides powerful AI ca...' (dims: 1024)

✨ Successfully processed 10 embeddings in 0.75 seconds
Memory usage remains low as embeddings are yielded one at a time\!

The streaming functionality is working perfectly with the production API! 🎉

fede-kamel avatar Sep 24 '25 19:09 fede-kamel

Comprehensive Test Results

1. Unit Tests - All Passing ✅

$ source venv/bin/activate && CO_API_KEY=<api key> python -m pytest tests/test_embed_streaming.py -v

============================= test session starts ==============================
platform linux -- Python 3.13.5, pytest-7.4.4, pluggy-1.6.0
rootdir: /home/fede/Projects/cohere-python
configfile: pyproject.toml
plugins: anyio-4.10.0, asyncio-0.23.8
collected 6 items

tests/test_embed_streaming.py::TestEmbedStreaming::test_embed_stream_empty_input PASSED [ 16%]
tests/test_embed_streaming.py::TestEmbedStreaming::test_embed_stream_memory_efficiency PASSED [ 33%]
tests/test_embed_streaming.py::TestEmbedStreaming::test_embed_stream_with_mock PASSED [ 50%]
tests/test_embed_streaming.py::TestEmbedStreaming::test_embed_stream_with_real_api PASSED [ 66%]
tests/test_embed_streaming.py::TestEmbedStreaming::test_streaming_embed_parser_fallback PASSED [ 83%]
tests/test_embed_streaming.py::TestEmbedStreaming::test_v2_embed_stream_with_mock PASSED [100%]

======================== 6 passed, 6 warnings in 0.97s =========================

2. Code Quality - Ruff Linting ✅

$ ruff check src/cohere/streaming_utils.py src/cohere/base_client.py src/cohere/v2/client.py tests/test_embed_streaming.py
All checks passed\!

3. Type Checking - Mypy ✅

$ mypy src/cohere/streaming_utils.py src/cohere/base_client.py src/cohere/v2/client.py --ignore-missing-imports
Success: no issues found in 3 source files

4. Integration Test with Real API ✅

Created and ran a demo script that processes 10 embeddings:

# Demo script output:
Testing memory-efficient embed streaming...
Processing 10 texts in batches of 3

✓ Processed embedding 0: 'The quick brown fox jumps over...' (dims: 1024)
✓ Processed embedding 1: 'Machine learning is transformi...' (dims: 1024)
✓ Processed embedding 2: 'Natural language processing en...' (dims: 1024)
✓ Processed embedding 3: 'Embeddings capture semantic me...' (dims: 1024)
✓ Processed embedding 4: 'Vector databases enable effici...' (dims: 1024)
✓ Processed embedding 5: 'Large language models understa...' (dims: 1024)
✓ Processed embedding 6: 'Streaming APIs reduce memory c...' (dims: 1024)
✓ Processed embedding 7: 'Batch processing improves thro...' (dims: 1024)
✓ Processed embedding 8: 'Python is great for data scien...' (dims: 1024)
✓ Processed embedding 9: 'Cohere provides powerful AI ca...' (dims: 1024)

✨ Successfully processed 10 embeddings in 0.75 seconds
Memory usage remains low as embeddings are yielded one at a time\!

5. Test Coverage Summary

Test Case Status Description
test_embed_stream_empty_input ✅ PASSED Handles empty/None input gracefully
test_embed_stream_memory_efficiency ✅ PASSED Validates O(1) memory usage
test_embed_stream_with_mock ✅ PASSED Tests v1 client with mocked responses
test_embed_stream_with_real_api ✅ PASSED Real API integration test
test_streaming_embed_parser_fallback ✅ PASSED JSON fallback when ijson unavailable
test_v2_embed_stream_with_mock ✅ PASSED Tests v2 client compatibility

6. Environment Details

  • Python 3.13.5
  • pytest 7.4.4
  • Dependencies installed via Poetry
  • Optional ijson library installed for optimal performance
  • Tested on Linux platform

7. Files Modified

modified:   src/cohere/base_client.py
modified:   src/cohere/streaming_utils.py
modified:   src/cohere/v2/client.py
modified:   tests/test_embed_streaming.py

All tests pass successfully and the implementation is ready for production use! 🚀

fede-kamel avatar Sep 24 '25 19:09 fede-kamel

🔄 PR Updated - Rebased on Latest Main

This PR has been rebased on the latest main branch and is ready for review.

Changes:

  • ✅ Rebased on upstream/main (no conflicts)
  • ✅ All 6 tests passing
  • ✅ Ruff linting passes
  • ✅ Mypy type checking passes

Requesting Review: @mkozakov @MusaTalluzi-cohere @andrewbcohere @daniel-cohere

This adds a memory-efficient streaming API for embeddings that enables processing of large datasets without loading all embeddings into memory at once. Would appreciate your review when you have a chance!

Key Features:

  • Memory usage: O(1) instead of O(n)
  • Configurable batch processing
  • Graceful fallback if ijson not installed
  • No breaking changes to existing APIs

fede-kamel avatar Oct 28 '25 15:10 fede-kamel

Hi @mkozakov, @billytrend-cohere, @daniel-cohere! 👋

Hope you're having a great week! I wanted to follow up on this PR that introduces memory-efficient streaming for embeddings.

Why this matters: When embedding large datasets (thousands or millions of texts), the current embed() method loads all results into memory, causing OOM errors and performance issues. This streaming approach reduces memory usage from O(n) to O(1).

What's been validated:

  • ✅ Full test suite passing (6 tests covering mock and real API calls)
  • ✅ Ruff linting and Mypy type checking passed
  • No merge conflicts - ready to merge
  • ✅ Backward compatible (new method, existing embed() unchanged)
  • ✅ Graceful fallback if optional ijson dependency not installed

Key features:

  • Process embeddings incrementally without memory pressure
  • Configurable batch size for optimal API usage
  • Works with both v1 and v2 clients

Usage example:

for embedding in client.embed_stream(texts=large_dataset, batch_size=20):
    save_to_database(embedding.index, embedding.embedding)
    # Memory stays constant regardless of dataset size

This enables processing of datasets that previously would have crashed due to memory constraints.

Would you be able to review this when you get a moment? Happy to address any feedback!

Thank you for all your work on this SDK! 🙏

fede-kamel avatar Nov 12 '25 00:11 fede-kamel