undici icon indicating copy to clipboard operation
undici copied to clipboard

Hangs on `await response.text()` using AbortController on unidci 6.21.1

Open ferrisxie opened this issue 10 months ago • 10 comments

Bug Description

I am upgrading my project from undici 5.x to 6.21.1. After the upgrade, I found that under high concurrency, when using undici.fetch to download files(The file sizes range from 10KB to 10MB, with concurrency typically between 2 and 10 requests. The total number of files is over 200, involving 4 different hosts.), there is a chance that it will block on await response.text(). I can’t consistently reproduce the issue, but I have added a lot of logs and gathered some information bellow. I would like to ask if, logically, there are cases where [await reponse.text()] does not return in some race condition?

Reproducible By

Our project is a Node.js web service that downloads many files during startup. This issue occurs when downloading the files at startup. The project code is quite complex, so I wrote a minimal demo for the downloading part. The only parts related to Undici and the APIs used are as follows. (We’re not sure if any of the third-party packages we depend on use Undici, as Node.js has Undici built-in, but the issue happens within the logic below.)

By the way, it cannot be reliably reproduced, and every time the issue occurs on different file, not the same file.

const downloadSingleFile = async (urlToDownload)=>{
    const options = {
        url: urlToDownload,
        method: 'GET',
        headers: {}
    }
    const controller = new AbortController();
    const timeout = 500
    const reqTimer = setTimeout(() => {
      logger.info(`fetch abort by timeout ${timeout}`);
      controller.abort();
    }, timeout);
    options.signal = controller.signal;
    let result;
    try{
        const response = await undici.fetch(options.url, options)
        const { status, statusText, headers } = response
        if (status >= 200 && status < 300) {
            const bodyTxt = await response.text();
            result = { statusCode: status, headers, body: bodyTxt };
        } else {
            result = { statusCode: status, headers, body: statusText };
        }
    }catch(e){
        result = {statusCode: -100, body: e.message}
    }
    clearTimeout(reqTimer);
    return result;
}
const demoEntry = async()=>{
    const res = [
        'https://xxxxxx/xxx.json'
        //...
        //about 200-300 urls
    ]
    for (let index = 0; index < res.length; index++) {
        const element = res[index];
        //...
        //concurrency is 1-10, not all urls
        //file size is between 10KB-10MB
        downloadSingleFile(element,index)
    }
}
demoEntry()

Expected Behavior

All promise return normally.

Logs & Screenshots

As the code shown above, I've add a lot of logs on response.text(), as it's hanged here. The log i added is readAllBytes in /node_modules/undici/lib/web/fetch/util.js

//in my code
const controller = new AbortController();
const reqTimer = setTimeout(() => {
  logger.info(`fetch abort by timeout ${timeout} tempIndex=${tempIndex}`);
  controller.abort();
}, timeout);

//in /node_modules/undici/lib/web/fetch/util.js
const sleep = (ms)=>{
  return new Promise((resolve) => setTimeout(() => resolve({value: false}), ms));
}

