Make per-connector queue size and concurrency settings user-configurable or adapative
Problem Description
"Queue is full" messages in logs are indication of backpressure experienced by connector, i.e., we are fetching documents from the 3rd party service/server faster than we are able to ingest into Elasticsearch.
Currently, various connectors (e.g, box, outlook, confluence, jira, servicenow, teams) have hardcoded QUEUE_MEM_SIZE and CONCURRENT_TASKS or MAX_CONCURRENCY settings. Tuning these settings can make the connector fetch documents slower to alleviate backpressure.
Proposed Solution
We should consider making these settings user-configurable or adaptive so that users do not have to manually tune.
As a shorter term effort, it will also be helpful to make the "Queue is full" error actionable (e.g., provide a more detailed explanation about the cause and what remediation actions users can take).
cc: @artem-shelkovnikov , @jedrazb , @seanstory
During my previous SDH week, I spent a few hours coming up with this PR that makes the maximum size of a connector's in-memory queue configurable. After thinking about it over the following weekend, I realized it's not a good solution, and I've since abandoned the PR.
I have an alternative solution I'd like to propose.
Adaptive per-connector rate-limiting based on average time-in-queue
There's two implementation parts to this:
- Extend our in-memory queue to track average time-in-queue for its data elements
- Enhance data sources to leverage the average time-in-queue value to inform rate-limiting behavior
Extend the in-memory queue
We will modify the queue object (which is an extension of the asyncio.Queue object) to timestamp all incoming data on put(). This can be done with a generic wrapper class that encapsulates data and maintains a timestamp value that gets set in the put() method.
During get() calls, that timestamp value is read and added to a running average (which we can calculate intermittently based on some condition), and the data is simply returned.
Rate limiting
Given that the data sources themselves (jira.py, confluence.py etc) are responsible for handling their API calls, they should thus be responsible for implementing their rate-limiting. This could be something as simple as blocking API calls for N amount of time, where
N = queue.avg_time_in_queue * 1.1 // API calls would come in slightly slower than average time-in-queue
or some other calculated value. The above is simply an example, not a final determination of how rate limiting should be informed. I suspect some experiments will have to be done during development to ensure the queue is kept "warm" at all times during a sync, and that the rate limiting is reactive enough in all situations.
Benefits
The biggest benefit to this designs are:
-
Decouples the notion of rate limiting from in-use memory size. Accurately transposing a notion of size (bytes or # of elements in queue) to how long we have to hold up API calls (milliseconds) is a harder task than simply correlating the time of an object in a queue to how long API calls may need to wait.
-
It leaves rate-limiting details up to the discretion of the data source. In other words, we don't have to enforce any rate-limiting rules if we don't want to, but we are providing a very useful data point that we or other contributors can utilize to implement rate limiting.
Design considerations and concerns
-
Cold starts. Rate limiting should not be in effect until the queue "warms up" and the running average stabilizes.
-
Reactivity. A data source may need to check the queue's average time-in-queue often to react quickly to it. How and when this happens may look different based on each data source, but it's something to consider that we may not have a one-size-fits-all solution here.
Do we even need these connector-specific queues? This feels like something that Crest added because they were copying patterns from elsewhere. But we shouldn't need these, right?
But we shouldn't need these, right?
I'm positive we could remove queue usage at the data source-level and implement rate limiting another way, such as maintaining TTL of indexing jobs and having that value be accessible to a data source to inform how much we rate limit.
The big, big drawback of my proposed solution above is that it would only be useful to data sources that utilize queues in their implementations (jira, confluence, MSFT teams etc)
I want to also talk about the queues and rate limiting here.
There is already framework-level throttling done with MemQueue that happens during the ingestion of documents to Elasticsearch:
[ Connector ] -> [ Memory Queue ] -> [ Elasticsearch ]
It serves several purposes:
- Backpressure when ingesting data to Elasticsearch to not blow Elasticsearch up
- Resource limitation - specifically soft-limiting amount of RAM connector can use
In essence it works the following way:
Extractor inside sink.py calls get_docs to get documents and put them into the queue. When the queue is full, extractor stops yielding documents from the connector. In practice it means that the connector stops execution and does not load/send any data. In the meantime Sink tries to empty the memory queue and send the data to Elasticsearch. For Extractor to keep sending the data to Elasticsearch, the queue needs to not be "full" - so ingesting even one document into Elasticsearch will mark the queue as "not full" and reset the timeout.
All of the above happens without connector knowledge.
What happens inside the connectors is not done to throttle connectors. It's a poor man's concurrent execution. This design was somewhat experimental, and in reality brings more problems now because:
- Sources tend to throttle
- Elasticsearch throttles
- Safeguards in the framework do not truely limit the RAM consumption any more
Based on the description of work of MemQueue above, you can see that to avoid a timeout the only thing that needs to happen is Elasticsearch being available to ingest at least some data from the queue.
Re: current connectors and their concurrency.
I strongly believe that that concurrency is inefficient and arbitrary - backpressure and throttling make them semi-useful while also breaking some core features of framework, such as RAM limiting.
I think ideal way forward would be:
- Make a PoC PR that removes concurrency from the connectors
- Benchmark the change on reasonably large sync (e.g. taking 30 minutes)
- Compare results, discuss with the team
By bet would be that dropping concurrency fully from most connectors will not impact their performance negatively. It can conter-intuitively instead potentially improve their performance.
Thank you for the detailed breakdown Artem - this makes a lot of intuitive sense with regards to performance either not getting impacted or even increasing if individual connectors drop their concurrency model. I can take over doing a PoC during free cycles to see if this is a route we can take!
Happy to chat and discuss the approach to this problem in a call too, if you feel it can be beneficial :)