java-operator-sdk icon indicating copy to clipboard operation
java-operator-sdk copied to clipboard

Scaling Operators

Open csviri opened this issue 3 years ago • 7 comments

DISCLAIMER: Note that need for scaling operators disputable problem, putting it here, since this is a quite simple solution that is feasible to implement. We do not plant to implement this, it's rather part of discussions and explore the topic, also as part for the options we have on table regarding performance improvements and scaling.

The Problem

Currently we can run only 1 instance of operator. The fundamental problem is that, operator instance receives all the events, so multiple instances would process same events in parallel. In the following will build up and see how to overcome this problem with a simple approach with drawbacks and then taking it a step further that might handle the event more efficiently (but added complexity).

Solution - First Step

The solution in short is to deploy instances of operator as a StatefulSet. So every pod / instance (for sake of simplicity let's talk about 3 instances) will receive and ordinal index: 0,1,2. So we will have three pods, with operators running inside will be aware of their ordinal index and the number of replicas (in our case 3).

The trick is that the operator will filter the events. All the events received (either related to the custom resource or dependent resources - mostly from informers) eventually are further propagated as an internal event, with a ResourceID (a name + namespace of the target custom resource ) in it. This ResourceID identifies the target custom resource we want to reconcile. What we can do is hash this resource id, and mod it by the number of replicas: hash(ResourceID) mod [number of replicas] in our case 3. and only process the events when this number is equal the ordinal index of the pod.

In this way we will receive all the events from K8S API but will just drop the events which are not valid for us based on the approach above, so not process them at all, not even cache them.

Untitled Diagram drawio

This works nicely with a set of predefined number of instances. To have it auto scaled, we would need to reconfigure the instances (mainly the number of replicas input, this is doable quite nicely using config maps, or just watching the StatefulSet inside the operator).

Improvement: Pre-Hashing with Mutating Webhook

In the previous approach we would receive still all the events regarding all the resources. What we could do is use label selectors and already ask for the events related to the particular operator instance. For this we need to provide a mutation webhook that will add the labels to the custom resources. Again we can use the same formula just adding a label:

metadata:
  labels:
    operatorid: [hash(name+namespace?) mod replica-count ]

(Or just in a round-robin fashion?)

This can be done on creation of the resource. Note that on the secondary resource the operator can add the labels when it's creating them, we need the admission controller just for the main custom resource. We have to make sure also that the label is not removed (but can be updated), that might be done via validation admission controller.

Although for auto-scaling (mainly downscaling) this might mean reprocessing the resources, which is not desriable.

Untitled Diagram drawio (1)

Remarks

  • If a pod crashes and restarts and will produce the events it just missed, since will receive the same ordinal index.
  • Note that this is not necessarily about auto scaling, with little tweaks that works too.

Implementation

To implement this we just need filters or label selectors added to the Event Sources, which are already possible.

And have a simple webhook implemented, for the improved case.

(Maybe we can think about a generic filtering mechanism (think of Servlet filter) that is cross cutting through all event sources, will create a separate issue for that)

Notes

  • Getting the id to the pod https://stackoverflow.com/questions/50750672/is-there-a-way-to-get-ordinal-index-of-a-pod-with-in-kubernetes-statefulset-conf
  • An analogy could be spotted on the improved variant with sticky sessions
  • (see also: https://en.wikipedia.org/wiki/Consistent_hashing for autoscaling and redistributing ids)

csviri avatar Jan 27 '22 17:01 csviri

We talked about sharding approaches on https://github.com/fabric8io/kubernetes-client/issues/3587#issuecomment-1014652601 and the related pr.

Instead of a webhook we were talking about a "coordinating" operator being responsible for assigning indexes to the resources. The idea was the same, you need something to monitor create / update to ensure the necessary index label is applied.

Some of the additional thoughts:

  • you'd need a good hashing function wrt the number of statefulset replicas - @manusa suggested using the uid.
  • you need to transitively set the necessary index label on dependent resources, so you can filter all the way down. Otherwise each operator replica ends up watching all dependent resources
  • if you ever needed to modify the replica count (or need to be defensive to any modification), the index can be changed - which would be seen as a delete event by each operator replica informer looking for that label value. You may not want to process that rehashing as delete/recreate - in that case there would need to be an additional lookup or other mechanism to establish the difference between a changed index, and a full deletion.

shawkins avatar Jan 27 '22 21:01 shawkins

thx @shawkins for response!

The hashing is over ResourceID because that is always at hand (note that these are not necessarily only Kubernetes resources), but can be external resources. And this is how the primary resource addressed internally. Might now be ideal from hashing perspective.

Instead of a webhook we were talking about a "coordinating" operator being responsible for assigning indexes to the resources.

This is definitelly the way to go when scaling up or down, on creation probably safer with the mutation hook. Maybe the combination of both is the ideal (and most complex :) )

I think this is doable in the operator itself, with watching the the StatefulSet, it would receive change to replica set, and change the objects, but also the dependent resources could be re-indexed in one step. Alternatively:

you need to transitively set the necessary index label on dependent resources, so you can filter all the way down. Otherwise each operator replica ends up watching all dependent resources

Yes, so this can be done in creation time by the operator. The only problem I see is with the auto-scaling, how are the dependent resource re-indexed - this cannot be a generic coordinating operator neither a web-hook, since its not aware of the dependent resources:

  1. One way to do it in general, is just re-index them by the new target operator itself, since the informer won't have them in the cache, read it from API server and change the index directly in the handling operator.
  2. An other is to have it done by the original operator (when coping with the delete event you mentioned) in a two phase manner. Mark the custom resource to skip for processing, change the dependent resource indexes too, then unmark it.

Anyways that is the harder part, but also definitely doable. Just the point is that the main operator should handle the dependent resources, since that is the only component aware of them. This still could be done a quite nicely in a generic way.

csviri avatar Jan 28 '22 16:01 csviri

I think we should better describe what is the problem we are trying to solve and where the bottleneck is.

As example, it is not clear how having multiple replicas as described in option 1 would help, it looks to me it can even exacerbate the problems (but I may have not understood well the solution so, pardon me if what I'm saying is wrong) as:

  1. the k8s api server has to notify N more clients with the same set of events
  2. each event need to be de-serialized to a Java object before being processed and discarded
  3. in fact we have just burning some CPU cycles and we put lot of pressure on the garbage collector as well to the CPU caches

The only reasonable solution to me, seems the one based on an label selection however this does not seems very trivial and it is also business sensitive (in some case you can just stop the process, in some others - as mentioned - you may need to wait for the reconcile loop to go to a safe point) so I wonder if this class of problems should be better solved by an external operator.

lburgazzoli avatar Jan 29 '22 10:01 lburgazzoli

@lburgazzoli thanks for the feedback!

I agree that using only the part 1 of the approach would need some careful measurement, and definitely does not sound that nice without the part 2.

With the second part made a little addition, IMHO the only little more problem there is with the dependent resources re-indexing, changed the wording, it's actually does not need stop, rather a skip in the other instance, while the dependent resources are changed. Will describe this more in depth later.

I think I did not articulate enough that this issue just like this we have some topic we talk about and create an issue just initiate a discussion and explore the topic. Added the "exploratory" label also to this issues and added a disclaimer regarding to this. Just to make it also explicit: I don't think we should implement such issues until there is explicit request from users and a strong rational behind it backed by numbers. We recognize that the currently the Kubernetes setup does not scale the internal operators, while capable of handling very large clusters.

This scaling and performance aspects sometimes pops up, more like in discussion level, so again this solution seemed to be simple enough, to take a step further and actually create an issue from it.

The problem this could help is memory utilization of a single operator instance, and some other aspects on startup speed of the operator (since we do cache sync on startup). So currently in operators to have fast reconciliation loops we cache all the dependent resources (if possible), in order to limit the hits on the Kubernetes API server and make reconciliation execution faster. Obviously this could lead to large memory consumption especially in case there is a large amount of custom resources and related dependent resources. So basically, what is described is basically sharding, that could helm with this problem, of memory utilization (but also processor) in a single instance.

(Caching is an interesting topic, it's been an ongoing discussion, how to limit caches or have them in a smart way. Recently controller-runtime also made improvements regarding to this subject. Will probably create issue regarding to that, since we staring to get to the point when it could be feasible to implement. )

csviri avatar Jan 30 '22 15:01 csviri

I think I did not articulate enough that this issue just like this we have some topic we talk about and create an issue just initiate a discussion and explore the topic. Added the "exploratory" label also to this issues and added a disclaimer regarding to this. Just to make it also explicit: I don't think we should implement such issues until there is explicit request from users and a strong rational behind it backed by numbers. We recognize that the currently the Kubernetes setup does not scale the internal operators, while capable of handling very large clusters.

I understand, no worries :)

The problem this could help is memory utilization, and some other aspects on startup speed of the operator (since we do cache sync on startup). So currently in operators to have fast reconciliation loops we cache all the dependent resources (if possible), in order to limit the hits on the Kubernetes API server and make reconciliation execution faster. Obviously this could lead to large memory consumption especially in case there is a large amount of custom resources and related dependent resources. So basically, what is described is basically sharding, that could helm with this problem, of memory utilization (but also processor) in a single instance.

I guess here we need to define what we mean by memory utilization because it is true that with sharding you can reduce the memory consumed by a single operator but overall, you may end up requiring more memory as each operator has it s own memory overhead :).

To be clear, I have nothing against this exploration and eventually with an implementation (quite the opposite) but I think that we need to recognize that there's no single solution to solve the problem as often it boils down to the business so maybe, a good start would be to provide some recipes for common scenarios.

lburgazzoli avatar Jan 31 '22 07:01 lburgazzoli

I guess here we need to define what we mean by memory utilization because it is true that with sharding you can reduce the memory consumed by a single operator but overall, you may end up requiring more memory as each operator has it s own memory overhead :).

Made it more clear that it is a memory utilization of a single instance of operator, not the overall memory usage.

csviri avatar Jan 31 '22 09:01 csviri

cc @salaboy (the discussion on our side regarding scaling mentioned on JBCN :) )

csviri avatar Jul 19 '22 10:07 csviri