go-bitswap icon indicating copy to clipboard operation
go-bitswap copied to clipboard

Sync sessions and received blocks

Open dirkmc opened this issue 5 years ago • 2 comments

Currently the synchronization between block receipt and sessions is not quite air-tight, leading to some race conditions.

Background

When IPFS wants a block it

  • Checks the blockstore
  • If the block is not in the blockstore, asks a Bitswap Session for the block The Session
    • Subscribes to a "notifier" for updates about the block
    • Registers "interest" in the block with the SessionInterestManager

When Bitswap receives a block it

  • Puts the block to the blockstore
  • Notifies all Sessions that have subscribed to updates about the block (through the "notifier")
  • Notifies all Sessions that have registered "interest" in the block

There are overlapping concerns handled in different places (SessionInterestManager vs "notifier") and non-atomic operations, and the code is complex and hard to follow.

Proposal

Create a unified Notifier interface that can synchronize operations and ensure atomicity.

Notifier

  • NewSubscription(sesid uint64) Subscription
  • Publish(msg ResponseMessage) error
    • Put blocks to blockstore
    • Inform listeners that are interested in blocks / HAVEs / DONT_HAVEs

Subscription

  • Messages() <-chan ResponseMessage
  • Subscribe(keys ...cid.Cid)
    • Listen for incoming messages with blocks / HAVEs / DONT_HAVEs for CIDs When messages are received, send them on the output channel
    • Check blockstore for each CID For each CID that is found in the blockstore
      • If there's still a listener for the CID, inform listener
      • If there's no listener (listener already received block) do nothing
  • Unsubscribe(keys ...cid.Cid)
    • If there are no more sessions interested in the keys, send CANCEL to all peers
  • Remove()
    • Remove all session subscriptions
    • If there are no more sessions interested in the keys, send CANCEL to all peers

ResponseMessage is a struct with public fields for blocks, HAVEs and DONT_HAVEs.

When IPFS asks a Bitswap Session for a block, the Session will call Subscribe() on the Notifier:

  • When the Session gets HAVE it adds the peer to the session
  • When the Session gets HAVE / DONT_HAVE it updates its want tables
  • When Session gets a block it will send the block on the output channel

When a request is cancelled the Session will call Unsubscribe() for those keys. When the Session shuts down it will call Unsubscribe() with the session id.

dirkmc avatar May 07 '20 19:05 dirkmc

Subscribe(ctx context.Context, sesid uint64, keys ...cid.Cid) <-chan ResponseMessage

  • Any reason for the session key?
  • Will this work for a streaming GetBlocks?

I assume we'll have to change this to something like:

Subscribe(chan <- ResponseMessage, keys ...cid.Cid) (where keys can be nil to cancel).

Unsubscribe(sesid uint64, keys ...cid.Cid)

Having unsubscribe trigger a cancel but not having subscribe trigger a want is weird. I'm wondering if we should just make this interface session agnostic.

Note that because operations on the blockstore are slow

Note 1: there should already be a cache. IMO, we should let the application do this for us. Note 2: we may need to improve some contention issues in the cache, but I'm not sure if it's actually an issue.

Stebalien avatar May 07 '20 21:05 Stebalien

I updated the original post to change the interfaces a little.

I think we can tackle streaming GetBlocks() separately, but it shouldn't be a bit lift to change it.

I'll put together a PR to see what this looks like in practice and post it as a WIP.

dirkmc avatar May 07 '20 22:05 dirkmc