async function readAllBytes (reader) {
  const bytes = []
  let byteLength = 0
  const logger = global.__asyncLogger;
  const tempIndexIdx = reader["__tempIndexIdx"] //this is injected from the download index
  logger.info(`fetch enter readAllBytes with idx ${tempIndexIdx} step0 ${!!reader}`)
  let times = 0
  let timesInfo = []
  while (true) {
    times++;
    // logger.info(`fetch enter readAllBytes with dix ${tempIndexIdx} step0 times=${times}`)
    const { done, value: chunk } = await Promise.race([reader.read(), sleep(15000)])
    timesInfo.push({
      time: new Date().toISOString(),
      times,
      length: byteLength
    })
    if(chunk === false){
      try{
        logger.info(`fetch enter controller readAllBytes with idx ${tempIndexIdx} step3 timeout ${!!reader} times=${times} length=${byteLength} ${JSON.stringify(timesInfo)}`)
        logger.info(`fetch enter readAllBytes with idx ${tempIndexIdx} step3 info 1`, {reader})
        const infoOfReader = {}
        const symbolKey = Object.getOwnPropertySymbols(reader)
        const kStateSymbol = symbolKey.find(sym => sym.toString().includes('kType'))
        const kStreamSymbol = symbolKey.find(sym => sym.toString().includes('kState'))
        const _innerState = reader[kStateSymbol]
        const _innerReader = reader[kStreamSymbol]
        infoOfReader.state = _innerState
        infoOfReader.state = _innerState
        const waitingLength = _innerReader.readRequests?.length || -1
        const waitingReq = _innerReader.readRequests[0]
        const waitingReqPromimse = waitingReq?.promise
        let waitingReqStatus = 'none';
        if(waitingReqPromimse){
          waitingReqStatus = await Promise.race([
            waitingReqPromimse.then(() => "fulfilled").catch(() => "rejected"),
            new Promise(resolve => setTimeout(() => resolve("pending"), 0))
          ]);
        }
        infoOfReader.waitingLength = waitingLength
        const _stream = _innerReader.stream
        const streamKeys = Object.getOwnPropertySymbols(_stream)
        const kStreamStatus = streamKeys.find(sym => sym.toString().includes('kState'))
        const stateOfStream = _stream[kStreamStatus]
        infoOfReader.stateOfStream = stateOfStream
        const controller = stateOfStream.controller
        const stateOfController = controller[kStreamStatus]
        infoOfReader.controllerState = stateOfController
        infoOfReader.waitingReqStatus = waitingReqStatus
        logger.info(`fetch enter readAllBytes with idx ${tempIndexIdx} status=${waitingReqStatus} step3 info ${JSON.stringify(infoOfReader)}`)
      }catch(e){
        logger.info(`fetch enter readAllBytes with idx ${tempIndexIdx} step3 timeout error ${e.message}`, e)
      }
     //when hangs happened, it will await here, so I can catch the hang.
      await sleep(600000)
    }
    if (done) {
      logger.info(`fetch enter readAllBytes with idx ${tempIndexIdx} step2 done`)
      // 1. Call successSteps with bytes.
      return Buffer.concat(bytes, byteLength)
    }

    // 1. If chunk is not a Uint8Array object, call failureSteps
    //    with a TypeError and abort these steps.
    if (!isUint8Array(chunk)) {
      throw new TypeError('Received non-Uint8Array chunk')
    }

    // 2. Append the bytes represented by chunk to bytes.
    bytes.push(chunk)
    byteLength += chunk.length

    // 3. Read-loop given reader, bytes, successSteps, and failureSteps.
  }
}

Here is useful log I've got when hangs.

2025-03-12 12:51:15,604 downloadIndex=340 startFetch
2025-03-12 12:51:15,613 downloadIndex=340 get fetch response, start await reponse.text()
2025-03-12 12:51:15,613 fetch enter readAllBytes with idx 340 step0 true
2025-03-12 12:51:16,127 fetch abort by timeout 500 tempIndex=340
2025-03-12 12:51:30,913 fetch enter controller readAllBytes with idx 340 step3 timeout true times=452 length=2899968 [...(ignores),{"time":"2025-03-12T12:51:15.912Z","times":451,"length":2890356},{"time":"2025-03-12T12:51:30.913Z","times":452,"length":2899968}]
2025-03-12 12:51:30,915 fetch enter readAllBytes with idx 340 step3 info 1 data: {
    reader: ReadableStreamDefaultReader {
      stream: ReadableStream { locked: true, state: 'readable', supportsBYOB: true },
      readRequests: 1,
      close: Promise {
        <pending>,
        [Symbol(async_id_symbol)]: 1145919,
        [Symbol(trigger_async_id_symbol)]: 1144674,
        [Symbol(kResourceStore)]: [Object]
      }
    }
  }
}
2025-03-12 12:51:30,916 INFO 128 [agent][[email protected]] [ssr:1.202.0]  fetch enter readAllBytes with idx 340 status=pending step3 info {"state":"ReadableStreamDefaultReader","waitingLength":1,"stateOfStream":{"disturbed":true,"reader":{"__tempIndexIdx":340},"state":"readable","transfer":{},"controller":{}},"controllerState":{"byobRequest":null,"closeRequested":false,"pullAgain":false,"pulling":false,"started":true,"stream":{},"queue":[],"queueTotalSize":0,"highWaterMark":0,"pendingPullIntos":[]},"waitingReqStatus":"pending"}

