go-muxrpc icon indicating copy to clipboard operation
go-muxrpc copied to clipboard

stream processing blocking

Open cryptix opened this issue 6 years ago • 2 comments
trafficstars

This is really annoying.... The creator of rpc streams has to take care that they don't deadlock each other. This means: a caller needs to drain the whole stream and not start new rpc calls while reading that stream and block for that new answer before reading more of the other stream... Sounds convoluted but imagine this:

src, err:= client.Source(/* call with lot's of replies*/)


for {
  v, err := src.Next(ctx)
  if eof { break }

  // process v

  // if the backlog of objects in the pipe for src is too big, the reply for this call does not arrive.
  resp, _ := client.Async()

}

A work around for this can be to read all messages from source into ram and then process them as a next step, like this:

var msgs []interface{}
snk := luigi.NewSliceSink(msgs)

src, err:= client.Source(/* call with lot's of replies*/)
// check(err)

err = luigi.Pump(ctx, snk, src)
// check(err)

for _, v:= range msgs {
  // process v
  

  resp, _ := client.Async()
}

cryptix avatar Oct 07 '19 09:10 cryptix

An increased bufSize just hides the problem in the garbage collector.

https://github.com/cryptoscope/go-muxrpc/blob/b76bb2cd2c270b2ab6964926d167b97806586a5d/rpc.go#L54

This can be especially nasty in the case of ssb-blobs requests where whole files can hide in this area.

cryptix avatar Oct 07 '19 09:10 cryptix

a bit more background: there is a single read-loop (rpc.Serve) that reads new packets, spawns requests and pours packets to existing requests via luigi.Pipe()s. These have a buffer size, the bufSize above. This size is not in bytes but in messags, which can be codec.Bodys with quite large bodies.

cryptix avatar Oct 07 '19 09:10 cryptix