confluent-kafka-go icon indicating copy to clipboard operation
confluent-kafka-go copied to clipboard

Support partitioner callback in producer

Open 0x1997 opened this issue 7 years ago • 18 comments

0x1997 avatar Nov 22 '16 07:11 0x1997

Out of curiousity, can you share your custom partitioner? It might be generic enough to be integrated.

edenhill avatar Nov 30 '16 13:11 edenhill

@edenhill It's application specific. Basically custom_hash(static_cast<State*>(msg_opaque)) % partition_cnt in C++.

0x1997 avatar Dec 01 '16 05:12 0x1997

Okay, what you'll need to do in the meantime is get the partition count (with GetMetadata()) for your topic(s) and then run your partitioner prior to calling Produce() and setting Message.Partition accordingly.

edenhill avatar Dec 11 '16 18:12 edenhill

The problem with implementing partitioner_cb support in high-level language bindings is that the partitioner callback may be called from an internal librdkafka thread and this isn't trivial to handle in cgo, cpython, et, al. This should be fixed in librdkafka, rather than the bindings, but this isnt trivial either, that's why this functionality is currently missing from our bindings.

But here's a dumb idea: what if you, as a Go app developer, implemented the partitioner in C (cgo) and the Go client provided an API to set the C partitioner_cb. You wouldn't be allowed to call any Go methods from this callback, but since partitioners are pretty minimal by design this might be okay.

It would look something like this:

/*
#include <librdkafka/rdkafka.h>

static int32_t my_partitioner (rd_kafka_topic_t *rkt, ..) {
    ..some custom hasher goes here..
     return hash % partition_cnt;
}
*/
import "C"

...

  conf := ConfigMap{..., "default.topic.config": &ConfigMap{"partitioner_cb", C.my_partitioner}}
 p, err := NewProducer(conf)
...

I know it is ugly, but would it be a reasonable workaround for you until proper Go partitioners are supported?

edenhill avatar Dec 11 '16 18:12 edenhill

I wouldn't even need an entirely custom partitioner, I would just need the same key to always go to the same partition. Perhaps it would be possible to bake in something like that, for example by adding another constant besides PartitionAny... ?

tchap avatar Aug 16 '17 15:08 tchap

@tchap the default partitioner is Consistent-Random, which maps the same key to the same partition, so you should be fine. https://github.com/edenhill/librdkafka/blob/master/src/rdkafka.h#L1606

edenhill avatar Aug 16 '17 15:08 edenhill

The next version of librdkafka (the underlying Kafka client) will expose the builtin partitioners as configuration properties, allowing you to change to an alternative builtin partitioner, such as the Java compatible murmur2_random partitioner.

Search for 'partitioner' here: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

Custom partitioners are not yet exposed in the Go client though.

edenhill avatar Jan 03 '18 07:01 edenhill

@edenhill

Custom partitioners are not yet exposed in the Go client though.

I have a usecase which really needs this, and I'm trying any kind of workaround and it seems not to be easy. First, I tried the workaround mentioned here https://github.com/confluentinc/confluent-kafka-go/issues/16#issuecomment-266298255 , but I get the error:

Failed to create producer: Invalid value type unsafe.Pointer for key partitioner_cb (expected string,bool,int,ConfigMap)

Then, I casted the pointer to an uintptr, and then, casted again to int, and then I ran into a runtime exception saying:

Failed to create producer: Property "partitioner_cb" must be set through dedicated ..set..() function

At this point, I stopped trying, but it looks like it wants me to use rd_kafka_topic_conf_set_partitioner_cb() through some more CGo magic, but I gave up since I didn't know what to put in the first param to that function, which is rd_kafka_topic_conf_t *topic_conf, and I don't know how to get the rd_kafka_topic_conf_t in the context of my golang program.

Do you have any tips for completing this workaround? Thanks!

billygout avatar Mar 12 '18 18:03 billygout

Due to the generic way configuration is passed from Go to C it is a bit tricky to add a special case for set_partitioner_cb(), so for the sake of proof-of-concepting I suggest you insert a call to ..set_partitioner_cb() with a hardcoded C-function callback here, right before rd_kafka_conf_set_default_topic_conf(): https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/config.go#L149

Do note that this callback may be called from internal librdkafka threads and it is not clear to me how to safely trigger a Go call from such a thread.

