google-cloud-rust icon indicating copy to clipboard operation
google-cloud-rust copied to clipboard

RowIterator+AsyncIterator are not cancellation safe

Open ivankelly opened this issue 2 years ago • 5 comments

To repro:

  • create a database and a change stream
  • query the change stream in a tokio::select! loop like
loop {
    tokio::select! {
        _ = some_tokio_interval.tick() => {
            do_periodic_task();
        },
       next_entry_res = iterator.next() => {
         next_entry_res?;
        }
    }
}
  • wait until it's getting heartbeats from the iterator
  • delete the change stream

At this point, the iterator should error and we return from the loop. However the client goes into a retry, giving the tick() future a chance to complete. the iterator.next() is cancelled and the error never returned to next_entry_res. It goes into an error state,but this error state always returns None for next() calls. This is particularly bad for change streams, because None indicates you've read everything from the stream, which may not be the case. So this loops forever.

ivankelly avatar Jun 01 '23 12:06 ivankelly

Thanks for pointing this out. I am not sure I understand it.

  • How can I drop the iterator in that code?
  • Normally, you would put in a code to break the result of next().
    let config = ClientConfig::default();
    let client = Client::new("projects/local-project/instances/test-instance/databases/local-database", config).await.unwrap();
    let task = tokio::spawn(async move {
        let key = Key::new(&"user_x_x");
        let mut tx = client.read_only_transaction().await.unwrap();
        let mut iterator = tx.read("User", &["UserId"], key).await.unwrap();
        let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(10));
        loop {
            tokio::select! {
                _ = interval.tick() => {
                    tracing::info!("tick");
                }
                next_entry_res = iterator.next() => {
                    match next_entry_res.unwrap() {
                        Some(_row) => tracing::info!("handle row"),
                        None => break,
                    }
                }
            }
        }
    });
    let _ = tokio::time::timeout(tokio::time::Duration::from_secs(2), task);

yoshidan avatar Jun 02 '23 03:06 yoshidan

The iterator itself doesn't get dropped, but the future returned by iterator.next() can be dropped if iterval.tick complete first. I don't think this is normally a problem because the iterator is mostly cancellation safe. But if you get a error from spanner for a future that is dropped, that never bubbles up, and then the next call to iterator.next returns with a None which looks like a normal partition completion (when using change streams).

I think the problem is in try_recv, which calls self.streaming.message().await which calls poll_next(),

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {                                                                                                                                                                                
        loop {                                                                                                                                                                                                                                                                
            if let State::Error = &self.inner.state {                                                                                                                                                                                                                         
                return Poll::Ready(None);                                                                                                                                                                                                                                     
            }           

If you miss the error that put the inner state into State::Error, you'll get None forever. This may not even be a spanner client issue. It could well be tonic.

ivankelly avatar Jun 06 '23 14:06 ivankelly

Thank you for the detailed information. I was able to reproduce the phenomenon. When a future is dropped using tokio::select, if an error is generated by streaming.message() in try_recv, then self.reader.read will remain in Pending state and the process will not be completed. https://github.com/yoshidan/google-cloud-rust/blob/207422caf07dad8c0e9f80ed7c47cb37b21d7716/spanner/src/reader.rs#L232

If try_recv was called in another future, such as when a tick was terminated, streaming.message() would always return None.

yoshidan avatar Jun 09 '23 13:06 yoshidan

I added enable_resume option to QueryOptions. If this is set to false and query is executed, the RowIterator will be cancel safe because it will not be automatically resumed.

https://github.com/yoshidan/google-cloud-rust/pull/162

yoshidan avatar Jun 13 '23 09:06 yoshidan

👍 good stuff. In our case we're just going to assume nothing is cancellation safe. It's too sharp an edge to be load bearing for correctness. Our final solution used futures::future::select: https://docs.rs/futures/latest/futures/future/fn.select.html. It's ugly as hell but it's safe.

'mainloop: loop {
    let mut data_future = iterator.next();
    'innerloop: loop {
        let tick = timed_flush_check_interval.tick();
        pin_mut!(tick);
        match select(data_future, tick).await {
            Either::Left((data, _)) => {
                let status = process_the_data(data).await?;
                if status == Status::Finished {
                    break 'mainloop;
                };
                break 'innerloop;
            }
            Either::Right((_, future)) => {
                data_future = future;
            }
        }
        things_that_need_to_run_periodically().await?;
    }
}

Thanks for your help on this. I have another bug incoming, but this will come as a PR. Session leakage.

ivankelly avatar Jun 13 '23 09:06 ivankelly