As we can see the promise in the reader is still pennding, but the chunk is fully pulled and the abortController.sigal has emitted.

Environment

Debian GNU/Linux 12 tested on Node v20.11.1 and v22.12.0

ferrisxie avatar Mar 12 '25 06:03 ferrisxie

From my code it hangs on await reponse.text() Inside the undici code, it hangs on await reader.read() in '/lib/web/fetch/util.js'

From the log and code above, we can get the timeline: 2025-03-12 12:51:15,604: start fetch 2025-03-12 12:51:15,613: get fetch reponse, start call await response.text() 2025-03-12 12:51:15.912: await reader.read() returned 451 times, and current chunkSize is 2899968 (total size is 3032884) 2025-03-12 12:51:16,127: abort signal called after 500ms, as expected hangs

2025-03-12 12:51:30.913: returned by Promise.race in 15 seconds 2025-03-12 12:51:30,916: print info todo with the reader, the reader promise status is still pendding.

Image

ferrisxie avatar Mar 12 '25 07:03 ferrisxie

It looks like:

  1. Request state was not properly reset
  2. Queue processing was interrupted but not cleaned up
  3. The reader’s promise remains in a pending state

fwx5618177 avatar Mar 13 '25 14:03 fwx5618177

I reproduce this issue by using express.

Here are code:

server.js

const express = require('express')
const crypto = require('crypto')

// Function to create server instance
function createServer(port) {
  const app = express()
  
  // Track request counts to selectively delay certain requests
  let requestCount = 0;
  
  // Global control variable to simulate server hanging
  let shouldStall = false;
  
  // Toggle the stall status every 30 seconds
  setInterval(() => {
    shouldStall = !shouldStall;
    console.log(`Server hang status changed to: ${shouldStall ? 'hanging' : 'normal'}`);
  }, 30000);

  app.get('/files/:size', (req, res) => {
    const sizeInKB = parseInt(req.params.size, 10)
    const sizeInBytes = sizeInKB * 1024
    
    requestCount++;
    
    // Set response headers
    res.setHeader('Content-Type', 'application/octet-stream')
    res.setHeader('Content-Length', sizeInBytes)
    
    // Set response timeout for each request to prevent complete connection interruption
    req.socket.setTimeout(300000); // 5 minute timeout
    
    // Prevent errors caused by client-side connection interruption
    req.on('close', () => {
      console.log(`Request #${requestCount}: Connection closed by client`);
    });

    // Generate random data
    const buffer = crypto.randomBytes(sizeInBytes)
    
    // Apply slow transfer strategy for all large file requests
    if (sizeInBytes > 100 * 1024) {  // Files larger than 100KB
      console.log(`Request #${requestCount}: Applying slow transfer (${sizeInKB}KB)`);
      
      // Use smaller chunks and more precise control
      const CHUNK_SIZE = 16 * 1024; // 16KB
      let offset = 0;
      let chunkCount = 0;
      
      // Progress counter
      const totalChunks = Math.ceil(sizeInBytes / CHUNK_SIZE);
      
      function sendNextChunk() {
        // Check if need to enter hanging state
        if (shouldStall && chunkCount > 1) {
          console.log(`Request #${requestCount}: Global hang signal detected, pausing data transfer [${chunkCount}/${totalChunks}]`);
          // Pause for a very long time, but don't end the response, keeping the client hanging
          setTimeout(sendNextChunk, 60000);  // Check again after 1 minute
          return;
        }
        
        if (offset >= sizeInBytes) {
          console.log(`Request #${requestCount}: Transfer completed`);
          res.end();
          return;
        }
        
        try {
          // Determine current chunk size
          const end = Math.min(offset + CHUNK_SIZE, sizeInBytes);
          const chunk = buffer.slice(offset, end);
          
          // Use callback to ensure data is written
          res.write(chunk, (err) => {
            if (err) {
              console.error(`Request #${requestCount}: Write error`, err);
              try { res.end(); } catch (e) {}
              return;
            }
            
            offset += chunk.length;
            chunkCount++;
            
            // Determine delay based on chunk number
            let delay;
            
            // First two chunks sent quickly, subsequent chunks intentionally slowed
            if (chunkCount <= 2) {
              delay = 50; // Send first two chunks quickly
              console.log(`Request #${requestCount}: Sending fast chunk ${chunkCount}/${totalChunks}, size: ${chunk.length/1024}KB`);
            } else if (chunkCount === 3) {
              // After the third chunk, enter extra long delay to ensure client reader.read() will hang
              delay = 120000; // 2 minute delay
              console.log(`Request #${requestCount}: Entering extra long delay ${delay}ms`);
            } else {
              delay = 10000; // Subsequent chunks also slow
              console.log(`Request #${requestCount}: Sending slow chunk ${chunkCount}/${totalChunks}, delay: ${delay}ms`);
            }
            
            // Set time to send the next chunk
            setTimeout(sendNextChunk, delay);
          });
        } catch (err) {
          console.error(`Request #${requestCount}: Send error`, err);
          try { res.end(); } catch (e) {}
        }
      }
      
      // Start sending
      sendNextChunk();
    } else {
      // Small files still have some delay to prevent all responses from completing too quickly
      setTimeout(() => {
        res.end(buffer);
      }, Math.random() * 2000); // 0-2000ms random delay
    }
  })

  return app.listen(port, () => {
    console.log(`Server running on port ${port} - http://localhost:${port}/files/:size`)
  })
}

