User description
The current caching in Ocean core relies on basic disk and in-memory providers, which support simple get/set/clear operations but lack sophisticated controls. This leads to issues such as:
- Risk of serving stale data without expiration mechanisms.
- Potential memory or disk exhaustion due to unbounded growth.
- No automated eviction when caches exceed limits.
- Limited visibility into cache performance without statistics or monitoring.
- Inefficient handling of cold starts without warming strategies.
- Poor storage efficiency for large values without compression.
These limitations are particularly problematic for integrations handling large datasets or frequent API calls, where inefficient caching can degrade performance, increase costs, or cause failures.
Type of change
Please leave one option from the following and delete the rest:
- [ ] Bug fix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] New Integration (non-breaking change which adds a new integration)
- [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected)
- [ ] Non-breaking change (fix of existing functionality that will not change current behavior)
- [ ] Documentation (added/updated documentation)
All tests should be run against the port production environment(using a testing org).
Core testing checklist
- [ ] Integration able to create all default resources from scratch
- [ ] Resync finishes successfully
- [ ] Resync able to create entities
- [ ] Resync able to update entities
- [ ] Resync able to detect and delete entities
- [ ] Scheduled resync able to abort existing resync and start a new one
- [ ] Tested with at least 2 integrations from scratch
- [ ] Tested with Kafka and Polling event listeners
- [ ] Tested deletion of entities that don't pass the selector
Integration testing checklist
- [ ] Integration able to create all default resources from scratch
- [ ] Completed a full resync from a freshly installed integration and it completed successfully
- [ ] Resync able to create entities
- [ ] Resync able to update entities
- [ ] Resync able to detect and delete entities
- [ ] Resync finishes successfully
- [ ] If new resource kind is added or updated in the integration, add example raw data, mapping and expected result to the
examples folder in the integration directory.
- [ ] If resource kind is updated, run the integration with the example data and check if the expected result is achieved
- [ ] If new resource kind is added or updated, validate that live-events for that resource are working as expected
- [ ] Docs PR link here
Preflight checklist
- [ ] Handled rate limiting
- [ ] Handled pagination
- [x] Implemented the code in async
- [ ] Support Multi account
Screenshots
Include screenshots from your environment showing how the resources of the integration will look.
API Documentation
Provide links to the API documentation used for this integration.
PR Type
Enhancement
Description
-
Implement hybrid cache provider with LRU eviction and TTL support
-
Add disk persistence using atomic pickle file operations
-
Support per-key TTL overriding default TTL configuration
-
Integrate hybrid cache as default for multi-process execution mode
Diagram Walkthrough
flowchart LR
A["CachingStorageMode enum"] -->|"add hybrid option"| B["HybridCacheProvider"]
B -->|"implements"| C["CacheProvider interface"]
C -->|"supports"| D["LRU eviction"]
C -->|"supports"| E["TTL expiration"]
C -->|"supports"| F["Disk persistence"]
G["Ocean config"] -->|"uses hybrid as default"| B
H["Tests"] -->|"validate"| B
File Walkthrough
| Relevant files |
|---|
| Enhancement |
hybrid.pyHybrid cache provider with LRU and persistence
port_ocean/cache/hybrid.py
- New hybrid cache provider implementation with OrderedDict-based LRU
eviction - Supports configurable max size, default TTL, and per-key TTL overrides
- Implements disk persistence using atomic pickle file operations with
temp file pattern - Automatic cleanup of expired entries on disk load and cache access
- Custom exception classes for read/write error handling
|
+140/-0 |
models.pyAdd hybrid caching storage mode option
port_ocean/core/models.py
- Add
hybrid option to CachingStorageMode enum
|
+1/-0 |
ocean.pyIntegrate hybrid cache provider into Ocean
port_ocean/ocean.py
- Import
HybridCacheProvider class - Register hybrid cache provider in caching type mapping
- Change default cache provider for multi-process mode from
DiskCacheProvider to HybridCacheProvider
|
+3/-1 |
|
| Tests |
test_hybrid_cache.pyComprehensive test suite for hybrid cache provider
port_ocean/tests/cache/test_hybrid_cache.py
- Comprehensive test suite with 30+ test cases covering basic
operations, TTL handling, LRU eviction, disk persistence, and error scenarios - Tests for data type support, cache expiration, LRU ordering on
access/update, and atomic writes - Error handling tests for corrupted pickle files, invalid formats, and
write permissions - Edge case tests for empty keys, large values, special characters, and
boundary max sizes
|
+448/-0 |
|
| Documentation |
CHANGELOG.mdUpdate changelog with hybrid cache feature
CHANGELOG.md
- Add version 0.28.19 release notes documenting hybrid cache improvement
|
+9/-0 |
|
PR Compliance Guide 🔍
Below is a summary of compliance checks for this PR:
| Security Compliance |
| ⚪ |
Insecure deserialization
Description: The cache uses pickle for disk persistence which can execute arbitrary code when loading untrusted files; if an attacker can replace or modify the cache file, unpickling on startup may lead to code execution—use a safe format or restrict file permissions and location. hybrid.py [86-112]
Referred Code
def _save_to_disk(self) -> None:
"""Save cache to pickle file."""
if not self.cache_file:
return
try:
# writing to a temp file first for atomicity
temp_file = self.cache_file + ".tmp"
with open(temp_file, "wb") as f:
pickle.dump(self.cache, f, protocol=pickle.HIGHEST_PROTOCOL)
os.replace(temp_file, self.cache_file)
logger.debug(
f"Saved {len(self.cache)} records to disk cache: {self.cache_file}"
)
except (pickle.PicklingError, IOError) as e:
logger.warning(
f"Failed to save cache to disk: {e}. Cache will not be persisted."
)
raise FailedToWriteHybridCacheError(
f"Failed to save cache to disk: {e}. Cache will not be persisted."
)
except Exception as e:
... (clipped 6 lines)
|
Insecure file path
Description: The default cache file path '/tmp/ocean/.ocean_cache/smart_cache.pkl' under /tmp may be world-accessible and susceptible to tampering or symlink attacks across users; ensure directory creation with secure permissions and avoid shared /tmp for sensitive caches. hybrid.py [21-23]
Referred Code
DEFAULT_CACHE_FILE = "/tmp/ocean/.ocean_cache/smart_cache.pkl"
DEFAULT_CACHE_TTL = 3600
DEFAULT_CACHE_MAX_SIZE = 100
|
| Ticket Compliance |
| ⚪ | 🎫 No ticket provided
|
| Codebase Duplication Compliance |
| ⚪ | Codebase context is not defined
Follow the guide to enable codebase context checks.
|
| Custom Compliance |
| 🟢 |
Generic: Meaningful Naming and Self-Documenting Code
Objective: Ensure all identifiers clearly express their purpose and intent, making code self-documenting
Status: Passed
|
| ⚪ |
Generic: Comprehensive Audit Trails
Objective: To create a detailed and reliable record of critical system actions for security analysis and compliance.
Status: Missing Audit Logs: New cache operations (load/save/eviction) are not emitting audit-grade logs for critical actions, making it unclear who performed actions and their outcomes.
Referred Code
def _save_to_disk(self) -> None:
"""Save cache to pickle file."""
if not self.cache_file:
return
try:
# writing to a temp file first for atomicity
temp_file = self.cache_file + ".tmp"
with open(temp_file, "wb") as f:
pickle.dump(self.cache, f, protocol=pickle.HIGHEST_PROTOCOL)
os.replace(temp_file, self.cache_file)
logger.debug(
f"Saved {len(self.cache)} records to disk cache: {self.cache_file}"
)
except (pickle.PicklingError, IOError) as e:
logger.warning(
f"Failed to save cache to disk: {e}. Cache will not be persisted."
)
raise FailedToWriteHybridCacheError(
f"Failed to save cache to disk: {e}. Cache will not be persisted."
)
except Exception as e:
... (clipped 30 lines)
|
Generic: Robust Error Handling and Edge Case Management
Objective: Ensure comprehensive error handling that provides meaningful context and graceful degradation
Status: Exception Handling: Broad exception captures rethrow custom errors but may lose actionable context (e.g., key/path/operation) and rely on warnings without ensuring upstream handling.
Referred Code
except (pickle.UnpicklingError, EOFError, OSError) as e:
logger.warning(
f"Failed to load cache from disk: {e}. Starting with empty cache."
)
raise FailedToReadHybridCacheError(
f"Failed to load cache from disk: {e}. Starting with empty cache."
)
except Exception as e:
logger.warning(f"Unexpected error loading cache: {e}. Starting fresh.")
raise FailedToReadHybridCacheError(
f"Unexpected error loading cache: {e}. Starting fresh."
)
def _save_to_disk(self) -> None:
"""Save cache to pickle file."""
if not self.cache_file:
return
try:
# writing to a temp file first for atomicity
temp_file = self.cache_file + ".tmp"
with open(temp_file, "wb") as f:
... (clipped 19 lines)
|
Generic: Secure Error Handling
Objective: To prevent the leakage of sensitive system information through error messages while providing sufficient detail for internal debugging.
Status: Sensitive Path Leak: Error messages and logs include full filesystem paths and raw exception strings which may expose internal details if surfaced to users.
Referred Code
logger.debug(
f"Loaded {len(self.cache)} records from disk cache: {self.cache_file}"
)
else:
logger.warning(
"Invalid cache file format. Expected OrderedDict, got %s",
type(disk_cache),
)
except (pickle.UnpicklingError, EOFError, OSError) as e:
logger.warning(
f"Failed to load cache from disk: {e}. Starting with empty cache."
)
raise FailedToReadHybridCacheError(
f"Failed to load cache from disk: {e}. Starting with empty cache."
)
except Exception as e:
logger.warning(f"Unexpected error loading cache: {e}. Starting fresh.")
raise FailedToReadHybridCacheError(
f"Unexpected error loading cache: {e}. Starting fresh."
)
... (clipped 27 lines)
|
Generic: Secure Logging Practices
Objective: To ensure logs are useful for debugging and auditing without exposing sensitive information like PII, PHI, or cardholder data.
Status: Unstructured Logging: Logging uses free-form strings via logger.debug/warning without structured fields, and may log values count and file paths which could expose sensitive context.
Referred Code
logger.debug(
f"Loaded {len(self.cache)} records from disk cache: {self.cache_file}"
)
else:
logger.warning(
"Invalid cache file format. Expected OrderedDict, got %s",
type(disk_cache),
)
except (pickle.UnpicklingError, EOFError, OSError) as e:
logger.warning(
f"Failed to load cache from disk: {e}. Starting with empty cache."
)
raise FailedToReadHybridCacheError(
f"Failed to load cache from disk: {e}. Starting with empty cache."
)
except Exception as e:
logger.warning(f"Unexpected error loading cache: {e}. Starting fresh.")
raise FailedToReadHybridCacheError(
f"Unexpected error loading cache: {e}. Starting fresh."
)
... (clipped 27 lines)
|
Generic: Security-First Input Validation and Data Handling
Objective: Ensure all data inputs are validated, sanitized, and handled securely to prevent vulnerabilities
Status: Pickle Usage Risk: The cache persists and loads with pickle which executes code on load; without integrity checks or trust boundaries, this may introduce deserialization risks.
Referred Code
try:
with open(self.cache_file, "rb") as f:
disk_cache = pickle.load(f)
if isinstance(disk_cache, OrderedDict):
current_time = time.time()
records_to_remove = []
for key, (_, timestamp, ttl) in disk_cache.items():
if ttl is not None and timestamp + ttl < current_time:
records_to_remove.append(key)
for key in records_to_remove:
del disk_cache[key]
self.cache = disk_cache
logger.debug(
f"Loaded {len(self.cache)} records from disk cache: {self.cache_file}"
)
else:
logger.warning(
"Invalid cache file format. Expected OrderedDict, got %s",
type(disk_cache),
)
except (pickle.UnpicklingError, EOFError, OSError) as e:
... (clipped 6 lines)
|
|
|
Compliance status legend
🟢 - Fully Compliant
🟡 - Partial Compliant
🔴 - Not Compliant
⚪ - Requires Further Human Verification
🏷️ - Compliance label
PR Code Suggestions ✨
Explore these optional code suggestions:
| Category | Suggestion | Impact |
| High-level |
Rethink the multi-process persistence strategy
The current multi-process cache implementation is unsafe due to a lack of file locking, which can cause race conditions and data corruption. A file-locking mechanism should be added to ensure atomic read-modify-write operations.
Examples:
port_ocean/cache/hybrid.py [86-112]
def _save_to_disk(self) -> None:
"""Save cache to pickle file."""
if not self.cache_file:
return
try:
# writing to a temp file first for atomicity
temp_file = self.cache_file + ".tmp"
with open(temp_file, "wb") as f:
pickle.dump(self.cache, f, protocol=pickle.HIGHEST_PROTOCOL)
os.replace(temp_file, self.cache_file)
... (clipped 17 lines)
port_ocean/ocean.py [133]
return HybridCacheProvider()
Solution Walkthrough:
Before:
class HybridCacheProvider(CacheProvider):
def __init__(self, ...):
self.cache = OrderedDict()
self._load_from_disk() # Reads file once on init
def _save_to_disk(self):
# No lock is acquired.
# Multiple processes can enter here concurrently.
temp_file = self.cache_file + ".tmp"
with open(temp_file, "wb") as f:
pickle.dump(self.cache, f)
os.replace(temp_file, self.cache_file)
async def set(self, key, value, ...):
# Modifies its own in-memory copy of the cache
self.cache[key] = (value, ...)
# Writes its entire cache to disk, overwriting
# any changes made by other processes.
self._save_to_disk()
After:
from filelock import FileLock
class HybridCacheProvider(CacheProvider):
def __init__(self, ...):
self.cache = OrderedDict()
self.lock = FileLock(self.cache_file + ".lock")
with self.lock:
self._load_from_disk()
def _locked_read_modify_write(self, modification_func):
with self.lock:
self._load_from_disk() # Load latest state
modification_func(self.cache)
self._save_to_disk() # Save modified state
async def set(self, key, value, ...):
def modify(cache):
cache[key] = (value, ...)
cache.move_to_end(key)
if len(cache) > self.max_size:
cache.popitem(last=False)
self._locked_read_modify_write(modify)
Suggestion importance[1-10]: 10
__
Why: The suggestion correctly identifies a critical race condition in the multi-process cache implementation due to the absence of file locking, which would lead to data corruption and loss of writes.
| High
|
| Security |
Fix security risk in file loading
Fix a critical security vulnerability by avoiding pickle deserialization from a file in the shared /tmp directory. Also, replace the os.path.exists check with
FileNotFoundError handling to prevent a TOCTOU race condition.
port_ocean/cache/hybrid.py [48-84]
def _load_from_disk(self) -> None:
"""Load cache from pickle file if it exists and is valid."""
- if not self.cache_file or not os.path.exists(self.cache_file):
+ if not self.cache_file:
return
try:
with open(self.cache_file, "rb") as f:
disk_cache = pickle.load(f)
if isinstance(disk_cache, OrderedDict):
current_time = time.time()
records_to_remove = []
for key, (_, timestamp, ttl) in disk_cache.items():
if ttl is not None and timestamp + ttl < current_time:
records_to_remove.append(key)
for key in records_to_remove:
del disk_cache[key]
self.cache = disk_cache
logger.debug(
f"Loaded {len(self.cache)} records from disk cache: {self.cache_file}"
)
else:
logger.warning(
"Invalid cache file format. Expected OrderedDict, got %s",
type(disk_cache),
)
+ except FileNotFoundError:
+ logger.debug(f"Cache file not found: {self.cache_file}. Starting fresh.")
+ return
except (pickle.UnpicklingError, EOFError, OSError) as e:
logger.warning(
f"Failed to load cache from disk: {e}. Starting with empty cache."
)
raise FailedToReadHybridCacheError(
f"Failed to load cache from disk: {e}. Starting with empty cache."
)
except Exception as e:
logger.warning(f"Unexpected error loading cache: {e}. Starting fresh.")
raise FailedToReadHybridCacheError(
f"Unexpected error loading cache: {e}. Starting fresh."
)
Suggestion importance[1-10]: 10
__
Why: The suggestion correctly identifies a critical security vulnerability (arbitrary code execution via pickle deserialization) due to using a world-writable default directory (/tmp). This is a high-severity issue that must be fixed.
| High
|
Securely create cache directory before writing
Prevent write errors and enhance security by ensuring the cache directory exists before writing to it. Create the directory with secure permissions (0o700) to restrict access.
port_ocean/cache/hybrid.py [86-112]
def _save_to_disk(self) -> None:
"""Save cache to pickle file."""
if not self.cache_file:
return
try:
+ # Create the directory with secure permissions if it doesn't exist
+ cache_dir = os.path.dirname(self.cache_file)
+ os.makedirs(cache_dir, mode=0o700, exist_ok=True)
+
# writing to a temp file first for atomicity
temp_file = self.cache_file + ".tmp"
with open(temp_file, "wb") as f:
pickle.dump(self.cache, f, protocol=pickle.HIGHEST_PROTOCOL)
os.replace(temp_file, self.cache_file)
logger.debug(
f"Saved {len(self.cache)} records to disk cache: {self.cache_file}"
)
except (pickle.PicklingError, IOError) as e:
logger.warning(
f"Failed to save cache to disk: {e}. Cache will not be persisted."
)
raise FailedToWriteHybridCacheError(
f"Failed to save cache to disk: {e}. Cache will not be persisted."
)
except Exception as e:
logger.warning(
f"Unexpected error saving cache: {e}. Cache will not be persisted."
)
raise FailedToWriteHybridCacheError(
f"Unexpected error saving cache: {e}. Cache will not be persisted."
)
Suggestion importance[1-10]: 10
__
Why: This suggestion addresses both a functional bug and a security flaw. The code would crash if the cache directory doesn't exist, and creating it with secure permissions (0o700) is essential to mitigate the pickle vulnerability.
| High
|
| General |
Avoid blocking disk I/O in async method
Improve performance by removing the blocking, synchronous disk write (_save_to_disk()) from the async def get() method. Defer saving to disk to avoid blocking the event loop on read operations.
port_ocean/cache/hybrid.py [114-124]
async def get(self, key: str) -> Optional[Any]:
"""Retrieve value if exists and not expired, else None. Move to end for LRU."""
if key not in self.cache:
return None
value, timestamp, ttl = self.cache[key]
if ttl is not None and timestamp + ttl < time.time():
del self.cache[key]
- self._save_to_disk()
+ # Defer saving to disk to avoid blocking on a get operation
return None
self.cache.move_to_end(key)
return value
Suggestion importance[1-10]: 7
__
Why: The suggestion correctly points out that performing a synchronous, blocking I/O operation (_save_to_disk) inside an async method is a performance anti-pattern that blocks the event loop. Removing it improves performance.
| Medium
|
Improve test for zero-sized cache
Improve the test for a zero-sized cache by adding an assertion to verify that
get() returns None for a key that was just set. This confirms the expected external behavior of immediate eviction.
port_ocean/tests/cache/test_hybrid_cache.py [428-435]
@pytest.mark.asyncio
async def test_hybrid_cache_zero_max_size(temp_cache_file: str) -> None:
"""Test cache behavior with max_size=0 (immediate eviction)."""
cache = HybridCacheProvider(max_size=0, default_ttl=60, cache_file=temp_cache_file)
await cache.set("key1", "value1")
# With max_size=0, nothing should stay in cache
assert len(cache.cache) == 0
+ assert await cache.get("key1") is None
Suggestion importance[1-10]: 5
__
Why: The suggestion correctly identifies that the test for max_size=0 is incomplete. While the existing assertion is correct, adding a check for get returning None makes the test more robust by verifying the external behavior, not just the internal state.
| Low
|
|
| |