picamera2 icon indicating copy to clipboard operation
picamera2 copied to clipboard

[HOW-TO] Integrate capture callbacks with asyncio

Open simon-staal opened this issue 2 years ago • 11 comments

I have an image detection pipeline running on a Pi which communicates to a server via an asyncio TCP socket. In order to ensure that messages from the server are processed properly, I'm using the non-blocking form of picam2.capture_array() by specifying a signal_function containing a callback. Once receiving the image, I'm performing the actual processing in a separate process to ensure this doesn't block image capture either.

Ideally, I'd like to do the following (with the global executor being replaced by a functools.partial application):

async def on_complete(job):
    global executor
    img = job.get_result()
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(executor, img_processing, img) # performs image processing in seperate process
    await on_result(result) # sends data to server

async def image_detection_task():
    global executor
    picam2 = setup_camera() # configures picam2 object
    executor = concurrent.futures.ProcessPoolExecutor(max_workers=4)
    while True:
        job = picam2.capture_array(signal_function=on_complete)
        await job
    executor.stop()

Currently, I've considered the following work-around:

def on_complete(job):
    global is_job_complete, img
    img = job.get_result()
    is_job_complete = True

async def image_detection_task():
    global img, is_job_complete
    picam2 = setup_camera() # configures picam2 object
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
        while True:
            is_job_complete = False
            job = picam2.capture_array(signal_function=on_complete)
            while not is_job_complete:
                await asyncio.sleep(0.100)
            loop = asyncio.get_event_loop()
            data = await loop.run_in_executor(executor, img_processing, img) # performs image processing in seperate process
            await send_data(data) # sends data to server

This seems to work, but isn't ideal. Additionally, the on_complete function can interrupt tasks even when they're not being awaited, which is also not optimal. Is there any way I can achieve what I want in a way similar to the desired implementation?

simon-staal avatar Jun 11 '23 13:06 simon-staal

Hi, thanks for the question. I think I might need a little more explanation to understand what's going on here...

Referring to your original example, you want to capture an image, and then pass this to on_complete(). This function schedules the event loop to do the image processing (loop.run_in_executor()) and then also waits for that to complete (the await on that same line). Finally the results are sent to the server (on_result()) and you wait for that to complete too (the await on that line again). Have I got that right?

I'm wondering whether we're doing a lot of waiting directly in the camera thread there (which is where the signal_function runs), which will block camera activity completely. Might this even prevent parallelism altogether?

In the second example, I don't really see any benefit in calling picam2.capture_array() asynchronously. I think img = picam2.capture_array() might be more straightforward, and you don't need the while not is_job_complete: loop (again, if I've interpreted that correctly).

Thereafter, is it again waiting for the image processing to complete and for the data to be sent, before going round the loop again and capturing the next image? Again, my apologies if I haven't understand this correctly.

davidplowman avatar Jun 12 '23 15:06 davidplowman

Hi David, happy to try and provide more context. You are correct that in both cases we want to wait for image processing to be complete and for data to be sent before proceeding into the next iteration of the loop. The reason why I'm looking to call picam2 asynchronously is that I'd like the image_detection_task to block execution as little as possible, as it will be running in parallel with the client managing the connection with the server (also using asyncio). I'm concerned that the blocking picam2.capture_array() (which takes ~100ms to execute as I'm taking high resolution images) may degrade the performance of the client which is handling the connection, as it can't be scheduled in to handle incoming messages from the server. Since tasks in asyncio can only be scheduled in when the currently run function awaits, any responses to the server would be stuck until the camera finishes taking a picture (from my understanding), which seems less than ideal.

I was not aware that signal_function runs in the camera thread, separate from the main thread, you're totally right that it doesn't make sense to run tasks in there. I guess a correction on my request above would be an asynchronous version of picam2.capture_array() which can be awaited until the img is ready, like so:

