kafka
kafka copied to clipboard
Stale consumers after commit offsets
Internally raised issue
Describe the bug Committing an offset as a member of a consumer group during a group rebalance event for that group can cause the consumer to become stale thus resulting in the consumer no longer receiving messages
To Reproduce The following scripts can be used to reproduce the issue (Note that the localhost/port need to be added in accordance with your kafka installation)
cat stale_con.q
//load kafka
OFFSET_LOG:() ; MSGS:()
\c 5000 5000
commit:{ .kfk.CommitOffsets[0i;`test1;;1b] exec partition!offset from MSGS where offset = (max;offset)fby partition ; `COMMITED set .z.p ; }
.kfk.offsetcb: {[cid;err;offsets] if[not err like "Success" ; 0N!"offsetcb not success" ; OFFSET_LOG,:(cid;err;offsets) ; `commit set { } ]; }
.kfk.consumecb:{ x[`rcvtime]:.z.p ; MSGS,:: enlist x _ `data ; `MSG set x }
cfg:(!) . flip(
(`metadata.broker.list;`$"localhost:port");
(`bootstrap.servers;`$"localhost:port");
(`group.id;`$"test_consumer_group_1");
(`enable.auto.commit;`false);
(`enable.auto.offset.store;`false);
(`auto.offset.reset;`latest);
(`session.timeout.ms;`60000);
);
.kfk.Consumer cfg
.kfk.Sub[0i;`test1;enlist[.kfk.PARTITION_UA]!enlist[.kfk.OFFSET.END] ]
cat other_cons.q
OFFSET_LOG:() ; MSGS:()
\c 5000 5000
system"sleep 2"
commit:{ .kfk.CommitOffsets[0i;`test1;;1b] exec partition!offset from MSGS where offset = (max;offset)fby partition ; `COMMITED set .z.p ; }
.kfk.offsetcb: {[cid;err;offsets] if[not err like "Success" ; 0N!"offsetcb not success" ; OFFSET_LOG,:(cid;err;offsets) ]; }
.kfk.consumecb:{ x[`rcvtime]:.z.p ; MSGS,:: enlist x _ `data ; `MSG set x ; }
cfg:(!) . flip(
(`metadata.broker.list;`$"localhost:port");
(`bootstrap.servers;`$"localhost:port");
(`group.id;`$"test_consumer_group_1");
(`enable.auto.commit;`false);
(`enable.auto.offset.store;`false);
(`auto.offset.reset;`latest);
(`session.timeout.ms;`60000);
);
clients:{ .kfk.Consumer cfg } each til 10
{ .kfk.Sub[x;`test1;enlist[.kfk.PARTITION_UA]!enlist[.kfk.OFFSET.END] ] } each clients
Steps to reproduce:
- Have a process producing on the topic
`test1
- start stale_con.q
- start other_cons.q once the stale one is up and running
- manually run commit[] on stale_con.q process in quick succession.
- If “offsetcb not success” is not seen then restart the other_cons.q process and try again
- After receiving the "Offset commit failed - Specified group generation id is not valid" from offsetcb the consumer won't consume any more messages.
Expected behavior If offset commit is unsuccessful the consumer should be able to retry commit or configuration should be set to allow this
Desktop (please complete the following information):
q).kfk.VersionSym[]
`1.4.2
Kdb: 4.0 2020.10.02
Kx kafka release: v1.4.0```
**Additional context**
Add any other context about the problem here.
To me this happens regardless of executing commit[]
. Even launching another stale_con.q
stops original process to receive message. Is there any necessary config on producer side or broker?
The issue cannot be reproduced due to another problem. The steps are below:
- Start a process producing on the topic `test1with examples/test_producer.q
- start
stale_con.q
- start
other_cons.q
once the stale one is up and running - Regardless of executing
commit
function the process ofstale_con.q
ceases to receive messages. Once processes withothers_con.q
disappear, the lone process resumes to receive messages.