edenhill avatar Mar 12 '18 19:03 edenhill

Thank you, @edenhill ! What I actually need is not generic custom partitioner. Rather, what I'm looking for is a way to hash based on something other than the kafka key, since I'm using the key for other purposes. It looks like hashing based on the msg_opaque would work for me.

                                                const rd_kafka_topic_t *rkt,
                                                const void *keydata,
                                                size_t keylen,
                                                int32_t partition_cnt,
                                                void *rkt_opaque,
                                                void *msg_opaque

as long as I can set the msg_opaque at the golang level and it looks like i can (https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/message.go#L79).

Would you accept a PR to add a new builtin partitioner to librdkafka, like this: consistent_opaque - CRC32 hash of msg_opaque (Empty and NULL msg_opaque are mapped to single partition) ? Although I'm not sure if the CRC32 should be applied to the pointer address msg_opaque, or "the whole data behind the pointer, and since it doesn't have a corresponding length argument, the msg_opaque would have to point to a c-string so the CRC32 knows how far to read...

billygout avatar Mar 12 '18 20:03 billygout

I strongly advise you to stick to the existing semantics of keys in Kafka, they are used for partitioning and compaction. If you need additional data with your message you can either create a richer message payload (using for example avro and schema-registry), or use message headers to "tag along" arbitary data to your liking.

We will not accept a PR that does partitioning on something else than the key, sorry.

Also, the msg_opaque is used by the Go client internally to map C messages back to Go messages.

edenhill avatar Mar 12 '18 20:03 edenhill

Also, the msg_opaque is used by the Go client internally to map C messages back to Go messages. Ah, that would certainly kill the idea.

billygout avatar Mar 12 '18 20:03 billygout

I strongly advise you to stick to the existing semantics of keys in Kafka, they are used for partitioning and compaction.

This ship has sailed for a while at my company. In the future when we upgrade to kafka 0.11, we'll probably stick this metadata in the 0.11+ Headers. Additionally, we have no use for compaction and have it turned off for our use case.

billygout avatar Mar 12 '18 20:03 billygout

or use message headers to "tag along" arbitary data to your liking.

This is a possibility too. I will re-visit that. It was my first option, but ran into social problems :)

billygout avatar Mar 12 '18 20:03 billygout

We would love to have this available in the producer, as we're moving from Sarama to confluent-kafka-go, but we still need to support Sarama's default partitioner, which uses the 32 bit FNV-1a hashing algorithm (part of hash/fnv in Go).

For now we will follow @edenhill's advice (i.e. get topic metadata, run custom Go partitioner prior to Produce and set Message.Partition), but it would still be nice to be able to have custom partitioners be supported in some form in the Producer API (either Go or C). Although, I may just try my hand at adding the FNV-1a algo to librdkafka...

Manicben avatar Feb 13 '20 11:02 Manicben

Yeah, the simplest alternative is to add it as a builtin partitioner to librdkafka. Look's like the fnv1a code is very simple, so should be fairly straight forward.

Find all occurences of murmur2_random in the librdkafka/src code and add fnv1a_random counterparts. https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_msg.c#L870

edenhill avatar Feb 13 '20 12:02 edenhill

Aggree with @Manicben :) could be great to have that included in the producer, rather than to have to do a custom go partitioner prior to the Produce. Is there a chance to see that feature in the future ? Btw @edenhill thanks for all the good work 👍

maeglindeveloper avatar Feb 14 '20 12:02 maeglindeveloper

@edenhill hello. I have an idea that the message of key="a" send to partition-0 and other message of key="b" send to partition-1. the other message ... after i debuged, it don't work. my code of partition_cb: `static int32_t partitioner_cb(const rd_kafka_topic_t *rkt, const void *keydata, size_t keylen, int32_t partition_cnt, void *rkt_opaque, void msg_opaque) { / this is a simple example */ int32_t partition = 0; if (keylen <= 0) return partition; if (strncmp(keydata, "a", 1) == 0) partition = 0; else if (strncmp(keydata, "b", 1) == 0) partition = 1; else if (strncmp(keydata, "c", 1) == 0) partition = 2; else if (strncmp(keydata, "d", 1) == 0) partition = 3;

return partition;

}

int err = rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, (void *)buf, len, (void *)key, strlen(key), NULL); ` could you help me? thanks

caojun97 avatar Nov 13 '23 09:11 caojun97