kafka-rest icon indicating copy to clipboard operation
kafka-rest copied to clipboard

KREST-2746 Reflect slow responses from Kafka back to the http client

Open ehumber opened this issue 2 years ago • 5 comments

This PR adds code to reflect Kafka rate limiting back to the http client.

While the produce api response from Kafka does indicate that the client is being throttled to the Kafka Produce java client, this information is not exposed to the end user of that client (ie Kafka REST), so we have to infer the throttling another way.

Kafka throttles by delaying its response back to the client. So we can assume that if we are getting a growing backlog of requests waiting to be sent to kafka then kafka is throttling REST.

When this happens (or we have a backlog of calls to Kafka for some other reason) we are obliged to send responses back to kafka in the same order as the requests arrived with us, so all we can do is add 429 responses, once we hit the "throttle from this point "queue depth, to the end of the response queue. The requests for these are not sent to kafka, and so should reduce the traffic reaching kafka, possibly enough to bring it back under the throttle limit.

If the queue depth doesn't shrink sufficiently, and instead grows to the max limit, then the connection is chopped after the grace period in my previous (disconnect clients nicely) PR has expired.

ehumber avatar Jul 18 '22 13:07 ehumber

@dimitarndimitrov The failing unit test I mentioned is testWriteToChunkedOutputAfterTimeout. (although the new 429 test fails in the same way too, I've not been testing that one)

ehumber avatar Jul 18 '22 13:07 ehumber

@dimitarndimitrov ignore the first commit, I'd not tidied up properly. This one shows the hang behaviour from easymock where the close on the mappingIterator never returns:

logs look like

** has next delayed response
THREAD EXECUTOR TRIGGERSclass io.confluent.kafkarest.response.StreamingResponse$ComposingStreamingResponse
THREAD EXECUTOR closeAll from thread executor call through to a sub close class io.confluent.kafkarest.response.StreamingResponse$ComposingStreamingResponse
CLOSE could be from either THREAD or FINALLY ->  calls through to inputStreaming closeclass io.confluent.kafkarest.response.StreamingResponse$ComposingStreamingResponse
CLOSE could be either from THREAD or FINALLY -> calls through to inputStreaming closeclass io.confluent.kafkarest.response.StreamingResponse$InputStreamingResponse
CLOSE in json stream which calls mapping iterator close
delegate not nullclass com.fasterxml.jackson.databind.MappingIterator$$EnhancerByCGLIB$$4a178d4f

Process finished with exit code 130 (interrupted by signal 2: SIGINT)

ehumber avatar Jul 18 '22 13:07 ehumber

and with the debugger running it looks like this, and passes the test.

There is a difference (after I said there wasn't) :)

the debug line writing to sink.

This only writes out if I have a breakpoint in the async writing to sink method (eg line 471), otherwise I see the same behaviour with and without the debugger

THREAD EXECUTOR TRIGGERSclass io.confluent.kafkarest.response.StreamingResponse$ComposingStreamingResponse
THREAD EXECUTOR closeAll from thread executor call through to a sub close class io.confluent.kafkarest.response.StreamingResponse$ComposingStreamingResponse
CLOSE could be from either THREAD or FINALLY ->  calls through to inputStreaming closeclass io.confluent.kafkarest.response.StreamingResponse$ComposingStreamingResponse
CLOSE could be either from THREAD or FINALLY -> calls through to inputStreaming closeclass io.confluent.kafkarest.response.StreamingResponse$InputStreamingResponse
[2022-07-18 14:51:09,785] DEBUG Writing to sink (io.confluent.kafkarest.response.StreamingResponse:477)
CLOSE in json stream which calls mapping iterator close
delegate not nullclass com.fasterxml.jackson.databind.MappingIterator$$EnhancerByCGLIB$$4a178d4f
delegate now closed
THREAD EXECUTOR closeAll from thread executor calling responsequeue.close
CLOSE response queue from FINALLY or THREAD class io.confluent.kafkarest.response.StreamingResponse$AsyncResponseQueue

So that's a nice big clue I'm going to go investigate :)

ehumber avatar Jul 18 '22 13:07 ehumber

@dimitarndimitrov @AndrewJSchofield This PR could do with merging before mid-November if possible, so that we can deal with kafka back pressure before increasing (or removing) any byte based rate limits for produce.

It would be great if I could get some feedback by the end of October.

ehumber avatar Oct 14 '22 10:10 ehumber

Sorry I didn't manage to get this tidied up in time :(.

I made a PR from a branch in confluentinc/kafka-rest, so if my original branch from my fork goes missing when I leave, hopefully you still have the code

https://github.com/confluentinc/kafka-rest/pull/1098

For this PR I recommend you don't merge it until you need to. That will most probably be when you look at rate limiting around REST produce, and potentially remove it in REST and rely on backpressure from Kafka to keep the requests at a sensible limit.

ehumber avatar Jan 06 '23 14:01 ehumber