sqlx icon indicating copy to clipboard operation
sqlx copied to clipboard

Add PgListener::try_recv_buffered(), to support batch processing of notifications

Open chanks opened this issue 1 year ago • 4 comments

Hello! I wanted to support batch processing of notifications when there is a lot of them, so it made sense to grab whatever notifications might be readily available. try_next() is unsuitable for that, since if the buffer is empty, I don't want to wait for a new NOTIFY to happen before progressing with the batch. So, it seemed to make sense to add a new non-async method that is essentially the first buffer check of try_recv(), and then call that in a loop to build up a batch until it returns None (or a batch limit is released).

I updated try_recv() to call the new function, just to make things as DRY as possible, but it isn't a huge improvement. Happy to revert that part if you prefer.

Also happy to bikeshed the new method's name/interface. A couple options I considered were:

// Similar to `read_to_end()` and `read_to_string()` in the Read trait 
pub fn fill_notification_buffer(&mut self, buffer: &mut Vec<PgNotification>) -> usize {
  // Add all notifications in the buffer to the vec and return the number of notifications we added.
  // UnboundedReceiver doesn't support receiving multiple messages at the same time, though,
  // so there doesn't seem to be any performance benefit the way there would be with the Read trait.
}
// Just implement the loop behavior for the caller. This isn't anything they couldn't implement
// themselves on top of try_recv_buffered(), though, and it doesn't let them reuse a buffer vec.
pub async fn try_recv_multiple(&mut self, limit: usize) -> Result<Vec<PgNotification>, Error> {
  let vec =
    if let Some(first) = self.try_recv().await? {
      vec![first]
    } else {
      return Ok(vec![]);
    };
  loop {
    if vec.len() >= limit {
      return Ok(vec)
    }
    if let Some(n) = self.try_recv_buffered() {
      vec.push(n);
    } else {
      return Ok(vec);
    }
  }

Anyway, ran the tests with cargo test -p sqlx-postgres --all-features, please let me know if you think there's anything else worth testing/running.

Thanks!

chanks avatar Oct 13 '24 01:10 chanks

Changed function name in response to feedback, and got tests passing. Frustratingly, try_recv() doesn't seem to be enough to fill the buffer of more than one notification, but a simple SQL statement like 'SELECT 1' is. Not sure why that is.

chanks avatar Oct 13 '24 02:10 chanks

try_recv() only drains the buffer, it does not add to it. If it reads a notification, it returns it directly.

The buffer exists because the server may send notifications at almost any time, including while executing a query.

I don't have much issue with changing try_recv() to read all available messages into the buffer before popping the first one, but strictly speaking the exact behavior of the method has never been guaranteed. And in fact, the documentation only says:

Receives the next notification available from any of the subscribed channels.

Now that I think about it, I'm actually wary of changing this behavior because, in the case of the server constantly emitting notifications, we'd have potential issues with it never returning and just continuing to fill the buffer forever. We'd have to stop reading messages at some arbitrary, or configurable limit.

I might end up re-thinking the PgListener API entirely, because there's a number of parts I'm not super happy with.

abonander avatar Oct 15 '24 23:10 abonander

Now that I think about this more, if you actually want it to try reading from the socket and just not wait, you would just use .try_recv().now_or_never(): https://docs.rs/futures/latest/futures/future/trait.FutureExt.html#method.now_or_never

I would hesitate to recommend it right now only because if the connection is broken, it'll try to reconnect but because it doesn't wait, it'll just open a socket and then immediately close it. We'd need to spawn the Pool::acquire() call as a task and store the JoinHandle.

abonander avatar Oct 16 '24 00:10 abonander

Thanks @abonander. I didn't know about now_or_never(), but it seems like that approach relies a bit more on implementation details than I'd like (relying on the method remaining cancel-safe and no other await being introduced, in addition to the broken-connection issue you mention).

Having try_recv() read up to a hard-coded number of notifications (say, 100?) seems like a pretty straightforward and safe change to me, and it could be made configurable whenever the PgListener interface is rethought. I'm happy to make that change and update the PR if you're open to it.

chanks avatar Oct 17 '24 17:10 chanks

Before long, I want to make a significant refactor to the driver code that runs the connection state machine in its own task. Then, it will always eagerly read notifications and send them over a channel to be processed.

abonander avatar Nov 27 '24 21:11 abonander