kafka_fdw icon indicating copy to clipboard operation
kafka_fdw copied to clipboard

No connection timeout

Open nick-adjust opened this issue 5 years ago • 3 comments

If there is no connection to the kafka broker an attempt to establish such a connection hangs indefinitely and doesn't check for interrupts.

Unfortunately I don't have a full backtrace, but here's librdkafka part of it:

#0  0x00007f127898d96a in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1  0x00007f1279e61e59 in cnd_timedwait (cond=cond@entry=0x55cab072d898, mtx=mtx@entry=0x55cab072d870, ts=ts@entry=0x7ffe3d7ca300) at tinycthread.c:462
#2  0x00007f1279e62273 in cnd_timedwait_abs (cnd=cnd@entry=0x55cab072d898, mtx=mtx@entry=0x55cab072d870, tspec=tspec@entry=0x7ffe3d7ca300) at tinycthread_extra.c:100
#3  0x00007f1279e2bdff in rd_kafka_q_serve (rkq=rkq@entry=0x55cab072d870, timeout_ms=<optimized out>, max_cnt=0, cb_type=cb_type@entry=RD_KAFKA_Q_CB_RETURN, 
    callback=callback@entry=0x7f1279df67f0 <rd_kafka_consume_cb>, opaque=opaque@entry=0x7ffe3d7ca410) at rdkafka_queue.h:475
#4  0x00007f1279df6f51 in rd_kafka_consume_callback0 (rkq=0x55cab072d870, timeout_ms=<optimized out>, max_cnt=<optimized out>, consume_cb=<optimized out>, opaque=<optimized out>)
    at rdkafka.c:2617

nick-adjust avatar Sep 18 '19 13:09 nick-adjust

The full stacktrace:

#0  0x00007f018c4b196a in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1  0x00007f018b10ee59 in cnd_timedwait (cond=cond@entry=0x55d599512918, 
    mtx=mtx@entry=0x55d5995128f0, ts=ts@entry=0x7fffd3c3ea50) at tinycthread.c:462
#2  0x00007f018b10f273 in cnd_timedwait_abs (cnd=cnd@entry=0x55d599512918, 
    mtx=mtx@entry=0x55d5995128f0, tspec=tspec@entry=0x7fffd3c3ea50)
    at tinycthread_extra.c:100
#3  0x00007f018b0d8dff in rd_kafka_q_serve (rkq=rkq@entry=0x55d5995128f0, 
    timeout_ms=timeout_ms@entry=100, max_cnt=max_cnt@entry=0, 
    cb_type=cb_type@entry=RD_KAFKA_Q_CB_CALLBACK, 
    callback=callback@entry=0x7f018b0a7090 <rd_kafka_poll_cb>, opaque=opaque@entry=0x0)
    at rdkafka_queue.h:475
#4  0x00007f018b0a637e in rd_kafka_query_watermark_offsets (rk=<optimized out>, 
    topic=<optimized out>, partition=partition@entry=0, low=low@entry=0x7fffd3c3ecc0, 
    high=high@entry=0x7fffd3c3ecc8, timeout_ms=timeout_ms@entry=1000) at rdkafka.c:3102
#5  0x00007f018c7d00ff in kafka_get_watermarks (fcinfo=0x55d5995c6310) at src/utils.c:72
#6  0x000055d597c0ef1f in ExecMakeFunctionResultSet (fcache=0x55d5995c62a0, 
    econtext=econtext@entry=0x55d5995c5860, argContext=<optimized out>, 
    isNull=<optimized out>, isDone=isDone@entry=0x55d5995c6288) at execSRF.c:604
#7  0x000055d597c2a7ba in ExecProjectSRF (node=node@entry=0x55d5995c5750, 
    continuing=continuing@entry=false) at nodeProjectSet.c:175
#8  0x000055d597c2a875 in ExecProjectSet (pstate=0x55d5995c5750) at nodeProjectSet.c:105
#9  0x000055d597c0570b in ExecProcNode (node=0x55d5995c5750)
    at ../../../src/include/executor/executor.h:247
