ocean icon indicating copy to clipboard operation
ocean copied to clipboard

[Core] added multiprocessing

Open ivankalinovski opened this issue 6 months ago • 10 comments

User description

Description

What - Process each kind in a separate process.

Why - As we can see when we run ocean with limited resources, it cannot even start to process the big amount of data - it gets terminated for OOM. ocean_single_process

How - Use multiprocessing module. We can enable the multiprocessing by setting a flag. Then we Ocean loads and start a resync we can see the we get batches of data until it get terminated for OOM. ocean_multi_process_part_1

We see that the process is terminated for OOM, but the resync continues, it processing the next kind. we can also notice the RAM usage in docker and the drop when the process was terminated. ocean_multi_process_part_2

Notice that even though we had OOM events the resync completed gracefully and didnt delete entities due to the deletion threshold. We can trigger a new resync from the UI and see how it begins. Also we can notice the RAM usage drop in the docker container when a resync is finished and the spike when it begins. ocean_multi_process_part_3

Type of change

Please leave one option from the following and delete the rest:

  • [ ] Bug fix (non-breaking change which fixes an issue)
  • [x] New feature (non-breaking change which adds functionality)
  • [ ] New Integration (non-breaking change which adds a new integration)
  • [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • [ ] Non-breaking change (fix of existing functionality that will not change current behavior)
  • [ ] Documentation (added/updated documentation)

All tests should be run against the port production environment(using a testing org).

Core testing checklist

  • [ ] Integration able to create all default resources from scratch
  • [ ] Resync finishes successfully
  • [ ] Resync able to create entities
  • [ ] Resync able to update entities
  • [ ] Resync able to detect and delete entities
  • [ ] Scheduled resync able to abort existing resync and start a new one
  • [ ] Tested with at least 2 integrations from scratch
  • [ ] Tested with Kafka and Polling event listeners
  • [ ] Tested deletion of entities that don't pass the selector

Integration testing checklist

  • [ ] Integration able to create all default resources from scratch
  • [ ] Resync able to create entities
  • [ ] Resync able to update entities
  • [ ] Resync able to detect and delete entities
  • [ ] Resync finishes successfully
  • [ ] If new resource kind is added or updated in the integration, add example raw data, mapping and expected result to the examples folder in the integration directory.
  • [ ] If resource kind is updated, run the integration with the example data and check if the expected result is achieved
  • [ ] If new resource kind is added or updated, validate that live-events for that resource are working as expected
  • [ ] Docs PR link here

Preflight checklist

  • [ ] Handled rate limiting
  • [ ] Handled pagination
  • [ ] Implemented the code in async
  • [ ] Support Multi account

Screenshots

Include screenshots from your environment showing how the resources of the integration will look.

API Documentation

Provide links to the API documentation used for this integration.


PR Type

Enhancement, Tests


Description

  • Introduced multiprocessing for resource processing in sync_raw_all

    • Added run_task and OceanTask for isolated resource execution
    • Modified resource loop to use multiprocessing.Process
  • Added new event listener type: TASK

    • Implemented TaskEventListener and settings
    • Integrated into event listener factory and context
  • Added memory leak test for JQ processing

    • Includes data generation, memory tracking, and result analysis
  • Updated fake integration to use new test server and endpoints

    • Adjusted client/server ports and endpoints for isolation

Changes walkthrough 📝

Relevant files
Enhancement
14 files
sync_raw.py
Add multiprocessing for resource processing in sync_raw_all
+40/-24 
run_task.py
Add run_task utility for multiprocessing resource execution
+72/-0   
ocean_task.py
Add OceanTask class for isolated integration execution     
+86/-0   
bootstrap.py
Add create_ocean_task for OceanTask instantiation               
+21/-0   
task.py
Implement TaskEventListener and settings                                 
+40/-0   
factory.py
Integrate TaskEventListener into event listener factory   
+8/-0     
__init__.py
Export TaskEventListener and settings in event_listener module
+7/-0     
ocean.py
Add TASK to event_listener_type property                                 
+1/-1     
event.py
Add TASK to EventType enumeration                                               
+1/-0     
__init__.py
Export OceanTask and run_task in package init                       
+10/-1   
fake_router.py
Refactor to run isolated FastAPI server for fake data       
+8/-4     
fake_client.py
Update API URL and defaults for new test server                   
+3/-3     
main.py
Comment out webhook and fake routes for isolation               
+8/-8     
ocean.py
Comment out webhook endpoints for Gitlab integration         
+32/-32 
Tests
2 files
test.py
Add memory leak test for JQ processing                                     
+336/-0 
memory_test_results.json
Add sample memory test results output                                       
+1/-0     
Miscellaneous
1 files
run.py
Add debug print for main_path in run function                       
+1/-1     
Additional files
2 files
.DS_Store [link]   
memory_test_results.json +1/-0     

Need help?
  • Type /help how to ... in the comments thread for any questions about Qodo Merge usage.
  • Check out the documentation for more information.
  • ivankalinovski avatar May 12 '25 10:05 ivankalinovski

    PR Reviewer Guide 🔍

    Here are some key observations to aid the review process:

    ⏱️ Estimated effort to review: 4 🔵🔵🔵🔵⚪
    🧪 PR contains tests
    🔒 No security concerns identified
    ⚡ Recommended focus areas for review

    Process Management

    The multiprocessing implementation doesn't properly handle resource cleanup or error cases. Processes are created but may not be properly terminated in all scenarios, potentially leading to resource leaks.

    with multiprocessing.Manager() as manager:
        for index,resource in enumerate(app_config.resources):
            # create resource context per resource kind, so resync method could have access to the resource
            # config as we might have multiple resources in the same event
            logger.info(f"Starting resource {resource.kind} with index {index}")
    
            result = manager.dict()
            process = multiprocessing.Process(target=run_task, args=(resource,index,user_agent_type,result))
            process.start()
            process.join()
            creation_results.append(result.get("task"))
            result.clear()
            process.terminate()
    
    Error Handling

    The task execution lacks proper error handling. If a process fails, there's no mechanism to report errors back to the parent process or retry the operation.

    
    async def task():
        await app.integration.start()
        async with event_context(
            EventType.TASK,
            trigger_type="machine",
        ):
            await app.integration.port_app_config_handler.get_port_app_config(
                use_cache=False
            )
            task = await app.integration.process_resource(
                resource, index, user_agent_type
            )
            return task
    
    
    Commented Code

    Critical webhook handling code has been commented out, which would break the GitLab integration's ability to receive and process webhook events.

    # @ocean.router.post("/hook/{group_id}")
    # async def handle_webhook_request(group_id: str, request: Request) -> dict[str, Any]:
    #     event_id = f"{request.headers.get('X-Gitlab-Event')}:{group_id}"
    #     with logger.contextualize(event_id=event_id):
    #         try:
    #             logger.info(f"Received webhook event {event_id} from Gitlab")
    #             body = await request.json()
    #             await event_handler.notify(event_id, body)
    #             return {"ok": True}
    #         except Exception as e:
    #             logger.exception(
    #                 f"Failed to handle webhook event {event_id} from Gitlab, error: {e}"
    #             )
    #             return {"ok": False, "error": str(e)}
    
    
    # @ocean.router.post("/system/hook")
    # async def handle_system_webhook_request(request: Request) -> dict[str, Any]:
    #     try:
    #         body: dict[str, Any] = await request.json()
    #         # some system hooks have event_type instead of event_name in the body, such as merge_request events
    #         event_name: str = str(body.get("event_name") or body.get("event_type"))
    #         with logger.contextualize(event_name=event_name):
    #             logger.info(f"Received system webhook event {event_name} from Gitlab")
    #             await system_event_handler.notify(event_name, body)
    
    #         return {"ok": True}
    #     except Exception as e:
    #         logger.exception(
    #             "Failed to handle system webhook event from Gitlab, error: {e}"
    #         )
    #         return {"ok": False, "error": str(e)}
    

    qodo-merge-pro[bot] avatar May 12 '25 10:05 qodo-merge-pro[bot]

    PR Code Suggestions ✨

    Explore these optional code suggestions:

    CategorySuggestion                                                                                                                                    Impact
    Possible issue
    Fix parallel processing implementation
    Suggestion Impact:The commit completely restructured the multiprocessing approach, but in a different way than suggested. Instead of implementing the suggested pattern of starting all processes first and then joining them, the commit moved to a conditional approach where multiprocessing is optional (controlled by a flag) and implemented a different pattern with process serialization/deserialization using pickle.

    code diff:

    @@ -628,25 +642,16 @@
                     did_fetched_current_state = False
     
                 creation_results: list[tuple[list[Entity], list[Exception]]] = []
    -
    -
    -            from port_ocean.run_task import run_task
    -            manager = multiprocessing.Manager()
    +            logger.info(f"Starting multiprocessing manager")
    +
    +            multiprocessing.set_start_method('fork', True)
                 try:
    -                with multiprocessing.Manager() as manager:
    -                    for index,resource in enumerate(app_config.resources):
    -                        # create resource context per resource kind, so resync method could have access to the resource
    -                        # config as we might have multiple resources in the same event
    -                        logger.info(f"Starting resource {resource.kind} with index {index}")
    -
    -                        result = manager.dict()
    -                        process = multiprocessing.Process(target=run_task, args=(resource,index,user_agent_type,result))
    -                        process.start()
    -                        process.join()
    -                        creation_results.append(result.get("task"))
    -                        result.clear()
    -                        process.terminate()
    -                        # creation_results.append(await self.process_resource(resource,index,user_agent_type))
    +                for index,resource in enumerate(app_config.resources):
    +                    # create resource context per resource kind, so resync method could have access to the resource
    +                    # config as we might have multiple resources in the same event
    +                    logger.info(f"Starting resource {resource.kind} with index {index}")
    +
    +                    creation_results.append(await self.process_resource(resource,index,user_agent_type))
    

    The multiprocessing implementation has a critical issue where processes are
    started and joined sequentially, negating the benefits of parallelism.
    Additionally, calling terminate() after join() is redundant and potentially
    dangerous as the process has already completed.

    port_ocean/core/integrations/mixins/sync_raw.py [636-648]

     with multiprocessing.Manager() as manager:
    -    for index,resource in enumerate(app_config.resources):
    +    processes = []
    +    results = []
    +    for index, resource in enumerate(app_config.resources):
             # create resource context per resource kind, so resync method could have access to the resource
             # config as we might have multiple resources in the same event
             logger.info(f"Starting resource {resource.kind} with index {index}")
     
             result = manager.dict()
    -        process = multiprocessing.Process(target=run_task, args=(resource,index,user_agent_type,result))
    +        results.append(result)
    +        process = multiprocessing.Process(target=run_task, args=(resource, index, user_agent_type, result))
    +        processes.append(process)
             process.start()
    +        
    +    for process in processes:
             process.join()
    +        
    +    for result in results:
             creation_results.append(result.get("task"))
    -        result.clear()
    -        process.terminate()
    

    [Suggestion processed]

    Suggestion importance[1-10]: 9

    __

    Why: The current implementation starts and joins processes sequentially, which defeats the purpose of multiprocessing. The suggested change properly parallelizes the execution by starting all processes first, then joining them, which would significantly improve performance when processing multiple resources.

    High
    Restore correct generator processing
    Suggestion Impact:The commit implemented the suggestion by changing the generator processing from a while loop with pop() back to a for loop, exactly as suggested. This ensures generators are processed in the expected order rather than in reverse.

    code diff:

    -        while len(async_generators) > 0:
    -
    -            generator = async_generators.pop()
    +        for generator in async_generators:
    

    The implementation changes the iteration over generators from a for-loop to a
    while-loop with pop(), which processes generators in reverse order and could
    lead to unexpected behavior. This may disrupt the expected processing sequence.

    port_ocean/core/integrations/mixins/sync_raw.py [317-322]

     async def _register_in_batches(
         self,
         resource: ResourceConfig,
         user_agent_type: UserAgentType,
     ) -> tuple[list[Entity], list[Exception]]:
         ...
    -    while len(async_generators) > 0:
    -
    -        generator = async_generators.pop()
    +    for generator in async_generators:
             try:
                 async for items in generator:
                     number_of_raw_results += len(items)
    

    [To ensure code accuracy, apply this suggestion manually]

    Suggestion importance[1-10]: 8

    __

    Why: The PR changed a for loop to a while loop with pop(), which processes generators in reverse order and could lead to unexpected behavior. This change could disrupt the expected processing sequence and potentially cause bugs.

    Medium
    Fix application settings initialization
    Suggestion Impact:The commit implemented the suggestion by storing the ApplicationSettings instance in a variable called 'application_settings', which allows the settings to be properly applied throughout the application.

    code diff:

    +    application_settings = ApplicationSettings(log_level=log_level, port=0)
    

    The code creates a new ApplicationSettings instance but doesn't store the
    reference, which means the settings won't be properly applied. This can lead to
    incorrect configuration in the multiprocessing environment.

    port_ocean/run_task.py [36]

     def run_task(
         resource: ResourceConfig,
         index: int,
         user_agent_type: UserAgentType,
         result: dict[Any, Any],
         path: str = ".",
         log_level: LogLevelType = "INFO",
     ) -> None:
    -    ApplicationSettings(log_level=log_level, port=8000)
    +    application_settings = ApplicationSettings(log_level=log_level, port=8000)
     
         init_signal_handler()
         # setup_logger(
         #     application_settings.log_level,
         #     enable_http_handler=application_settings.enable_http_logging,
         # )
         logger.info(f"Running task for resource {resource.kind} with index {index}")
    

    [To ensure code accuracy, apply this suggestion manually]

    Suggestion importance[1-10]: 6

    __

    Why: The code creates an ApplicationSettings instance but doesn't store the reference, which means the settings might not be properly applied throughout the application. Storing the reference ensures the settings are properly maintained.

    Low
    • [ ] Update

    qodo-merge-pro[bot] avatar May 12 '25 10:05 qodo-merge-pro[bot]

    This pull request is automatically being deployed by Amplify Hosting (learn more).

    Access this pull request here: https://pr-1620.d1ftd8v2gowp8w.amplifyapp.com

    Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/15142150379/artifacts/3161522054

    Code Coverage Total Percentage: 80.36%

    github-actions[bot] avatar May 20 '25 16:05 github-actions[bot]

    Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/15143045705/artifacts/3161884139

    Code Coverage Total Percentage: 80.35%

    github-actions[bot] avatar May 20 '25 16:05 github-actions[bot]

    Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/15143808564/artifacts/3162163307

    Code Coverage Total Percentage: 80.35%

    github-actions[bot] avatar May 20 '25 17:05 github-actions[bot]

    Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/15143991869/artifacts/3162226798

    Code Coverage Total Percentage: 80.35%

    github-actions[bot] avatar May 20 '25 17:05 github-actions[bot]

    Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/15144690974/artifacts/3162502172

    Code Coverage Total Percentage: 80.37%

    github-actions[bot] avatar May 20 '25 18:05 github-actions[bot]

    Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/15191552762/artifacts/3178722903

    Code Coverage Total Percentage: 80.57%

    github-actions[bot] avatar May 22 '25 16:05 github-actions[bot]

    Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/15235678682/artifacts/3192423348

    Code Coverage Total Percentage: 80.57%

    github-actions[bot] avatar May 25 '25 07:05 github-actions[bot]

    Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/15235848135/artifacts/3192476787

    Code Coverage Total Percentage: 80.57%

    github-actions[bot] avatar May 25 '25 08:05 github-actions[bot]

    Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/15250084165/artifacts/3196018289

    Code Coverage Total Percentage: 80.43%

    github-actions[bot] avatar May 26 '25 09:05 github-actions[bot]

    Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/15251800566/artifacts/3196498113

    Code Coverage Total Percentage: 80.4%

    github-actions[bot] avatar May 26 '25 10:05 github-actions[bot]

    Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/15251838312/artifacts/3196514421

    Code Coverage Total Percentage: 80.43%

    github-actions[bot] avatar May 26 '25 10:05 github-actions[bot]

    Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/15251885354/artifacts/3196529228

    Code Coverage Total Percentage: 80.43%

    github-actions[bot] avatar May 26 '25 10:05 github-actions[bot]

    Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/15256104866/artifacts/3197932764

    Code Coverage Total Percentage: 80.45%

    github-actions[bot] avatar May 26 '25 14:05 github-actions[bot]

    Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/15257143292/artifacts/3198283334

    Code Coverage Total Percentage: 80.46%

    github-actions[bot] avatar May 26 '25 15:05 github-actions[bot]

    Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/15257716644/artifacts/3198463579

    Code Coverage Total Percentage: 80.45%

    github-actions[bot] avatar May 26 '25 15:05 github-actions[bot]

    Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/15267010023/artifacts/3201010450

    Code Coverage Total Percentage: 80.41%

    github-actions[bot] avatar May 27 '25 05:05 github-actions[bot]

    Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/15267391198/artifacts/3201117473

    Code Coverage Total Percentage: 80.41%

    github-actions[bot] avatar May 27 '25 05:05 github-actions[bot]

    Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/15267516459/artifacts/3201150849

    Code Coverage Total Percentage: 80.41%

    github-actions[bot] avatar May 27 '25 05:05 github-actions[bot]

    Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/15268477340/artifacts/3201443116

    Code Coverage Total Percentage: 80.41%

    github-actions[bot] avatar May 27 '25 06:05 github-actions[bot]

    Code Coverage Artifact 📈: https://github.com/port-labs/ocean/actions/runs/15268791725/artifacts/3201547983

    Code Coverage Total Percentage: 80.41%

    github-actions[bot] avatar May 27 '25 07:05 github-actions[bot]