franz-go icon indicating copy to clipboard operation
franz-go copied to clipboard

OnPartitionsAssigned called twice for the same partitions

Open iimos opened this issue 2 months ago • 4 comments

Hi!

I've encountered an issue when OnPartitionsAssigned called twice with the same set of partitions.

Here are franz logs with my comments (read from bottom to top):

2024-04-11 11:28:29.115	{"how":0,"input":{"pb_metering_resource":{"0":{"At":9679,"Epoch":-1,"CurrentEpoch":0},"3":{"At":50126,"Epoch":-1,"CurrentEpoch":0}}},"level":"info","msg":"assigning partitions","time":"2024-04-11T08:28:29Z","why":"newly fetched offsets for group resource_tracker"}
2024-04-11 11:28:22.650	{"group":"resource_tracker","level":"info","msg":"beginning heartbeat loop","time":"2024-04-11T08:28:22Z"}
2024-04-11 11:28:22.650	{"added":{"pb_metering_resource":[0,3]},"group":"resource_tracker","level":"info","lost":{},"msg":"new group session begun","time":"2024-04-11T08:28:22Z"}
2024-04-11 11:28:22.650	{"assigned":{"pb_metering_resource":[0,3,4,7,10,11]},"group":"resource_tracker","level":"info","msg":"synced","time":"2024-04-11T08:28:22Z"}
2024-04-11 11:28:22.648	{"group":"resource_tracker","level":"info","msg":"syncing","protocol":"cooperative-sticky","protocol_type":"consumer","time":"2024-04-11T08:28:22Z"}
2024-04-11 11:28:22.647	{"group":"resource_tracker","level":"info","msg":"joining group","time":"2024-04-11T08:28:22Z"}
2024-04-11 11:28:22.646	{"level":"info","msg":"immediate metadata update triggered","time":"2024-04-11T08:28:22Z","why":"waitmeta after heartbeat error"}
2024-04-11 11:28:22.646	{"group":"resource_tracker","level":"info","msg":"cooperative consumer calling onRevoke at the end of a session even though no partitions were lost","time":"2024-04-11T08:28:22Z"}
2024-04-11 11:28:22.646	{"err":"REBALANCE_IN_PROGRESS: The group is rebalancing, so a rejoin is needed.","group":"resource_tracker","level":"info","msg":"heartbeat errored","time":"2024-04-11T08:28:22Z"}
2024-04-11 11:28:07.711	{"how":0,"input":{"pb_metering_resource":{"10":{"At":34472,"Epoch":-1,"CurrentEpoch":0},"11":{"At":13108,"Epoch":-1,"CurrentEpoch":0},"4":{"At":83302,"Epoch":-1,"CurrentEpoch":0},"7":{"At":726686,"Epoch":-1,"CurrentEpoch":0}}},"level":"info","msg":"assigning partitions","time":"2024-04-11T08:28:07Z","why":"newly fetched offsets for group resource_tracker"}
2024-04-11 11:28:07.653	{"group":"resource_tracker","level":"info","msg":"beginning heartbeat loop","time":"2024-04-11T08:28:07Z"}

here OnPartitionsAssigned({"pb_metering_resource":[4,7,10,11]}) called again

2024-04-11 11:28:07.653	{"added":{},"group":"resource_tracker","level":"info","lost":{},"msg":"new group session begun","time":"2024-04-11T08:28:07Z"}
2024-04-11 11:28:07.653	{"assigned":{"pb_metering_resource":[4,7,10,11]},"group":"resource_tracker","level":"info","msg":"synced","time":"2024-04-11T08:28:07Z"}
2024-04-11 11:28:07.653	{"group":"resource_tracker","level":"info","msg":"syncing","protocol":"cooperative-sticky","protocol_type":"consumer","time":"2024-04-11T08:28:07Z"}
2024-04-11 11:28:03.828	{"group":"resource_tracker","level":"info","msg":"joining group","time":"2024-04-11T08:28:03Z"}
2024-04-11 11:28:03.828	{"group":"resource_tracker","level":"info","msg":"fetch offsets failed due to context cancelation","time":"2024-04-11T08:28:03Z"}

here OnPartitionsRevoked({}) called

2024-04-11 11:28:03.827	{"group":"resource_tracker","level":"info","msg":"cooperative consumer calling onRevoke at the end of a session even though no partitions were lost","time":"2024-04-11T08:28:03Z"}
2024-04-11 11:28:01.640	{"level":"info","msg":"immediate metadata update triggered","time":"2024-04-11T08:28:01Z","why":"waitmeta after heartbeat error"}
2024-04-11 11:28:01.640	{"err":"REBALANCE_IN_PROGRESS: The group is rebalancing, so a rejoin is needed.","group":"resource_tracker","level":"info","msg":"heartbeat errored","time":"2024-04-11T08:28:01Z"}
2024-04-11 11:27:52.639	{"group":"resource_tracker","level":"info","msg":"beginning heartbeat loop","time":"2024-04-11T08:27:52Z"}

here OnPartitionsAssigned({"pb_metering_resource":[4,7,10,11]}) called

2024-04-11 11:27:52.639	{"added":{"pb_metering_resource":[4,7,10,11]},"group":"resource_tracker","level":"info","lost":{},"msg":"new group session begun","time":"2024-04-11T08:27:52Z"}
2024-04-11 11:27:52.639	{"assigned":{"pb_metering_resource":[4,7,10,11]},"group":"resource_tracker","level":"info","msg":"synced","time":"2024-04-11T08:27:52Z"}

The problem is that sometimes after reballance OnPartitionsAssigned({"pb_metering_resource":[4,7,10,11]}) is called twice in a row.

I'll be very appreciated for help.


franz version: v1.15.2

client initialisation:

kafka, err := kgo.NewClient(
	kgo.SeedBrokers(...),
	kgo.ConsumeTopics(...),
	kgo.ConsumerGroup(...),
	kgo.ClientID(fmt.Sprintf("%s-%s", params.KafkaParams.AppName, uuid.Must(uuid.NewV4()))),
	kgo.SessionTimeout(12 * time.Second),
	kgo.HeartbeatInterval(3 * time.Second),

	// Offset commit settings
	kgo.DisableAutoCommit(),
	kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),

	// Network settings
	kgo.DialTimeout(3 * time.Second),
	kgo.FetchMaxBytes(16MiB)),
	kgo.BrokerMaxReadBytes(16MiB),

	// Partition assignment and rebalancing settings
	kgo.BlockRebalanceOnPoll(),
	kgo.OnPartitionsAssigned(c.onPartitionsAssigned),
	kgo.AdjustFetchOffsetsFn(c.adjustFetchOffsetsFn),
	kgo.OnPartitionsRevoked(c.onPartitionsRevoked),
	kgo.OnPartitionsLost(c.onPartitionsRevoked),
)

iimos avatar Apr 11 '24 17:04 iimos