client-python
client-python copied to clipboard
[client] Connector queuing auto backpressure - Scheduler (#6325)
Proposed changes
- Adding query "read" connector
- Update query "ping" connector
- Setting up a scheduler and process in helper
- New variable environment "queue_threshold", default = 500 Mo
- New variable environment "duration_period"
Related issues
- https://github.com/OpenCTI-Platform/opencti/issues/6325
Checklist
- [x] I consider the submitted work as finished
- [x] I tested the code for its functionality
- [ ] I wrote test cases for the relevant uses case
- [ ] I added/update the relevant documentation (either on github or on notion)
- [ ] Where necessary I refactored code to improve the overall quality
Further comments
If this is a relatively large or complex change, kick off the discussion by explaining why you chose the solution you did and what alternatives you considered, etc...
Tested locally and everything seems good to me, here are different cases I've tested:
- For
duration_periodtime in ISO8601 format:- Case "0" in string, considered as
run_and_terminateprocess => works - Case "PT0S" in string, considered as
run_and_terminateprocess => works - Case 0 in int, considered as
run_and_terminateprocess => works - Case "P0S" (deliberate mistake), an error is raised because it is not the good format
- Case "PT3S" (for a scheduled period time of 3 seconds) =>everything works as expected
- Case "0" in string, considered as
Steps to Reproduce and my understanding:
-
Stop the Worker:
- Begin by stopping the worker process.
-
Run the Connector:
- Start the connector process.
-
Scheduler Checks
duration_periodConfiguration:- The scheduler will review the
duration_periodsetting.
- The scheduler will review the
-
Main Process Execution:
- The main process of the connector will start.
- We must wait for this process to complete before entering the scheduler configuration and initialization.
-
Scheduler Activation:
- The scheduler activates once the specified period time is reached.
-
Queue Size Threshold Check:
- The scheduler checks the queue size threshold, defined as
queue_threshold. - If no environment variable is set, the default threshold is 500MB.
- The scheduler checks the queue size threshold, defined as
-
Queue Size Condition:
- If the connector's queue size exceeds the queue threshold, the message callback process (connector's main process) will pause.
- The process will not resume until the queue is ingested and reduced sufficiently, allowing it to restart during the next scheduler check.
During the test, the connector was modified to run the scheduler as following:
def process_message(self):
"""Main process of the connector."""
[...]
def run(self):
self.helper.schedule_iso(
message_callback=self.process_message,
duration_period=self.config.duration_period,
)