[Feature] Add batch event comsumption
Related to: https://github.com/OpenCTI-Platform/opencti/issues/13372
Mooved here @Megafredo
Hello @Renizmy, thank you for the switch! There is just one issue with the linter. It seems to be a matter of indentation in the linter configuration.
Fixed, sorry
Codecov Report
:white_check_mark: All modified and coverable lines are covered by tests.
:white_check_mark: Project coverage is 30.84%. Comparing base (bc3f6d9) to head (84b0e1c).
:warning: Report is 785 commits behind head on master.
Additional details and impacted files
@@ Coverage Diff @@
## master #13261 +/- ##
===========================================
+ Coverage 16.26% 30.84% +14.57%
===========================================
Files 2846 2913 +67
Lines 412135 192309 -219826
Branches 11512 39176 +27664
===========================================
- Hits 67035 59317 -7718
+ Misses 345100 132992 -212108
| Flag | Coverage Δ | |
|---|---|---|
| opencti | 30.84% <ø> (+14.57%) |
:arrow_up: |
| opencti-front | 2.45% <ø> (-1.44%) |
:arrow_down: |
| opencti-graphql | 68.20% <ø> (+0.96%) |
:arrow_up: |
Flags with carried forward coverage won't be shown. Click here to find out more.
:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.
:rocket: New features to boost your workflow:
- :snowflake: Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
- :package: JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.
@Renizmy FYI, we'd like to improve a bit and refactor the code before merging ! :)
Hi @Renizmy,
Thank you for your contribution. As @helene-nguyen mentioned, we'd like the code to be refactored before merging. The main concern is that the new class (ListenStreamBatch) and method (listen_stream_batch) duplicate existing code.
Instead of creating a new class and method, we suggest implementing a message_callback wrapper that can adapt the existing listen_stream function from a single callback per message to a batched callback. You should be able to use the code you've already introduced to create this adapter.
Then each batch-capable connector (in regards of the targeted API) could be able to use this adapter to receive batch of message instead individual message.
Usage (assuming wrapper is named create_batch_callback and the process_message of the connector becomes process_message_batch) would be something like that:
self.helper.listen_stream(message_callback=self.process_message)
--->
batch_callback = self.helper.create_batch_callback(self.process_message_batch, self.batch_size, self.batch_timeout, self.max_batches_per_minute)
self.helper.listen_stream(message_callback=batch_callback)
Would you be open to making this change?