quickwit
quickwit copied to clipboard
Stateless HTTP Indexing sources
Is your feature request related to a problem? Please describe.
We want to write data via the http endpoint and we have a very large amount of data and need to scale out indexing nodes.
The current Ingesting API
is pretty good, but there are a few issues: this is a stateful endpoint, which requires queues to be maintained in local disk. when the http request returns a successful response, the data is not actually written to the index, but to the queue. If the node/disk hangs, the data is lost, but the client doesn't know that.
Describe the solution you'd like
Provide a stateless HTTP endpoint. which don't have local storage, once the handler received Indexing Request
, handler just call the indexing flow
and waiting for the result, then send the result to http client.
HTTP indexing source don't need to maintain the checkpoint, just indexing all the data in the request and then return the result.
Client--> Quickwit HTTP endpoint -> Handler--> indexing actor-> upload split-> send response to http client
Also provide the a write buffer to improve the write throughput
Client->Quickwit HTTP endpoint -> Handler -> Write Buffer(Set the threshold based on time and size) -> indexing actor -> upload split -> send response to http client.
Describe alternatives you've considered
Another option would be to replica the queues in current ingesting API, but this would be more complicated.
Thanks for the bug report, I created a similar issue here: https://github.com/quickwit-oss/quickwit/issues/1442
The approach to keep the connection alive may cause issues on some infrastructure, which is configured with a timeout. I think we could still provide it as a wrapper around the async api, with the caveat that it may timeout in some environments.
which is configured with a timeout
Agree with the configured timeout, If we configure the client side with larger timeout, by keeping the connection alive , this endpoint can be used quite simply.
The approach to keep the connection alive may cause issues on some infrastructure
Would you mind sharing the concerns?
@PSeitz just curious, May I know why we choose to store the data in Rocksdb instead of creating an index and uploading it directly to S3? I have read several related issues and can't find the objectives for storing the logs in local disk.
The data is temporarily persisted on local disk until the index is successfully uploaded to S3. When indexing errors happen, we can resume from the local data. Most databases use this technique to ensure the durability of data.
@guilload Thanks for your reply. I have worked storage engines and distributed storage system and I totally agree with the WAL technique for database.
When indexing errors happen, we can resume from the local data.
How often does this error occur? My understanding, if not often, it is fine to resend the HTTP request if an error occurs. And current implement can't provide durability of data because of the lack of replica. It would seem a simpler design if the indexer did not store this data and let the client wait for the results.
How often does this error occur?
Yes, rarely.
And current implement can't provide durability of data because of the lack of replica.
Yes. Our plan is to replicate the WAL eventually.
It would seem a simpler design if the indexer did not store this data and let the client wait for the results.
Not that simple unfortunately. You'd want to commit rather often to avoid having too much data at risk at any given time. However, committing often hurts the indexing throughput. In addition, in multitenant settings, we can't have too many indexing threads running at the same time.
I believe appending the data into a WAL is a solution with reasonable trade-offs. However, Quickwit needs to a few things better for this solution to work smoothly:
- validate the data before WAL insert (verify that the index exists, records are valid JSON and match the index schema)
- replicate the WAL
committing often hurts the indexing throughput.
agree, Frequent requests for small data can affect throughput. But throughput can also be increased by resizing the request data, and if it is a stateless node, it is very easy to scale out.
I believe appending the data into a WAL is a solution with reasonable trade-offs.
I believe so. A distributed queue make it easy to use.
If you're talking about storing a log and then replicating the local state you're rapidly heading down the path of duplicating what Kafka already does. That replication is going to bring a ton of complexity and this doesn't strike me as something that Quickwit should really be doing.
I get that the current API seems simple as you can just dump data into it, but in reality it's not usable in the real world because writes are completely unacknowledged and there's no way to externally control flow into the API since it's completely unchecked and non-blocking. Put that in a high volume system and the indexing node will drown in data in no time and, as already mentioned, the loss of a node could lose huge amounts of data with no way to know what was and wasn't processed.
As far as an Async API goes, most of the issues above still apply plus you're going to have to introduce some kind of task concept and associated cluster wide state since status polling on the async task can hit any node in the cluster. I think Quickwit is going to require this in the future for other reasons but I don't think these REST API endpoints should be the catalyst.
As @sunisdown orginally suggested these REST APIs probably shouldn't be trying to queue the data, leave that to external systems. Just accept the request, block until the split is committed and then return an acknowledgement for the writes.
Yes that may end up with too many small splits but the concept for split merging already exists so I would think the answer to that lies in enhancements to that process. Also the API should allow larger single requests so that the end user can decide what amount of data they want to send to influence the split sizes.
Then the question just becomes whether the API treats the entire request atomically or if it accepts individual document failures and report errors independently. I'd think it would actually need an option for both.
Elasticsearch provides individual document failures and for truly high volume processes it's what you want as having an entire chunk of data rejected for one bad record is really problematic. That said though, Quickwit does have the advantage of being able to atomically write the entire split so it should be fairly easy to offer an option to do that too. That shouldn't be a requirement but in some scenarios it definitely could be useful.
The other issue is timeouts on blocking requests which will definitely happen but this is a controllable problem and something data teams are used to dealing with. You want your indexing to scale horizontally and control throughput based on the size of requests along with the concurrency of requests and not have any arbitrary bottlenecks in that process.
Yes that may end up with too many small splits but the concept for split merging already exists so I would think the answer to that lies in enhancements to that process.
cant agree more with this, the merging flow should merge the small splits in background, REST API only handle the ingest request and upload splits to S3(or some object storage system).
If you're talking about storing a log and then replicating the local state you're rapidly heading down the path of duplicating what Kafka already does.
the current Ingest API is not an Ingest API
but a queue, ingest source
consume the message from ingest API queue
, Queue and Ingest API should be two different things. Quickwit can provide a queue and let users write data to the queue via http. But this should let the user know that it's a Queue API, the Ingest REST API should be lighter. After requesting the Ingest REST API, the user should know that the request failed or succeeded and that the data can be found from Quickwit, as @kstaken mentioned atomically.
The PushAPI has been introduced for users who want an alternative to cheaply index a small amount of application logs. For large use case, Quickwit is meant to grab its data straight from Kafka/Kinesis/Pulsar etc.
I don't think a rest PushAPI that does not queue documents, produces a split right away and blocks until this is done is something someone with a lot of data would use. To maximize throughput this stateless endpoint would want to receive 1GB per pipeline every 30s. A rest push API does not seem suited. Producing smaller splits over smaller batch is possible but impacts write amplification and CPU usage.
We could have some kind of S3 source, where you upload ~1GB batches of logs on S3, and quickwit pulls the data from there, index it, and optionally remove the batch files.
I don't think a rest PushAPI that does not queue documents, produces a split right away and blocks until this is done is something someone with a lot of data would use.
There are a very large number of services that use HTTP APIs to provide services that write a lot of data, ElasticSearch/ Clickhouse/Snowflake are all there and HTTP can be used as a way to upload large amounts of data. It's just that the current quickwit PUSH API is not suitable for uploading large amounts of data.
Producing smaller splits over smaller batch is possible but impacts write amplification and CPU usage.
I believe users can find the right data size for each request. It's just that the data source is different, one is Kafka and one is HTTP, the batch size can be similar, and even to improve the small batch we can sacrifice latency by having the service maintain a write buffer and send it to the indexer in bulk to write to the same split, kind of like a queue, except that only when the split is written to S3 will a successful response be returned.
I'm sure you are familiar with all this, but there is no harm to be explicit/exhaustive.
Elastic PushAPI bulk api is not stateless per se. It has an internal queue. The difference with Quickwit is that it is properly replicated. You can optionally force a commit to do make something that from the outside behave like your stateless http indexing source, but
- it still writes sequentially on the differnet replicas transaction log.
- it is not the default
- it will strongly impact your indexing throughput if your batch are small. https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-refresh.html
We could add support refresh=true
or refresh=wait_for
in quickwit, and increase the payload limit to 100MB.
I am not entirely sure this would be the best solution for your use case however.
Let's put name on the different possibilities.
- Non-Replicated PushAPI (like Quickwit today): Bad Durability. If the disk is dead we can lose data.
- Stateless PushAPI: Each push results in producing a split. This requires either large payloads OR an increased write amplification.
- Non-replicated PushAPI + refresh=true. More or less the same trade-off as stateless PushAPI, but internally uses the plumbing of Non-replicated PushAPI, triggers a commit and wait for it.
- Replicated PushAPI + refresh=true: That's interesting but it requires work, and this is considered low priority at the moment. One hidden benefit vs taping on kafka directly would be for efficiently handling many indexes.
- Just use Kafka: By far the most efficient & production friendly solution.
Thanks for you replay @fulmicoton , let's ignore my own use case and focus on the endpoint. My use case is not mentioned in this issue. I didn't open this issue for my own use case, which of course might help, but more because I think Quickwit needed a similar endpoint for writing data atomically. That's why I'm opening an issue to discuss with you :)
Elastic PushAPI bulk api is not stateless per se. It has an internal queue. The difference with Quickwit is that it is properly replicated.
What attracted me to Quickwit was the stateless architecture, which allows for easy scale-up and scale-down. I know that Elastic is stateful, which also makes writing to it simple and straightforward, more like a storage system. Quickwit's write operations are more complex.
We know that a Stateless PushAPI is not a Silver Bullet
, It has advantages, such as easy to use.
It also has disadvantages, such as performance issues when users are not familiar with quickwit. But we shouldn't let the potential problems stop us from offering similar options.
Does refresh=true
mean to publish the split to S3?
In fact, in my opinion Non-replicated PushAPI + refesh=True
is the same as Stateless PushAPI
, the implementation behind it is different, but it is the same for the user, both are atomic commits.
If quickwit have refresh=ture
, then we don't even need replica, replica is to prevent data from being lost. When the user sends a request for refesh=true
, and if the request fails, the user can just send it again.
Replicated PushAPI is, as I understand it, a Queue, where the user submits, and quickwit ensures that the data is not lost, but there is no guarantee when the user will be able to query the data. It depends on when the indexer publishes split to S3. correct me if I'm wrong. @fulmicoton
refresh=true
here would mean:
- trigger a commit
- wait for all the pending docs to be published before returning.
Replicated PushAPI is, as I understand it, a Queue, where the user submits, and quickwit ensures that the data is not lost, but > there is no guarantee when the user will be able to query the data. It depends on when the indexer publishes split to S3. correct me if I'm wrong. @fulmicoton
That's about right. I would elaborate on the "no-guarantee" part. The system can be configured to target a publish within a given interval (40s typically). This is not guaranteed however. If there is some unexpected error, or if the service is too busy to keep up, it could take longer.
Non-replicated PushAPI + refresh=true. More or less the same trade-off as stateless PushAPI, but internally uses the plumbing of Non-replicated PushAPI, triggers a commit and wait for it.
Noted with thanks.
I would elaborate on the "no-guarantee" part. The system can be configured to target a publish within a given interval (40s typically). This is not guaranteed however.
It seems that we need an atomic commit, either Stateless PushAPI
or Non-Replicated Push API + refresh=true
will work, at the moment I'm leaning towards Non-Replicated Push API + Refresh=true
as this has a higher throughput than Stateless PushAPI
.
There is also a need for a Replica PushAPI, which I would prefer to call Distributed Queue.
I don't think a rest PushAPI that does not queue documents, produces a split right away and blocks until this is done is something someone with a lot of data would use.
LOL, I'm jonesing for this ability. :-) Granted I only need it right now because the Kafka ingest is temporarily limited but to me it's an obvious thing that really should exist.
That said, I absolutely get what you're trying to accomplish with the current APIs and ease of use is clearly important but hopefully that can be achieved without gimping the system or encouraging bad practice from users and writing unchecked data in a non-blocking manner is absolutely a bad practice in my opinion. I have nightmares from having to explain the issues with that kind of thing to junior developers.
I have no major issue with the queueing it self, just with the lack of acknowledgement on writes and the inability to receive back pressure from the system to control an ingest pipeline.
So here's a suggestion for another model
Non-replicated PushAPI + queue=true/false + atomic=true/false
.
By default queue=true, atomic=false
making this a minimally blocking API that explicitly acknowledges all writes to the queue on a per document basis and enables per document failures without invalidating the entire request.
queue=true
it will block while validating and preparing the data before queueing it. The ackowledgement provided will be that it has been queued, not that it has been indexed. This will still allow independent and more optimized batching of data before the split is committed.
It's critical that the queue also have a size limit and this becomes a blocking call if the queue exceeds that limit. Once the queue has space then the request can proceed or it can timeout and be retried. The write to the queue will need to be atomic in regard to a single request so it will need space for the entire request, not just on a per record basis where it could be competing with other requests. This should provide adequate back pressure to control upstream pipelines.
In the short term the risks of data loss just need to be clearly documented along with the option to do queue=false
and then long term that can be eased if queue replication is added.
queue=false
In this case the queue is not used, or a dedicated temporary queue is used, and the entire request is converted directly to a single split. This exists so you have an option to get a directly acknowledged write of a set of data. The call blocks until all processing of the split is complete and the split is committed.
This certainly has the risk of creating too many small splits if used carelessly but I'm assuming a future background process will exist in general to handle these scenarios.
atomic=false
Means each document is considered independently and can succeed or fail independently. The result from the request will need to include the acknowledgement about each document independently along with any failures.
atomic=true
Means that all the data in the request is valid and gets committed in a split or none of it does. This is false
by default because the normal unit of atomicity here is a single record and it's a very special case to require the entire bulk request to succeed atomically.
An aside on Elasticsearch, indexing there is vastly more complicated and the internal architecture is so different from Quickwit that it's probably not worth wasting too much time on the details but a couple points may be worth mentioning.
First the queues in ES are in memory only and are not replicated like suggested above. They exist at the node level and are used to both hold incoming bulk requests and the results of the scatter process for the processing of those requests. This has a fundamentally different purpose than what the current Non-Replicated PushAPI
is trying to do. That's probably more akin to the per shard transaction log that ES uses.
Second, the refresh
setting isn't controlling the durability of the data, it's not a commit, it's simply controlling the visibility of the newly written data to search requests. It's really just a way to override the default index refresh interval in cases where you want to guarantee data is immediately available for reading. If you don't provide the refresh
parameter the writes will still be acknowledged once the value in wait_for_active_shards
is met but won't be made available for search until the next automatic refresh on the index. Again this doesn't really matter to Quickwit as it doesn't have this complexity and a split will simply become available to any query that starts after it has been marked Published
.
An aside on Elasticsearch, indexing there is vastly more complicated and the internal architecture is so different from Quickwit that it's probably not worth wasting too much time on the details but a couple points may be worth mentioning.
Agreed! But I will still have to reply your message to make sure I don't look like a fool :)
That's probably more akin to the per-shard transaction log that ES uses.
I was talking about the per-shard transaction log. It is replicated on each replica.
Second, the refresh setting isn't controlling the durability of the data, it's not a commit, it's simply controlling the visibility of the newly written data to search requests.
It is not a commit in the normal database sense of the word, but it triggers a (soft)commit in Lucene/Tantivy. Quickwit and Elastic want to make PushAPI calls durable while respecting the need for search building to be batchy. Both do that by using a transaction log. In elastic, that log is naturally replicated because elastic uses sharding/doc-replication scheme. It is difficult to imagine otherwise.
In quickwit, the data is written to disk but the log is not replicated yet. "The data is written to disk" <- that statement itself would deserve its own discussion.
The plan to add replication there is closer to a kafka than elasticsearch.
In quickwit, the data is written to disk but the log is not replicated yet. "The data is written to disk" <- that statement itself would deserve its own discussion. The plan to add replication there is closer to a kafka than elasticsearch.
That would be great that if quickwit has a PUSH API
closer to a kafka, which can help improve the write throughput.
It is not a commit in the normal database sense of the word, but it triggers a (soft)commit in Lucene/Tantivy. Quickwit and Elastic want to make PushAPI calls durable while respecting the need for search building to be batchy.
That's the reason why I raise this API, I believe that Quickwit should provide an endpoint which is ACID
supported. At the moment I feel that the replication push API
only satisfies D
.