Sync sessions and received blocks
Currently the synchronization between block receipt and sessions is not quite air-tight, leading to some race conditions.
Background
When IPFS wants a block it
- Checks the blockstore
- If the block is not in the blockstore, asks a Bitswap Session for the block
The Session
- Subscribes to a "notifier" for updates about the block
- Registers "interest" in the block with the
SessionInterestManager
When Bitswap receives a block it
- Puts the block to the blockstore
- Notifies all Sessions that have subscribed to updates about the block (through the "notifier")
- Notifies all Sessions that have registered "interest" in the block
There are overlapping concerns handled in different places (SessionInterestManager vs "notifier") and non-atomic operations, and the code is complex and hard to follow.
Proposal
Create a unified Notifier interface that can synchronize operations and ensure atomicity.
Notifier
NewSubscription(sesid uint64) SubscriptionPublish(msg ResponseMessage) error- Put blocks to blockstore
- Inform listeners that are interested in blocks / HAVEs / DONT_HAVEs
Subscription
Messages() <-chan ResponseMessageSubscribe(keys ...cid.Cid)- Listen for incoming messages with blocks / HAVEs / DONT_HAVEs for CIDs When messages are received, send them on the output channel
- Check blockstore for each CID
For each CID that is found in the blockstore
- If there's still a listener for the CID, inform listener
- If there's no listener (listener already received block) do nothing
Unsubscribe(keys ...cid.Cid)- If there are no more sessions interested in the keys, send CANCEL to all peers
Remove()- Remove all session subscriptions
- If there are no more sessions interested in the keys, send CANCEL to all peers
ResponseMessage is a struct with public fields for blocks, HAVEs and DONT_HAVEs.
When IPFS asks a Bitswap Session for a block, the Session will call Subscribe() on the Notifier:
- When the Session gets HAVE it adds the peer to the session
- When the Session gets HAVE / DONT_HAVE it updates its want tables
- When Session gets a block it will send the block on the output channel
When a request is cancelled the Session will call Unsubscribe() for those keys.
When the Session shuts down it will call Unsubscribe() with the session id.
Subscribe(ctx context.Context, sesid uint64, keys ...cid.Cid) <-chan ResponseMessage
- Any reason for the session key?
- Will this work for a streaming GetBlocks?
I assume we'll have to change this to something like:
Subscribe(chan <- ResponseMessage, keys ...cid.Cid) (where keys can be nil to cancel).
Unsubscribe(sesid uint64, keys ...cid.Cid)
Having unsubscribe trigger a cancel but not having subscribe trigger a want is weird. I'm wondering if we should just make this interface session agnostic.
Note that because operations on the blockstore are slow
Note 1: there should already be a cache. IMO, we should let the application do this for us. Note 2: we may need to improve some contention issues in the cache, but I'm not sure if it's actually an issue.
I updated the original post to change the interfaces a little.
I think we can tackle streaming GetBlocks() separately, but it shouldn't be a bit lift to change it.
I'll put together a PR to see what this looks like in practice and post it as a WIP.