Add custom-api Ocean integration
User description
Description
What -
Why -
How -
Type of change
Please leave one option from the following and delete the rest:
- [ ] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] New Integration (non-breaking change which adds a new integration)
- [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected)
- [ ] Non-breaking change (fix of existing functionality that will not change current behavior)
- [ ] Documentation (added/updated documentation)
All tests should be run against the port production environment(using a testing org).
Core testing checklist
- [ ] Integration able to create all default resources from scratch
- [ ] Resync finishes successfully
- [ ] Resync able to create entities
- [ ] Resync able to update entities
- [ ] Resync able to detect and delete entities
- [ ] Scheduled resync able to abort existing resync and start a new one
- [ ] Tested with at least 2 integrations from scratch
- [ ] Tested with Kafka and Polling event listeners
- [ ] Tested deletion of entities that don't pass the selector
Integration testing checklist
- [ ] Integration able to create all default resources from scratch
- [ ] Resync able to create entities
- [ ] Resync able to update entities
- [ ] Resync able to detect and delete entities
- [ ] Resync finishes successfully
- [ ] If new resource kind is added or updated in the integration, add example raw data, mapping and expected result to the
examplesfolder in the integration directory. - [ ] If resource kind is updated, run the integration with the example data and check if the expected result is achieved
- [ ] If new resource kind is added or updated, validate that live-events for that resource are working as expected
- [ ] Docs PR link here
Preflight checklist
- [ ] Handled rate limiting
- [ ] Handled pagination
- [ ] Implemented the code in async
- [ ] Support Multi account
Screenshots
Include screenshots from your environment showing how the resources of the integration will look.
API Documentation
Provide links to the API documentation used for this integration.
PR Type
New Integration, Documentation, Tests
Description
-
Added a new Ocean integration for custom API ingestion.
- Fetches data from a configured API endpoint.
- Maps and transforms data into Port entities.
- Supports API authentication with optional Bearer token.
-
Defined configuration, blueprints, and mapping rules for the integration.
-
Included documentation for setup, usage, and contribution guidelines.
-
Added basic test and development environment setup.
Changes walkthrough 📝
| Relevant files | |||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| Enhancement | |||||||||||||||||
| Configuration changes | 8 files
| ||||||||||||||||
| Tests | 1 files
| ||||||||||||||||
| Documentation | |||||||||||||||||
| Miscellaneous | 1 files
| ||||||||||||||||
| Additional files | 1 files
|
Need help?
Type /help how to ...in the comments thread for any questions about Qodo Merge usage.Check out the documentation for more information.
PR Reviewer Guide 🔍
Here are some key observations to aid the review process:
| ⏱️ Estimated effort to review: 2 🔵🔵⚪⚪⚪ |
| 🧪 No relevant tests |
| 🔒 Security concerns Sensitive information exposure: |
⚡ Recommended focus areas for reviewError Handling
|
PR Code Suggestions ✨
Explore these optional code suggestions:
| Category | Suggestion | Impact |
| Possible issue |
Add missing dependenciesThe integrations/custom-api/setup.py [7]
Suggestion importance[1-10]: 9__ Why: The empty install_requires list is a critical issue as it would cause the integration to fail when installed as a package. Adding the required dependencies ensures the integration will work properly in production environments. | High |
Fix async generator contextThe function is missing the integrations/custom-api/main.py [6-22]
Suggestion importance[1-10]: 8__ Why: Moving the yield statement inside the async context is important for proper async generator behavior. This change ensures data is yielded while the HTTP client is still active, which is a significant improvement for resource management and follows best practices for async generators. | Medium | |
| ||
This pull request is automatically being deployed by Amplify Hosting (learn more).
Access this pull request here: https://pr-1576.d1ftd8v2gowp8w.amplifyapp.com
Hey @yarden-port thanks for the contribution! The current implementation of the generic API integration is a good start, but to make it more robust and reusable across different APIs, I recommend adding support for the following common patterns found in our other integrations:
-
Authentication:
- Support multiple auth methods (Bearer token, Basic auth, API keys)
- Add OAuth2 support with token refresh
- Allow configurable auth headers and methods
-
Pagination:
- Implement support for different pagination types:
- Cursor-based (next_cursor, after)
- Offset-based (skip, offset)
- Page-based (page, page_size)
- Link header pagination
- Handle different pagination response formats
- Implement support for different pagination types:
-
Rate Limiting:
- Add support for rate limit headers
- Implement configurable rate limiting
- Add concurrent request limiting
- Include backoff strategies
-
Error Handling:
- Add proper HTTP status code handling
- Implement retry mechanisms
- Add custom exception types
- Improve error logging
-
Request Configuration:
- Add configurable timeouts
- Support custom headers
- Improve query parameter handling
- Add support for different content types
-
Response Processing:
- Add response validation
- Support different response formats
- Add error response parsing
- Consider adding response caching
-
Logging and Monitoring:
- Add request/response logging
- Include performance metrics
- Add error tracking
- Add debug information
-
Configuration Management:
- Add environment variable support
- Add configuration validation
- Add default values
- Distinguish between required/optional params
-
Webhook Support:
- Add webhook authentication
- Add webhook payload validation
- Add webhook event processing
- Add webhook secret management
-
Data Transformation:
- Add support for different data formats
- Add data normalization
- Add field mapping
- Add data validation
These features would make the integration more flexible and suitable for a wider range of APIs. Each feature could be made configurable to support different API requirements. Here are some example implementations for the key features:
- Authentication:
class APIClient:
def __init__(self, auth_config: dict):
self.auth_type = auth_config.get("type", "bearer")
self.auth_value = auth_config.get("value")
@property
def auth_headers(self) -> dict:
if self.auth_type == "bearer":
return {"Authorization": f"Bearer {self.auth_value}"}
elif self.auth_type == "basic":
encoded = base64.b64encode(f"{self.auth_value}".encode()).decode()
return {"Authorization": f"Basic {encoded}"}
elif self.auth_type == "api_key":
return {"X-API-Key": self.auth_value}
- Pagination:
class PaginatedResponse:
def __init__(self, response: dict, pagination_type: str):
self.response = response
self.pagination_type = pagination_type
def get_next_params(self) -> Optional[dict]:
if self.pagination_type == "cursor":
return {"cursor": self.response.get("next_cursor")}
elif self.pagination_type == "offset":
return {"offset": self.response.get("offset") + self.response.get("limit")}
elif self.pagination_type == "page":
return {"page": self.response.get("page") + 1}
- Rate Limiting:
class RateLimitedClient:
def __init__(self, rate_limit: int, time_period: int):
self.rate_limiter = AsyncLimiter(rate_limit, time_period)
self._semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)
async def make_request(self, *args, **kwargs):
async with self.rate_limiter:
async with self._semaphore:
response = await self._make_request(*args, **kwargs)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 60))
await asyncio.sleep(retry_after)
return await self.make_request(*args, **kwargs)
return response
- Error Handling:
class APIError(Exception):
def __init__(self, message: str, status_code: int, response: dict):
self.message = message
self.status_code = status_code
self.response = response
super().__init__(f"{message} (Status: {status_code})")
class APIClient:
async def _handle_error(self, response: httpx.Response):
if response.status_code == 404:
raise APIError("Resource not found", 404, response.json())
elif response.status_code == 401:
raise APIError("Authentication failed", 401, response.json())
elif response.status_code == 429:
raise APIError("Rate limit exceeded", 429, response.json())
- Configuration Management:
class APIConfig:
def __init__(self, config: dict):
self.api_url = config.get("api_url")
self.auth_config = config.get("auth", {})
self.pagination_config = config.get("pagination", {})
self.rate_limit_config = config.get("rate_limit", {})
self._validate_config()
def _validate_config(self):
if not self.api_url:
raise ValueError("api_url is required")
if not self.auth_config.get("value"):
raise ValueError("auth.value is required")
- Response Processing:
class APIResponse:
def __init__(self, response: dict, mapping_config: dict):
self.raw_response = response
self.mapping_config = mapping_config
def transform(self) -> dict:
transformed = {}
for field, mapping in self.mapping_config.items():
value = self._get_value(mapping["path"])
if mapping.get("transform"):
value = self._apply_transform(value, mapping["transform"])
transformed[field] = value
return transformed
- Webhook Support:
class WebhookHandler:
def __init__(self, secret: str):
self.secret = secret
async def verify_signature(self, request: Request) -> bool:
signature = request.headers.get("X-Signature")
if not signature:
return False
expected = hmac.new(
self.secret.encode(),
await request.body(),
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, expected)
These examples show the basic structure for each feature. You can combine them into a more comprehensive client:
class GenericAPIClient:
def __init__(self, config: dict):
self.config = APIConfig(config)
self.rate_limiter = RateLimitedClient(
self.config.rate_limit_config.get("limit", 100),
self.config.rate_limit_config.get("period", 60)
)
async def get_paginated_data(self, endpoint: str) -> AsyncGenerator[dict, None]:
params = {}
while True:
response = await self.rate_limiter.make_request(
"GET",
f"{self.config.api_url}/{endpoint}",
params=params
)
paginated = PaginatedResponse(
response.json(),
self.config.pagination_config.get("type", "cursor")
)
yield from response.json().get("data", [])
next_params = paginated.get_next_params()
if not next_params:
break
params = next_params
Feel free to reach out anytime if you'd like to consult on something or bounce around ideas — always happy to hear your thoughts and share from my experience if it’s helpful 😸