aiokafka
aiokafka copied to clipboard
admin client - failure to create topics or delete topics (error code 41)
-
Use controller node for create and delete topic instead of random
-
Fixes error code 41 on create and delete topic
@ods I feel this PR should definitely be part of 0.8.0
@tvoinarovskyi Please review
Codecov Report
Merging #865 (c004f06) into master (fa239a2) will increase coverage by
0.00%
. The diff coverage is100.00%
.
@@ Coverage Diff @@
## master #865 +/- ##
=======================================
Coverage 97.55% 97.55%
=======================================
Files 28 28
Lines 5389 5391 +2
=======================================
+ Hits 5257 5259 +2
Misses 132 132
Flag | Coverage Δ | |
---|---|---|
cext | 88.29% <100.00%> (+<0.01%) |
:arrow_up: |
integration | 97.51% <100.00%> (+<0.01%) |
:arrow_up: |
purepy | 97.08% <100.00%> (+0.03%) |
:arrow_up: |
unit | 37.89% <0.00%> (-0.02%) |
:arrow_down: |
Flags with carried forward coverage won't be shown. Click here to find out more.
Impacted Files | Coverage Δ | |
---|---|---|
aiokafka/admin.py | 88.42% <100.00%> (+0.10%) |
:arrow_up: |
:mega: We’re building smart automated test selection to slash your CI/CD build times. Learn more
@kbhatiya999 Could you please describe what problem do you solve? Is it possible to reproduce it?
I solved the failure to create or delete topics using admin client.
To reproduce we just need a kafka server with more than one broker.
Now use admin client to create or delete topics, it will fail most of the time.
Reason: you need cluster coordinator to create/delete topics. But since we are using random broker it will fail.
I changed code so that create/delete always hit cluster coordinator broker.
Hello! We also have this bug with 41 error code while trying to create topic with distributed kafka cluster.
The PR seems to be a solution for that.
Are there any plans to merge this fix in near future?
@DMantis you might have to fork library as you wait. I am really surprised others are not facing this issue (probably they are not using admin client just producer and consumer).
@DMantis If possible could you add sample code and it's output demonstrating the problem please. I haven't worked on kafka for some time now so will have to setup from scratch.
@ods what will you need for this PR to be approved. As for the change/code review it is easy as there is only 3 lines of change. Please let me know how can I help to speed up the process.
@kbhatiya999, thank you for the reminder about this problem. I've done some research and it looks like the main point of the PR is correct, i.e. we have to send request to controller node. But we certainly have to apply the same approach in other methods too (delete_topics, create_partitions etc.). Also, we probably want to handle NOT_CONTROLLER (41) error (refresh metadata and retry?) in this case.
Also it would be nice to have some simple way to check that problem exists with current code and solved after the fix. I understand that it's very difficult to provide CI tests for this case, but I hope something like a docker-compose.yml + a script to check it manually is possible. This is the most difficult part. Probably I could finish the rest if we have tests.
Does it make sense to you? Please describe you vision on how to proceed.
@ods It might take me some time (I have left that project as it was completed). I will comeback with docker-compose.yml and script.
Has there been any progress on this? I can also confirm that this change fixes the issue.
librdkafka opts to always use the controller ID node as the target for admin requests, so we can fix all of the call sites by modifying one function:
async def _send_request(
self,
request: Request,
node_id: Optional[int] = None) -> Response:
if node_id is None:
metadata = await self._get_cluster_metadata()
node_id = metadata.controller_id
return await self._client.send(node_id, request)
Handling error code 41 in the call may be challenging since the user can pass multiple topics and each one technically has its own associated error code. As an end-user, I would be fine handling the retry logic.
@y4n9squared I will update pr with details that will help maintainer determine if pr is ready for merge. Give me a day or two
Any update on this?
@ods I have taken a look to the test setup, and an approach would be :
- drop the supervisord approach in docker
- create a docker-compose file that starts zookeeper, a kerberos server, and 3 brokers
- replace all the docker python calls by call to
docker compose
(probably via a os.system ?)
It is quite some work, I can give it a shot, but I would like to know if you agree with the approach
- drop the supervisord approach in docker
- create a docker-compose file that starts zookeeper, a kerberos server, and 3 brokers
I completely agree with moving to docker compose. Actually we have problems with current approach that could be solved by this move.
- replace all the docker python calls by call to docker compose (probably via a os.system ?)
It doesn't look like a right way to me. Nowadays it's common to start tests in of the services declared in docker-compose.yml. This approach has many advantages compared to the current: better reproducibility of environment, fully automated setup for newcomers, better observability and simpler debugging. Docker compose takes responsibility on waiting for readiness, so we don't need the code in test for this.
Actually, moving to docker compose is in roadmap for aiokafka. But as you said, it's quite a big task, as all the CI should be adjusted accordingly either by using docker-in-docker approach, or by duplicating services setup in CI config.
I think some temporary docker-compose.yml to reproduce the problem and check that PR fixes it (without automating and incorporating it in CI) would be enough for now.
@ods About running the test inside a service part of the docker-compose.yml, it is the way we are working in my company so we have quite an experience with it, so it would be easier but it has some cons too. Some details/feedback on it :
- we use Makefile too to streamline main usages (running tests, running checks/linters, running kafka commands, running kcat, ...)
- we mount the project folder into the docker service so we have a quicker feedback on our changes (we don't need to rebuild an image)
- mounts doesn't play well with docker in docker approach, so we have our own runners (gitlab in our cases) running job on the host that have docker + make installed (that the only requirement we have, python is installed inside docker only)
- some care need to be taken on uid/gid used inside the "python" container, so files generated by the commands use the proper permission on the host
Most of the "issues" are around mounting the project folder. A copy would play nicer, but it is slowing down a bit the dev cycle (you need to run the docker build as soon as you change a file)
@ods I tried https://github.com/aio-libs/aiokafka/pull/973 but for sure, I cannot reproduce the issue. I start wondering if a kafka broker version didn't solved this issue (i.e you can send the command to any brokers, it will still work)
@kbhatiya999 @y4n9squared Any tips to reproduce the issue ? What the version of brokers used in your clusters ?
I am seeing that in the scala code of the broker https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala
case ApiKeys.CREATE_TOPICS => maybeForwardToController(request, handleCreateTopicsRequest)
case ApiKeys.DELETE_TOPICS => maybeForwardToController(request, handleDeleteTopicsRequest)
Here the change on the broker https://issues.apache.org/jira/browse/KAFKA-10181
I am still failing to reproduce the issue
(the KIP about forwarding https://cwiki.apache.org/confluence/display/KAFKA/KIP-590%3A+Redirect+Zookeeper+Mutation+Protocols+to+The+Controller )
I see the Jira issue (KAFKA-9705 - Zookeeper mutation protocols should be redirected to Controller only) is not resolved yet.
Yet @ods I have been playing with the compose here https://github.com/aio-libs/aiokafka/pull/973 form version 1.1.1 to 2.8.1 without being able to reproduce the problem. Maybe something is wrong with my test, but so far, I am not able to reproduce it. Maybe it is about KRaft based cluster ? Here it is using zookeeper, so to say, maybe for the brokers, there is no need to reach the controller as they are all connected to ZK, that is ensuring the quorum
In my situation, we are using the client library against RedPanda 23.2. RedPanda is a completely separate implementation but complies with the Kafka API. This issue being particular to KRaft-based cluster seems to track.
@y4n9squared I tested with a KRaft cluster, it does still work, see this branch https://github.com/aio-libs/aiokafka/pull/973/files maybe you should try to do a similar docker compose file but with RedPanda ? I am not sure then how far the goal of aiokafka client to be compatible with different servers
As a general principle, I agree that the client SDK shouldn't be accommodating implementation idiosyncrasies. My impression was that the API dictated that Create/DeleteTopic requests must go to the controller, but I cannot find the KIP.
However, implementation in other libraries seem to do this already:
- kafka-python: https://github.com/dpkp/kafka-python/blob/master/kafka/admin/client.py#L483
- confluent_python via librdkafka: https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka_admin.c#L643
I am not sure then how far the goal of aiokafka client to be compatible with different servers
I don't mind adding a recent version of RedPanda to CI, if somebody is ready to volunteer implementing and supporting it.