graphql-spqr-spring-boot-starter icon indicating copy to clipboard operation
graphql-spqr-spring-boot-starter copied to clipboard

How to test WebSocket Subscription ?

Open hantsy opened this issue 4 years ago • 2 comments

In before experience, Vertx GraphQL and Netflix Dgs supports Apollo web-socket subscription specfification.

I tried to test WebSocket based subscription using Spring WebClient and WebTestClient, it does not work when send Subscription request through the WebSocket message paylaod.

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Slf4j
@Disabled
class WebSocketSubscriptionTests {

    @LocalServerPort
    int port;

    WebTestClient webClient;

    @Autowired
    PostService postService;

    @Autowired
    ObjectMapper objectMapper;

    @BeforeEach
    void setUp() {
        this.webClient = WebTestClient.bindToServer().baseUrl("http://localhost:" + port).build();
    }

    @SneakyThrows
    @Test
    public void testAddComment() {
        // there are 4 posts initialized.
        String postId = this.postService.getAllPosts().get(0).getId();
        log.debug("post id: {}", postId);
        // add comment
        var addCommentQuery = """
                mutation addNewComment($postId:String!, $content:String!){
                    addComment(postId:$postId, content:$content){
                        id
                        postId
                        content
                    }
                }""".trim();
        var addCommentVariables = Map.of(
                "postId", postId,
                "content", "test comment"
        );
        Map<String, Object> addCommentBody = Map.of("query", addCommentQuery, "variables", addCommentVariables);
        webClient.post().uri("/graphql")
                .contentType(MediaType.APPLICATION_JSON)
                .accept(MediaType.APPLICATION_JSON)
                .bodyValue(addCommentBody)
                .exchange()
                .expectStatus().isOk()
                .expectBody()
                .jsonPath("data.addComment.id").exists()
                .jsonPath("data.addComment.content").isEqualTo("test comment");


        //handle subscription to /graphql websocket endpoints
        Map<String, Object> queryPayload = Map.of(
                "query", "subscription onCommentAdded { commentAdded { id postId content } }",
                "extensions", emptyMap(),
                "variables", emptyMap());
        var body = Map.of(
                "payload", queryPayload,
                "type", "start",
                "id", "1"
        );

        var commentsReplay = new ArrayList<String>();
        var socketClient = new ReactorNettyWebSocketClient();
        WebSocketHandler socketHandler = session -> {
            Mono<Void> receiveMono = session.receive().doOnNext(
                    it -> {
                        log.debug("next item: {}", it);
//                        String text = it.getPayloadAsText();
//                        log.debug("receiving message as text: {}", text);
//                        if ("data".equals(JsonPath.read(text, "type"))) {
//                            String comment = JsonPath.read(text, "payload.data.commentAdded.content");
//                            commentsReplay.add(comment);
//                        }
                    }
            ).log().then();

            String message = null;
            try {
                message = objectMapper.writeValueAsString(body);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
            return session
                    .send(Mono.delay(Duration.ofMillis(100)).thenMany(Flux.just(message).map(session::textMessage)))
                    .log()
                    .then(receiveMono);
        };

        MultiValueMapAdapter<String, String> queryParams = new MultiValueMapAdapter<>(Map.of("query", List.<String>of("subscription onCommentAdded { commentAdded { id postId content } }")));

        URI uri = new DefaultUriBuilderFactory("ws://localhost:" + port + "/graphql").builder().queryParams(queryParams).build();
        socketClient.execute(uri, socketHandler).block(Duration.ofMillis(500));

        assertThat(commentsReplay.size()).isEqualTo(1);
        assertThat(commentsReplay.get(0)).isEqualTo("test comment");
    }
}

Finally, I tried to add query to the WebSocket connection url as query parameters, it triggered the WebSocket, but failed, there is no data in the receiveMono method.

Not sure how to use the WebSocket subscription here, any help here? The complete codes is hosted on my Github, check https://github.com/hantsy/spring-graphql-sample/blob/master/spring-spqr/src/test/java/com/example/demo/WebSocketSubscriptionTests.java

hantsy avatar Oct 08 '21 14:10 hantsy

In the background console output like this.

Connecting to ws://localhost:50123/graphql?query=subscription%20onCommentAdded%20%7B%20commentAdded%20%7B%20id%20postId%20content%20%7D%20%7D
2021-10-09 12:10:47.660 DEBUG 13324 --- [o-auto-1-exec-2] o.s.web.servlet.DispatcherServlet        : GET "/graphql?query=subscription%20onCommentAdded%20%7B%20commentAdded%20%7B%20id%20postId%20content%20%7D%20%7D", parameters={masked}
2021-10-09 12:10:47.667 DEBUG 13324 --- [o-auto-1-exec-2] s.w.s.m.m.a.RequestMappingHandlerMapping : Mapped to io.leangen.graphql.spqr.spring.web.mvc.DefaultGraphQLController#executeGet(GraphQLRequest, Object)
2021-10-09 12:10:47.683 DEBUG 13324 --- [o-auto-1-exec-2] o.s.w.c.request.async.WebAsyncManager    : Started async request
2021-10-09 12:10:47.684 DEBUG 13324 --- [o-auto-1-exec-2] o.s.w.c.request.async.WebAsyncManager    : Async result set, dispatch to /graphql
2021-10-09 12:10:47.684 DEBUG 13324 --- [o-auto-1-exec-2] o.s.web.servlet.DispatcherServlet        : Exiting but response remains open for further handling
2021-10-09 12:10:47.685 DEBUG 13324 --- [o-auto-1-exec-2] o.s.web.servlet.DispatcherServlet        : "ASYNC" dispatch for GET "/graphql?query=subscription%20onCommentAdded%20%7B%20commentAdded%20%7B%20id%20postId%20content%20%7D%20%7D", parameters={masked}
2021-10-09 12:10:47.686 DEBUG 13324 --- [o-auto-1-exec-2] s.w.s.m.m.a.RequestMappingHandlerAdapter : Resume with async result [{data=graphql.execution.reactive.CompletionStageMappingPublisher@6997f06}]
2021-10-09 12:10:47.687 DEBUG 13324 --- [o-auto-1-exec-2] m.m.a.RequestResponseBodyMethodProcessor : Using 'application/json', given [*/*] and supported [application/json]
2021-10-09 12:10:47.687 DEBUG 13324 --- [o-auto-1-exec-2] m.m.a.RequestResponseBodyMethodProcessor : Writing [{data=graphql.execution.reactive.CompletionStageMappingPublisher@6997f06}]
2021-10-09 12:10:47.696 DEBUG 13324 --- [o-auto-1-exec-2] o.s.web.servlet.DispatcherServlet        : Exiting from "ASYNC" dispatch, status 200
2021-10-09 12:10:47.706  WARN 13324 --- [ctor-http-nio-3] r.netty.http.client.HttpClientConnect    : [id:60230589-1, L:/127.0.0.1:50125 - R:localhost/127.0.0.1:50123] The connection observed an error

io.netty.handler.codec.http.websocketx.WebSocketClientHandshakeException: Invalid handshake response getStatus: 200 
	at io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker13.verify(WebSocketClientHandshaker13.java:272) ~[netty-codec-http-4.1.68.Final.jar:4.1.68.Final]
	at io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker.finishHandshake(WebSocketClientHandshaker.java:304) ~[netty-codec-http-4.1.68.Final.jar:4.1.68.Final]
	at reactor.netty.http.client.WebsocketClientOperations.onInboundNext(WebsocketClientOperations.java:116) ~[reactor-netty-http-1.0.11.jar:1.0.11]

hantsy avatar Oct 09 '21 04:10 hantsy

Not sure why the WebSocket handling work is delegated to a general http controller and finally it failed.

hantsy avatar Oct 09 '21 04:10 hantsy