numaflow
numaflow copied to clipboard
Batch Mapping & Sink
Summary
I'mm looking for the ability for a mapper and sink to take a batch of items at once. Today these take single item at once, regardless of readBatchSize
limiting the ability to take advantage of bulk APIs/processing,.
Use Cases
I have three main use cases that interest me. All of these desire to operate at near real-time, and the messages aren't necessarily required to be ordered for histogram/statistic type metrics, Thus, horizontal scaling should be easy for my use cases
- Bundling: I have a use case receiving 100k+ messages per sec, but each message is very small, < 1k. For throughput purposes it would make sense to take batches of those messages (say up to 1000), and group them into a single bundle. The overall MB of messages may not be changed, but it, from my rough experimentation, I can get more ISB throughput when operating on this smaller quantity of messages. Implementation of this bundling is of course user centric, but being able to receive mulitple messages at once would facilitate this.
- Input would be N messages with 1-N responses (typically 1 in my personal use-case).
- Bundled transactions: Some operations can be generally more efficient by sending multiple data items through at once vs a single call for each. Ex. Elasticsearch BatchAPI, or pykafka producer (ignoring numaflow's built-in kafka sink).
- Typically more of a 'sink' use case,
- Batching for performance: In a language like python, if you're doing the same operation on a lot of datasets using numpy/scipy/etc, it's useful to group them together and call the math logic once. If we could pull a batch of items at once, we can organize them for faster processing, and then write back out the results as appropriate.
- This specific case would still be N messages in, N messages out, but could be combined with Bundling
If there's a way to accomplish any of this today, I'm happy to adopt that, but I don't think the reduce capability quite fits my expectations, particularly with the time based windows and limitations of horizontal scaling in order to meet the ordering expectations.
Message from the maintainers:
If you wish to see this enhancement implemented please add a 👍 reaction to this issue! We often sort issues this way to know what to prioritize.
### Tasks
- [ ] https://github.com/numaproj/numaflow/issues/1786
- [ ] https://github.com/numaproj/numaflow/issues/1787
- [ ] https://github.com/numaproj/numaflow/issues/1788
- [ ] Release SDKs with BatchMap
- [ ] https://github.com/numaproj/numaflow/issues/1789
- [ ] https://github.com/numaproj/numaflow/issues/1827
- [ ] https://github.com/numaproj/numaflow/issues/1810
- [ ] https://github.com/numaproj/numaflow/issues/1809
- [ ] https://github.com/numaproj/numaflow/issues/1812
- [ ] #1817
- [ ] https://github.com/numaproj/numaflow/issues/1820
- [ ] https://github.com/numaproj/numaflow/issues/1819
Since I'm interested in this, I wanted to take a first attempt at implementing this following the patterns that currently exist. I'll expect to have some PRs up shortly for first few items I made on check list over this week. This will be my first contribution to this community, so will hopefully be able to work well with everyone to work through details.
I made some tasks to help track how I'm plan to commit items. My initial focus is on python as it's my primary use case, though if I can I will try to do other languages as well.
@magelisk - For sinks, the batch operation is already there in the interface exposed to the developers with streaming data from the gRPC clients, I don't expect there will be too much performance enhancement to send the data with a unary call from the client in a batch.
I think we should do batching on top on gRPC streaming, we have an issue to track this https://github.com/numaproj/numaflow/issues/1564
Thanks for the input. I hadn't used the streaming sink so I wasn't familiar with it's interface taking the iterator, that's very useful.
Following that and the recommendation that at this be built on top of the streaming interface I think I've wrapped my head around this. I see that #1564 was already given an assignment to @yhl25 . I don't know if any work has been done on this on his part (I couldn't find anything in his accessible repos), so I did an initial draft for my own personal learning opportunity. I don't wan to step on anyone, so feel free to kick it away, but hopefully I can contribute something to help.
protodef: https://github.com/numaproj/numaflow-go/pull/129 numaflow core: https://github.com/numaproj/numaflow/pull/1707 python client: https://github.com/numaproj/numaflow-python/pull/163
The biggest question of these - I did this with a NEW gRPC endpoint, so now the mapping streamer has to Fn handlers. This is a deviation from existing patterns but allows maintaining backwards combability. Not sure how the numaflow team hope to handle these kinds of breakages generally speaking.
@magelisk, thank you for being a contributor to Numaflow. We (the Numaflow team) would like to know how the community and you are using the platform, learn about the use cases, and take some feedback. Would you be open for a chat with us based on your availability?
@syayi would be happy to talk at some point. I'm generally pretty free, and US EST based
Thanks @magelisk ! Sure, drop me a note here or on slack based on what works best for you, and happy to schedule a call
We have a conversation tomorrow, but so that there's a written record I'll put this here now to solicit and document thoughts.
The currently implementation I did was one that has new gRPC call within existing service. This works, though breaks the 'function-only' method as-is. So I think there are a few options
- Remove/Replace existing map function which takes a single datum and let it take the multiple requiring caller to iterate over stream explicitly
- Consistent with user defined sink patter
- Not quite consistent with a
map
handler function that's typically used
- A new Service for this which contains single endpoint.
- A fair bit of additional overhead/new classes that are nearly duplicates of existing. Could share common classes between protos, but again, this isn't something that's currently done so would need to balance DRY vs compartmentalization
- Should maintain full backwards compability
- Use as-is in current MR (
MapFn
andBatchMapFn
, and find some other work around, or adjustinterface
definition to have styles that still work with function-only patter- I think this is more complex, more moving parts, but should maintain existing backwards compatibility (I haven't prototyped this to see what final impact to the interface classes are
I haven't been able to pull this into my operation test bed yet, but local representative stuff looks good. Thanks.
We have included this in the 1.3.0 release.