Respecting knative-serving concurrency in knative-eventing
The Knative stack (eventing and serving) is pretty awesome. Overall it makes infra easy to use for our devs and ML engineers. On Kuberntes I don't wanna miss it anymore. It has advanced scaling capabilities based on concurrency. It can handle scaling very well in the following two scenarios:
- a microservces which answers very quickly (milliseconds)
- long running jobs which take more than a couple of minutes with the JobSink approach
For the scenarios in between of multiple seconds or up to a minute seem to be a blind spot especially when combining Knative eventing and serving.
Possible configurations
First of all a couple of (central) configurations and behaviors (if something is wrong or there is anything else, please let me know!):
-
kubectl get cm -n knative-eventing config-kafka-broker-data-plane -o yaml
max.poll.records=50
This is how many messages it pulls kinda in parallel per partition. It is a central configuration and is set on Kubernetes cluster level (to be more specific on knative-eventing-kafka installation level).
-
kubectl get cm -n knative-eventing kafka-broker-config -o yaml
default.topic.partitions: "10"
The partitions are like a multiplier. max.poll.records x default.topic.partitions = 50 x 10 = 500 messages are pulled out kinda in parallel. This configuration is on Broker level but requires at least an additional kafka-broker-config-* ConfigMap.
-
kubectl get trigger <example-trigger> -o yaml
metadata:
annotations:
kafka.eventing.knative.dev/delivery.order: ordered
Another possibility to restrict traffic is using ordered. "An ordered consumer is a per-partition blocking consumer that waits for a successful response from the CloudEvent subscriber before it delivers the next message of the partition." Which means with 1 partition it handles only 1 message at the same time. With 10 partitions it handles 10 messages at the same time.
- Another important variable is the DeliverySpec.Timout which can be set on Broker and Trigger level (which makes it independent for every Sink/microservice). I could not find the default values but I assume it is 30s.
delivery:
timeout: PT30S
- concurrency hard-limit
spec:
template:
spec:
containerConcurrency: 50
Concurrency determines the number of simultaneous requests that can be processed by each replica of an application at any given time.
Background
I am working in a Python world with AI. Python is not good in doing things fast and concurrently. AI (we do a lot in the vision area with segments and so forth) needs a lot processing time and is therefore slow. We also face often a tipping point, when overloading Python FastAPI/Flask, it has a much lower throughput then. Hence, with a lower concurrency it can achieve the highest throughput.
Scenario and possible tweaks
Lets assume I have a microservice which can handle 2 messages in parallel and needs 10s for each message. Hence the throughput is around 12 messages per minute per replica. Keeping the default values with max.poll.records=50, default.topic.partitions: "10" and timeout: PT30S (unordered), it would overload the microservice immateriality if we get 500 messages and nothing gets processed correctly.
- Now, we could set a
containerConcurrency: 2(hard-limit), which ensures only getting 2 messages at FastAPI/Flask level to get the max performance but it is on "queue-proxy"-level. The "queue-proxy" needs to buffer all other messages and produces timeouts after 30s too (I could not even identify how to increase the timeout there). - If we would decrease the
max.poll.recordsto 2, it would throttle all other microservices in our Kubernetes cluster which could also handle super huge throughput. - We could set a combination of
default.topic.partitions: "10"(another number possible) andkafka.eventing.knative.dev/delivery.order: ordered. But first the partition configuration is on Broker level and this is quite static.
All this leads to timeouts which triggers retries which over floats the system even more.
I know knative-eventing is in the end an abstraction of Kafka. But would it be possible to implement an intelligent mechanism that knative-eventing respects and is aware of knative-serving concurrency and only pulls the messages out of kafka to utilize the microservices as best as possible (but not too much)? While also scaling more replicas if the demand requires that?
Expected behavior
So the expected behavior would be:
- Lets assume we have 500 messages in queue and we have only 1 replica which can only have 2 concurrent requests, then it should not send more than 2 concurrent requests to that replica (hard-limit)
- Then it should scale accordingly a couple of replicas to deal with the traffic
In my opinion it is the last missing puzzle for Knative in terms of scalability.
I am fully open to hop on a call for any discussion =)
cc @pierDipi @matzew
@tikr7 hi, have you tried to set a limit at the Knative Service side and use Activator to do the backpressure (queue the requests) or there is some reason that Eventing should not send a request at all.
Hi @skonto, thank you for looking into it :)
How could it look like to send the traffic first over the Activator (svc and url)? I can try that.
I could assume the dispatcher would timeout if the Activator does not respond after 30s... but I am not sure.
We had a similar issue where our processing time varied between 1 to 5 minutes. Sometimes, our service didn’t respond within 5 minutes, which is the default timeout for requests forwarded to Knative Service (Ksvc). To address this, we increased timeoutSeconds and responseStartTimeoutSeconds.
Additionally, you can scale your triggers using this guide: Enabling and configuring autoscaling of triggers with KEDA. This allows you to control how many events are forwarded to your Ksvc with replication of dispatcher pods.
If you have a small number of jobs with high processing times, I would recommend the JobSink approach, as you mentioned.
Hi @s-kali
The default timeout is 5 minutes? Queue-proxy seems to have 30s which leads to retries on our end. Anyway, increasing the timeout is not the best option. Timeouts protect also the system if something really goes wrong.
The KEDA part with scaling triggers sound interesting. What I don't understand there: What does scaling of triggers mean? Triggers are something virtually. They configure the dispatcher component of knative-eventing. Does it scale the dispatcher? I would like to scale the Sink (KSvc) configured within the Trigger and that it does not get too overloaded until it scaled :)
The KEDA part with scaling triggers sound interesting. What I don't understand there: What does scaling of triggers mean? Triggers are something virtually. They configure the dispatcher component of knative-eventing. Does it scale the dispatcher? I would like to scale the Sink (KSvc) configured within the Trigger and that it does not get too overloaded until it scaled :)
You're right, triggers are virtual objects, but they influence the behavior of dispatcher pods. For example, a single dispatcher pod can handle around 20 trigger replicas. If you need to increase the event flow, scaling up your triggers will also scale up the dispatcher pods accordingly to meet your requirements
This issue is stale because it has been open for 90 days with no
activity. It will automatically close after 30 more days of
inactivity. Reopen the issue with /reopen. Mark the issue as
fresh by adding the comment /remove-lifecycle stale.