// Start 4 server instances on different ports
const ports = [3000, 3001, 3002, 3003]
const servers = ports.map(port => createServer(port))

// Add overall startup success log
console.log('=================================================')
console.log('✅ All servers started successfully!')
console.log('=================================================')
console.log(`🚀 Started ${ports.length} test server instances:`)
ports.forEach(port => {
  console.log(`   - http://localhost:${port}`)
})
console.log('\n📄 Available endpoints:')
console.log('   - GET /files/:size - Returns random data of specified size (KB)')
console.log('     Example: http://localhost:3000/files/1024 will return 1MB data')
console.log('\n⚙️  Press Ctrl+C to stop all servers')
console.log('=================================================')

// Add graceful shutdown handling
process.on('SIGINT', () => {
  console.log('Shutting down servers...')
  servers.forEach(server => server.close())
  process.exit()
})

test.js

// First load the async logger
require('./asyncLogger');

const undici = require('./index.js')

// Use global async logger, fallback to console if it doesn't exist
const logger = global.__asyncLogger || console;
logger.info('Test started - Using logger:', global.__asyncLogger ? 'global.__asyncLogger' : 'console');

async function downloadSingleFile(url, index) {
  const startTime = Date.now()
  const timeout = 5000; // Increased to 5 seconds
  logger.info(`[${index}] Starting request to ${url} at ${new Date().toISOString()}`)

  const controller = new AbortController()
  const reqTimer = setTimeout(() => {
    logger.info(`[${index}] Aborting request after 5000ms`)
    controller.abort()
  }, timeout)

  try {
    const response = await undici.fetch(url, {
      signal: controller.signal,
      headers: {},
      bodyTimeout: 500000, // Increased to 500 seconds to ensure no reading timeout interruption
      headersTimeout: 60000 // Increased header timeout
    })

    logger.info(`[${index}] Got response in ${Date.now() - startTime}ms, status: ${response.status}`)

    if (response.status >= 200 && response.status < 300) {
      logger.info(`[${index}] Starting to read body at ${new Date().toISOString()}`)
      const textStartTime = Date.now()
      
      // Add extra monitoring to detect long-running text() calls
      const textTimeout = setTimeout(() => {
        logger.warn(`[${index}] ⚠️ WARNING: response.text() has been running for 5 seconds without completing!`)
      }, 5000)
      
      // Additional: Pass request index to response object for use in readAllBytes
      // This is a special handling for undici's internal implementation
      if (response.body && response.body.getReader) {
        const originalGetReader = response.body.getReader.bind(response.body);
        response.body.getReader = function() {
          const reader = originalGetReader();
          // Inject index into reader object for tracking
          reader.__tempIndexIdx = index;
          
          // Additionally record read start time
          reader.__readStartTime = Date.now();
          
          // Add diagnostic log
          logger.info(`[${index}] Created reader instance, start time: ${new Date(reader.__readStartTime).toISOString()}`);
          
          return reader;
        };
      }
      
      // Increase timeout monitoring frequency
      const textTimeoutInterval = setInterval(() => {
        const elapsedTime = Date.now() - textStartTime;
        logger.warn(`[${index}] ⚠️ WARNING: response.text() has been running for ${elapsedTime/1000} seconds without completing!`);
      }, 3000);
      
      // This may get stuck, which is the issue we're trying to reproduce
      try {
        const text = await response.text();
        clearInterval(textTimeoutInterval);
        clearTimeout(textTimeout);
        
        const textDuration = Date.now() - textStartTime
        logger.info(`[${index}] Completed reading body in ${textDuration}ms, length: ${text.length}`)
        
        // Record longer response times
        if (textDuration > 2000) {
          logger.warn(`[${index}] ⚠️ Slow text() operation detected: ${textDuration}ms for ${url}`)
        }
        
        return { success: true, length: text.length, duration: textDuration }
      } catch (textError) {
        clearInterval(textTimeoutInterval);
        clearTimeout(textTimeout);
        logger.error(`[${index}] Text reading error:`, textError);
        throw textError;
      }
    } else {
      return { success: false, status: response.status }
    }
  } catch (err) {
    logger.error(`[${index}] Error:`, {
      name: err.name,
      message: err.message,
      cause: err.cause,
      timestamp: new Date().toISOString()
    })
    return { success: false, error: err.message }
  } finally {
    clearTimeout(reqTimer)
  }
}

