kafka
kafka copied to clipboard
Stale consumers after commit
Describe the bug Committing an offset as a member of a consumer group during a rebalancing of the group can cause a consumer to become stale.
To Reproduce
cat stale_con.q
\l kfk.q
OFFSET_LOG:() ; MSGS:()
\c 5000 5000
commit:{ .kfk.CommitOffsets[0i;`test1;;1b] exec partition!offset from MSGS where offset = (max;offset)fby partition ; `COMMITED set .z.p ; }
.kfk.offsetcb: {[cid;err;offsets] if[not err like "Success" ; .ms.sys.message "offsetcb not success" ; OFFSET_LOG,:(cid;err;offsets) ; `commit set { } ]; }
.kfk.consumecb:{ x[`rcvtime]:.z.p ; MSGS,:: enlist x _ `data ; `MSG set x }
cfg:(!) . flip(
(`metadata.broker.list;`$"localhost:9092");
(`bootstrap.servers;`$"localhost:9092");
(`group.id;`$"test_consumer_group_1");
(`enable.auto.commit;`false);
(`enable.auto.offset.store;`false);
(`auto.offset.reset;`latest);
(`session.timeout.ms;`60000);
);
.kfk.Consumer cfg
.kfk.Sub[0i;`test1;enlist[.kfk.PARTITION_UA]!enlist[.kfk.OFFSET.END] ]
cat other__cons.q
\l kfk.q
OFFSET_LOG:() ; MSGS:()
\c 5000 5000
system"sleep 2"
commit:{ .kfk.CommitOffsets[0i;`test1;;1b] exec partition!offset from MSGS where offset = (max;offset)fby partition ; `COMMITED set .z.p ; }
.kfk.offsetcb: {[cid;err;offsets] if[not err like "Success" ; .ms.sys.message "offsetcb not success" ; OFFSET_LOG,:(cid;err;offsets) ]; }
.kfk.consumecb:{ x[`rcvtime]:.z.p ; MSGS,:: enlist x _ `data ; `MSG set x ; }
cfg:(!) . flip(
(`metadata.broker.list;`$"localhost:9092");
(`bootstrap.servers;`$"localhost:9092");
(`group.id;`$"test_consumer_group_1");
(`enable.auto.commit;`false);
(`enable.auto.offset.store;`false);
(`auto.offset.reset;`latest);
(`session.timeout.ms;`60000);
);
clients:{ .kfk.Consumer cfg } each til 10
{ .kfk.Sub[x;`test1;enlist[.kfk.PARTITION_UA]!enlist[.kfk.OFFSET.END] ] } each clients
Steps:
- have a process producing on your topic
- start stale_con.q
- start other_cons.q once the stale one is up and running
- manually run commit[] on stale_con.q process in quick succession.
- If “offsetcb not success” is not seen then restart the other_cons.q process and try again
- After receiving the "Offset commit failed - Specified group generation id is not valid" from offsetcb the consumer won't consume any more messages.
Expected behavior Consumer groups should continue to consume messages from an appropriate location following all rebalancing events