async def image_detection_task():
    picam2 = setup_camera() # configures picam2 object
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
        while True:
            img = await picam2.capture_array() # Releases asyncio to process other tasks while camera is taking image
            loop = asyncio.get_event_loop()
            data = await loop.run_in_executor(executor, img_processing, img) # performs image processing in seperate process
            await send_data(data) # sends data to server

Hopefully this makes a bit more sense?

simon-staal avatar Jun 12 '23 18:06 simon-staal

Hi, thanks for the update. That certainly makes more sense, though there's probably still some stuff maybe to think about.

Firstly, this line:

            img = await picam2.capture_array() # Releases asyncio to process other tasks while camera is taking image

I don't really understand how this is different from

            img = picam2.capture_array()

In both cases, the copying of the camera memory into your numpy array happens in the camera thread, and then the calling thread is signalled. So I think these are basically doing the same thing, though the first version is probably poking more threads and doing some extra signalling? (There are ways to do the copy not-in-the-camera-thread, but maybe let's make something simple work first!)

Next we have

            data = await loop.run_in_executor(executor, img_processing, img) # performs image processing in seperate process

Am I right in thinking that this is waiting for the image processing to finish? If so, it seems to me like that might be breaking the parallelism, as we wait for every task to finish as soon as we create it? Or is there some kind of parallelism going on inside the img_processing function, though it's not clear to me how that would help.

My assumption is that you want to run several img_processing operations in parallel on consecutive frames, and as each one finishes (in the right order), then you send the results to the server? Is that an accurate description? Thanks!

davidplowman avatar Jun 13 '23 08:06 davidplowman

Essentially, the (heavily simplified) code running on the Pi will look something like this:

class SensorClient:
    def connect(self):
        # Connect to server
        # Do some other admin work
        # Run server communication task in parallel with image detection
        self.tasks = asyncio.gather(*[image_detection_task(), server_communication_task()])

So the use of await in the image_detection_task is not to make that function itself work in parallel, but to allow it to work in parallel with other tasks running on the Pi (in the same process). My concern is that without the use of await for picam2.capture_array(), although the actual processing is happening in a seperate thread, the asyncio event loop is not signalled that it may schedule in another task as it is waiting to complete, keeping execution stuck in the image_detection_task(), which causes real-time tasks (server communication) to stall. This is why I was using the workaround of the non-blocking version of capture_array(), which then brings us to our asyncio.sleep loop where other async tasks can be run while we wait for the camera to finish taking a picture. However, given that the signal function appears to be executed as soon as the image is ready, and it's unclear to me how the camera thread operates with the GIL, this function may also interrupt execution of code in a different task at an unexpected time (i.e. not during an await statement), which might lead to subtle bugs. So essentially, my 2 main questions are:

  • If we use the blocking version of capture_array(), does it signal to the asyncio event loop that other tasks may be scheduled in (similar to awaiting an async function). If this is not the case, what would be the best way to accomplish this?
  • Assuming the best way is through the use of a signal function and a asyncio.sleep loop, how would this interfere with the rest of the system assuming that a different function (not image_detection_task()) is currently scheduled in by asyncio when the signal function triggers?

simon-staal avatar Jun 17 '23 10:06 simon-staal

Hi again, I'm afraid these are all things I've never tried for myself so it's difficult to give you definitive answers. Maybe the best thing is to instrument the code to check that you're getting the behaviour that you want.

Nonetheless, I can try and explain the behaviour of the camera system. I think it's pretty straightforward so it ought to be possible to find out how it interacts with other things.

When you're running in a script (rather than a Qt application), there's a background camera thread. When you do capture_array(), it blocks on a socket which becomes readable when a camera frame is available. The camera thread then copies the frame for you and signals your script thread. I assume that Python, while one thread is waiting on a socket, is quite happy to go and run other threads.

If you prefer not to let the camera thread the image copy, you can use this idiom.

request = picam2.capture_request()
image = request.make_array('main')
request.release()

You could even do capture_request() in your main thread, and then send the request off to other threads for copying (and don't forget to release the request after), image processing and so on.

There's nothing to stop you even calling capture_array() or capture_request() from different threads, though of course at that point you lose track of the order in which frames arrived. It was never quite clear to me whether the data you send to your server needs to happen in the order in which the frames arrived or not.

Sorry not to be more help with this, but things are getting a little abstract for me to be certain of what I'm saying. If you had some simple self-contained test cases that show particular behaviour then I'd be happy to try and run them.

davidplowman avatar Jun 19 '23 09:06 davidplowman

I have similar requirements. (Let's leave image processing out of the picture to avoid over-complicating things. It can be done independently after capture using loop.run_in_executor.)

This is what I have so far, but I couldn't yet make it work: (Maybe someone else has an idea?)

import asyncio
import numpy as np
from picamera2 import Picamera2


class AsyncCamera:
    def __init__(self) -> None:
        self.cam = Picamera2()

        config = self.cam.create_still_configuration()
        self.cam.configure(config)

        self.cam.start()

    # Conceptually, we would like something like this, but this obviously doesn't work
    async def get_image(self) -> np.ndarray:
        return await self.cam.capture_array()

    # I tried this, but this somehow blocks
    # (I didn't find the time to investigate this.)
    async def get_image(self) -> np.ndarray:
        # Create an asyncio future that can be awaited
        loop = asyncio.get_running_loop()
        future = loop.create_future()

        # Resolve the future once the arrival of an image is signaled
        def signal_function(job: picamera2.job.Job):
            try:
                result = job.get_result()
                future.set_result(result)
            except Exception as e:
                future.set_exception(e)

        # Start the capture of one image in the background
        self.cam.capture_array("main", signal_function=signal_function)

        # Await the future. (This gives other coroutines the chance to execute in the meantime.)
        return await future


Some context on asyncio:

  • Unlike threading or multiprocessing, which involve parallel execution, asyncio uses a single-threaded, single-process approach to achieve concurrency by managing an event loop.
  • Instead, coroutines are used. These are special functions defined with async def that can pause and resume their execution with await. When that happens, control is passed back to the event loop which can then run the next coroutine waiting to be executed.

moi90 avatar Jun 07 '25 06:06 moi90

Unfortunately I don't know anything about asyncio, so it seems a bit confusing to me at the moment! However, if you could provide the world's simplest example that should work but doesn't, along with the code that calls it (but gets stuck, or whatever), they I'd be happy to take a look.

If it's any help, you can call Picamera2 functions asynchronously, which returns a "job" that you can wait for. The "job" is a thin wrapper round a future, which is held inside the "job".

davidplowman avatar Jun 09 '25 09:06 davidplowman

The above is the simplest example I can think of ;)

If it's any help, you can call Picamera2 functions asynchronously, which returns a "job" that you can wait for. The "job" is a thin wrapper round a future, which is held inside the "job".

Yes, I use that in the above example. ;)

I discovered that it is necessary to use loop.call_soon_threadsafe(future.set_result, result) because we're interacting with the asyncio.Future from a different thread.

So this works (for anyone who is using a similar setup):

async def get_image(self) -> np.ndarray:
    # Create an asyncio future that can be awaited
    loop = asyncio.get_running_loop()
    future = loop.create_future()

    # Resolve the future once the arrival of an image is signaled
    # Use `loop.call_soon_threadsafe` because `job_done_callback` runs in the camera thread.
    def job_done_callback(job: "picamera2.job.Job"):
        try:
            result = job.get_result()
        except Exception as e:
            loop.call_soon_threadsafe(future.set_exception, e)
        else:
            loop.call_soon_threadsafe(future.set_result, result)

    # Start the capture of one image in the background
    self.cam.capture_array("main", signal_function=job_done_callback)

    # Await the future. (This gives other coroutines the chance to execute in the meantime.)
    return await future

I think this solution also solves this issue. (And @simon-staal probably moved on since 2023?)

moi90 avatar Jun 13 '25 11:06 moi90

Thanks for that. I guess I'm not sure I 100% understand, but it sort of makes sense. You call the Picamera2 function asynchronously, and set your job_done_callback to be the thing that gets called when it finishes. Then I suppose asyncio has its own flavour of future so you have to make one of those and signal that.

I suppose it's simple enough if you understand... though the necessity for call_soon_threadsafe is a bit over my head.

Beyond this, is there anything Picamera2 could do to make this easier? Would it be helpful for Picamera2 to have an import picamera2.asyncio module that could provide such functionality? I'm always keen to try and help foiks!

davidplowman avatar Jun 13 '25 13:06 davidplowman

Would it be helpful for Picamera2 to have an import picamera2.asyncio module that could provide such functionality?

That would be awesome for my use case. However, I don't know how many people would actually use it.

moi90 avatar Jun 14 '25 14:06 moi90

Obviously I don't know too much about asyncio, but I wonder a bit what such support would look like.

Maybe there's a wrapper, based on the code above, that takes the function we want to "asyncio-ify", and we can use that to implement the methods called by users.

Maybe we have in Picamera2.__init__():

    self.asyncio = `Picamera2.Asycnio(self)`

and then a file asyncio.py where where implement

class Asyncio:
    # ...
    def asyncioify(self, func, **kwargs):
        # Basically the code you have above, but calling func instead of capture_array

and then a load of methods that call this wrapper so that users can do

    array = picam2.asyncio.capture_array(...)
    request = picam2.asyncio.capture_request(...)
```
What do you think? Not that I'm volunteering to do anything like this just yet, but trying to gather ideas!

davidplowman avatar Jun 16 '25 05:06 davidplowman

I stumbled across this post with the same question, and after some poking around, I figured out how to integrate captures with asyncio for anyone that wants it, combining the work of @moi90 and my own poking around.

But first, a couple of disclaimers

  1. This is my first time using asyncio. So there probably is a better way to do this,
  2. I used ChatGPT to help myself understand what I needed to do. It worked, so I'm thinking it counts.

Ok, on to what I got working:

So the issue is that the signal_function happens in a separate thread, and asyncio events are not thread safe. So we need to create a future, pass it and a reference to the loop through with loop.call_soon_threadsafe, and then await on that future:

import asyncio

from picamera2.job import Job
from picamera2.picamera2 import Picamera2
from picamera2.encoders import MJPEGEncoder  # type: ignore[attr-defined]

camera = Picamera2()

def _photo_signal(
    job: Job,
    loop: asyncio.AbstractEventLoop,
    future: asyncio.Future,
) -> None:
    """Signal function to set a future to indicate that taking a photo is done.

    Arguments:
        job: The picamera2 job for the photo taken.
        loop: The asyncio event loop that the future to set is in.
        future: The future to set the result for.

    """
    loop.call_soon_threadsafe(future.set_result, "Done")


async def take_photo() -> None:
    """Take a photo asynchronously, then save the JPEG and DNG"""
    # Create config for high res photo
    capture_config = camera.create_still_configuration(
        raw={},
        display=None,
        controls={
            "AeEnable": True,
            "ExposureValue": 0.0,
        },
    )
    loop = asyncio.get_running_loop()
    photo_done = asyncio.get_running_loop().create_future()
    camera.start()
    job = camera.switch_mode_capture_request_and_stop(
        capture_config,
        signal_function=lambda j: _photo_signal(j, loop, photo_done),
    )
    await photo_done
    request = job.get_result()
    request.save("main", "test.jpg")
    request.save_dng("test.dng")
    request.release()

This will free up the event loop while the capture is in progress. One catch is that the saving logic will likely block the event loop. You might want to offload that to a separate thread/process.

@davidplowman I could see having a async capable set of functions would probably be helpful for people trying to use the module. I'm not sure if having a separate picam2.asyncio module is the best way though. It might be possible to just have some async def methods on the existing Picamera2 class that wrap things similar to my example here.

marsfan avatar Oct 17 '25 07:10 marsfan