tiberius icon indicating copy to clipboard operation
tiberius copied to clipboard

tiberius unable to return RowStream

Open kzhui125 opened this issue 1 year ago • 0 comments

This is because the return QueryStream rely on the connection, means I can't write a function to return stream of rows in Repository methods:

image image
use deadpool_tiberius::{self, Pool};
use futures::stream::BoxStream;
use std::error::Error;

#[derive(Debug)]
pub struct User {
    name: String,
}

pub struct SomeRepository {
    pool: Pool,
}

impl SomeRepository {
    pub fn new(pool: Pool) -> Self {
        Self { pool }
    }

    pub async fn stream_users2(
        &self,
    ) -> Result<
        (
            deadpool_tiberius::deadpool::managed::Object<deadpool_tiberius::Manager>,
            BoxStream<'static, tiberius::Result<tiberius::Row>>,
        ),
        Box<dyn Error>,
    > {
        let mut conn: deadpool_tiberius::deadpool::managed::Object<deadpool_tiberius::Manager> =
            self.pool.get().await?;

        let stream = conn.query("query", &[]).await?.into_row_stream();
        Ok((conn, stream))
    }
}

Here is tokio-postgres, the return RowStream lifetime is bound to sql:

image
use bb8::Pool;
use bb8_postgres::PostgresConnectionManager;
use futures_core::stream::BoxStream;
use futures_util::StreamExt;
use std::error::Error;
use tokio_postgres::NoTls;

#[derive(Debug)]
pub struct User {
    name: String,
}

pub struct SomeRepository {
    pool: Pool<PostgresConnectionManager<NoTls>>,
}

impl SomeRepository {
    pub fn new(pool: Pool<PostgresConnectionManager<NoTls>>) -> Self {
        Self { pool }
    }

    pub async fn stream_users(&self) -> BoxStream<'static, Result<User, tokio_postgres::Error>> {
        let conn = self.pool.get().await.unwrap();
        let params: Vec<String> = vec![];
        let stream = conn
            .query_raw("SELECT name FROM users", params)
            .await
            .unwrap();

        Box::pin(stream.map(|result| {
            result.map(|row| User {
                name: row.get("name"),
            })
        }))
    }
}

here is sqlx, the return RowStream lifetime is also bound to sql:

use futures_core::stream::BoxStream;
use futures_util::StreamExt;
use sqlx::postgres::PgPoolOptions;
use sqlx::{Pool, Postgres, Row};
use std::error::Error;

#[derive(Debug)]
pub struct User {
    name: String,
}

pub struct SomeRepository {
    pool: Pool<Postgres>,
}

impl SomeRepository {
    pub fn new(pool: Pool<Postgres>) -> Self {
        Self { pool }
    }

    pub fn stream_users2(&self) -> BoxStream<'static, Result<User, sqlx::Error>> {
        sqlx::query("SELECT name FROM users")
            .fetch(&self.pool)
            .map(|result| {
                result.map(|row| User {
                    name: row.get("name"),
                })
            })
            .boxed()
    }
}

kzhui125 avatar Oct 23 '24 16:10 kzhui125