graphql-client icon indicating copy to clipboard operation
graphql-client copied to clipboard

Support subscriptions in graphql_client_web

Open tomhoule opened this issue 6 years ago • 3 comments

We should probably look at how this is implemented in other GraphQL clients that work in browsers, Apollo for example.

I haven't worked with subscriptions much personally, so I will have to read up on this so I can implement it or give feedback on someone else's implementation. This issue is for discussing the requirements for the feature.

tomhoule avatar Dec 30 '18 10:12 tomhoule

I happen to be in need of that, so maybe I could give it a try. Right now there are two specs, the legacy apollo one and the newer graphql-ws one. This is usually implemented using websocket, but sometimes server sent events. Sadly reqwest still doesn't support websocket (https://github.com/seanmonstar/reqwest/issues/864), so we probably need another library. Maybe https://github.com/snapview/tokio-tungstenite?

Sytten avatar Jan 05 '22 17:01 Sytten

For those interested, I have a working prototype over here https://github.com/caido/graphql-ws-client. In theory the source crate is runtime agnostic so it should be possible to run it everywhere but I didn't try it. Depending on the response from https://github.com/obmarg/graphql-ws-client/issues/30, I will either push the prototype upstream or continue working on my work (maybe we can integrate here later).

Sytten avatar Jan 26 '22 22:01 Sytten

I polished my prototype and it is now merged in v0.2.0 or https://github.com/obmarg/graphql-ws-client!

As a reference here own I implemented it for my tests:

pub struct TokioSpawner(tokio::runtime::Handle);

impl TokioSpawner {
    pub fn new(handle: tokio::runtime::Handle) -> Self {
        TokioSpawner(handle)
    }

    pub fn current() -> Self {
        TokioSpawner::new(tokio::runtime::Handle::current())
    }
}

impl futures::task::Spawn for TokioSpawner {
    fn spawn_obj(
        &self,
        obj: futures::task::FutureObj<'static, ()>,
    ) -> Result<(), futures::task::SpawnError> {
        self.0.spawn(obj);
        Ok(())
    }
}
pub type GraphQLSubscriptionsClient = GraphQLClientClient<Message>;
pub type GraphQLSubscription<Q> = SubscriptionStream<GraphQLClient, StreamingOperation<Q>>;

pub async fn build_subscriptions_client() -> GraphQLSubscriptionsClient {
    let mut request = "ws://locahost:8080/ws/graphql"
        .into_client_request()
        .unwrap();
    request.headers_mut().insert(
        "Sec-WebSocket-Protocol",
        HeaderValue::from_str("graphql-transport-ws").unwrap(),
    );

    let (connection, _) = async_tungstenite::tokio::connect_async(request)
        .await
        .unwrap();
    let (sink, stream) = connection.split();

    GraphQLClientClientBuilder::new()
        .build(stream, sink, TokioSpawner::current())
        .await
        .unwrap()
}

pub async fn subscribe<Q>(
    client: &mut GraphQLSubscriptionsClient,
    variables: Q::Variables,
) -> GraphQLSubscription<Q>
where
    Q: GraphQLQuery + Send + Unpin + 'static,
    Q::Variables: Send + Unpin,
{
    let operation = StreamingOperation::<Q>::new(variables);
    client.streaming_operation(operation).await.unwrap()
}

pub async fn next<Q>(stream: &mut GraphQLSubscription<Q>) -> Response<Q::ResponseData>
where
    Q: GraphQLQuery + Send + Unpin + 'static,
    Q::Variables: Send + Unpin,
{
    timeout(Duration::from_secs(5), stream.next())
        .await
        .unwrap()
        .unwrap()
        .unwrap()
}
#[tokio::test]
async fn test_replay_task() {
        let mut client = common::build_subscriptions_client().await;

        let variables = updated_book::Variables {};
        let mut stream = common::subscribe::<subscriptions::UpdatedBook>(&mut client, variables).await;

        let variables = update_book::Variables {
            id: "1".to_string(),
            input: UpdateBookInput {
              name: "Test".to_string(),
            }
        };
        let res = post_graphql::<mutations::UpdateBook>(variables).await;
        let book = res.data.unwrap().update_book.book;

        let res = common::next(&mut stream).await;
        let updated_book = res.data.unwrap().updated_book;

        assert_eq!(book.id, updated_book.id);
}

Sytten avatar Jan 27 '22 23:01 Sytten