pycapnp
pycapnp copied to clipboard
Streaming example
Hi, I'm having a hard time implementing a soft real-time streaming server.
Specifically, I would like to have the following behavior:
- Server us up
- Client connects to server
- Client initiates a call to the server to receive a byte stream, using a callback interface
- Server loops over data to stream, calling back client on every iteration
- Client receives data from the server on every iteration of the server loop
- Server indicates to the client that the stream is finished
- Client disconnects from server
I've not been able to figure out how to do step 4, as promises are pipelined and called only when all data is processed.
My main issue is that I keep on running into failed: expected !loop.running; XXX() is not allowed from within event callbacks.
Attached is my attempt to code this behavior.
If I can get this working I'll make a pull request with the working example ;-)
Thanks!
Cython==0.29.21
pycapnp==1.0.0b2
I'm not very familiar with the capnp streaming api. Can you wait on data inside the callback in C++ as well?
(Removing the .wait() on sendChunk and the code works for me)
I'm not very familiar with the capnp streaming api. Can you wait on data inside the callback in C++ as well?
I haven't tried using capnp in C++, and I'm a bit rusty in that language. The official docs are a bit ambiguous as to the behavior.
(Removing the .wait() on sendChunk and the code works for me)
Yes, it will work in the sense that there are no errors, but in my example it will wait the 5 seconds until all promises are gathered to make the call to the client.
This is great in most cases (limits the amount of back and forth network calls), but in this specific case I need to send each call as it is evaluated on the server side. I'm sending images from a camera for calibration, so it does no good to have all images sent back at once.
As a workaround, I could send back the images on a separate connection (using UDP for example), and just use capnp to start and stop the streaming. This would add extra complexity though, ideally I would use capnp for all exchanges.
The problem here is that you're blocking the event loop. You need control to return to the event loop in order for progress to be made. When you do sleep(1)
, the event loop can't make progress, because the event loop doesn't run while sleeping. Whereas when you call .wait()
on a promise, that explicitly runs the event loop -- but you aren't allowed to call .wait()
from inside a callback that was itself called by the event loop.
What you really need to do here is use .then()
to register a callback to call when the client is ready for the next chunk.
That makes sense, thanks for the explanation. I used sleep(1)
to simulate some blocking process, in my case capturing an image from the camera.
I'm not understanding how to use then()
though. Does the callback have to be run in a different thread or process, so as to be run apart from the main loop?
How about this streaming example in terms of progress reporting from server to client: (in terms of the async-calculator example)
schema:
longRunningOp @3 (progressCallback: ProgressReporter) -> (result: Int32);
# alternatively, we could choose to return a Value here, like 'evaluate' does (line 26 in original schema file)
interface ProgressReporter {
# NEW: This streaming call features flow control!
reportProgress @0 (percentage :Int8) -> stream;
done @1 ();
}
on the server side:
#... (rest of async server example)
async def some_long_operation(callback, _context, promise_fulfiller):
callback.reportProgress(0)
await asyncio.sleep(0.2)
callback.reportProgress(25)
await asyncio.sleep(0.2)
callback.reportProgress(50)
await asyncio.sleep(0.2)
callback.reportProgress(75)
await asyncio.sleep(0.2)
callback.reportProgress(100)
callback.done()
# call 'done' as instructed in
# https://capnproto.org/news/2020-04-23-capnproto-0.8.html#multi-stream-flow-control
setattr(_context.results, 'result', 42)
# return the result to the client
promise_fulfiller.fulfill()
# signal that we're done to our caller via Capnproto's Promise object
#(one would assume that the last two steps can be combined,
# but I could not figure out how to achieve this using the Pycapnp wrapper)
class CalculatorImpl(calculator_capnp.Calculator.Server):
"Implementation of the Calculator Cap'n Proto interface."
def evaluate(self, expression, _context, **kwargs):
return evaluate_impl(expression).then(lambda value: setattr(_context.results, 'value', ValueImpl(value)))
#...
def longRunningOp(self, progressCallback, _context, **kwargs):
# use a PromiseFulfiller pair to combine use of asyncio and capnproto event loop,
# and signal completion from within an asyncio task to a Cap'n Proto Promise
p = capnp.PromiseFulfillerPair()
asyncio.create_task(some_long_operation(progressCallback, _context, p))
return p.promise # we need to return a capnproto promise here
#...
client:
from tqdm import tqdm
class ProgressImpl(calculator_capnp.Calculator.ProgressReporter.Server):
'''An implementation of the ProgressReporter interface. Note that we're
implementing this on the client side and will pass a reference to the
server. The server will then be able to make calls back to the client.'''
def __init__(self):
self.pbar = tqdm(total=100)
def reportProgress(self, percentage, **kwargs):
'''Note the **kwargs. This is very necessary to include, since
protocols can add parameters over time. Also, by default, a _context
variable is passed to all server methods, but you can also return
results directly as python objects, and they'll be added to the
results struct in the correct order'''
n = self.pbar.n
self.pbar.update(percentage - n)
def done(self, **kwargs):
self.pbar.close()
#... (rest of async client example)
async def main(host):
#...
prgs = ProgressImpl()
eval_promise = calculator.longRunningOp(progressCallback=prgs)
result = await eval_promise.a_wait()
print("received: ", result)
hope this helps,
Niek