async function runTest() {
  // Force requests for large files to increase probability of triggering the issue
  const totalRequests = 100 // Reduce number of requests, but each is a large file
  
  // Simulate 4 different hosts
  const hosts = [
    'http://localhost:3000',
    'http://localhost:3001',
    'http://localhost:3002',
    'http://localhost:3003'
  ]
  
  // Generate requests, all for large files
  const requests = Array.from({ length: totalRequests }, (_, i) => {
    // All requests for large files
    const size = Math.floor(Math.random() * 4000) + 2000; // 2MB-6MB
    
    const host = hosts[i % hosts.length]
    return `${host}/files/${size}`
  })

  // Use lower concurrency, focusing on fewer large files
  const concurrency = 10 // Reduce concurrency to allow server to process
  const results = []
  let completedRequests = 0

  // Implement more aggressive concurrency model
  const batches = []
  for (let i = 0; i < requests.length; i += concurrency) {
    batches.push(requests.slice(i, i + concurrency))
  }

  // Use concurrency pattern closer to the issue description
  logger.info(`Starting test with ${requests.length} total requests, concurrency: ${concurrency}`)
  logger.info(`Using ${hosts.length} different hosts`)

  // Add functionality to periodically record current status
  const statusInterval = setInterval(() => {
    logger.info(`Current progress: ${completedRequests}/${totalRequests} completed`);
    logMemoryUsage();
  }, 5000);

  for (let batchIndex = 0; batchIndex < batches.length; batchIndex++) {
    const batch = batches[batchIndex]
    const batchStartTime = Date.now()

    logger.info(`Starting batch ${batchIndex + 1}/${batches.length}, size: ${batch.length}`)

    // Start all requests simultaneously
    const batchPromises = batch.map((url, idx) => {
      const requestIndex = batchIndex * concurrency + idx
      return downloadSingleFile(url, requestIndex)
        .then(result => {
          completedRequests++
          if (completedRequests % 10 === 0) {
            logger.info(`Progress: ${completedRequests}/${totalRequests} completed`)
          }
          return result
        })
    })

    // Wait for all requests in this batch to complete
    const batchResults = await Promise.all(batchPromises)
    results.push(...batchResults)

    logger.info(`Completed batch ${batchIndex + 1}/${batches.length} in ${Date.now() - batchStartTime}ms`)
  }

  clearInterval(statusInterval);
  return results
}

