strimzi-kafka-operator
strimzi-kafka-operator copied to clipboard
Topic Operator: Changing replication factor
Currently the topic controller supports ConfigMaps having a replicas
key, which is used to declare the replication factor of a topic. This is only partially supported currently: changes to the replicas
in the CM and changes to the partition assignment in Kafka don't get mirrored.
Kafka doesn't really support "replication factor" as a first class concept. It's basically used only when creating topics, and thereafter the APIs deal only with lists of assigned brokers. Indeed I think it's possible for different partitions of the same topic to have different replication factors. By exposing a readable replicas
we end up having to synthesize something which doesn't really exist in Kafka. This is actually not trivial, because during reassignment the assigned replicas will be (a subset of) the union of the old and new replicas, which could lead to a reported replication factor being higher during the reassignment than it would be either before or after the reassignment. It would be possible to do, however, if we only updated the replicas
each time a partition reassignment for the topic finished (strictly when reassignments of topic 0 finished, since this is what we base our fiction of replication factor on), but that involves knowing which such reassignments finish. Today that involves parting the JSON in the reassign_partitions znode, and in the future KIP-240 might change that.
An alternative to this might be to support setting replicas
, but then "consuming it" and removing it from the CM as part of the topic controllers processing.
Another alternative might be to not support replicas
as a replication factor at all, and instead report the assigned replicas. This would still have the problem of reporting more replicas during a reassignment, but in this respect we're no different from Kafka.
I think that configuring the replication is important. We need to have it in some form. If integer replicas
doesn't work, can we use the replica assignment instead? I.e. have an object / map in he config map which would describe the desired replication?
An alternative to this might be to support setting replicas, but then "consuming it" and removing it from the CM as part of the topic controllers processing.
I don't think this would work well. We should minimize overwriting he config maps to minimum. This can lead to issues such as overwriting the config map again and again ever time the user does 'kubectl apply ...`. It might also cause problems to people using GitOps
Is this duplicate with #142?
Replication factor change is a special-case of partition reassignment
As mentioned, Kafka doesn't support Replication Factor (RF) as a first class concept. It's just a convenience at the time a topic is created. Thereafter each partition can, in theory, have its own number of replicas, and changing those is done through partition reassignment.
Challenges due to Kafka's reassignment API
The only supported way to reassign partition replicas in Apache Kafka is via the CLI tool kafka-reassign-partitions.sh
.
This effectively writes some JSON to the /admin/reassign_partitions
Zookeeper znode, which the Kafka Controller then actions.
The Controller will update the znode as each partition's reassignment is completes
and delete it once all the partitions in the JSON have been reassigned.
Writing directly to Zookeeper is not a publicly supported API and might change without warning in a future version of Kafka.
A consequence of this design is that reassignment must be done in batches. This poses two problems:
- There's no natural mechanism for the Topic Operator to be able to batch reassignments from multiple
KafkaTopics
each modified at a slightly different time. - There must be some mechanism for queuing reassignments which which can't be processed right now because of a pre-existing in-flight reassignment (which could take a very long time to complete).
Long term vision
In the bigger picture, users with more than a hundful of partitions will use some kind of partition balancer tool to perform a heuristic optimization. In this case being able to reassign partitions via Kubernetes is not really a requirement. Furthermore such tools are written to operate directly on Kafka (and in practice Zookeeper).
Putting these facts together suggests we should not try to aim to solve the whole reassignment problem in the Topic Operator, but should instead support use of a dedicated balancer (either an existing one or one we write ourselves).
There are two possible ways that could work:
- Another component, distinct from the Topic Operator and part of the dedicated balancer, takes responsibility for watching
KafkaTopic
s for RF changes. The TO could still provide one-way sync from Kafka to K8S of the RF. This means the TO doesn't have to know anything about this other component, other than knowing to ignore K8s-side changes to RF. - The TO takes responsibility for watching for RF changes, and informs another component, which does the actual work.
The problem with this bigger picture is we're not ready yet to start on that work.
Short term solution
The reality is that we should support RF changes even today. To do that we would need to make the TO configurable to use a built-in approach just supporting RF change, not reassignment. In the future the TO would be configured for the external balancer, but in the short term it would be configured for the internal RF-changer. We would provide only the functionality for changing RF in the absence of a true balancer, with the expectation that we will provide a full balancer in the future.
Fixed throttles
We would configure the TO with a fixed set of throttles to be used when it is performing reassignment.
- Add a new env var in the TO for passing the fixed throttles.
- This would be represented in the
TopicOperatorSpec
in theKafka.entityOperator.topicOperator
resource (see below) - When we actually throttle for a batch we would intersect the actual partitions in the reassignment with the configured
throttleReplicas
Delayed initiation of reassignment
We would delay kicking off a reassignment for some period of time (rfChangeLingerSeconds
) to allow a primitive way for users to batch changes.
For example, when a modification changes RF we'd wait up to 1 minute for changes in other topics' RF.
Once the time was up (or the configured number of topics changed — maxPartitionsInBatch
) we'd start a batch of reassignments.
When many RF changes had got queued up (because a reconciliation was already happening) we would use the same maxPartitionsInBatch
configuration when creating the next batch. In this case we would need a way to pick the maxPartitionsInBatch
from the set of eligible partitions. Queueing them based on the order of the observation of their need for RF change would be possible, but this queue cannot persist across TO restarts. It would therefore be impossible to guarantee this order. Since the user knows the batch size they can always impose their own ordering simply by when they make the RF changes to the KafkaTopics
. So perhaps a simpler way would be to batch partitions randomly in this case.
Checking for reassignment completion
We could use a ZK watch on admin/reassign_partitions
to know that a partition had completed reassignment. This requires maintaining in memory the set of partitions being reassigned so that we can deduce which has completed when admin/reassign_partitions
changes.
Because we cannot completely trust the watch mechanism we would also regularly poll for changes (completionPollInterval
).
Example configuration for internal
reassignment
kind: Kafka
spec:
# ...
entityOperator:
topicOperator:
reassignment:
type: internal # in the future we'd support external
throttledRates:
1: # broker 1
leader: 70000000
follower: 70000000
2: # broker 2
leader: 70000000
follower: 70000000
throttledReplicas:
my-topic:
leader:
0: 101
1: 102
follower:
0: 102
1: 101
rfChangeLingerSeconds: 60
maxPartitionsInBatch: 20
completionPollInterval: 60s
In the case of internal
reassignment we would use env vars
to pass this configuration to the TO.
Maintaining state
We would use a KafkaTopic.status.assignmentStatuses
object to make explicit the assigned -> queued -> reassigning -> assigned
status changes for each partition.
kind: KafkaTopic
metadata:
name: my-topic
spec:
partitions: 2
replicas: 3
config:
# ...
status:
assignmentStatuses:
# Map or list associating partition id with a state
- 0: assigned
- 1: queued
A partition is in the queued
when the TO has observed that the KafkaTopic.spec.replicas
differs from the Kafka topic's current RF and is not currently being reassigned.
A partition is in the reassigning
state it is currently being reassigned.
A partition is in the assigned
state the TO has observed that the KafkaTopic.spec.replicas
is the same as the Kafka topic's current RF and is not currently being reassigned.
The transition assigned -> queued
would happen when the TO is notified of a change to KafkaTopic
(or on periodic reconciliation) which changed the RF.
The transition queued -> reassigning
would happen then the TO adds the partition to a batch and starts the reassignment.
The transition reassigning -> assigned
would happen when the TO is notified via a ZK watch on /admin/reassign_partitions
(or periodically) that the partition is no longer being reassigned.
To do this it will need to keep in-memory the partitions being reassigned.
Selection brokers of new replicas
When RF is increased the additional replicas need to be assigned to brokers. When rack awareness is enabled in the Kafka cluster that assignment should avoid assigning the replicas of the partition to brokers in the same rack. To do this the TO will need to know the rack, which it can obtain via the AdminClient.
OT, but how do you model this in the api
?
throttledRates:
1: # broker 1
leader: 70000000
follower: 70000000
2: # broker 2
leader: 70000000
follower: 70000000
We could have used it for the JBOD storage and we can sue it for the exposing improvements. Can you do it just as a Map<Integer, Object>
?
In this example:
kind: Kafka
spec:
# ...
entityOperator:
topicOperator:
reassignment:
type: internal # in the future we'd support external
throttledRates:
1: # broker 1
leader: 70000000
follower: 70000000
2: # broker 2
leader: 70000000
follower: 70000000
throttledReplicas:
my-topic:
leader:
0: 101
1: 102
follower:
0: 102
1: 101
rfChangeLingerSeconds: 60
maxPartitionsInBatch: 20
completionPollInterval: 60s
I understand the three options at the bottom. I understand the general fixed throttles configured for the reassignments. I'm not sure I understand the point of the throttledReplicas
object TBH. Also, if it is topic specific, isn't the better place for this in the KafkaTopic object?
Would it be complicated to configure the throttles per topic in KafkaTopic instead of having only general configuration on TO level?
Using status for reporting / tracking progress seems obvious choice, but it means support for OpenShift 3.11+, right?
When RF is increased the additional replicas need to be assigned to brokers. When rack awareness is enabled in the Kafka cluster that assignment should avoid assigning the replicas of the partition to brokers in the same rack. To do this the TO will need to know the rack, which it can obtain via the AdminClient.
Can we as an alternative let the user specify the map with the assignments in KafkaTopics?
OT, but how do you model this in the
api
?throttledRates: 1: # broker 1 leader: 70000000 follower: 70000000 2: # broker 2 leader: 70000000 follower: 70000000
We could have used it for the JBOD storage and we can sue it for the exposing improvements. Can you do it just as a
Map<Integer, Object>
?
TBH that might not be possible, I'll have to check. We do something similar with Map<String, Object>
for the config
. We might be able to use a Map<String, Object>
but constrain the key to have the decimal integer syntax in the schema.
In this example:
kind: Kafka spec: # ... entityOperator: topicOperator: reassignment: type: internal # in the future we'd support external throttledRates: 1: # broker 1 leader: 70000000 follower: 70000000 2: # broker 2 leader: 70000000 follower: 70000000 throttledReplicas: my-topic: leader: 0: 101 1: 102 follower: 0: 102 1: 101 rfChangeLingerSeconds: 60 maxPartitionsInBatch: 20 completionPollInterval: 60s
I understand the three options at the bottom. I understand the general fixed throttles configured for the reassignments. I'm not sure I understand the point of the
throttledReplicas
object TBH. Also, if it is topic specific, isn't the better place for this in the KafkaTopic object?
throttledReplicas
is listing which brokers, when in the given roles, throttle replication of the given topic partition. For example, when broker 101 is the leader for my-topic partition 0 the throttle given in throttledRates
will apply. (It didn't help that I used 101 for broker id in the throttledReplicas
but 1 in the throttledRates
, sorry about that.)
Would it be complicated to configure the throttles per topic in KafkaTopic instead of having only general configuration on TO level?
I don't think it would work putting it in the KafkaTopic because it's quite specific to the brokers were the partition(s) are now. It would need to be changed every time the topic was reassigned (and/or it would be stale when the topic was not being reassigned). I'm also keen on not adding extra properties to KafkaTopic
related the reassignment until we've thought more about the longer term plan for balancing. Putting throttling info in the KafkaTopic
now might be difficult to support balancer features later. For example having the balancer automatically adjusting throttles is more complicated if they're present in the KafkaTopic
than if it's just something the balancer could do directly.
Finally, the throttling configuration is more an aspect of a reconciliation than it is of a topic, imho.
Using status for reporting / tracking progress seems obvious choice, but it means support for OpenShift 3.11+, right?
I don't think pre-3.11 will prevent you from having a status
object in your CR, it just won't be seen as a status
subresource. I will experiment with this to confirm.
When RF is increased the additional replicas need to be assigned to brokers. When rack awareness is enabled in the Kafka cluster that assignment should avoid assigning the replicas of the partition to brokers in the same rack. To do this the TO will need to know the rack, which it can obtain via the AdminClient.
Can we as an alternative let the user specify the map with the assignments in KafkaTopics?
The problem is again whether this makes our lives harder in the future when we do cluster balancing. If there's a writeable replicaAssignment
in something like
kind: KafkaTopic
metadata:
name: my-topic
spec:
partitions: 2
replicaAssignment:
0: [101, 103, 103]
1: [104, 101, 102]
config:
# ...
and the topic is being balanced automatically then who is in control? There are issues with things like gitops applying an old assignment? This is still a problem with replicas
, but it's a smaller problem because RF doesn't really change much whereas the assignments could be quite dynamic.
That said, maybe it's just a matter of making it explicit in the KafkaTopic
resource what kind of assigment is expected:
kind: KafkaTopic
metadata:
name: my-topic
spec:
partitions: 2
replicaAssignment:
type: manual
assignments:
0: [101, 103, 103]
1: [104, 101, 102]
config:
# ...
And a type: automatic
would simply not expose any assignments to be tinkered with. Ultimately I expect we would use the status
to expose the current assignment, but (in 3.11+) read-only.
Putting throttling info in the KafkaTopic now might be difficult to support balancer features later. For example having the balancer automatically adjusting throttles is more complicated if they're present in the KafkaTopic than if it's just something the balancer could do directly.
That is probably fair point. But storing it in the Kafka CR seems weird. I also wonder if it would cause issues with rights. You might want some to manage his topic but maybe not to manage his Kafka cluster. For this the user needs the full rights to the Kafka resources. I wonder if holding this in config map or something could be better solution until we have the balancer.
I don't think pre-3.11 will prevent you from having a status object in your CR, it just won't be seen as a status subresource. I will experiment with this to confirm.
Yeah, I know ... but I wonder whether it wouldn't cause more issues that way (e.g. with users rewriting the status again and again from some Git repo etc.)
And a type: automatic would simply not expose any assignments to be tinkered with. Ultimately I expect we would use the status to expose the current assignment, but (in 3.11+) read-only.
This sounds good to me ... with the type, the users can choose and there is no problem with the rewriting of the resource. So basically:
- If the user specifies either explicitly that type is automatic or just sets the old partitions / replicas fields it will be automatic
- If the users sets it manually it will be never touched by the balancer
- The
replicas
andpartitions
fields and `replica assignment could be mutually exclusive
That would probably work with the balancer. It might be still a bit awkward before we have the balancer. How much time you think we will need for that :-o.
OT, but how do you model this in the
api
?throttledRates: 1: # broker 1 leader: 70000000 follower: 70000000 2: # broker 2 leader: 70000000 follower: 70000000
We could have used it for the JBOD storage and we can sue it for the exposing improvements. Can you do it just as a
Map<Integer, Object>
?TBH that might not be possible, I'll have to check. We do something similar with
Map<String, Object>
for theconfig
. We might be able to use aMap<String, Object>
but constrain the key to have the decimal integer syntax in the schema.
Annoyingly it turns out that Kube doesn't support the patternProperties
, so we can't constrain String keys at all. That doesn't stop using a Map<String, ...> like this, but it means the map wouldn't be validated by the schema. So it would probably be better to use something like this:
throttledRates:
- broker: 1
leader: 70000000
follower: 70000000
- broker: 2
leader: 70000000
follower: 70000000
Similarly the KafkaTopic.status
would look like
status:
assignmentStatuses:
- partition: 0
assignmentStatus: assigned
- partition: 1
assignmentStatus: queued
@tombentley Ok, fair enough. Thanks for investigating.
I don't think pre-3.11 will prevent you from having a status object in your CR, it just won't be seen as a status subresource. I will experiment with this to confirm.
Yeah, I know ... but I wonder whether it wouldn't cause more issues that way (e.g. with users rewriting the status again and again from some Git repo etc.)
In a Kubernetes without support for CRD subresources then we can't avoid the possibility of people updating the status
, but because we're maintaining a watch on the resource anyway, the TO could be made to update the status whenever it was found to be wrong (i.e. as part of the reconciliation). This means that the status isn't completely reliable because it can be momentarily wrong; but it would be correct almost all of the time while the TO was running. I don't think the TO changes needed for this are very much work.
In a Kubernetes with support for CRD subresources we would need to register that the status
should be treated as a subresource. Then we would have to update it via the subresource path. It seems that this is not directly supported in fabric8. We would still be able to use this API but would have to resort to adapting the fabric8 client to an EasyHTTP client.
Further complication comes if we want to make use of subresource support when it's available. In this case we would need to conditionally apply changes to the status
in the reconciliation code and also conditionally update the status
using the subresource path.
On the one hand the design I sketched above does not rely on having a completely trustworthy status
(the TOP should work fine without it). But having changes which are not immediately synced to the underlying topic and no way to inform the user of the status of those changes (except logging) is horrible. So I think we should consider status
as a requirement. The question then is whether we try to make use of subresource suppport when it's available, or just live with slightly rubbish status until the time that Strimzi no longer supports pre-1.11 (by which time fabric8 might have caught up a bit).
Oh, about the fabric8 support, see https://github.com/fabric8io/kubernetes-client/issues/417
It seems that this is not directly supported in fabric8. We would still be able to use this API but would have to resort to adapting the fabric8 client to an EasyHTTP client.
That is indeed a bit unfortunate. We would have to see if this can be easily worked around.
In a Kubernetes without support for CRD subresources then we can't avoid the possibility of people updating the status, but because we're maintaining a watch on the resource anyway, the TO could be made to update the status whenever it was found to be wrong (i.e. as part of the reconciliation).
That works fine if it is a single update done manually by the users. Might cause problems if the update is done every minute by some Gitops framework.
Further complication comes if we want to make use of subresource support when it's available.
So the question there is: How can we discover when the k8s supports customresource subresources? On my kube (and with a CRD which declares use of status subresource):
$ curl 127.0.0.1:45321/apis/kafka.strimzi.io/v1alpha1/
{
"kind": "APIResourceList",
"apiVersion": "v1",
"groupVersion": "kafka.strimzi.io/v1alpha1",
"resources": [
{
"name": "kafkatopics",
"singularName": "kafkatopic",
"namespaced": true,
"kind": "KafkaTopic",
"verbs": [
"delete",
"deletecollection",
"get",
"list",
"patch",
"create",
"update",
"watch"
],
"shortNames": [
"kt"
]
},
{
"name": "kafkatopics/status",
"singularName": "",
"namespaced": true,
"kind": "KafkaTopic",
"verbs": [
"get",
"patch",
"update"
]
},
...
So it seems clear that the kafkatopics/status
object will only be present when status subresource is supported.
Constraints for decreasing the replication factor
In most-important first order:
- The preferred leader should not be changed.
- The retained replicas should be from different racks (where possible)
- The current leader should be retained if possible (i.e. avoid leader election & CG rebalances where possible)
Constraints for increasing the replication factor
In most-important first order:
- The preferred leader should not be changed.
- The new brokers should be in different racks from already-assigned brokers (where possible)
There actually needs to be a 3rd constraint, but it's not clear what it should be. We basically need to avoid always putting new partitions on the same broker(s). We could do that by picking a broker from the pool of candidates at random, or by picking the broker with the fewest assigned partitions. I don't want to try to base this on metric data (because that would be getting into cluster balancing). Either of those choices could be a poor choice in some circumstances. Any preference?
Reviewed on 10th September 2020: This is still needed. Should be done in cooperation with Cruise Control.
Hey!! I want to work on this project idea.
Hi @tombentley , can I take this task for GSoC 2021? I already have successful experience with open source projects: https://github.com/ClickHouse/ClickHouse/pull/16578 https://github.com/ClickHouse/ClickHouse/pull/17144 https://github.com/catboost/catboost/pull/1598
@fibersel you're welcome to have a look into it. There are at least a couple of other people who are also interested. I'll tell you what I've told them:
You're welcome to start poking around in the Topic Operator code if you want. And bug fixes are always welcome. But it's also worth thinking about how this is going to work overall. Increasing the replication factor of a topic requires Kafka to move data around between brokers, which can take a long time, so think about the consequences of that for the Topic Operator. Think about what happens if any of the TO, CC or Kafka brokers get restarted during this time. Some of those will be problematic and others not. Getting an sound overall understanding is really important for deciding exactly how the TO should handle this. So learn a bit about how partition reassignment works in Kafka, and how CC controls that is all very useful, and IMO at least as important as familiarity with the code at this stage.
Feel free to ask questions here or on the Strimzi channel on the CNCF slack.
hay i am biggner. anyone can you please help me what i can do
@sudip004 You mean to implement this issue? I guess @tombentley @tomncooper @kyguy has some thoughts about this. But TBH it is not exactly an issue for a beginner.
why this is part of the topic operator and not cruise control?
Triaged on 7.6.2022: Still desired. Work in progress with proposal https://github.com/strimzi/proposals/pull/50
Done by @fvaleri. Will be released in 0.41.