librdkafka
librdkafka copied to clipboard
Multi-Threaded rd_jitter Broken on Windows
Read the FAQ first: https://github.com/edenhill/librdkafka/wiki/FAQ
Description
This has been discussed in the past (#2795), but the output of rand() is predictable. This predictability is usually alleviated with the use of srand(), or by calling rand_r().
While POSIX went and defined rand_r to allow for reentrant PRNG, the Microsoft solution to this problem was to make rand and similar C functions thread-safe by storing the CRT global state in thread-safe storage. Given two threads, A and B, if thread A is the one to call srand() followed by thread B calling rand(), the PRGN seed will be the default for thread B, resulting in the same fixed sequence.
Long story short, this means that librdkakfa cannot ensure the calling application may call any of the API functions from any of its own threads at any time on Windows.
How to reproduce
Reproduction steps below assume Windows:
- Enable debug output
- Call
rd_kafka_newand obtain anrd_kafka_thandle on thread A - In a loop (more than once to validate), spawn a new thread, passing this handle to the new thread
- Produce a single message to the configured topic on thread B
- Analyze the logging output to see messages like the following:
Debug : [thrd:main]: TOPIC [5] is the new sticky partition
In a production scenario where this was spotted, the particular topic had 12 partitions, and partition 5 was the one that was selected every single time. To demonstrate this without using librdkafka, you can look at the following example:
// Compile with:
// cl randtest.c
//
// Given PARTITION_CNT == 12, this will output:
//
// 0 0
// 1 0
// 2 0
// 3 0
// 4 0
// 5 1000
// 6 0
// 7 0
// 8 0
// 9 0
// 10 0
// 11 0
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#define WIN32_LEAN_AND_MEAN
#include <windows.h>
#define PARTITION_CNT 12
long RANDVALS[PARTITION_CNT];
// rd_jitter: the rd_jitter definition from v1.2.0 (effectively the same from v1.8.0 without rand_r)
static inline int rd_jitter(int low, int high)
{
return (low + (rand() % ((high - low) + 1)));
}
// randtest: does the rand test
DWORD __stdcall randtest(void *p)
{
RANDVALS[rd_jitter(0, PARTITION_CNT - 1) % PARTITION_CNT]++;
return 0;
}
int main(int argc, char **argv)
{
memset(RANDVALS, 0, sizeof RANDVALS);
srand(time(NULL)); // only seeds for the main thread
for (int i = 0; i < 1000; i++) {
HANDLE thrd = CreateThread(NULL, 0, randtest, NULL, 0, NULL);
WaitForSingleObject(thrd, INFINITE);
}
for (int i = 0; i < PARTITION_CNT; i++) {
printf("%d %ld\n", i, RANDVALS[i]);
}
return 0;
}
Checklist
IMPORTANT: We will close issues where the checklist has not been completed.
Please provide the following information:
- [x] librdkafka version (release number or git tag):
4faeb8132521da70b6bcde14423a14eb7ed5c55e - [x] Apache Kafka version:
2.6.2 - [x] librdkafka client configuration:
enable.random.seed=true (only relevant config here) - [x] Operating system:
Any Windows Version - [ ] Provide logs (with
debug=..as necessary) from librdkafka - [ ] Provide broker log excerpts
- [ ] Critical issue
For this fix, I have considered two potential solutions.
First Proposal
The first proposal is to just call srand() inline within rd_jitter with a flag ensuring we only call srand for a thread once like so:
unsigned int rd_get_seed_value()
{
/* Seed with time+thread id */
unsigned int seed = 0;
struct timeval tv;
rd_gettimeofday(&tv, NULL);
seed = (unsigned int)(tv.tv_usec / 1000);
seed ^= (unsigned int)(intptr_t)thrd_current();
return seed;
}
int rd_jitter (int low, int high) {
int rand_num;
#if HAVE_RAND_R
static RD_TLS unsigned int seed = 0;
if (unlikely(seed == 0)) {
seed = rd_get_seed_value();
}
rand_num = rand_r(&seed);
#else
static RD_TLS unsigned int srand_called = 0;
if (unlikely(srand_called == 1)) {
/* Set initial seed for use cases when rd_kafka_new and other rd_kafka APIs
* may be called by user from different threads. */
srand(rd_get_seed_value());
srand_called = 1;
}
rand_num = rand();
#endif
return (low + (rand_num % ((high-low)+1)));
}
Second Proposal
From what I can see, most of the POSIX functions that don't exist on Windows have a Win32 -> POSIX translation (e.g. rd_gettimeofday). Seeing as how rand_r doesn't exist on Windows and this issue has arisen, it seems reasonable to create an rd_rand_r function that would be a passthrough on POSIX systems with a bespoke implementation on Windows. Pulling from the musl c standard library's version of rand_r, I suggest something like this:
// rd_rand_r: proposed rand_r implementation (rdwin32.h only)
rd_rand_r(unsigned int *seed)
{
unsigned int x;
*seed = *seed * 1103515245 + 12345;
x = *seed;
x ^= x >> 11;
x ^= x << 7 & 0x9D2C5680;
x ^= x << 15 & 0xEFC60000;
x ^= x >> 18;
return x / 2;
}
// ...
int rd_jitter(int low, int high) {
int rand_num;
struct timeval tv;
rd_gettimeofday(&tv, NULL);
seed = (unsigned int)(tv.tv_usec / 1000);
seed ^= (unsigned int)(intptr_t)thrd_current();
rand_num = rd_rand_r(&seed);
return (low + (rand_num % ((high - low) + 1)));
}