// Add memory usage monitoring
function logMemoryUsage() {
  const used = process.memoryUsage()
  logger.info('Memory usage:', {
    rss: `${Math.round(used.rss / 1024 / 1024)} MB`,
    heapTotal: `${Math.round(used.heapTotal / 1024 / 1024)} MB`,
    heapUsed: `${Math.round(used.heapUsed / 1024 / 1024)} MB`,
    external: `${Math.round(used.external / 1024 / 1024)} MB`
  })
}

// Record memory usage every 10 seconds
const memoryInterval = setInterval(logMemoryUsage, 10000)

runTest().then((results) => {
  clearInterval(memoryInterval)
  
  const successes = results.filter(r => r.success).length
  const failures = results.filter(r => !r.success).length
  
  // Calculate statistics for text() operations
  const textDurations = results
    .filter(r => r.success && r.duration)
    .map(r => r.duration)
  
  const avgDuration = textDurations.reduce((sum, d) => sum + d, 0) / (textDurations.length || 1)
  const maxDuration = Math.max(...(textDurations.length ? textDurations : [0]))
  const slowResponses = results.filter(r => r.success && r.duration > 2000).length

  console.log('\nTest Summary:')
  console.log(`Total requests: ${results.length}`)
  console.log(`Successful: ${successes}`)
  console.log(`Failed: ${failures}`)
  console.log(`Average text() duration: ${avgDuration.toFixed(2)}ms`)
  console.log(`Maximum text() duration: ${maxDuration}ms`)
  console.log(`Slow responses (>2000ms): ${slowResponses}`)
  
  logMemoryUsage()
}).catch(console.error)

asyncLogger.js

const fs = require('fs');
const path = require('path');
const util = require('util');

// Create log directory
const LOG_DIR = path.join(__dirname, 'logs');
if (!fs.existsSync(LOG_DIR)) {
  fs.mkdirSync(LOG_DIR);
}

// Log file path
const LOG_FILE = path.join(LOG_DIR, `test-${new Date().toISOString().replace(/:/g, '-')}.log`);

// Create write stream
const logStream = fs.createWriteStream(LOG_FILE, { flags: 'a' });

// Async logger
class AsyncLogger {
  constructor() {
    this.queue = [];
    this.isProcessing = false;
    
    // Write logs from queue to file every second
    setInterval(() => this.processQueue(), 1000);
    
    // Ensure all logs are written when process exits
    process.on('exit', () => {
      this.processQueueSync();
    });
  }

  // Format log message
  formatMessage(level, message, ...args) {
    const timestamp = new Date().toISOString();
    const formattedArgs = args.map(arg => {
      if (typeof arg === 'object') {
        return util.inspect(arg, { depth: null });
      }
      return arg;
    }).join(' ');
    
    return `[${timestamp}] [${level}] ${message} ${formattedArgs}`.trim();
  }

