librdkafka icon indicating copy to clipboard operation
librdkafka copied to clipboard

"consistent_random" and "murmur2_random" partitioners treat empty keys differently

Open Quuxplusone opened this issue 1 year ago • 3 comments

(Reporting this as an issue mainly so that there is a canonical place it's documented, for internal reference and for posterity.)

librdkafka provides several partitioners out-of-the-box:

  • rd_kafka_msg_partitioner_murmur2_random
  • rd_kafka_msg_partitioner_fnv1a_random
  • rd_kafka_msg_partitioner_consistent_random

They have the following behaviors:

  • murmur2_random: If key is null, pick a random partition. Otherwise, partition based on murmur2(key, keylen).
  • fnv1a_random: If key is null, pick a random partition. Otherwise, partition based on fnv1a(key, keylen).
  • consistent_random: If key is null or keylen is zero, pick a random partition. Otherwise, partition based on crc32(key, keylen).

Notice the inconsistency between consistent_random (the default) and murmur2_random (which various Internet folks recommend switching to: 1, 2). consistent_random will distribute zero-length keys among random partitions; murmur2_hash will send all zero-length keys to a single partition. There are two ways this could bite someone who switches partitioning schemes:

  • The way it bit us in real life: Suppose we're currently using consistent_random, and storing the Kafka key in a std::string. (If there's no key — our common case — then the std::string is just empty). We unconditionally call rd_kafka_producev(..., RD_KAFKA_V_KEY(key.data(), key.size()), ...). When the key is absent, key.size() == 0, and consistent_random distributes our data to random partitions. Tomorrow, we switch to murmur2_random — and suddenly all our keyless data ends up in the same single partition! (Partition 21 mod 30, in our case.) To restore the behavior we depended on before, we must change our calling code to rd_kafka_producev(..., RD_KAFKA_V_KEY(key.empty() ? nullptr : key.data(), key.size()), ...).

  • Or, vice versa: Suppose we're currently using murmur2_random or fnv1a_random. We always provide a key, and sometimes the key is zero-length. (Maybe we use "customer name" as the key, and one smart-aleck customer set their name to the empty string, I dunno.) Using murmur2_random, all records with a zero-length key get hashed to one consistent partition. Tomorrow, we switch to the default partitioner consistent_random — and suddenly all those records are distributed randomly, such that they aren't in the same partition anymore! To restore the old behavior, we must change our calling code to check for the zero-length key and map it to something non-zero-length.

Because changing the behavior in either direction has the potential to break someone, I don't think there's any way for librdkafka to fix this inconsistency between how consistent_random and murmur2_random handle empty keys. I just wish we had known about it before it bit us.

  • [x] librdkafka version: v1.8.2

Quuxplusone avatar Nov 17 '23 15:11 Quuxplusone

@Quuxplusone thanks for sharing this. Hope will help people being aware of the changes to apply before switching to murmur2_random, that we cannot set as default for the same reason.

emasab avatar Nov 17 '23 15:11 emasab

Could you make murmur2_random the default and bump up the version of librdkafka, noting this as a breaking change, warning users to explicitly choose consistent_random if they were relying on it up til now (for the reasons listed here)?

I appreciate not wanting to surprise existing users. But for new users, it would be great if they inherited murmur2_random by default, so they don't run into issues the moment they ever start to share a topic between Java and non-Java producers. See https://www.confluent.io/blog/standardized-hashing-across-java-and-non-java-producers/

benissimo avatar Nov 22 '23 12:11 benissimo

@benissimo we have a list of breaking changes that we want to do in a planned major release, and this is already included. We should also deprecate them before removing in next version and give time to developers to know that. In case we do a major release we should also backport fixes to previous major version until it goes out of support.

2.x was needed more than planned because of the upgrade of OSSL to 3.x but for the rest it's compatible.

This is to say that at some point it will be needed but we have to plan it and we're currently focused on these features: KIP-848, KIP-714 and KIP-951

emasab avatar Nov 24 '23 09:11 emasab