franz-go
franz-go copied to clipboard
OnPartitionsAssigned called twice for the same partitions
Hi!
I've encountered an issue when OnPartitionsAssigned called twice with the same set of partitions.
Here are franz logs with my comments (read from bottom to top):
2024-04-11 11:28:29.115 {"how":0,"input":{"pb_metering_resource":{"0":{"At":9679,"Epoch":-1,"CurrentEpoch":0},"3":{"At":50126,"Epoch":-1,"CurrentEpoch":0}}},"level":"info","msg":"assigning partitions","time":"2024-04-11T08:28:29Z","why":"newly fetched offsets for group resource_tracker"}
2024-04-11 11:28:22.650 {"group":"resource_tracker","level":"info","msg":"beginning heartbeat loop","time":"2024-04-11T08:28:22Z"}
2024-04-11 11:28:22.650 {"added":{"pb_metering_resource":[0,3]},"group":"resource_tracker","level":"info","lost":{},"msg":"new group session begun","time":"2024-04-11T08:28:22Z"}
2024-04-11 11:28:22.650 {"assigned":{"pb_metering_resource":[0,3,4,7,10,11]},"group":"resource_tracker","level":"info","msg":"synced","time":"2024-04-11T08:28:22Z"}
2024-04-11 11:28:22.648 {"group":"resource_tracker","level":"info","msg":"syncing","protocol":"cooperative-sticky","protocol_type":"consumer","time":"2024-04-11T08:28:22Z"}
2024-04-11 11:28:22.647 {"group":"resource_tracker","level":"info","msg":"joining group","time":"2024-04-11T08:28:22Z"}
2024-04-11 11:28:22.646 {"level":"info","msg":"immediate metadata update triggered","time":"2024-04-11T08:28:22Z","why":"waitmeta after heartbeat error"}
2024-04-11 11:28:22.646 {"group":"resource_tracker","level":"info","msg":"cooperative consumer calling onRevoke at the end of a session even though no partitions were lost","time":"2024-04-11T08:28:22Z"}
2024-04-11 11:28:22.646 {"err":"REBALANCE_IN_PROGRESS: The group is rebalancing, so a rejoin is needed.","group":"resource_tracker","level":"info","msg":"heartbeat errored","time":"2024-04-11T08:28:22Z"}
2024-04-11 11:28:07.711 {"how":0,"input":{"pb_metering_resource":{"10":{"At":34472,"Epoch":-1,"CurrentEpoch":0},"11":{"At":13108,"Epoch":-1,"CurrentEpoch":0},"4":{"At":83302,"Epoch":-1,"CurrentEpoch":0},"7":{"At":726686,"Epoch":-1,"CurrentEpoch":0}}},"level":"info","msg":"assigning partitions","time":"2024-04-11T08:28:07Z","why":"newly fetched offsets for group resource_tracker"}
2024-04-11 11:28:07.653 {"group":"resource_tracker","level":"info","msg":"beginning heartbeat loop","time":"2024-04-11T08:28:07Z"}
here OnPartitionsAssigned({"pb_metering_resource":[4,7,10,11]}) called again
2024-04-11 11:28:07.653 {"added":{},"group":"resource_tracker","level":"info","lost":{},"msg":"new group session begun","time":"2024-04-11T08:28:07Z"}
2024-04-11 11:28:07.653 {"assigned":{"pb_metering_resource":[4,7,10,11]},"group":"resource_tracker","level":"info","msg":"synced","time":"2024-04-11T08:28:07Z"}
2024-04-11 11:28:07.653 {"group":"resource_tracker","level":"info","msg":"syncing","protocol":"cooperative-sticky","protocol_type":"consumer","time":"2024-04-11T08:28:07Z"}
2024-04-11 11:28:03.828 {"group":"resource_tracker","level":"info","msg":"joining group","time":"2024-04-11T08:28:03Z"}
2024-04-11 11:28:03.828 {"group":"resource_tracker","level":"info","msg":"fetch offsets failed due to context cancelation","time":"2024-04-11T08:28:03Z"}
here OnPartitionsRevoked({}) called
2024-04-11 11:28:03.827 {"group":"resource_tracker","level":"info","msg":"cooperative consumer calling onRevoke at the end of a session even though no partitions were lost","time":"2024-04-11T08:28:03Z"}
2024-04-11 11:28:01.640 {"level":"info","msg":"immediate metadata update triggered","time":"2024-04-11T08:28:01Z","why":"waitmeta after heartbeat error"}
2024-04-11 11:28:01.640 {"err":"REBALANCE_IN_PROGRESS: The group is rebalancing, so a rejoin is needed.","group":"resource_tracker","level":"info","msg":"heartbeat errored","time":"2024-04-11T08:28:01Z"}
2024-04-11 11:27:52.639 {"group":"resource_tracker","level":"info","msg":"beginning heartbeat loop","time":"2024-04-11T08:27:52Z"}
here OnPartitionsAssigned({"pb_metering_resource":[4,7,10,11]}) called
2024-04-11 11:27:52.639 {"added":{"pb_metering_resource":[4,7,10,11]},"group":"resource_tracker","level":"info","lost":{},"msg":"new group session begun","time":"2024-04-11T08:27:52Z"}
2024-04-11 11:27:52.639 {"assigned":{"pb_metering_resource":[4,7,10,11]},"group":"resource_tracker","level":"info","msg":"synced","time":"2024-04-11T08:27:52Z"}
The problem is that sometimes after reballance OnPartitionsAssigned({"pb_metering_resource":[4,7,10,11]})
is called twice in a row.
I'll be very appreciated for help.
franz version: v1.15.2
client initialisation:
kafka, err := kgo.NewClient(
kgo.SeedBrokers(...),
kgo.ConsumeTopics(...),
kgo.ConsumerGroup(...),
kgo.ClientID(fmt.Sprintf("%s-%s", params.KafkaParams.AppName, uuid.Must(uuid.NewV4()))),
kgo.SessionTimeout(12 * time.Second),
kgo.HeartbeatInterval(3 * time.Second),
// Offset commit settings
kgo.DisableAutoCommit(),
kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),
// Network settings
kgo.DialTimeout(3 * time.Second),
kgo.FetchMaxBytes(16MiB)),
kgo.BrokerMaxReadBytes(16MiB),
// Partition assignment and rebalancing settings
kgo.BlockRebalanceOnPoll(),
kgo.OnPartitionsAssigned(c.onPartitionsAssigned),
kgo.AdjustFetchOffsetsFn(c.adjustFetchOffsetsFn),
kgo.OnPartitionsRevoked(c.onPartitionsRevoked),
kgo.OnPartitionsLost(c.onPartitionsRevoked),
)