  // Add log to queue
  log(level, message, ...args) {
    const formattedMessage = this.formatMessage(level, message, ...args);
    
    // Output to console
    if (level === 'ERROR') {
      console.error(formattedMessage);
    } else if (level === 'WARN') {
      console.warn(formattedMessage);
    } else {
      console.log(formattedMessage);
    }
    
    // Add to queue
    this.queue.push(formattedMessage);
    
    // Start processing if queue is not already being processed
    if (!this.isProcessing) {
      this.processQueue();
    }
  }

  // Process queue asynchronously
  async processQueue() {
    if (this.isProcessing || this.queue.length === 0) return;
    
    this.isProcessing = true;
    
    try {
      const messagesToProcess = [...this.queue];
      this.queue = [];
      
      // Write to file
      for (const message of messagesToProcess) {
        logStream.write(message + '\n');
      }
    } finally {
      this.isProcessing = false;
      
      // Continue processing if queue still has messages
      if (this.queue.length > 0) {
        setImmediate(() => this.processQueue());
      }
    }
  }

  // Process queue synchronously (used when process exits)
  processQueueSync() {
    if (this.queue.length === 0) return;
    
    const messagesToProcess = [...this.queue];
    this.queue = [];
    
    for (const message of messagesToProcess) {
      fs.appendFileSync(LOG_FILE, message + '\n');
    }
  }

  // Public methods
  info(message, ...args) {
    this.log('INFO', message, ...args);
  }

  warn(message, ...args) {
    this.log('WARN', message, ...args);
  }

  error(message, ...args) {
    this.log('ERROR', message, ...args);
  }

  debug(message, ...args) {
    this.log('DEBUG', message, ...args);
  }
}

// Export instance and set as global variable
const logger = new AsyncLogger();
global.__asyncLogger = logger;

module.exports = logger;

we need to update lib/web/fetch/util.js


const sleep = (ms)=>{
  return new Promise((resolve) => setTimeout(() => resolve({value: false}), ms));
}

/**
 * @see https://streams.spec.whatwg.org/#readablestreamdefaultreader-read-all-bytes
 * @see https://streams.spec.whatwg.org/#read-loop
 * @param {ReadableStreamDefaultReader} reader
 */
async function readAllBytes (reader) {
  const bytes = []
  let byteLength = 0
  const logger = global.__asyncLogger;
  const tempIndexIdx = reader["__tempIndexIdx"] //this is injected from the download index
  logger.info(`fetch enter readAllBytes with idx ${tempIndexIdx} step0 ${!!reader}`)
  let times = 0
  let timesInfo = []
  while (true) {
    times++;
    logger.info(`fetch enter readAllBytes with dix ${tempIndexIdx} step0 times=${times}`)
    // Significantly reduce timeout time to ensure it triggers before reader.read()
    const { done, value: chunk } = await Promise.race([reader.read(), sleep(100)])
    timesInfo.push({
      time: new Date().toISOString(),
      times,
      length: byteLength
    })
    logger.info(`fetch enter readAllBytes with idx chunk: ${chunk === false}`)
    if(chunk === false){
      try{
        logger.info(`fetch enter controller readAllBytes with idx ${tempIndexIdx} step3 timeout ${!!reader} times=${times} length=${byteLength} ${JSON.stringify(timesInfo)}`)
        logger.info(`fetch enter readAllBytes with idx ${tempIndexIdx} step3 info 1`, {reader})
        const infoOfReader = {}
        const symbolKey = Object.getOwnPropertySymbols(reader)
        const kStateSymbol = symbolKey.find(sym toString().includes('kType'))
        const kStreamSymbol = symbolKey.find(sym toString().includes('kState'))
        const _innerState = reader[kStateSymbol]
        const _innerReader = reader[kStreamSymbol]
        infoOfReader.state = _innerState
        infoOfReader.state = _innerState
        const waitingLength = _innerReader.readRequests?.length || -1
        const waitingReq = _innerReader.readRequests[0]
        const waitingReqPromimse = waitingReq?.promise
        let waitingReqStatus = 'none';
        if(waitingReqPromimse){
          // Increase chance to detect pending status by increasing timeout to 500ms
          waitingReqStatus = await Promise.race([
            waitingReqPromimse.then(() => "fulfilled").catch(() => "rejected"),
            new Promise(resolve => setTimeout(() => resolve("pending"), 500))
          ]);
        }
        infoOfReader.waitingLength = waitingLength
        const _stream = _innerReader.stream
        const streamKeys = Object.getOwnPropertySymbols(_stream)
        const kStreamStatus = streamKeys.find(sym toString().includes('kState'))
        const stateOfStream = _stream[kStreamStatus]
        infoOfReader.stateOfStream = stateOfStream
        const controller = stateOfStream.controller
        const stateOfController = controller[kStreamStatus]
        infoOfReader.controllerState = stateOfController
        infoOfReader.waitingReqStatus = waitingReqStatus
        logger.info(`fetch enter readAllBytes with idx ${tempIndexIdx} status=${waitingReqStatus} step3 info ${JSON.stringify(infoOfReader)}`)
      }catch(e){
        logger.info(`fetch enter readAllBytes with idx ${tempIndexIdx} step3 timeout error ${e.message}`, e)
      }
     //when hangs happened, it will await here, so I can catch the hang.
      await sleep(600000)
    }
    if (done) {
      logger.info(`fetch enter readAllBytes with idx ${tempIndexIdx} step2 done`)
      // 1. Call successSteps with bytes.
      return Buffer.concat(bytes, byteLength)
    }

    // 1. If chunk is not a Uint8Array object, call failureSteps
    //    with a TypeError and abort these steps.
    if (!isUint8Array(chunk)) {
      throw new TypeError('Received non-Uint8Array chunk')
    }

    // 2. Append the bytes represented by chunk to bytes.
    bytes.push(chunk)
    byteLength += chunk.length

    // 3. Read-loop given reader, bytes, successSteps, and failureSteps.
  }
}