#10 ExecutePlan (execute_once=<optimized out>, dest=0x55d5995d3a60, 
    direction=<optimized out>, numberTuples=0, sendTuples=<optimized out>, 
    operation=CMD_SELECT, use_parallel_mode=<optimized out>, planstate=0x55d5995c5750, 
    estate=0x55d5995c5540) at execMain.c:1723
#11 standard_ExecutorRun (queryDesc=0x55d5995bf7e0, direction=<optimized out>, count=0, 
    execute_once=<optimized out>) at execMain.c:364
#12 0x00007f018c8e0fc5 in pgss_ExecutorRun (queryDesc=0x55d5995bf7e0, 
    direction=ForwardScanDirection, count=0, execute_once=<optimized out>)
    at pg_stat_statements.c:892
#13 0x000055d597d5026b in PortalRunSelect (portal=portal@entry=0x55d599587fe0, 
    forward=forward@entry=true, count=0, count@entry=9223372036854775807, 
    dest=dest@entry=0x55d5995d3a60) at pquery.c:932
#14 0x000055d597d5171e in PortalRun (portal=portal@entry=0x55d599587fe0, 
    count=count@entry=9223372036854775807, isTopLevel=isTopLevel@entry=true, 
    run_once=run_once@entry=true, dest=dest@entry=0x55d5995d3a60, 
    altdest=altdest@entry=0x55d5995d3a60, completionTag=0x7fffd3c3f340 "") at pquery.c:773
#15 0x000055d597d4d5e7 in exec_simple_query (
    query_string=0x55d5994eb150 "select kafka_get_watermarks('kafka_7_cpc'::regclass);")
    at postgres.c:1145
#16 0x000055d597d4ee5e in PostgresMain (argc=<optimized out>, 
    argv=argv@entry=0x55d599520df8, dbname=<optimized out>, username=<optimized out>)
    at postgres.c:4182
#17 0x000055d597cdd036 in BackendRun (port=0x55d59951d7a0) at postmaster.c:4358
#18 BackendStartup (port=0x55d59951d7a0) at postmaster.c:4030
#19 ServerLoop () at postmaster.c:1707
#20 0x000055d597cddf07 in PostmasterMain (argc=5, argv=0x55d5994e5910) at postmaster.c:1380
#21 0x000055d597a6f636 in main (argc=5, argv=0x55d5994e5910) at main.c:228

zilder avatar Sep 23 '19 09:09 zilder

I think this might be a bug in librdkafka but I cannot yet assert this with full confidence. From my perspective it looks like in rd_kafka_query_watermark_offsets() the result of rd_kafka_q_serve() is not interpreted correctly. It should return the number of messages that have been processed, but it's treated as if it was rd_kafka_op_res_t (a result value of a specified callback function, rd_kafka_poll_cb). It either doesn't make sense or i don't see something.

In our case rd_kafka_q_serve returns 0 because it times out. The caller compares the result with RD_KAFKA_OP_RES_YIELD (which is equal to 3 if converted to integer) and proceeds to the next iteration. This repeats again and again and execution stucks in the infinite loop. I prepared a rough draft for the patch:

-        while (state.err == RD_KAFKA_RESP_ERR__IN_PROGRESS &&
-               rd_kafka_q_serve(rkq, 100, 0, RD_KAFKA_Q_CB_CALLBACK,
-                                rd_kafka_poll_cb, NULL) !=
-               RD_KAFKA_OP_RES_YIELD)
-                ;
+        while (state.err == RD_KAFKA_RESP_ERR__IN_PROGRESS)
+        {
+                if (rd_kafka_q_serve(rkq, 100, 0, RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL) == 0)
+                {
+                        state.err = RD_KAFKA_RESP_ERR__TIMED_OUT;
+                }
+        }

It worked in our case. I'll double check it soon and will create a PR to librdkafka.

zilder avatar Sep 23 '19 16:09 zilder

Posting it here for visibility: https://github.com/edenhill/librdkafka/pull/2535

nick-adjust avatar Oct 01 '19 09:10 nick-adjust