ocean icon indicating copy to clipboard operation
ocean copied to clipboard

[Integration][Core] Ocean Smart Caching

Open dennis-bilson-port opened this issue 1 month ago • 8 comments

User description

The current caching in Ocean core relies on basic disk and in-memory providers, which support simple get/set/clear operations but lack sophisticated controls. This leads to issues such as:

  • Risk of serving stale data without expiration mechanisms.
  • Potential memory or disk exhaustion due to unbounded growth.
  • No automated eviction when caches exceed limits.
  • Limited visibility into cache performance without statistics or monitoring.
  • Inefficient handling of cold starts without warming strategies.
  • Poor storage efficiency for large values without compression.

These limitations are particularly problematic for integrations handling large datasets or frequent API calls, where inefficient caching can degrade performance, increase costs, or cause failures.

Type of change

Please leave one option from the following and delete the rest:

  • [ ] Bug fix (non-breaking change which fixes an issue)
  • [x] New feature (non-breaking change which adds functionality)
  • [ ] New Integration (non-breaking change which adds a new integration)
  • [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • [ ] Non-breaking change (fix of existing functionality that will not change current behavior)
  • [ ] Documentation (added/updated documentation)

All tests should be run against the port production environment(using a testing org).

Core testing checklist

  • [ ] Integration able to create all default resources from scratch
  • [ ] Resync finishes successfully
  • [ ] Resync able to create entities
  • [ ] Resync able to update entities
  • [ ] Resync able to detect and delete entities
  • [ ] Scheduled resync able to abort existing resync and start a new one
  • [ ] Tested with at least 2 integrations from scratch
  • [ ] Tested with Kafka and Polling event listeners
  • [ ] Tested deletion of entities that don't pass the selector

Integration testing checklist

  • [ ] Integration able to create all default resources from scratch
  • [ ] Completed a full resync from a freshly installed integration and it completed successfully
  • [ ] Resync able to create entities
  • [ ] Resync able to update entities
  • [ ] Resync able to detect and delete entities
  • [ ] Resync finishes successfully
  • [ ] If new resource kind is added or updated in the integration, add example raw data, mapping and expected result to the examples folder in the integration directory.
  • [ ] If resource kind is updated, run the integration with the example data and check if the expected result is achieved
  • [ ] If new resource kind is added or updated, validate that live-events for that resource are working as expected
  • [ ] Docs PR link here

Preflight checklist

  • [ ] Handled rate limiting
  • [ ] Handled pagination
  • [x] Implemented the code in async
  • [ ] Support Multi account

Screenshots

Include screenshots from your environment showing how the resources of the integration will look.

API Documentation

Provide links to the API documentation used for this integration.


PR Type

Enhancement


Description

  • Implement hybrid cache provider with LRU eviction and TTL support

  • Add disk persistence using atomic pickle file operations

  • Support per-key TTL overriding default TTL configuration

  • Integrate hybrid cache as default for multi-process execution mode


Diagram Walkthrough

flowchart LR
  A["CachingStorageMode enum"] -->|"add hybrid option"| B["HybridCacheProvider"]
  B -->|"implements"| C["CacheProvider interface"]
  C -->|"supports"| D["LRU eviction"]
  C -->|"supports"| E["TTL expiration"]
  C -->|"supports"| F["Disk persistence"]
  G["Ocean config"] -->|"uses hybrid as default"| B
  H["Tests"] -->|"validate"| B

File Walkthrough

Relevant files
Enhancement
hybrid.py
Hybrid cache provider with LRU and persistence                     

port_ocean/cache/hybrid.py

  • New hybrid cache provider implementation with OrderedDict-based LRU
    eviction
  • Supports configurable max size, default TTL, and per-key TTL overrides
  • Implements disk persistence using atomic pickle file operations with
    temp file pattern
  • Automatic cleanup of expired entries on disk load and cache access
  • Custom exception classes for read/write error handling
+140/-0 
models.py
Add hybrid caching storage mode option                                     

port_ocean/core/models.py

  • Add hybrid option to CachingStorageMode enum
+1/-0     
ocean.py
Integrate hybrid cache provider into Ocean                             

port_ocean/ocean.py

  • Import HybridCacheProvider class
  • Register hybrid cache provider in caching type mapping
  • Change default cache provider for multi-process mode from
    DiskCacheProvider to HybridCacheProvider
+3/-1     
Tests
test_hybrid_cache.py
Comprehensive test suite for hybrid cache provider             

port_ocean/tests/cache/test_hybrid_cache.py

  • Comprehensive test suite with 30+ test cases covering basic
    operations, TTL handling, LRU eviction, disk persistence, and error
    scenarios
  • Tests for data type support, cache expiration, LRU ordering on
    access/update, and atomic writes
  • Error handling tests for corrupted pickle files, invalid formats, and
    write permissions
  • Edge case tests for empty keys, large values, special characters, and
    boundary max sizes
+448/-0 
Documentation
CHANGELOG.md
Update changelog with hybrid cache feature                             

CHANGELOG.md

  • Add version 0.28.19 release notes documenting hybrid cache improvement
+9/-0     

dennis-bilson-port avatar Oct 31 '25 08:10 dennis-bilson-port

Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/18967331699/artifacts/4427489722

Code Coverage Total Percentage: 87.96%

github-actions[bot] avatar Oct 31 '25 08:10 github-actions[bot]

Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/18968633488/artifacts/4427948785

Code Coverage Total Percentage: 88.01%

github-actions[bot] avatar Oct 31 '25 09:10 github-actions[bot]

Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/18968623446/artifacts/4427952315

Code Coverage Total Percentage: 88.01%

github-actions[bot] avatar Oct 31 '25 09:10 github-actions[bot]

Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/18969069257/artifacts/4428104187

Code Coverage Total Percentage: 88.01%

github-actions[bot] avatar Oct 31 '25 10:10 github-actions[bot]

Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/18969184625/artifacts/4428162329

Code Coverage Total Percentage: 88.01%

github-actions[bot] avatar Oct 31 '25 10:10 github-actions[bot]

Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/19025761087/artifacts/4445803525

Code Coverage Total Percentage: 87.95%

github-actions[bot] avatar Nov 03 '25 06:11 github-actions[bot]

PR Compliance Guide 🔍

Below is a summary of compliance checks for this PR:

Security Compliance
Insecure deserialization

Description: The cache uses pickle for disk persistence which can execute arbitrary code when loading
untrusted files; if an attacker can replace or modify the cache file, unpickling on
startup may lead to code execution—use a safe format or restrict file permissions and
location.
hybrid.py [86-112]

Referred Code
def _save_to_disk(self) -> None:
    """Save cache to pickle file."""
    if not self.cache_file:
        return
    try:
        # writing to a temp file first for atomicity
        temp_file = self.cache_file + ".tmp"
        with open(temp_file, "wb") as f:
            pickle.dump(self.cache, f, protocol=pickle.HIGHEST_PROTOCOL)
        os.replace(temp_file, self.cache_file)
        logger.debug(
            f"Saved {len(self.cache)} records to disk cache: {self.cache_file}"
        )
    except (pickle.PicklingError, IOError) as e:
        logger.warning(
            f"Failed to save cache to disk: {e}. Cache will not be persisted."
        )
        raise FailedToWriteHybridCacheError(
            f"Failed to save cache to disk: {e}. Cache will not be persisted."
        )
    except Exception as e:


 ... (clipped 6 lines)
Insecure file path

Description: The default cache file path '/tmp/ocean/.ocean_cache/smart_cache.pkl' under /tmp may be
world-accessible and susceptible to tampering or symlink attacks across users; ensure
directory creation with secure permissions and avoid shared /tmp for sensitive caches.
hybrid.py [21-23]

Referred Code
DEFAULT_CACHE_FILE = "/tmp/ocean/.ocean_cache/smart_cache.pkl"
DEFAULT_CACHE_TTL = 3600
DEFAULT_CACHE_MAX_SIZE = 100
Ticket Compliance
🎫 No ticket provided
  • [ ] Create ticket/issue
Codebase Duplication Compliance
Codebase context is not defined

Follow the guide to enable codebase context checks.

Custom Compliance
🟢
Generic: Meaningful Naming and Self-Documenting Code

Objective: Ensure all identifiers clearly express their purpose and intent, making code
self-documenting

Status: Passed

Generic: Comprehensive Audit Trails

Objective: To create a detailed and reliable record of critical system actions for security analysis
and compliance.

Status:
Missing Audit Logs: New cache operations (load/save/eviction) are not emitting audit-grade logs for critical
actions, making it unclear who performed actions and their outcomes.

Referred Code
def _save_to_disk(self) -> None:
    """Save cache to pickle file."""
    if not self.cache_file:
        return
    try:
        # writing to a temp file first for atomicity
        temp_file = self.cache_file + ".tmp"
        with open(temp_file, "wb") as f:
            pickle.dump(self.cache, f, protocol=pickle.HIGHEST_PROTOCOL)
        os.replace(temp_file, self.cache_file)
        logger.debug(
            f"Saved {len(self.cache)} records to disk cache: {self.cache_file}"
        )
    except (pickle.PicklingError, IOError) as e:
        logger.warning(
            f"Failed to save cache to disk: {e}. Cache will not be persisted."
        )
        raise FailedToWriteHybridCacheError(
            f"Failed to save cache to disk: {e}. Cache will not be persisted."
        )
    except Exception as e:


 ... (clipped 30 lines)
Generic: Robust Error Handling and Edge Case Management

Objective: Ensure comprehensive error handling that provides meaningful context and graceful
degradation

Status:
Exception Handling: Broad exception captures rethrow custom errors but may lose actionable context (e.g.,
key/path/operation) and rely on warnings without ensuring upstream handling.

Referred Code
    except (pickle.UnpicklingError, EOFError, OSError) as e:
        logger.warning(
            f"Failed to load cache from disk: {e}. Starting with empty cache."
        )
        raise FailedToReadHybridCacheError(
            f"Failed to load cache from disk: {e}. Starting with empty cache."
        )
    except Exception as e:
        logger.warning(f"Unexpected error loading cache: {e}. Starting fresh.")
        raise FailedToReadHybridCacheError(
            f"Unexpected error loading cache: {e}. Starting fresh."
        )

def _save_to_disk(self) -> None:
    """Save cache to pickle file."""
    if not self.cache_file:
        return
    try:
        # writing to a temp file first for atomicity
        temp_file = self.cache_file + ".tmp"
        with open(temp_file, "wb") as f:


 ... (clipped 19 lines)
Generic: Secure Error Handling

Objective: To prevent the leakage of sensitive system information through error messages while
providing sufficient detail for internal debugging.

Status:
Sensitive Path Leak: Error messages and logs include full filesystem paths and raw exception strings which may
expose internal details if surfaced to users.

Referred Code
        logger.debug(
            f"Loaded {len(self.cache)} records from disk cache: {self.cache_file}"
        )
    else:
        logger.warning(
            "Invalid cache file format. Expected OrderedDict, got %s",
            type(disk_cache),
        )
except (pickle.UnpicklingError, EOFError, OSError) as e:
    logger.warning(
        f"Failed to load cache from disk: {e}. Starting with empty cache."
    )
    raise FailedToReadHybridCacheError(
        f"Failed to load cache from disk: {e}. Starting with empty cache."
    )
except Exception as e:
    logger.warning(f"Unexpected error loading cache: {e}. Starting fresh.")
    raise FailedToReadHybridCacheError(
        f"Unexpected error loading cache: {e}. Starting fresh."
    )



 ... (clipped 27 lines)
Generic: Secure Logging Practices

Objective: To ensure logs are useful for debugging and auditing without exposing sensitive
information like PII, PHI, or cardholder data.

Status:
Unstructured Logging: Logging uses free-form strings via logger.debug/warning without structured fields, and may
log values count and file paths which could expose sensitive context.

Referred Code
        logger.debug(
            f"Loaded {len(self.cache)} records from disk cache: {self.cache_file}"
        )
    else:
        logger.warning(
            "Invalid cache file format. Expected OrderedDict, got %s",
            type(disk_cache),
        )
except (pickle.UnpicklingError, EOFError, OSError) as e:
    logger.warning(
        f"Failed to load cache from disk: {e}. Starting with empty cache."
    )
    raise FailedToReadHybridCacheError(
        f"Failed to load cache from disk: {e}. Starting with empty cache."
    )
except Exception as e:
    logger.warning(f"Unexpected error loading cache: {e}. Starting fresh.")
    raise FailedToReadHybridCacheError(
        f"Unexpected error loading cache: {e}. Starting fresh."
    )



 ... (clipped 27 lines)
Generic: Security-First Input Validation and Data Handling

Objective: Ensure all data inputs are validated, sanitized, and handled securely to prevent
vulnerabilities

Status:
Pickle Usage Risk: The cache persists and loads with pickle which executes code on load; without integrity
checks or trust boundaries, this may introduce deserialization risks.

Referred Code
try:
    with open(self.cache_file, "rb") as f:
        disk_cache = pickle.load(f)
    if isinstance(disk_cache, OrderedDict):
        current_time = time.time()
        records_to_remove = []
        for key, (_, timestamp, ttl) in disk_cache.items():
            if ttl is not None and timestamp + ttl < current_time:
                records_to_remove.append(key)
        for key in records_to_remove:
            del disk_cache[key]
        self.cache = disk_cache
        logger.debug(
            f"Loaded {len(self.cache)} records from disk cache: {self.cache_file}"
        )
    else:
        logger.warning(
            "Invalid cache file format. Expected OrderedDict, got %s",
            type(disk_cache),
        )
except (pickle.UnpicklingError, EOFError, OSError) as e:


 ... (clipped 6 lines)
Compliance status legend 🟢 - Fully Compliant
🟡 - Partial Compliant
🔴 - Not Compliant
⚪ - Requires Further Human Verification
🏷️ - Compliance label

qodo-code-review[bot] avatar Nov 04 '25 08:11 qodo-code-review[bot]

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
High-level
Rethink the multi-process persistence strategy

The current multi-process cache implementation is unsafe due to a lack of file
locking, which can cause race conditions and data corruption. A file-locking
mechanism should be added to ensure atomic read-modify-write operations.

Examples:

port_ocean/cache/hybrid.py [86-112]
    def _save_to_disk(self) -> None:
        """Save cache to pickle file."""
        if not self.cache_file:
            return
        try:
            # writing to a temp file first for atomicity
            temp_file = self.cache_file + ".tmp"
            with open(temp_file, "wb") as f:
                pickle.dump(self.cache, f, protocol=pickle.HIGHEST_PROTOCOL)
            os.replace(temp_file, self.cache_file)

 ... (clipped 17 lines)
port_ocean/ocean.py [133]
            return HybridCacheProvider()

Solution Walkthrough:

Before:

class HybridCacheProvider(CacheProvider):
    def __init__(self, ...):
        self.cache = OrderedDict()
        self._load_from_disk() # Reads file once on init

    def _save_to_disk(self):
        # No lock is acquired.
        # Multiple processes can enter here concurrently.
        temp_file = self.cache_file + ".tmp"
        with open(temp_file, "wb") as f:
            pickle.dump(self.cache, f)
        os.replace(temp_file, self.cache_file)

    async def set(self, key, value, ...):
        # Modifies its own in-memory copy of the cache
        self.cache[key] = (value, ...)
        # Writes its entire cache to disk, overwriting
        # any changes made by other processes.
        self._save_to_disk()

After:

from filelock import FileLock

class HybridCacheProvider(CacheProvider):
    def __init__(self, ...):
        self.cache = OrderedDict()
        self.lock = FileLock(self.cache_file + ".lock")
        with self.lock:
            self._load_from_disk()

    def _locked_read_modify_write(self, modification_func):
        with self.lock:
            self._load_from_disk() # Load latest state
            modification_func(self.cache)
            self._save_to_disk() # Save modified state

    async def set(self, key, value, ...):
        def modify(cache):
            cache[key] = (value, ...)
            cache.move_to_end(key)
            if len(cache) > self.max_size:
                cache.popitem(last=False)
        self._locked_read_modify_write(modify)

Suggestion importance[1-10]: 10

__

Why: The suggestion correctly identifies a critical race condition in the multi-process cache implementation due to the absence of file locking, which would lead to data corruption and loss of writes.

High
Security
Fix security risk in file loading

Fix a critical security vulnerability by avoiding pickle deserialization from a
file in the shared /tmp directory. Also, replace the os.path.exists check with
FileNotFoundError handling to prevent a TOCTOU race condition.

port_ocean/cache/hybrid.py [48-84]

 def _load_from_disk(self) -> None:
     """Load cache from pickle file if it exists and is valid."""
-    if not self.cache_file or not os.path.exists(self.cache_file):
+    if not self.cache_file:
         return
 
     try:
         with open(self.cache_file, "rb") as f:
             disk_cache = pickle.load(f)
         if isinstance(disk_cache, OrderedDict):
             current_time = time.time()
             records_to_remove = []
             for key, (_, timestamp, ttl) in disk_cache.items():
                 if ttl is not None and timestamp + ttl < current_time:
                     records_to_remove.append(key)
             for key in records_to_remove:
                 del disk_cache[key]
             self.cache = disk_cache
             logger.debug(
                 f"Loaded {len(self.cache)} records from disk cache: {self.cache_file}"
             )
         else:
             logger.warning(
                 "Invalid cache file format. Expected OrderedDict, got %s",
                 type(disk_cache),
             )
+    except FileNotFoundError:
+        logger.debug(f"Cache file not found: {self.cache_file}. Starting fresh.")
+        return
     except (pickle.UnpicklingError, EOFError, OSError) as e:
         logger.warning(
             f"Failed to load cache from disk: {e}. Starting with empty cache."
         )
         raise FailedToReadHybridCacheError(
             f"Failed to load cache from disk: {e}. Starting with empty cache."
         )
     except Exception as e:
         logger.warning(f"Unexpected error loading cache: {e}. Starting fresh.")
         raise FailedToReadHybridCacheError(
             f"Unexpected error loading cache: {e}. Starting fresh."
         )
  • [ ] Apply / Chat
Suggestion importance[1-10]: 10

__

Why: The suggestion correctly identifies a critical security vulnerability (arbitrary code execution via pickle deserialization) due to using a world-writable default directory (/tmp). This is a high-severity issue that must be fixed.

High
Securely create cache directory before writing

Prevent write errors and enhance security by ensuring the cache directory exists
before writing to it. Create the directory with secure permissions (0o700) to
restrict access.

port_ocean/cache/hybrid.py [86-112]

 def _save_to_disk(self) -> None:
     """Save cache to pickle file."""
     if not self.cache_file:
         return
     try:
+        # Create the directory with secure permissions if it doesn't exist
+        cache_dir = os.path.dirname(self.cache_file)
+        os.makedirs(cache_dir, mode=0o700, exist_ok=True)
+
         # writing to a temp file first for atomicity
         temp_file = self.cache_file + ".tmp"
         with open(temp_file, "wb") as f:
             pickle.dump(self.cache, f, protocol=pickle.HIGHEST_PROTOCOL)
         os.replace(temp_file, self.cache_file)
         logger.debug(
             f"Saved {len(self.cache)} records to disk cache: {self.cache_file}"
         )
     except (pickle.PicklingError, IOError) as e:
         logger.warning(
             f"Failed to save cache to disk: {e}. Cache will not be persisted."
         )
         raise FailedToWriteHybridCacheError(
             f"Failed to save cache to disk: {e}. Cache will not be persisted."
         )
     except Exception as e:
         logger.warning(
             f"Unexpected error saving cache: {e}. Cache will not be persisted."
         )
         raise FailedToWriteHybridCacheError(
             f"Unexpected error saving cache: {e}. Cache will not be persisted."
         )
  • [ ] Apply / Chat
Suggestion importance[1-10]: 10

__

Why: This suggestion addresses both a functional bug and a security flaw. The code would crash if the cache directory doesn't exist, and creating it with secure permissions (0o700) is essential to mitigate the pickle vulnerability.

High
General
Avoid blocking disk I/O in async method

Improve performance by removing the blocking, synchronous disk write
(_save_to_disk()) from the async def get() method. Defer saving to disk to avoid
blocking the event loop on read operations.

port_ocean/cache/hybrid.py [114-124]

 async def get(self, key: str) -> Optional[Any]:
     """Retrieve value if exists and not expired, else None. Move to end for LRU."""
     if key not in self.cache:
         return None
     value, timestamp, ttl = self.cache[key]
     if ttl is not None and timestamp + ttl < time.time():
         del self.cache[key]
-        self._save_to_disk()
+        # Defer saving to disk to avoid blocking on a get operation
         return None
     self.cache.move_to_end(key)
     return value
  • [ ] Apply / Chat
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly points out that performing a synchronous, blocking I/O operation (_save_to_disk) inside an async method is a performance anti-pattern that blocks the event loop. Removing it improves performance.

Medium
Improve test for zero-sized cache

Improve the test for a zero-sized cache by adding an assertion to verify that
get() returns None for a key that was just set. This confirms the expected
external behavior of immediate eviction.

port_ocean/tests/cache/test_hybrid_cache.py [428-435]

 @pytest.mark.asyncio
 async def test_hybrid_cache_zero_max_size(temp_cache_file: str) -> None:
     """Test cache behavior with max_size=0 (immediate eviction)."""
     cache = HybridCacheProvider(max_size=0, default_ttl=60, cache_file=temp_cache_file)
 
     await cache.set("key1", "value1")
     # With max_size=0, nothing should stay in cache
     assert len(cache.cache) == 0
+    assert await cache.get("key1") is None
  • [ ] Apply / Chat
Suggestion importance[1-10]: 5

__

Why: The suggestion correctly identifies that the test for max_size=0 is incomplete. While the existing assertion is correct, adding a check for get returning None makes the test more robust by verifying the external behavior, not just the internal state.

Low
  • [ ] More

qodo-code-review[bot] avatar Nov 04 '25 08:11 qodo-code-review[bot]