urql icon indicating copy to clipboard operation
urql copied to clipboard

Add custom URQL exchange to handle aborted operations via AbortSignal

Open lykianovsky opened this issue 5 months ago • 8 comments

Summary

Introduce a new customAbortExchange that adds native support for AbortSignal-based cancellation of GraphQL operations in urql.

This solves the problem where operations cancelled via AbortController are not reflected in urql’s exchange pipeline, leading to incomplete teardown and inconsistent error handling in consumers.

Proposed Solution

A new exchange, customAbortExchange, will be introduced. It behaves as follows:

  • Hooks into the exchange pipeline and watches each incoming operation
  • If the operation has a fetchOptions.signal, it attaches an abort event listener
  • When AbortController.abort() is triggered:
    • Emits a teardown operation upstream
    • Emits an OperationResult with an error: new Error("Operation was aborted")
  • If signal.aborted === true at the moment of receiving, teardown and error are emitted immediately
  • All event listeners are cleaned up on teardown to prevent memory leaks

This solution ensures that urql clients can handle request aborts consistently in streams or promises.

Requirements

  • The exchange must:
    • Detect AbortSignal in both function and object-based fetchOptions
    • Support early-aborted signals
    • Emit proper teardown operations
    • Return an error result on abort for consumer-side handling
    • Automatically remove event listeners after completion or teardown
  • Must not interfere with other operation kinds (e.g., subscriptions)
  • Must be composable with other exchanges in the urql pipeline

🧠 Usage

import { createClient, dedupExchange, fetchExchange } from 'urql'
import { customAbortExchange } from './customAbortExchange'

const client = createClient({
  url: '/graphql',
  exchanges: [
    dedupExchange,
    customAbortExchange,
    fetchExchange
  ],
})

To cancel an operation, you can use an AbortController:

const controller = new AbortController()

client.query(MY_QUERY, {}, {
  fetchOptions: {
    signal: controller.signal
  }
}).toPromise()

// Cancel it when needed
controller.abort()

🧩 How It Works

This exchange listens for an AbortSignal on each Operation's fetchOptions.
If the signal is triggered:

  • It emits a teardown operation for upstream exchanges
  • It also emits a manual OperationResult with an Error so the client receives a failure result

If the signal is already aborted at the time the operation is received, it immediately emits teardown and error without forwarding.


📦 Example

const signal = new AbortController().signal

client.query(SOME_QUERY, null, {
  fetchOptions: {
    signal,
  },
})

If signal.abort() is called, the exchange will:

  • Teardown the operation
  • Emit an error: new Error("Operation was aborted")

📁 Exchange source code

import { type Exchange, makeErrorResult, type Operation, type OperationResult } from 'urql'
import { makeSubject, merge, pipe, tap } from 'wonka'
// !! WARNING: USE TYPEOF OR YOUR CUSTOM FUNCTION, THIS FROM ME REPO
import { isFunction } from '@utils/guards/types'

// Custom Exchange to handle operations that were aborted via AbortSignal
export const customAbortExchange: Exchange = ({ forward }) => {
  // Subject for teardown operations triggered by abort
  const abortOperation = makeSubject<Operation>()

  // Subject for manually returned operation results (e.g., errors from aborts)
  const resultOperation = makeSubject<OperationResult>()

  // Stores cleanup handlers for abort event listeners by operation key
  const abortHandlerMap = new Map<number, () => void>()

  return sourceOperation$ => {
    // Handle incoming operations
    const filteredOperation$ = pipe(
      sourceOperation$,
      tap(operation => {
        const { kind, key } = operation

        // If this is a teardown operation (manual cancel or completion), remove its abort handler
        if (kind === 'teardown') {
          const handler = abortHandlerMap.get(key)
          if (handler) {
            handler() // remove abort event listener
            abortHandlerMap.delete(key)
          }
          return
        }

        // Try to extract AbortSignal from operation context
        const signal = extractAbortSignal(operation)

        if (!signal) {
          return
        }

        // Define abort event handler: emit teardown and return an error result
        const abortHandler = () => {
          abortOperation.next({ ...operation, kind: 'teardown' })
          resultOperation.next(makeErrorResult(operation, new Error('Operation was aborted')))
        }

        // If already aborted — immediately emit teardown and error
        if (signal.aborted) {
          abortHandler()
          return
        }

        // Save cleanup function to remove the event listener later
        abortHandlerMap.set(key, () => {
          signal.removeEventListener('abort', abortHandler)
        })

        // Attach abort event listener (once)
        signal.addEventListener('abort', abortHandler, { once: true })
      })
    )

    // Merge original stream with manually triggered teardowns
    const forwarded$ = forward(merge([filteredOperation$, abortOperation.source]))

    // Merge the result stream with manually emitted error results
    return merge([forwarded$, resultOperation.source])
  }
}

