librdkafka icon indicating copy to clipboard operation
librdkafka copied to clipboard

Question regardless blocked signals in threads

Open azat opened this issue 1 year ago • 8 comments

I've recently came across this (https://github.com/confluentinc/librdkafka/commit/62165e6d09082454138948922b96b54bd55058a8), what is the problem with signals? Can you please clarify on this?

Also posted as comment - https://github.com/confluentinc/librdkafka/issues/78/#issuecomment-1858177682

Cc: @edenhill

azat avatar Jan 02 '24 10:01 azat

The signal mask is set to block all signals. A new thread is created that inherits the signal mask blocking all signals. Then the original signal mask is restored. This avoids that signals handlers are executed in librdkafka threads. A safer way would be to set pthread_attr_setsigmask_np while creating the thread but this isn't portable (_np), only supported by GNU pthreads.

emasab avatar Feb 13 '24 14:02 emasab

Then the original signal mask is restored

IIRC the problem was that threads are created from the thread with blocked signal mask, so they restore signal mask to this all-blocked-signals mask.

This avoids that signals handlers are executed in librdkafka threads

But what is the problem with calling signal handlers from librdkafka threads?

The problem that I had been faced with is obtaining stacktraces for each thread in ClickHouse (https://github.com/ClickHouse/ClickHouse/pull/57907), this is done by sending RT signal to all threads, but it does not work librdkafka threads.

The original commit contains a comment:

   /* Block all signals in newly created thread.
    * To avoid race condition we block all signals in the calling
    * thread, which the new thread will inherit its sigmask from,
    * and then restore the original sigmask of the calling thread when
    * we're done creating the thread. */

What race condition is possible without blocking signals, can you shed some light on this?

azat avatar Feb 15 '24 11:02 azat

It restores the original signmask, not the one with all blocked signal, check these lines https://github.com/confluentinc/librdkafka/blob/62165e6d09082454138948922b96b54bd55058a8/rdkafka.c#L922 https://github.com/confluentinc/librdkafka/blob/62165e6d09082454138948922b96b54bd55058a8/rdkafka.c#L950

The race condition Magnus was referring to happens if the signal handler is executed in the librdkafka thread instead of the application thread it was meant to run, that's why signal are masked in the created librdkafka thread.

emasab avatar Feb 15 '24 16:02 emasab

It restores the original signmask, not the one with all blocked signal, check these lines

Not always, for instance broker thread is created with the signals blocked:

https://github.com/confluentinc/librdkafka/blob/62165e6d09082454138948922b96b54bd55058a8/rdkafka.c#L915-L950

And while creating it also blocks signals but then restore it to all-blocked, since they had been blocked already above:

https://github.com/confluentinc/librdkafka/blob/62165e6d09082454138948922b96b54bd55058a8/rdkafka_broker.c#L3414

The race condition Magnus was referring to happens if the signal handler is executed in the librdkafka thread instead of the application thread it was meant to run, that's why signal are masked in the created librdkafka thread.

But what is the race condition exactly? Maybe you can give an example?

azat avatar Feb 15 '24 19:02 azat

And while creating it also blocks signals but then restore it to all-blocked, since they had been blocked already above:

No, they are restored to oldset, that is the value they had before calling pthread_sigmask, it could be any value you had set, not all blocked.

The documentation says:

If the argument oset is not a null pointer, the previous mask shall be stored in the location pointed to by oset.

https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_sigmask.html

emasab avatar Mar 05 '24 12:03 emasab

Yes, you are talking about pthread_sigmask behavior, but the problem is that it already inherited the all blocked sigmask.

rd_kafka_new() -> blocks all signals -> rd_kafka_broker_add() ->
    get old sigmask (all signals blocked) -> create thread -> restore sigmask
restore sigmask

So I mean't at least something like this (though not sure that this is the only place):

diff --git a/src/rdkafka.c b/src/rdkafka.c
index e30faa6c..0138dfe3 100644
--- a/src/rdkafka.c
+++ b/src/rdkafka.c
@@ -2417,12 +2417,6 @@ rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *app_conf,
          * @warning `goto fail` is prohibited past this point
          */

-        rdk_thread_mutex_lock(&rk->rk_internal_rkb_lock);
-       rk->rk_internal_rkb = rd_kafka_broker_add(rk, RD_KAFKA_INTERNAL,
-                                                 RD_KAFKA_PROTO_PLAINTEXT,
-                                                 "", 0, RD_KAFKA_NODEID_UA);
-        rdk_thread_mutex_unlock(&rk->rk_internal_rkb_lock);
-
        /* Add initial list of brokers from configuration */
        if (rk->rk_conf.brokerlist) {
                if (rd_kafka_brokers_add0(rk, rk->rk_conf.brokerlist) == 0)
@@ -2430,6 +2424,12 @@ rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *app_conf,
                                        "No brokers configured");
        }

+        rdk_thread_mutex_lock(&rk->rk_internal_rkb_lock);
+       rk->rk_internal_rkb = rd_kafka_broker_add(rk, RD_KAFKA_INTERNAL,
+                                                 RD_KAFKA_PROTO_PLAINTEXT,
+                                                 "", 0, RD_KAFKA_NODEID_UA);
+        rdk_thread_mutex_unlock(&rk->rk_internal_rkb_lock);
+
 #ifndef _WIN32
        /* Restore sigmask of caller */
        pthread_sigmask(SIG_SETMASK, &oldset, NULL);

azat avatar Mar 05 '24 13:03 azat