connect-query-es icon indicating copy to clipboard operation
connect-query-es copied to clipboard

Revisiting streaming setup for connect query

Open malinskibeniamin opened this issue 9 months ago • 4 comments

Now that streamedQuery can be used natively in tanstack query, are there any plans to improve the existing streaming functionality? https://tanstack.com/query/latest/docs/reference/streamedQuery

I am aware of useStreamingQuery and a few other proposals shared previously in older issues, but I imagine this could help align with the react query ecosystem.

malinskibeniamin avatar Mar 26 '25 21:03 malinskibeniamin

@malinskibeniamin interesting, I had somehow missed this API entirely. We'll need to investigate the ergonomics but yeah, that's a pretty good way forward for us to solidify on integrating with streaming RPCs. I'll start an exploratory branch and see what happens.

paul-sachs avatar Mar 27 '25 14:03 paul-sachs

Looking into the API, at least as of 5.69.0, it seems like it's still marked as experimental. I think we'll likely wait until it's been marked as officially released, in case the data changes. We could also release our own version prefixed with experimental but I'm really not digging that idea, just feels a little viral.

On the plus side, I think it'll be easier to write your own tiny little wrapper, something like:

import {
  ConnectQueryKey,
  createConnectQueryKey,
  useTransport,
} from "@connectrpc/connect-query";
import {
  DescMessage,
  DescMethodServerStreaming,
  DescMethodUnary,
  MessageInitShape,
  MessageShape,
} from "@bufbuild/protobuf";
import { Transport } from "@connectrpc/connect";
import { createAsyncIterable } from "@connectrpc/connect/protocol";
import {
  experimental_streamedQuery,
  QueryFunction,
} from "@tanstack/react-query";

function createServerStreamingQueryFn<
  I extends DescMessage,
  O extends DescMessage,
>(
  transport: Transport,
  schema: DescMethodServerStreaming<I, O>,
  input: MessageInitShape<I> | undefined,
  refetchMode: "append" | "reset" = "reset"
): QueryFunction<MessageShape<O>[], ConnectQueryKey> {
  return experimental_streamedQuery({
    queryFn: async (context) => {
      const result = await transport.stream(
        schema,
        context.signal,
        undefined,
        undefined,
        createAsyncIterable(input ? [input] : []),
        undefined
      );

      return result.message;
    },
    refetchMode,
  });
}

function createServerStreamingQueryOptions<
  I extends DescMessage,
  O extends DescMessage,
>(
  schema: DescMethodServerStreaming<I, O>,
  input: MessageInitShape<I> | undefined,
  transport: Transport
): {
  queryKey: ConnectQueryKey;
  queryFn: QueryFunction<MessageShape<O>[], ConnectQueryKey>;
} {
  const method: DescMethodUnary = {
    ...schema,
    methodKind: "unary",
  };

  return {
    queryKey: createConnectQueryKey({
      schema: method,
      input,
      cardinality: "finite",
    }),
    queryFn: createServerStreamingQueryFn(transport, schema, input),
  };
}

Then you can just do something like:

import {
  useQuery,
} from "@tanstack/react-query";

const transport = useTransport();
  const { status, fetchStatus, error, data } = useQuery(
    createServerStreamingQueryOptions(
      ElizaService.method.introduce,
      { name: "Alice" },
      transport
    )
  );

This will effectively do what you need with only a minor hack around the typing for generating keys.

paul-sachs avatar Mar 27 '25 22:03 paul-sachs

Awesome

vikyw89 avatar Jul 12 '25 17:07 vikyw89

If it helps anyone, I updated this example to work with the custom reducers added to the streamingQuery in Tanstack Query 5.86 (see initialValue and reducer args added, function got renamed to streamFn).

This allows you to pass an initialState and a reducer and have the work done as the chunks come in. Without this, you end up have to have some sort of useEffect that listens to the array of data change and does its own custom reduction.

import type {
	DescMessage,
	DescMethodServerStreaming,
	DescMethodUnary,
	MessageInitShape,
	MessageShape,
} from "@bufbuild/protobuf"
import type { Transport } from "@connectrpc/connect"
import { createAsyncIterable } from "@connectrpc/connect/protocol"
import { type ConnectQueryKey, createConnectQueryKey } from "@connectrpc/connect-query"
import { experimental_streamedQuery, type QueryFunction } from "@tanstack/react-query"

export function createServerStreamingQueryFn<I extends DescMessage, O extends DescMessage, ReducedStateShape>(
	transport: Transport,
	schema: DescMethodServerStreaming<I, O>,
	input: MessageInitShape<I> | undefined,
	reducer: (state: ReducedStateShape, message: MessageShape<O>) => ReducedStateShape,
	initialValue: ReducedStateShape,
	refetchMode: "append" | "reset" = "reset",
): QueryFunction<ReducedStateShape, ConnectQueryKey> {
	return experimental_streamedQuery<MessageShape<O>, ReducedStateShape, ConnectQueryKey>({
		streamFn: async (context) => {
			const result = await transport.stream(
				schema,
				context.signal,
				undefined,
				undefined,
				createAsyncIterable(input ? [input] : []),
				undefined,
			)

			return result.message
		},
		initialValue,
		refetchMode,
		reducer,
	})
}

export function createServerStreamingQueryOptions<I extends DescMessage, O extends DescMessage, ReducedStateShape>(
	schema: DescMethodServerStreaming<I, O>,
	input: MessageInitShape<I> | undefined,
	transport: Transport,
	reducer: (state: ReducedStateShape, message: MessageShape<O>) => ReducedStateShape,
	initialValue: ReducedStateShape,
): {
	queryKey: ConnectQueryKey
	queryFn: QueryFunction<ReducedStateShape, ConnectQueryKey>
} {
	const method: DescMethodUnary = {
		...schema,
		methodKind: "unary",
	}

	return {
		queryKey: createConnectQueryKey({
			schema: method,
			input,
			cardinality: "finite",
		}),
		queryFn: createServerStreamingQueryFn(transport, schema, input, reducer, initialValue),
	}
}

jtbeach avatar Sep 12 '25 23:09 jtbeach