client-python icon indicating copy to clipboard operation
client-python copied to clipboard

[client] Connector queuing auto backpressure - Scheduler (#6325)

Open Megafredo opened this issue 1 year ago • 1 comments

Proposed changes

  • Adding query "read" connector
  • Update query "ping" connector
  • Setting up a scheduler and process in helper
  • New variable environment "queue_threshold", default = 500 Mo
  • New variable environment "duration_period"

Related issues

  • https://github.com/OpenCTI-Platform/opencti/issues/6325

Checklist

  • [x] I consider the submitted work as finished
  • [x] I tested the code for its functionality
  • [ ] I wrote test cases for the relevant uses case
  • [ ] I added/update the relevant documentation (either on github or on notion)
  • [ ] Where necessary I refactored code to improve the overall quality

Further comments

If this is a relatively large or complex change, kick off the discussion by explaining why you chose the solution you did and what alternatives you considered, etc...

Megafredo avatar Jul 10 '24 05:07 Megafredo

Tested locally and everything seems good to me, here are different cases I've tested:

  • For duration_period time in ISO8601 format:
    • Case "0" in string, considered as run_and_terminate process => works
    • Case "PT0S" in string, considered as run_and_terminate process => works
    • Case 0 in int, considered as run_and_terminate process => works
    • Case "P0S" (deliberate mistake), an error is raised because it is not the good format
    • Case "PT3S" (for a scheduled period time of 3 seconds) =>everything works as expected

Steps to Reproduce and my understanding:

  1. Stop the Worker:

    • Begin by stopping the worker process.
  2. Run the Connector:

    • Start the connector process.
  3. Scheduler Checks duration_period Configuration:

    • The scheduler will review the duration_period setting.
  4. Main Process Execution:

    • The main process of the connector will start.
    • We must wait for this process to complete before entering the scheduler configuration and initialization.
  5. Scheduler Activation:

    • The scheduler activates once the specified period time is reached.
  6. Queue Size Threshold Check:

    • The scheduler checks the queue size threshold, defined as queue_threshold.
    • If no environment variable is set, the default threshold is 500MB.
  7. Queue Size Condition:

    • If the connector's queue size exceeds the queue threshold, the message callback process (connector's main process) will pause.
    • The process will not resume until the queue is ingested and reduced sufficiently, allowing it to restart during the next scheduler check.

During the test, the connector was modified to run the scheduler as following:

    def process_message(self):
        """Main process of the connector."""
        [...]

    def run(self):
        self.helper.schedule_iso(
            message_callback=self.process_message,
            duration_period=self.config.duration_period,
        )

helene-nguyen avatar Jul 15 '24 07:07 helene-nguyen