kafka_fdw
kafka_fdw copied to clipboard
No connection timeout
If there is no connection to the kafka broker an attempt to establish such a connection hangs indefinitely and doesn't check for interrupts.
Unfortunately I don't have a full backtrace, but here's librdkafka
part of it:
#0 0x00007f127898d96a in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1 0x00007f1279e61e59 in cnd_timedwait (cond=cond@entry=0x55cab072d898, mtx=mtx@entry=0x55cab072d870, ts=ts@entry=0x7ffe3d7ca300) at tinycthread.c:462
#2 0x00007f1279e62273 in cnd_timedwait_abs (cnd=cnd@entry=0x55cab072d898, mtx=mtx@entry=0x55cab072d870, tspec=tspec@entry=0x7ffe3d7ca300) at tinycthread_extra.c:100
#3 0x00007f1279e2bdff in rd_kafka_q_serve (rkq=rkq@entry=0x55cab072d870, timeout_ms=<optimized out>, max_cnt=0, cb_type=cb_type@entry=RD_KAFKA_Q_CB_RETURN,
callback=callback@entry=0x7f1279df67f0 <rd_kafka_consume_cb>, opaque=opaque@entry=0x7ffe3d7ca410) at rdkafka_queue.h:475
#4 0x00007f1279df6f51 in rd_kafka_consume_callback0 (rkq=0x55cab072d870, timeout_ms=<optimized out>, max_cnt=<optimized out>, consume_cb=<optimized out>, opaque=<optimized out>)
at rdkafka.c:2617
The full stacktrace:
#0 0x00007f018c4b196a in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1 0x00007f018b10ee59 in cnd_timedwait (cond=cond@entry=0x55d599512918,
mtx=mtx@entry=0x55d5995128f0, ts=ts@entry=0x7fffd3c3ea50) at tinycthread.c:462
#2 0x00007f018b10f273 in cnd_timedwait_abs (cnd=cnd@entry=0x55d599512918,
mtx=mtx@entry=0x55d5995128f0, tspec=tspec@entry=0x7fffd3c3ea50)
at tinycthread_extra.c:100
#3 0x00007f018b0d8dff in rd_kafka_q_serve (rkq=rkq@entry=0x55d5995128f0,
timeout_ms=timeout_ms@entry=100, max_cnt=max_cnt@entry=0,
cb_type=cb_type@entry=RD_KAFKA_Q_CB_CALLBACK,
callback=callback@entry=0x7f018b0a7090 <rd_kafka_poll_cb>, opaque=opaque@entry=0x0)
at rdkafka_queue.h:475
#4 0x00007f018b0a637e in rd_kafka_query_watermark_offsets (rk=<optimized out>,
topic=<optimized out>, partition=partition@entry=0, low=low@entry=0x7fffd3c3ecc0,
high=high@entry=0x7fffd3c3ecc8, timeout_ms=timeout_ms@entry=1000) at rdkafka.c:3102
#5 0x00007f018c7d00ff in kafka_get_watermarks (fcinfo=0x55d5995c6310) at src/utils.c:72
#6 0x000055d597c0ef1f in ExecMakeFunctionResultSet (fcache=0x55d5995c62a0,
econtext=econtext@entry=0x55d5995c5860, argContext=<optimized out>,
isNull=<optimized out>, isDone=isDone@entry=0x55d5995c6288) at execSRF.c:604
#7 0x000055d597c2a7ba in ExecProjectSRF (node=node@entry=0x55d5995c5750,
continuing=continuing@entry=false) at nodeProjectSet.c:175
#8 0x000055d597c2a875 in ExecProjectSet (pstate=0x55d5995c5750) at nodeProjectSet.c:105
#9 0x000055d597c0570b in ExecProcNode (node=0x55d5995c5750)
at ../../../src/include/executor/executor.h:247
#10 ExecutePlan (execute_once=<optimized out>, dest=0x55d5995d3a60,
direction=<optimized out>, numberTuples=0, sendTuples=<optimized out>,
operation=CMD_SELECT, use_parallel_mode=<optimized out>, planstate=0x55d5995c5750,
estate=0x55d5995c5540) at execMain.c:1723
#11 standard_ExecutorRun (queryDesc=0x55d5995bf7e0, direction=<optimized out>, count=0,
execute_once=<optimized out>) at execMain.c:364
#12 0x00007f018c8e0fc5 in pgss_ExecutorRun (queryDesc=0x55d5995bf7e0,
direction=ForwardScanDirection, count=0, execute_once=<optimized out>)
at pg_stat_statements.c:892
#13 0x000055d597d5026b in PortalRunSelect (portal=portal@entry=0x55d599587fe0,
forward=forward@entry=true, count=0, count@entry=9223372036854775807,
dest=dest@entry=0x55d5995d3a60) at pquery.c:932
#14 0x000055d597d5171e in PortalRun (portal=portal@entry=0x55d599587fe0,
count=count@entry=9223372036854775807, isTopLevel=isTopLevel@entry=true,
run_once=run_once@entry=true, dest=dest@entry=0x55d5995d3a60,
altdest=altdest@entry=0x55d5995d3a60, completionTag=0x7fffd3c3f340 "") at pquery.c:773
#15 0x000055d597d4d5e7 in exec_simple_query (
query_string=0x55d5994eb150 "select kafka_get_watermarks('kafka_7_cpc'::regclass);")
at postgres.c:1145
#16 0x000055d597d4ee5e in PostgresMain (argc=<optimized out>,
argv=argv@entry=0x55d599520df8, dbname=<optimized out>, username=<optimized out>)
at postgres.c:4182
#17 0x000055d597cdd036 in BackendRun (port=0x55d59951d7a0) at postmaster.c:4358
#18 BackendStartup (port=0x55d59951d7a0) at postmaster.c:4030
#19 ServerLoop () at postmaster.c:1707
#20 0x000055d597cddf07 in PostmasterMain (argc=5, argv=0x55d5994e5910) at postmaster.c:1380
#21 0x000055d597a6f636 in main (argc=5, argv=0x55d5994e5910) at main.c:228
I think this might be a bug in librdkafka
but I cannot yet assert this with full confidence. From my perspective it looks like in rd_kafka_query_watermark_offsets()
the result of rd_kafka_q_serve()
is not interpreted correctly. It should return the number of messages that have been processed, but it's treated as if it was rd_kafka_op_res_t
(a result value of a specified callback function, rd_kafka_poll_cb
). It either doesn't make sense or i don't see something.
In our case rd_kafka_q_serve
returns 0 because it times out. The caller compares the result with RD_KAFKA_OP_RES_YIELD
(which is equal to 3 if converted to integer) and proceeds to the next iteration. This repeats again and again and execution stucks in the infinite loop. I prepared a rough draft for the patch:
- while (state.err == RD_KAFKA_RESP_ERR__IN_PROGRESS &&
- rd_kafka_q_serve(rkq, 100, 0, RD_KAFKA_Q_CB_CALLBACK,
- rd_kafka_poll_cb, NULL) !=
- RD_KAFKA_OP_RES_YIELD)
- ;
+ while (state.err == RD_KAFKA_RESP_ERR__IN_PROGRESS)
+ {
+ if (rd_kafka_q_serve(rkq, 100, 0, RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL) == 0)
+ {
+ state.err = RD_KAFKA_RESP_ERR__TIMED_OUT;
+ }
+ }
It worked in our case. I'll double check it soon and will create a PR to librdkafka
.
Posting it here for visibility: https://github.com/edenhill/librdkafka/pull/2535