The root cause of the problem is:

  1. Server stalls after sending partial data
  • In our server simulation code, it intentionally adds a 120-second (2-minute) delay after sending the first few data chunks
  • This situation simulates a "half-open connection" problem in real-world scenarios, where the TCP connection remains open but data transfer is interrupted
  1. Read operation blocks indefinitely
  • reader.read() is a blocking operation that waits until new data arrives or the stream ends
  • But if the server neither sends new data nor closes the connection, this read operation will hang indefinitely

fwx5618177 avatar Mar 24 '25 13:03 fwx5618177

Image

fwx5618177 avatar Mar 24 '25 13:03 fwx5618177

Can you put all the above in a repository and make it easy for use to reproduce?

mcollina avatar Mar 27 '25 15:03 mcollina

Can you put all the above in a repository and make it easy for use to reproduce?

I have pushed these files on branch fix-4089, we could find files on reproduce-script.

Directly run:

  1. node ./reproduce-script/server.js
  2. node ./reproduce-script/test.js

fwx5618177 avatar May 27 '25 17:05 fwx5618177

What repository?

mcollina avatar Jun 01 '25 21:06 mcollina

We also ran into an issue that very accurately resembles the issue reported here. We saw hangs on await response.text(), but only when the server was responding with 'transfer-encoding': 'chunked'.

What may be relevant for this issue is: bumping to undici 7.10.0 solved it for us

rene84 avatar Jun 18 '25 09:06 rene84

We also ran into an issue that very accurately resembles the issue reported here. We saw hangs on await response.text(), but only when the server was responding with 'transfer-encoding': 'chunked'.

What may be relevant for this issue is: bumping to undici 7.10.0 solved it for us

Yes, please read this repo, and I reproduced it. But I still have no idea to fix it.

@mcollina here: https://github.com/fwx5618177/undici/tree/fix-4089/reproduce-script

fwx5618177 avatar Jun 18 '25 14:06 fwx5618177

hi, @rene84 ,as you said :

only when the server was responding with 'transfer-encoding': 'chunked'.

I have noticed that there is the same response header chunked in my case.

ferrisxie avatar Jun 19 '25 06:06 ferrisxie