cloudstate icon indicating copy to clipboard operation
cloudstate copied to clipboard

P2P messaging

Open jroper opened this issue 6 years ago • 5 comments

We would like to add a P2P messaging pattern, that is, a protocol that user functions can use to do P2P messaging through Akka.

What is a peer?

It's important to define what we mean by peer. A peer is an abstract concept that, for each domain, is defined by the domain. It could be a human, or it could be a device - eg, an IoT device - or it could be an entity (eg, an event sourced entity that is pushing updates to pages in real time). If it is a human, they may be interacting through many devices, for example, I have Slack installed on multiple laptops and multiple mobile devices, when someone sends me a message, and I have Slack on all my devices open, I expect to receive that message in real time on all of my devices at once. Typically, a device will have a TCP connection (perhaps gRPC stream or WebSocket) to a serverless service from which it will receive P2P messages. That connection may be over an unreliable network, and when it fails, it will reconnect, but not necessarily back to the same node that it was originally connected to.

Example characteristics

While we probably can't address every possible use case, we want to come up with one or more solutions that cover a broad range of use cases. With that in mind, here are some different characteristics or requirements that some use cases might have.

The P2P messaging may in some cases be between more than 2 peers (eg, a chat room, or multiple IoT devices in a home), there may be multiple publishers for a single topic, and multiple subscribers for a single topic - this may expand the traditional definition of P2P, perhaps we really are talking about addressed communication, but note that address is not a machine or actor address, it is the abstract user/device as defined above.

Various use cases exist for a range of different delivery guarantees. At most once is useful when the current state is being sent, and new messages invalidate previous messages. For example, tracking the location of an IoT enabled vehicle. The other major useful guarantee is effectively once. In this case it's assumed that the device receiving updates can deduplicate (using a domain specific sequence number for example, or unique ids), but needs at least once delivery. Instant messaging is an example of this.

Delivery time guarantees for effectively once messaging vary too. The point of P2P messaging is to allow effectively instant delivery, ie the only latency comes from network, routing, and processing, and that should happen in the happy case. In failure scenarios however, in some use cases there should be a maximum time that it takes for the message to be delivered, in other cases it's ok for the dropped message to not be delivered until the next message is received.

Solutions

Currently, the only out the box solution that Akka provides to implement P2P messaging as described above is distributed pubsub. This can be combined with Akka persistence to achieve at least once delivery, by persisting messages first, then publishing them, and then using the sequence number to detect dropped messages, and the journal to recover.

Distributed pubsub however requires replicating the subscriber state to all nodes, and hence doesn't scale well when there are a very large number of topics being subscribed to.

Here are two other distributed P2P possibilities that we might want to consider. These ideas are very raw and not fully thought out, they may be terrible.

  • Sharded mediator. In this case, all messages go through a sharded mediator. Subscribers are required to register subscription with the mediator, and they are required to maintain that subscription, including in cases when the mediator is rebalanced. The mediator may tell the subscriber when it's handing off to another node to assist on this, but the subscriber can't rely on that, and the subscriber should periodically resubscribe to the mediator - the mediator will also expire subscribers that haven't resubscribed for a while. Akka cluster sharding could be used, or a consistent hashing router could be used - if the latter, cluster membership events might trigger resubscribing. This solution has reduced availability because it introduces a third node that needs to be available for communication to succeed.
  • Gossip subscription state among publishers. In this case, a sharded contact point might be used to initially discover publishers and subscribers, once discovered, publishers gossip the subscription state between themselves and the contact point. Subscribers could either be part of the gossip cluster themselves, and use that to keep themselves active, or they could regularly tell the sharded contact point that they are active. Because there are potentially many gossip clusters, to keep communication down, gossip intervals would need to be long. Message activity could be used to trigger a temporary increase in gossip frequency (or an immediate gossip to all nodes), and publishers could keep messages that they publish for a time in case they learn of any new subscribers during this period of increased gossip frequency.

jroper avatar Jun 17 '19 11:06 jroper

What approximate value would a "very large number" of topics be (that would impact the scalability of distributed pubsub)? Are there any use cases that may fit this scenario?

If this is an edge case and pubsub is the best overall option, perhaps there's a compromise to consider, such as a hard upper limit on topics that can be subscribed to within a specific boundary (I'm assuming this value is the total number of topics being subscribed to within a single service / bounded context?)

rocketpages avatar Jun 22 '19 15:06 rocketpages

I'm not actually sure, I'm just going off what I've been told, that is that distributed pubsub doesn't scale well for the P2P case. I think it's a case of having the right tools for the job, I imagine we'll offer a generalised protocol for messaging, and then the backend can be selected from a configuration option. Perhaps we may even include some options based on third party tech.

jroper avatar Jun 24 '19 00:06 jroper

Hi guys! Probably a gossip protocol would be more scalable eg. https://www.serf.io/docs/internals/gossip.html.

skonto avatar Sep 18 '19 17:09 skonto

Great issue; Use case: How to support 30,000,000 peers in 300,000 topics? This is very common for group chatting / live streamings / virtual meetings. Sharded mediator is what we are using now

He-Pin avatar Apr 04 '20 19:04 He-Pin

@hepin1989 I'd love to hear more about your sharded mediator setup, in particular, what are the biggest problems you've encountered with it? How well does it respond to elastic scaling - eg do you get any thundering herd problems when scaling up? How many nodes/how much memory/how much cpu is required to handle various amounts of load/topics/peers?

jroper avatar Apr 05 '20 23:04 jroper