async-zmq
async-zmq copied to clipboard
Support for streaming multi-part messages
A problem with multipart messages in the current implementation is that senders need to allocate all of the message parts to a VecDeque
before sending. Likewise, receivers must allocate message parts before they can be processed. Applications such as an XPUB/XSUB proxy would be very inefficient with this API.
Ideally, a sender should be able to stream parts from another source, without an intermediate allocation. Receivers should be able to start processing the first part, before the rest arrive.
The API that I am envisioning could look something like this:
-
socket.send(msg)
can send only single messages - Use
socket.send_multipart(iter)
to send messages from anIntoIterator
- Use
socket.send_multipart_stream(stream)
to stream messages -
socket.recv()
returns an error if the message is multipart -
socket.recv_multipart()
returns aStream
. It works for either multipart or single message.
This is quite a big change, which will impact the public API and internal structures. It will need some careful design, and my intention with opening this issue is to trigger some conversation first.
Back when it needed to stream the multipart, I thought it would be useful to use a Vecdeque
to help. But as time goes by, it did feel more like an overhead.
The proposed methods look reasonable. I also have an idea that maybe we could make socket types generic. So they are able to send/receive messages that satisfy some trait bounds. This should be easier to refactor and the interface would probably still the same (or similar).
Btw I do agree we should expand more methods on socket types. For instance, there could be a method for xpub
to set welcome message directly instead of having to get the original zmq socket.
I agree about adding a type parameter for the message type.
One slightly awkward thing is how ZMQ treats topics. It is reasonable for a topic to be of a different type than the rest of the message parts, but it's also possible for the topic to be the leading bytes of a message. For example you could serialise structs so that the id
field is guaranteed to be serialized first, so can act as topic without duplication.
So if I create a Subscribe<T, M>
, I would still need to be able to express whether the topic should be discarded or kept as part of M
.
Sorry for the late reply. It's a busy week for me. Anyway, indeed it's unfortunately that it will have to do something like this. But I guess this is the trade off. I would prefer to just stick with one generic to make it simple IMHO.
This also why this crate provides a method to reach inner zmq socket. If someone wants flexibility, he could still find a way to call original socket methods. I'll take some experiments and see how it turns out in next couple of days.
It definitely needs some thought to get the right balance between simplicity, safety and versatility.
One possibility is to have a sensible default for the topic, e.g. Subscribe<M, T = [u8]>
. This seems to work well for things like the HashMap
hasher in std, which mostly you don't change but you can if you want to.
Yeah, that sounds pretty great! I'm refactoring with generic <M>
with trait bound of IntoIterator
first. Then let's see what methods should be added with <T>
introduced.
We could also just make the decision to only support topics which are sent as a separate message part. I have no idea how common the other patterns are in the wild, but this would meet my use case. If users want more versatility then we can think about it then.
Right, let's do this for now since this is also what I need in most of time.
Btw, I have some update on generic of IntoIterator
. It seems the send method on original zmq socket will consume the data but Pin<&mut Self>
is a reference that cant be moved. I have tried some work around, but it seems it will have to create another instance no matter what in the end.
What one of trait bound for Message
from original zmq
has is Into<Message> + Clone
. If users send with u8
slice, it's still able to not allocate the memory. I'll probably do a slice with this bound on the sending side and return a owned vector as multipart on receiving side.
With the release of v0.3.0, it introduces two new types Multipart
/MultipartIter
to replace MessageBuf
. Sending multipart message would not allocate additional memory anymore.
However, I couldn't find a way to make multipart message queue to turn into iterator under the hood yet. Users will have to explicitly use .into()
to turn their type into iterator when calling send
on type with Sink
implemented.
I would prefer to leave this issue open in hope for some enhancement in the future.