pond icon indicating copy to clipboard operation
pond copied to clipboard

Stick to certain workers (feature request?)

Open lospejos opened this issue 7 months ago • 1 comments

Hi!

I'm analyzing how can I use pond library in my current project, and I faced a (probably?) missing feature in pond. My scenario is: select data for processing, create a pool of workers, and process each row in data using this pool of workers. Processed data have a certain key value, and I want to ensure that data with the same key value will be processed by the same worker from the pool, to maintain data order. Just like it is done in Kafka, f.e., where the same consumer from group will process the events with same key.

Unfortunately, I cannot find how to make it using pond. Was it my bad searching the pond library, or pond really lacks such functionality?

Thanks!

lospejos avatar May 12 '25 09:05 lospejos

Hey @lospejos!

Sorry for the late response. I think i got what you are trying to achieve but yeah, there is no built-in mechanism to implement the pattern you want. Pond's worker pools do not have the concept of "task/message key" as Kafka does, which is then hashed and used to guarantee messages having the same key end up in the same partition (and thus, read order is guaranteed).

One way this could be implemented using pond is by using subpools of size 1 (max of 1 gouroutine per subpool). This is a relatively recent feature and it allows you to subdivide a worker pool into smaller pools which have a fraction of the concurrency limit of their "parent" pool. E.g. you can have a worker pool of size 10 and create any number of sub-pools with a smaller limit, such as 2, and have both limits honored at the same time.

pool := pond.NewPool(0) // unlimited concurrency limit

subpools := make(map[int]pond.Pool)

for _, task := range tasks {
  partition := hashKey(task.Key()) // TODO: implement the hashing function here to ensure each key ends up in a different partition
  if _, ok := subpools[partition]; !ok {
    subpools[partition] = pool.NewSubpool(1)
  }

  subpools[key].Submit(task)
}

pool.StopAndWait() // Optional: wait for all tasks to complete and stop the pool

Would something like this work for your use case? Each subpool will scale down to 0 automatically if no tasks are submitted for that partition at some point and scaled back up to 1 if more are sent later on.

Doing this you'll have a max concurrency equal to the max number of partitions, but this is always the case if you need to ensure messages of the same key are processed in order.

alitto avatar May 23 '25 12:05 alitto