ocean icon indicating copy to clipboard operation
ocean copied to clipboard

Add custom-api Ocean integration

Open yarden-port opened this issue 8 months ago • 4 comments

User description

Description

What -

Why -

How -

Type of change

Please leave one option from the following and delete the rest:

  • [ ] Bug fix (non-breaking change which fixes an issue)
  • [ ] New feature (non-breaking change which adds functionality)
  • [ ] New Integration (non-breaking change which adds a new integration)
  • [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • [ ] Non-breaking change (fix of existing functionality that will not change current behavior)
  • [ ] Documentation (added/updated documentation)

All tests should be run against the port production environment(using a testing org).

Core testing checklist

  • [ ] Integration able to create all default resources from scratch
  • [ ] Resync finishes successfully
  • [ ] Resync able to create entities
  • [ ] Resync able to update entities
  • [ ] Resync able to detect and delete entities
  • [ ] Scheduled resync able to abort existing resync and start a new one
  • [ ] Tested with at least 2 integrations from scratch
  • [ ] Tested with Kafka and Polling event listeners
  • [ ] Tested deletion of entities that don't pass the selector

Integration testing checklist

  • [ ] Integration able to create all default resources from scratch
  • [ ] Resync able to create entities
  • [ ] Resync able to update entities
  • [ ] Resync able to detect and delete entities
  • [ ] Resync finishes successfully
  • [ ] If new resource kind is added or updated in the integration, add example raw data, mapping and expected result to the examples folder in the integration directory.
  • [ ] If resource kind is updated, run the integration with the example data and check if the expected result is achieved
  • [ ] If new resource kind is added or updated, validate that live-events for that resource are working as expected
  • [ ] Docs PR link here

Preflight checklist

  • [ ] Handled rate limiting
  • [ ] Handled pagination
  • [ ] Implemented the code in async
  • [ ] Support Multi account

Screenshots

Include screenshots from your environment showing how the resources of the integration will look.

API Documentation

Provide links to the API documentation used for this integration.


PR Type

New Integration, Documentation, Tests


Description

  • Added a new Ocean integration for custom API ingestion.

    • Fetches data from a configured API endpoint.
    • Maps and transforms data into Port entities.
    • Supports API authentication with optional Bearer token.
  • Defined configuration, blueprints, and mapping rules for the integration.

  • Included documentation for setup, usage, and contribution guidelines.

  • Added basic test and development environment setup.


Changes walkthrough 📝

Relevant files
Enhancement
2 files
debug.py
Add debug entry point for integration                                       
+4/-0     
main.py
Implement API data ingestion logic                                             
+22/-0   
Configuration changes
8 files
setup.py
Define setup configuration for custom API integration       
+13/-0   
.env.example
Provide example environment variables for integration       
+6/-0     
blueprints.json
Define blueprint schema for API items                                       
+26/-0   
port-app-config.yml
Add mapping configuration for API data                                     
+17/-0   
spec.yaml
Specify integration type and configurations                           
+20/-0   
poetry.toml
Configure poetry virtual environment settings                       
+3/-0     
pyproject.toml
Define project dependencies and tooling configurations     
+113/-0 
sonar-project.properties
Add SonarQube project configuration                                           
+2/-0     
Tests
1 files
test_sample.py
Add placeholder test for integration                                         
+2/-0     
Documentation
3 files
CHANGELOG.md
Add changelog for custom API integration                                 
+8/-0     
CONTRIBUTING.md
Add contribution guidelines for integration                           
+7/-0     
README.md
Add README with setup and usage instructions                         
+26/-0   
Miscellaneous
1 files
Makefile
Add Makefile for integration infrastructure                           
+1/-0     
Additional files
1 files
__init__.py [link]   

Need help?
  • Type /help how to ... in the comments thread for any questions about Qodo Merge usage.
  • Check out the documentation for more information.
  • yarden-port avatar Apr 17 '25 09:04 yarden-port

    PR Reviewer Guide 🔍

    Here are some key observations to aid the review process:

    ⏱️ Estimated effort to review: 2 🔵🔵⚪⚪⚪
    🧪 No relevant tests
    🔒 Security concerns

    Sensitive information exposure:
    The .env.example file contains what appear to be real credentials (client ID and client secret) rather than placeholder values. These should be replaced with dummy values like 'YOUR_CLIENT_ID' to prevent accidental credential leakage.

    ⚡ Recommended focus areas for review

    Error Handling

    The API response handling lacks proper error handling for malformed JSON or unexpected data structures. The code assumes the API returns a list but doesn't validate the response structure before processing.

        response = await client.get(api_url, headers=headers)
        response.raise_for_status()
        data = response.json()
    
    logger.info(f"Fetched {len(data)} records from API")
    
    yield data
    
    Missing Pagination

    The implementation doesn't handle API pagination, which could lead to incomplete data ingestion if the API returns paginated results.

    async with httpx.AsyncClient() as client:
        response = await client.get(api_url, headers=headers)
        response.raise_for_status()
        data = response.json()
    
    Exposed Credentials

    The example environment file contains what appears to be real credentials that should be replaced with placeholder values.

    OCEAN__PORT__CLIENT_ID=qXrDCFPN62wlieF3YU50trhlroTaTero
    OCEAN__PORT__CLIENT_SECRET=4eYJOAzVsIQv7RpKp8PbWS7TbbEPoLrBxjviKuOhr68w7CeKCdjfouhXw354fvWe
    

    qodo-code-review[bot] avatar Apr 17 '25 09:04 qodo-code-review[bot]

    PR Code Suggestions ✨

    Explore these optional code suggestions:

    CategorySuggestion                                                                                                                                    Impact
    Possible issue
    Add missing dependencies

    The install_requires list is empty, but the integration depends on external
    packages like httpx and port_ocean. These dependencies should be specified to
    ensure the integration works correctly when installed.

    integrations/custom-api/setup.py [7]

    -install_requires=[],
    +install_requires=["httpx", "port_ocean>=0.22.4"],
    
    • [ ] Apply this suggestion
    Suggestion importance[1-10]: 9

    __

    Why: The empty install_requires list is a critical issue as it would cause the integration to fail when installed as a package. Adding the required dependencies ensures the integration will work properly in production environments.

    High
    Fix async generator context

    The function is missing the yield statement inside an async generator. In an
    async generator function, you need to use yield within the async context to
    properly yield items asynchronously. Currently, the function is yielding data
    after the async context has closed.

    integrations/custom-api/main.py [6-22]

     @ocean.on_resync("apiItem")
     async def sync_api_items(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
         api_url = ocean.integration_config["api_url"]
         auth_token = ocean.integration_config.get("auth_token")
     
         headers = {"Accept": "application/json"}
         if auth_token:
             headers["Authorization"] = f"Bearer {auth_token}"
     
         async with httpx.AsyncClient() as client:
             response = await client.get(api_url, headers=headers)
             response.raise_for_status()
             data = response.json()
     
    -    logger.info(f"Fetched {len(data)} records from API")
    +        logger.info(f"Fetched {len(data)} records from API")
     
    -    yield data
    +        yield data
    
    • [ ] Apply this suggestion
    Suggestion importance[1-10]: 8

    __

    Why: Moving the yield statement inside the async context is important for proper async generator behavior. This change ensures data is yielded while the HTTP client is still active, which is a significant improvement for resource management and follows best practices for async generators.

    Medium
    • [ ] Update

    qodo-code-review[bot] avatar Apr 17 '25 09:04 qodo-code-review[bot]

    This pull request is automatically being deployed by Amplify Hosting (learn more).

    Access this pull request here: https://pr-1576.d1ftd8v2gowp8w.amplifyapp.com

    Hey @yarden-port thanks for the contribution! The current implementation of the generic API integration is a good start, but to make it more robust and reusable across different APIs, I recommend adding support for the following common patterns found in our other integrations:

    1. Authentication:

      • Support multiple auth methods (Bearer token, Basic auth, API keys)
      • Add OAuth2 support with token refresh
      • Allow configurable auth headers and methods
    2. Pagination:

      • Implement support for different pagination types:
        • Cursor-based (next_cursor, after)
        • Offset-based (skip, offset)
        • Page-based (page, page_size)
        • Link header pagination
      • Handle different pagination response formats
    3. Rate Limiting:

      • Add support for rate limit headers
      • Implement configurable rate limiting
      • Add concurrent request limiting
      • Include backoff strategies
    4. Error Handling:

      • Add proper HTTP status code handling
      • Implement retry mechanisms
      • Add custom exception types
      • Improve error logging
    5. Request Configuration:

      • Add configurable timeouts
      • Support custom headers
      • Improve query parameter handling
      • Add support for different content types
    6. Response Processing:

      • Add response validation
      • Support different response formats
      • Add error response parsing
      • Consider adding response caching
    7. Logging and Monitoring:

      • Add request/response logging
      • Include performance metrics
      • Add error tracking
      • Add debug information
    8. Configuration Management:

      • Add environment variable support
      • Add configuration validation
      • Add default values
      • Distinguish between required/optional params
    9. Webhook Support:

      • Add webhook authentication
      • Add webhook payload validation
      • Add webhook event processing
      • Add webhook secret management
    10. Data Transformation:

      • Add support for different data formats
      • Add data normalization
      • Add field mapping
      • Add data validation

    These features would make the integration more flexible and suitable for a wider range of APIs. Each feature could be made configurable to support different API requirements. Here are some example implementations for the key features:

    1. Authentication:
    class APIClient:
        def __init__(self, auth_config: dict):
            self.auth_type = auth_config.get("type", "bearer")
            self.auth_value = auth_config.get("value")
            
        @property
        def auth_headers(self) -> dict:
            if self.auth_type == "bearer":
                return {"Authorization": f"Bearer {self.auth_value}"}
            elif self.auth_type == "basic":
                encoded = base64.b64encode(f"{self.auth_value}".encode()).decode()
                return {"Authorization": f"Basic {encoded}"}
            elif self.auth_type == "api_key":
                return {"X-API-Key": self.auth_value}
    
    1. Pagination:
    class PaginatedResponse:
        def __init__(self, response: dict, pagination_type: str):
            self.response = response
            self.pagination_type = pagination_type
            
        def get_next_params(self) -> Optional[dict]:
            if self.pagination_type == "cursor":
                return {"cursor": self.response.get("next_cursor")}
            elif self.pagination_type == "offset":
                return {"offset": self.response.get("offset") + self.response.get("limit")}
            elif self.pagination_type == "page":
                return {"page": self.response.get("page") + 1}
    
    1. Rate Limiting:
    class RateLimitedClient:
        def __init__(self, rate_limit: int, time_period: int):
            self.rate_limiter = AsyncLimiter(rate_limit, time_period)
            self._semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)
            
        async def make_request(self, *args, **kwargs):
            async with self.rate_limiter:
                async with self._semaphore:
                    response = await self._make_request(*args, **kwargs)
                    if response.status_code == 429:
                        retry_after = int(response.headers.get("Retry-After", 60))
                        await asyncio.sleep(retry_after)
                        return await self.make_request(*args, **kwargs)
                    return response
    
    1. Error Handling:
    class APIError(Exception):
        def __init__(self, message: str, status_code: int, response: dict):
            self.message = message
            self.status_code = status_code
            self.response = response
            super().__init__(f"{message} (Status: {status_code})")
    
    class APIClient:
        async def _handle_error(self, response: httpx.Response):
            if response.status_code == 404:
                raise APIError("Resource not found", 404, response.json())
            elif response.status_code == 401:
                raise APIError("Authentication failed", 401, response.json())
            elif response.status_code == 429:
                raise APIError("Rate limit exceeded", 429, response.json())
    
    1. Configuration Management:
    class APIConfig:
        def __init__(self, config: dict):
            self.api_url = config.get("api_url")
            self.auth_config = config.get("auth", {})
            self.pagination_config = config.get("pagination", {})
            self.rate_limit_config = config.get("rate_limit", {})
            
            self._validate_config()
            
        def _validate_config(self):
            if not self.api_url:
                raise ValueError("api_url is required")
            if not self.auth_config.get("value"):
                raise ValueError("auth.value is required")
    
    1. Response Processing:
    class APIResponse:
        def __init__(self, response: dict, mapping_config: dict):
            self.raw_response = response
            self.mapping_config = mapping_config
            
        def transform(self) -> dict:
            transformed = {}
            for field, mapping in self.mapping_config.items():
                value = self._get_value(mapping["path"])
                if mapping.get("transform"):
                    value = self._apply_transform(value, mapping["transform"])
                transformed[field] = value
            return transformed
    
    1. Webhook Support:
    class WebhookHandler:
        def __init__(self, secret: str):
            self.secret = secret
            
        async def verify_signature(self, request: Request) -> bool:
            signature = request.headers.get("X-Signature")
            if not signature:
                return False
                
            expected = hmac.new(
                self.secret.encode(),
                await request.body(),
                hashlib.sha256
            ).hexdigest()
            
            return hmac.compare_digest(signature, expected)
    

    These examples show the basic structure for each feature. You can combine them into a more comprehensive client:

    class GenericAPIClient:
        def __init__(self, config: dict):
            self.config = APIConfig(config)
            self.rate_limiter = RateLimitedClient(
                self.config.rate_limit_config.get("limit", 100),
                self.config.rate_limit_config.get("period", 60)
            )
            
        async def get_paginated_data(self, endpoint: str) -> AsyncGenerator[dict, None]:
            params = {}
            while True:
                response = await self.rate_limiter.make_request(
                    "GET",
                    f"{self.config.api_url}/{endpoint}",
                    params=params
                )
                
                paginated = PaginatedResponse(
                    response.json(),
                    self.config.pagination_config.get("type", "cursor")
                )
                
                yield from response.json().get("data", [])
                
                next_params = paginated.get_next_params()
                if not next_params:
                    break
                params = next_params
    

    Feel free to reach out anytime if you'd like to consult on something or bounce around ideas — always happy to hear your thoughts and share from my experience if it’s helpful 😸

    shalev007 avatar Apr 20 '25 12:04 shalev007