// Extract AbortSignal from fetchOptions (can be a function or an object)
function extractAbortSignal(operation: Operation): AbortSignal | null | undefined {
  const fetchOptions = operation.context.fetchOptions

  if (!fetchOptions) {
    return
  }

  if (isFunction(fetchOptions)) {
    return fetchOptions()?.signal
  }

  return fetchOptions.signal
}

lykianovsky avatar Jul 17 '25 11:07 lykianovsky

I really like this approach 👍

We can simplify the sample implementation a bit:

  • Teardowns should be issued via client.reexecuteOperation so there's no leak on the client and the entire exchange pipeline gets notified of the teardown
  • When issuing the response we can use a subject or we could duplicate the teardown if we use the following bullet point
  • We could annotate the error with an aborted property. I think that's warranted for clarity

Other than that, this seems like a pretty neat and tidy solution to me that makes sure that all behaviour lines up

Curious what @JoviDeCroock thinks

kitten avatar Jul 17 '25 14:07 kitten

I really like this approach 👍

We can simplify the sample implementation a bit:

  • Teardowns should be issued via client.reexecuteOperation so there's no leak on the client and the entire exchange pipeline gets notified of the teardown
  • When issuing the response we can use a subject or we could duplicate the teardown if we use the following bullet point
  • We could annotate the error with an aborted property. I think that's warranted for clarity

Other than that, this seems like a pretty neat and tidy solution to me that makes sure that all behaviour lines up

Curious what @JoviDeCroock thinks

I’m not very experienced with RxJS or similar technologies. I just needed to implement request aborts by timeout, so I came up with a simple solution after skimming through the Wonka docs for about 30 minutes. I’m really glad you liked it!

If you’d like, feel free to explain what the ideal implementation should look like in more detail — I’d be happy to open a PR with the improved version.


If I understand your idea correctly, then the changes should be approximately the same:

1. Custom AbortError class with an aborted property:

class AbortError extends Error {
  public aborted: boolean

  constructor(message: string) {
    super(message)
    this.name = 'AbortError'
    this.aborted = true
  }
}

2. Updated exchange signature to receive client (needed to call reexecuteOperation):

export const customAbortExchange: Exchange = ({ forward, client }) => {
  // ...
}

3. In the abort handler, send teardown via client.reexecuteOperation and emit error with AbortError:

const abortHandler = () => {
  resultOperation.next(makeErrorResult(operation, new AbortError('Operation was aborted')))
  client.reexecuteOperation({ ...operation, kind: 'teardown' })
}

4. Remove old merge with abort operation

const forwarded$ = forward(filteredOperation$)

// Combine forward and manually emitted error results
return merge([forwarded$, resultOperation.source])

lykianovsky avatar Jul 21 '25 13:07 lykianovsky

yep! that's exactly what I meant 👍 You've got it spot on. Happy to take over implementation but also happy to accept a PR for this

kitten avatar Jul 21 '25 14:07 kitten

yep! that's exactly what I meant 👍 You've got it spot on. Happy to take over implementation but also happy to accept a PR for this

packages/core/src/exchanges

On this path to store ABORT Exchange? And there are some requirements for the implementation of the approach, tests/comments, etc.

lykianovsky avatar Jul 21 '25 14:07 lykianovsky

I'm a bit ambivalent whether this goes into @urql/core or into a separate exchanges/* package first. The latter could help us test this in isolation first, and we can always pull it into core later if it makes sense.

Maybe @JoviDeCroock has a stronger opinion here

kitten avatar Jul 21 '25 14:07 kitten

I'm a bit ambivalent whether this goes into @urql/core or into a separate exchanges/* package first. The latter could help us test this in isolation first, and we can always pull it into core later if it makes sense.

Maybe @JoviDeCroock has a stronger opinion here

Okay, I will not delve into the specifics, I hope this implementation will help someone, I saw what people asked for discussions. I will also wait for implementation, so that then I would connect to myself

I was glad to help, thanks for the operational work, this is often not enough in libraries on GitHub 👍

lykianovsky avatar Jul 21 '25 14:07 lykianovsky

I really needed this today and it worked perfectly! Thanks so much @lykianovsky

focux avatar Aug 15 '25 23:08 focux

Any update on if this will become part of the library?

tyler-lotz avatar Dec 04 '25 22:12 tyler-lotz