Hangs on `await response.text()` using AbortController on unidci 6.21.1
Bug Description
I am upgrading my project from undici 5.x to 6.21.1. After the upgrade, I found that under high concurrency, when using undici.fetch to download files(The file sizes range from 10KB to 10MB, with concurrency typically between 2 and 10 requests. The total number of files is over 200, involving 4 different hosts.), there is a chance that it will block on await response.text(). I can’t consistently reproduce the issue, but I have added a lot of logs and gathered some information bellow. I would like to ask if, logically, there are cases where [await reponse.text()] does not return in some race condition?
Reproducible By
Our project is a Node.js web service that downloads many files during startup. This issue occurs when downloading the files at startup. The project code is quite complex, so I wrote a minimal demo for the downloading part. The only parts related to Undici and the APIs used are as follows. (We’re not sure if any of the third-party packages we depend on use Undici, as Node.js has Undici built-in, but the issue happens within the logic below.)
By the way, it cannot be reliably reproduced, and every time the issue occurs on different file, not the same file.
const downloadSingleFile = async (urlToDownload)=>{
const options = {
url: urlToDownload,
method: 'GET',
headers: {}
}
const controller = new AbortController();
const timeout = 500
const reqTimer = setTimeout(() => {
logger.info(`fetch abort by timeout ${timeout}`);
controller.abort();
}, timeout);
options.signal = controller.signal;
let result;
try{
const response = await undici.fetch(options.url, options)
const { status, statusText, headers } = response
if (status >= 200 && status < 300) {
const bodyTxt = await response.text();
result = { statusCode: status, headers, body: bodyTxt };
} else {
result = { statusCode: status, headers, body: statusText };
}
}catch(e){
result = {statusCode: -100, body: e.message}
}
clearTimeout(reqTimer);
return result;
}
const demoEntry = async()=>{
const res = [
'https://xxxxxx/xxx.json'
//...
//about 200-300 urls
]
for (let index = 0; index < res.length; index++) {
const element = res[index];
//...
//concurrency is 1-10, not all urls
//file size is between 10KB-10MB
downloadSingleFile(element,index)
}
}
demoEntry()
Expected Behavior
All promise return normally.
Logs & Screenshots
As the code shown above, I've add a lot of logs on response.text(), as it's hanged here.
The log i added is readAllBytes in /node_modules/undici/lib/web/fetch/util.js
//in my code
const controller = new AbortController();
const reqTimer = setTimeout(() => {
logger.info(`fetch abort by timeout ${timeout} tempIndex=${tempIndex}`);
controller.abort();
}, timeout);
//in /node_modules/undici/lib/web/fetch/util.js
const sleep = (ms)=>{
return new Promise((resolve) => setTimeout(() => resolve({value: false}), ms));
}
async function readAllBytes (reader) {
const bytes = []
let byteLength = 0
const logger = global.__asyncLogger;
const tempIndexIdx = reader["__tempIndexIdx"] //this is injected from the download index
logger.info(`fetch enter readAllBytes with idx ${tempIndexIdx} step0 ${!!reader}`)
let times = 0
let timesInfo = []
while (true) {
times++;
// logger.info(`fetch enter readAllBytes with dix ${tempIndexIdx} step0 times=${times}`)
const { done, value: chunk } = await Promise.race([reader.read(), sleep(15000)])
timesInfo.push({
time: new Date().toISOString(),
times,
length: byteLength
})
if(chunk === false){
try{
logger.info(`fetch enter controller readAllBytes with idx ${tempIndexIdx} step3 timeout ${!!reader} times=${times} length=${byteLength} ${JSON.stringify(timesInfo)}`)
logger.info(`fetch enter readAllBytes with idx ${tempIndexIdx} step3 info 1`, {reader})
const infoOfReader = {}
const symbolKey = Object.getOwnPropertySymbols(reader)
const kStateSymbol = symbolKey.find(sym => sym.toString().includes('kType'))
const kStreamSymbol = symbolKey.find(sym => sym.toString().includes('kState'))
const _innerState = reader[kStateSymbol]
const _innerReader = reader[kStreamSymbol]
infoOfReader.state = _innerState
infoOfReader.state = _innerState
const waitingLength = _innerReader.readRequests?.length || -1
const waitingReq = _innerReader.readRequests[0]
const waitingReqPromimse = waitingReq?.promise
let waitingReqStatus = 'none';
if(waitingReqPromimse){
waitingReqStatus = await Promise.race([
waitingReqPromimse.then(() => "fulfilled").catch(() => "rejected"),
new Promise(resolve => setTimeout(() => resolve("pending"), 0))
]);
}
infoOfReader.waitingLength = waitingLength
const _stream = _innerReader.stream
const streamKeys = Object.getOwnPropertySymbols(_stream)
const kStreamStatus = streamKeys.find(sym => sym.toString().includes('kState'))
const stateOfStream = _stream[kStreamStatus]
infoOfReader.stateOfStream = stateOfStream
const controller = stateOfStream.controller
const stateOfController = controller[kStreamStatus]
infoOfReader.controllerState = stateOfController
infoOfReader.waitingReqStatus = waitingReqStatus
logger.info(`fetch enter readAllBytes with idx ${tempIndexIdx} status=${waitingReqStatus} step3 info ${JSON.stringify(infoOfReader)}`)
}catch(e){
logger.info(`fetch enter readAllBytes with idx ${tempIndexIdx} step3 timeout error ${e.message}`, e)
}
//when hangs happened, it will await here, so I can catch the hang.
await sleep(600000)
}
if (done) {
logger.info(`fetch enter readAllBytes with idx ${tempIndexIdx} step2 done`)
// 1. Call successSteps with bytes.
return Buffer.concat(bytes, byteLength)
}
// 1. If chunk is not a Uint8Array object, call failureSteps
// with a TypeError and abort these steps.
if (!isUint8Array(chunk)) {
throw new TypeError('Received non-Uint8Array chunk')
}
// 2. Append the bytes represented by chunk to bytes.
bytes.push(chunk)
byteLength += chunk.length
// 3. Read-loop given reader, bytes, successSteps, and failureSteps.
}
}
Here is useful log I've got when hangs.
2025-03-12 12:51:15,604 downloadIndex=340 startFetch
2025-03-12 12:51:15,613 downloadIndex=340 get fetch response, start await reponse.text()
2025-03-12 12:51:15,613 fetch enter readAllBytes with idx 340 step0 true
2025-03-12 12:51:16,127 fetch abort by timeout 500 tempIndex=340
2025-03-12 12:51:30,913 fetch enter controller readAllBytes with idx 340 step3 timeout true times=452 length=2899968 [...(ignores),{"time":"2025-03-12T12:51:15.912Z","times":451,"length":2890356},{"time":"2025-03-12T12:51:30.913Z","times":452,"length":2899968}]
2025-03-12 12:51:30,915 fetch enter readAllBytes with idx 340 step3 info 1 data: {
reader: ReadableStreamDefaultReader {
stream: ReadableStream { locked: true, state: 'readable', supportsBYOB: true },
readRequests: 1,
close: Promise {
<pending>,
[Symbol(async_id_symbol)]: 1145919,
[Symbol(trigger_async_id_symbol)]: 1144674,
[Symbol(kResourceStore)]: [Object]
}
}
}
}
2025-03-12 12:51:30,916 INFO 128 [agent][[email protected]] [ssr:1.202.0] fetch enter readAllBytes with idx 340 status=pending step3 info {"state":"ReadableStreamDefaultReader","waitingLength":1,"stateOfStream":{"disturbed":true,"reader":{"__tempIndexIdx":340},"state":"readable","transfer":{},"controller":{}},"controllerState":{"byobRequest":null,"closeRequested":false,"pullAgain":false,"pulling":false,"started":true,"stream":{},"queue":[],"queueTotalSize":0,"highWaterMark":0,"pendingPullIntos":[]},"waitingReqStatus":"pending"}
As we can see the promise in the reader is still pennding, but the chunk is fully pulled and the abortController.sigal has emitted.
Environment
Debian GNU/Linux 12 tested on Node v20.11.1 and v22.12.0
From my code it hangs on await reponse.text()
Inside the undici code, it hangs on await reader.read() in '/lib/web/fetch/util.js'
From the log and code above, we can get the timeline: 2025-03-12 12:51:15,604: start fetch 2025-03-12 12:51:15,613: get fetch reponse, start call await response.text() 2025-03-12 12:51:15.912: await reader.read() returned 451 times, and current chunkSize is 2899968 (total size is 3032884) 2025-03-12 12:51:16,127: abort signal called after 500ms, as expected hangs
2025-03-12 12:51:30.913: returned by Promise.race in 15 seconds 2025-03-12 12:51:30,916: print info todo with the reader, the reader promise status is still pendding.
It looks like:
- Request state was not properly reset
- Queue processing was interrupted but not cleaned up
- The reader’s promise remains in a pending state
I reproduce this issue by using express.
Here are code:
server.js
const express = require('express')
const crypto = require('crypto')
// Function to create server instance
function createServer(port) {
const app = express()
// Track request counts to selectively delay certain requests
let requestCount = 0;
// Global control variable to simulate server hanging
let shouldStall = false;
// Toggle the stall status every 30 seconds
setInterval(() => {
shouldStall = !shouldStall;
console.log(`Server hang status changed to: ${shouldStall ? 'hanging' : 'normal'}`);
}, 30000);
app.get('/files/:size', (req, res) => {
const sizeInKB = parseInt(req.params.size, 10)
const sizeInBytes = sizeInKB * 1024
requestCount++;
// Set response headers
res.setHeader('Content-Type', 'application/octet-stream')
res.setHeader('Content-Length', sizeInBytes)
// Set response timeout for each request to prevent complete connection interruption
req.socket.setTimeout(300000); // 5 minute timeout
// Prevent errors caused by client-side connection interruption
req.on('close', () => {
console.log(`Request #${requestCount}: Connection closed by client`);
});
// Generate random data
const buffer = crypto.randomBytes(sizeInBytes)
// Apply slow transfer strategy for all large file requests
if (sizeInBytes > 100 * 1024) { // Files larger than 100KB
console.log(`Request #${requestCount}: Applying slow transfer (${sizeInKB}KB)`);
// Use smaller chunks and more precise control
const CHUNK_SIZE = 16 * 1024; // 16KB
let offset = 0;
let chunkCount = 0;
// Progress counter
const totalChunks = Math.ceil(sizeInBytes / CHUNK_SIZE);
function sendNextChunk() {
// Check if need to enter hanging state
if (shouldStall && chunkCount > 1) {
console.log(`Request #${requestCount}: Global hang signal detected, pausing data transfer [${chunkCount}/${totalChunks}]`);
// Pause for a very long time, but don't end the response, keeping the client hanging
setTimeout(sendNextChunk, 60000); // Check again after 1 minute
return;
}
if (offset >= sizeInBytes) {
console.log(`Request #${requestCount}: Transfer completed`);
res.end();
return;
}
try {
// Determine current chunk size
const end = Math.min(offset + CHUNK_SIZE, sizeInBytes);
const chunk = buffer.slice(offset, end);
// Use callback to ensure data is written
res.write(chunk, (err) => {
if (err) {
console.error(`Request #${requestCount}: Write error`, err);
try { res.end(); } catch (e) {}
return;
}
offset += chunk.length;
chunkCount++;
// Determine delay based on chunk number
let delay;
// First two chunks sent quickly, subsequent chunks intentionally slowed
if (chunkCount <= 2) {
delay = 50; // Send first two chunks quickly
console.log(`Request #${requestCount}: Sending fast chunk ${chunkCount}/${totalChunks}, size: ${chunk.length/1024}KB`);
} else if (chunkCount === 3) {
// After the third chunk, enter extra long delay to ensure client reader.read() will hang
delay = 120000; // 2 minute delay
console.log(`Request #${requestCount}: Entering extra long delay ${delay}ms`);
} else {
delay = 10000; // Subsequent chunks also slow
console.log(`Request #${requestCount}: Sending slow chunk ${chunkCount}/${totalChunks}, delay: ${delay}ms`);
}
// Set time to send the next chunk
setTimeout(sendNextChunk, delay);
});
} catch (err) {
console.error(`Request #${requestCount}: Send error`, err);
try { res.end(); } catch (e) {}
}
}
// Start sending
sendNextChunk();
} else {
// Small files still have some delay to prevent all responses from completing too quickly
setTimeout(() => {
res.end(buffer);
}, Math.random() * 2000); // 0-2000ms random delay
}
})
return app.listen(port, () => {
console.log(`Server running on port ${port} - http://localhost:${port}/files/:size`)
})
}
// Start 4 server instances on different ports
const ports = [3000, 3001, 3002, 3003]
const servers = ports.map(port => createServer(port))
// Add overall startup success log
console.log('=================================================')
console.log('✅ All servers started successfully!')
console.log('=================================================')
console.log(`🚀 Started ${ports.length} test server instances:`)
ports.forEach(port => {
console.log(` - http://localhost:${port}`)
})
console.log('\n📄 Available endpoints:')
console.log(' - GET /files/:size - Returns random data of specified size (KB)')
console.log(' Example: http://localhost:3000/files/1024 will return 1MB data')
console.log('\n⚙️ Press Ctrl+C to stop all servers')
console.log('=================================================')
// Add graceful shutdown handling
process.on('SIGINT', () => {
console.log('Shutting down servers...')
servers.forEach(server => server.close())
process.exit()
})
test.js
// First load the async logger
require('./asyncLogger');
const undici = require('./index.js')
// Use global async logger, fallback to console if it doesn't exist
const logger = global.__asyncLogger || console;
logger.info('Test started - Using logger:', global.__asyncLogger ? 'global.__asyncLogger' : 'console');
async function downloadSingleFile(url, index) {
const startTime = Date.now()
const timeout = 5000; // Increased to 5 seconds
logger.info(`[${index}] Starting request to ${url} at ${new Date().toISOString()}`)
const controller = new AbortController()
const reqTimer = setTimeout(() => {
logger.info(`[${index}] Aborting request after 5000ms`)
controller.abort()
}, timeout)
try {
const response = await undici.fetch(url, {
signal: controller.signal,
headers: {},
bodyTimeout: 500000, // Increased to 500 seconds to ensure no reading timeout interruption
headersTimeout: 60000 // Increased header timeout
})
logger.info(`[${index}] Got response in ${Date.now() - startTime}ms, status: ${response.status}`)
if (response.status >= 200 && response.status < 300) {
logger.info(`[${index}] Starting to read body at ${new Date().toISOString()}`)
const textStartTime = Date.now()
// Add extra monitoring to detect long-running text() calls
const textTimeout = setTimeout(() => {
logger.warn(`[${index}] ⚠️ WARNING: response.text() has been running for 5 seconds without completing!`)
}, 5000)
// Additional: Pass request index to response object for use in readAllBytes
// This is a special handling for undici's internal implementation
if (response.body && response.body.getReader) {
const originalGetReader = response.body.getReader.bind(response.body);
response.body.getReader = function() {
const reader = originalGetReader();
// Inject index into reader object for tracking
reader.__tempIndexIdx = index;
// Additionally record read start time
reader.__readStartTime = Date.now();
// Add diagnostic log
logger.info(`[${index}] Created reader instance, start time: ${new Date(reader.__readStartTime).toISOString()}`);
return reader;
};
}
// Increase timeout monitoring frequency
const textTimeoutInterval = setInterval(() => {
const elapsedTime = Date.now() - textStartTime;
logger.warn(`[${index}] ⚠️ WARNING: response.text() has been running for ${elapsedTime/1000} seconds without completing!`);
}, 3000);
// This may get stuck, which is the issue we're trying to reproduce
try {
const text = await response.text();
clearInterval(textTimeoutInterval);
clearTimeout(textTimeout);
const textDuration = Date.now() - textStartTime
logger.info(`[${index}] Completed reading body in ${textDuration}ms, length: ${text.length}`)
// Record longer response times
if (textDuration > 2000) {
logger.warn(`[${index}] ⚠️ Slow text() operation detected: ${textDuration}ms for ${url}`)
}
return { success: true, length: text.length, duration: textDuration }
} catch (textError) {
clearInterval(textTimeoutInterval);
clearTimeout(textTimeout);
logger.error(`[${index}] Text reading error:`, textError);
throw textError;
}
} else {
return { success: false, status: response.status }
}
} catch (err) {
logger.error(`[${index}] Error:`, {
name: err.name,
message: err.message,
cause: err.cause,
timestamp: new Date().toISOString()
})
return { success: false, error: err.message }
} finally {
clearTimeout(reqTimer)
}
}
async function runTest() {
// Force requests for large files to increase probability of triggering the issue
const totalRequests = 100 // Reduce number of requests, but each is a large file
// Simulate 4 different hosts
const hosts = [
'http://localhost:3000',
'http://localhost:3001',
'http://localhost:3002',
'http://localhost:3003'
]
// Generate requests, all for large files
const requests = Array.from({ length: totalRequests }, (_, i) => {
// All requests for large files
const size = Math.floor(Math.random() * 4000) + 2000; // 2MB-6MB
const host = hosts[i % hosts.length]
return `${host}/files/${size}`
})
// Use lower concurrency, focusing on fewer large files
const concurrency = 10 // Reduce concurrency to allow server to process
const results = []
let completedRequests = 0
// Implement more aggressive concurrency model
const batches = []
for (let i = 0; i < requests.length; i += concurrency) {
batches.push(requests.slice(i, i + concurrency))
}
// Use concurrency pattern closer to the issue description
logger.info(`Starting test with ${requests.length} total requests, concurrency: ${concurrency}`)
logger.info(`Using ${hosts.length} different hosts`)
// Add functionality to periodically record current status
const statusInterval = setInterval(() => {
logger.info(`Current progress: ${completedRequests}/${totalRequests} completed`);
logMemoryUsage();
}, 5000);
for (let batchIndex = 0; batchIndex < batches.length; batchIndex++) {
const batch = batches[batchIndex]
const batchStartTime = Date.now()
logger.info(`Starting batch ${batchIndex + 1}/${batches.length}, size: ${batch.length}`)
// Start all requests simultaneously
const batchPromises = batch.map((url, idx) => {
const requestIndex = batchIndex * concurrency + idx
return downloadSingleFile(url, requestIndex)
.then(result => {
completedRequests++
if (completedRequests % 10 === 0) {
logger.info(`Progress: ${completedRequests}/${totalRequests} completed`)
}
return result
})
})
// Wait for all requests in this batch to complete
const batchResults = await Promise.all(batchPromises)
results.push(...batchResults)
logger.info(`Completed batch ${batchIndex + 1}/${batches.length} in ${Date.now() - batchStartTime}ms`)
}
clearInterval(statusInterval);
return results
}
// Add memory usage monitoring
function logMemoryUsage() {
const used = process.memoryUsage()
logger.info('Memory usage:', {
rss: `${Math.round(used.rss / 1024 / 1024)} MB`,
heapTotal: `${Math.round(used.heapTotal / 1024 / 1024)} MB`,
heapUsed: `${Math.round(used.heapUsed / 1024 / 1024)} MB`,
external: `${Math.round(used.external / 1024 / 1024)} MB`
})
}
// Record memory usage every 10 seconds
const memoryInterval = setInterval(logMemoryUsage, 10000)
runTest().then((results) => {
clearInterval(memoryInterval)
const successes = results.filter(r => r.success).length
const failures = results.filter(r => !r.success).length
// Calculate statistics for text() operations
const textDurations = results
.filter(r => r.success && r.duration)
.map(r => r.duration)
const avgDuration = textDurations.reduce((sum, d) => sum + d, 0) / (textDurations.length || 1)
const maxDuration = Math.max(...(textDurations.length ? textDurations : [0]))
const slowResponses = results.filter(r => r.success && r.duration > 2000).length
console.log('\nTest Summary:')
console.log(`Total requests: ${results.length}`)
console.log(`Successful: ${successes}`)
console.log(`Failed: ${failures}`)
console.log(`Average text() duration: ${avgDuration.toFixed(2)}ms`)
console.log(`Maximum text() duration: ${maxDuration}ms`)
console.log(`Slow responses (>2000ms): ${slowResponses}`)
logMemoryUsage()
}).catch(console.error)
asyncLogger.js
const fs = require('fs');
const path = require('path');
const util = require('util');
// Create log directory
const LOG_DIR = path.join(__dirname, 'logs');
if (!fs.existsSync(LOG_DIR)) {
fs.mkdirSync(LOG_DIR);
}
// Log file path
const LOG_FILE = path.join(LOG_DIR, `test-${new Date().toISOString().replace(/:/g, '-')}.log`);
// Create write stream
const logStream = fs.createWriteStream(LOG_FILE, { flags: 'a' });
// Async logger
class AsyncLogger {
constructor() {
this.queue = [];
this.isProcessing = false;
// Write logs from queue to file every second
setInterval(() => this.processQueue(), 1000);
// Ensure all logs are written when process exits
process.on('exit', () => {
this.processQueueSync();
});
}
// Format log message
formatMessage(level, message, ...args) {
const timestamp = new Date().toISOString();
const formattedArgs = args.map(arg => {
if (typeof arg === 'object') {
return util.inspect(arg, { depth: null });
}
return arg;
}).join(' ');
return `[${timestamp}] [${level}] ${message} ${formattedArgs}`.trim();
}
// Add log to queue
log(level, message, ...args) {
const formattedMessage = this.formatMessage(level, message, ...args);
// Output to console
if (level === 'ERROR') {
console.error(formattedMessage);
} else if (level === 'WARN') {
console.warn(formattedMessage);
} else {
console.log(formattedMessage);
}
// Add to queue
this.queue.push(formattedMessage);
// Start processing if queue is not already being processed
if (!this.isProcessing) {
this.processQueue();
}
}
// Process queue asynchronously
async processQueue() {
if (this.isProcessing || this.queue.length === 0) return;
this.isProcessing = true;
try {
const messagesToProcess = [...this.queue];
this.queue = [];
// Write to file
for (const message of messagesToProcess) {
logStream.write(message + '\n');
}
} finally {
this.isProcessing = false;
// Continue processing if queue still has messages
if (this.queue.length > 0) {
setImmediate(() => this.processQueue());
}
}
}
// Process queue synchronously (used when process exits)
processQueueSync() {
if (this.queue.length === 0) return;
const messagesToProcess = [...this.queue];
this.queue = [];
for (const message of messagesToProcess) {
fs.appendFileSync(LOG_FILE, message + '\n');
}
}
// Public methods
info(message, ...args) {
this.log('INFO', message, ...args);
}
warn(message, ...args) {
this.log('WARN', message, ...args);
}
error(message, ...args) {
this.log('ERROR', message, ...args);
}
debug(message, ...args) {
this.log('DEBUG', message, ...args);
}
}
// Export instance and set as global variable
const logger = new AsyncLogger();
global.__asyncLogger = logger;
module.exports = logger;
we need to update lib/web/fetch/util.js
const sleep = (ms)=>{
return new Promise((resolve) => setTimeout(() => resolve({value: false}), ms));
}
/**
* @see https://streams.spec.whatwg.org/#readablestreamdefaultreader-read-all-bytes
* @see https://streams.spec.whatwg.org/#read-loop
* @param {ReadableStreamDefaultReader} reader
*/
async function readAllBytes (reader) {
const bytes = []
let byteLength = 0
const logger = global.__asyncLogger;
const tempIndexIdx = reader["__tempIndexIdx"] //this is injected from the download index
logger.info(`fetch enter readAllBytes with idx ${tempIndexIdx} step0 ${!!reader}`)
let times = 0
let timesInfo = []
while (true) {
times++;
logger.info(`fetch enter readAllBytes with dix ${tempIndexIdx} step0 times=${times}`)
// Significantly reduce timeout time to ensure it triggers before reader.read()
const { done, value: chunk } = await Promise.race([reader.read(), sleep(100)])
timesInfo.push({
time: new Date().toISOString(),
times,
length: byteLength
})
logger.info(`fetch enter readAllBytes with idx chunk: ${chunk === false}`)
if(chunk === false){
try{
logger.info(`fetch enter controller readAllBytes with idx ${tempIndexIdx} step3 timeout ${!!reader} times=${times} length=${byteLength} ${JSON.stringify(timesInfo)}`)
logger.info(`fetch enter readAllBytes with idx ${tempIndexIdx} step3 info 1`, {reader})
const infoOfReader = {}
const symbolKey = Object.getOwnPropertySymbols(reader)
const kStateSymbol = symbolKey.find(sym toString().includes('kType'))
const kStreamSymbol = symbolKey.find(sym toString().includes('kState'))
const _innerState = reader[kStateSymbol]
const _innerReader = reader[kStreamSymbol]
infoOfReader.state = _innerState
infoOfReader.state = _innerState
const waitingLength = _innerReader.readRequests?.length || -1
const waitingReq = _innerReader.readRequests[0]
const waitingReqPromimse = waitingReq?.promise
let waitingReqStatus = 'none';
if(waitingReqPromimse){
// Increase chance to detect pending status by increasing timeout to 500ms
waitingReqStatus = await Promise.race([
waitingReqPromimse.then(() => "fulfilled").catch(() => "rejected"),
new Promise(resolve => setTimeout(() => resolve("pending"), 500))
]);
}
infoOfReader.waitingLength = waitingLength
const _stream = _innerReader.stream
const streamKeys = Object.getOwnPropertySymbols(_stream)
const kStreamStatus = streamKeys.find(sym toString().includes('kState'))
const stateOfStream = _stream[kStreamStatus]
infoOfReader.stateOfStream = stateOfStream
const controller = stateOfStream.controller
const stateOfController = controller[kStreamStatus]
infoOfReader.controllerState = stateOfController
infoOfReader.waitingReqStatus = waitingReqStatus
logger.info(`fetch enter readAllBytes with idx ${tempIndexIdx} status=${waitingReqStatus} step3 info ${JSON.stringify(infoOfReader)}`)
}catch(e){
logger.info(`fetch enter readAllBytes with idx ${tempIndexIdx} step3 timeout error ${e.message}`, e)
}
//when hangs happened, it will await here, so I can catch the hang.
await sleep(600000)
}
if (done) {
logger.info(`fetch enter readAllBytes with idx ${tempIndexIdx} step2 done`)
// 1. Call successSteps with bytes.
return Buffer.concat(bytes, byteLength)
}
// 1. If chunk is not a Uint8Array object, call failureSteps
// with a TypeError and abort these steps.
if (!isUint8Array(chunk)) {
throw new TypeError('Received non-Uint8Array chunk')
}
// 2. Append the bytes represented by chunk to bytes.
bytes.push(chunk)
byteLength += chunk.length
// 3. Read-loop given reader, bytes, successSteps, and failureSteps.
}
}
The root cause of the problem is:
- Server stalls after sending partial data
- In our server simulation code, it intentionally adds a 120-second (2-minute) delay after sending the first few data chunks
- This situation simulates a "half-open connection" problem in real-world scenarios, where the TCP connection remains open but data transfer is interrupted
- Read operation blocks indefinitely
- reader.read() is a blocking operation that waits until new data arrives or the stream ends
- But if the server neither sends new data nor closes the connection, this read operation will hang indefinitely
Can you put all the above in a repository and make it easy for use to reproduce?
Can you put all the above in a repository and make it easy for use to reproduce?
I have pushed these files on branch fix-4089, we could find files on reproduce-script.
Directly run:
- node ./reproduce-script/server.js
- node ./reproduce-script/test.js
What repository?
We also ran into an issue that very accurately resembles the issue reported here. We saw hangs on await response.text(), but only when the server was responding with 'transfer-encoding': 'chunked'.
What may be relevant for this issue is: bumping to undici 7.10.0 solved it for us
We also ran into an issue that very accurately resembles the issue reported here. We saw hangs on
await response.text(), but only when the server was responding with 'transfer-encoding': 'chunked'.What may be relevant for this issue is: bumping to undici 7.10.0 solved it for us
Yes, please read this repo, and I reproduced it. But I still have no idea to fix it.
@mcollina here: https://github.com/fwx5618177/undici/tree/fix-4089/reproduce-script
hi, @rene84 ,as you said :
only when the server was responding with 'transfer-encoding': 'chunked'.
I have noticed that there is the same response header chunked in my case.