ngx_kafka_module
ngx_kafka_module copied to clipboard
Kafka Partition Key Routing
I've got basic kafka message routing working but I have a requirement to route messages to a partition based on a routing key. Is there a way to achieve this currently?
Let me know if this should be a separate issue. I tried to workaround my issue by processing a key-based partition assignment in nginx lua and then settings a variable used by kafka_partition. It looks like this won't work because I get this error from ngx_kafka_module: nginx: [emerg] "kafka_partition" directive invalid number
. Let me know what technical blockers there are for passing a variable to the kafka_partition directive.
Also, based on my initial question, let me know if there is a better way to approach this.
In my opinion, it is a good way using ngx_lua
to solve this problem. Creating several locations that each location binds one partition specific and using ngx_lua
route requests to a location based on your routing key.
And I'm very glad to diagnose your problems if it is convenient for you to paste some configuration code here.
@brg-liuwei thanks. The config is relatively simple at this point:
location /events {
access_by_lua_file /etc/nginx/conf.d/kafka.lua;
kafka_partition 0;
kafka_topic events;
}
I understand what you mean by having a separate location block per partition to solve this problem however I'm unable to add a partition number into the incoming URI. So the remaining question is how do I modify the URI of an incoming request with ngx_lua
? I'm thinking a request comes into /events to hit access_by_lua_file
and inside that lua script, it determines the correct partition (based on some hash bucket function) and then points the request at a new location that is something like /events/<partition>
. Let me know if that makes sense and if you have any suggestions on how to do this with ngx_lua
.
@dkinon Assume you config kafka with 3 partitions, the following code maybe help you:
# create 3 internal locations associate to one existing partition
location = /kafka_partition/0 {
internal;
kafka_topic events;
kafka_partition 0;
}
location = /kafka_partition/1 {
internal;
kafka_topic events;
kafka_partition 1;
}
location = /kafka_partition/2 {
internal;
kafka_topic events;
kafka_partition 2;
}
location = /send_to_kafka {
content_by_lua_block {
-- get route key
local key = ngx.var.arg_route_key or "no_route_key"
-- calc hash
local hash = ngx.crc32_short(ngx.md5(key))
-- get partition(assume there are 3 partition in your kafka topic)
local partition = hash % 3
-- send to location which binds the specific partition
ngx.location.capture("/kafka_partition/" .. partition, {
method = ngx.HTTP_POST, -- ngx_kafka_module only supports POST
body = "hash code:" .. hash .. ", partition:" .. partition .. ", route key:" .. key,
})
}
}
Send message with route_key:
curl host:port/send_to_kafka?route_key=helloworld
Those code has been tested in my dev env. It worked. And if you have any another questions, feel free to add more comments.
This is working for me, thanks.
It would be nice to have a more dynamic configuration for this where kafka_partition
could accept an nginx variable rather than a fixed number. This would simplify the nginx locations and the lua code. I would be apposed to doing the work and creating a PR if you could help me understand the technical blockers to this approach.
@dkinon The PR you mentioned is welcomed. You can first read the source code and then learn some basic knowledge about nginx module development. And we can discuss technical questions there or in PR context.