fix: Fix connection and timeout handling issues in high concurrency scenarios for undici
This relates to...
#4089
Rationale
Current issues with undici in high concurrency scenarios:
- Request states not properly reset, leading to stuck requests
- Queue processing interruption without cleanup, causing resource leaks
- Reader promises remaining in pending state indefinitely
Expected Behavior
After fixes, the behavior should be:
- Request states properly reset and cleaned up
- Queued requests properly processed or cleaned up
- Timeout mechanisms working correctly to prevent stuck requests
Changes
- Added state reset logic
in _resume()method - Enhanced queue cleanup in
connect()timeout handler - Improved error handling in
onError()for connection failures - Added timeout protection in
connect()for stuck requests - Fixed resource cleanup in
kDestroysymbol method
Features
- State Management Optimization
function _resume (client, sync) {
while (true) {
if (client[kConnecting] && !client[kHTTPContext]) {
client[kConnecting] = false
client[kResuming] = 0
}
// ...existing code...
}
}
- Request Queue Management
async function connect (client) {
const timeout = setTimeout(() => {
if (client[kResuming] === 2) {
// Timeout handling and queue cleanup
const err = new Error('Request timeout')
const requests = client[kQueue].splice(client[kPendingIdx])
for (const req of requests) {
util.errorRequest(client, req, err)
}
}
}, 30000)
// ...existing code...
}
Bug Fixes
- State Reset Issues
- Added connection state checks
- Proper timeout handling
- Legacy state cleanup
- Queue Processing Issues
- Handle interrupted request queues
- Cleanup timed-out requests
- Proper resource release
- Promise Deadlock Issues
- Added timeout protection
- Handle abnormal states
- Improved error handling
Breaking Changes and Deprecations
None.
Status
- [x] I have read and agreed to the Developer's Certificate of Origin
- [x] Tested
- [ ] Benchmarked (optional)
- [ ] Documented
- [x] Review ready
- [ ] In review
- [ ] Merge ready
I don't believe this is the way to go, the changes essentially changes the way the queue handles itself and its different states; the error from the issue seems to root more into a different path (Fetch ready - Stream reader).
Got it. I would simulate and reproduce the specific issue, and then follow this reasoning.
@metcoder95
I reproduce this issue by using express.
Here are code:
server.js
const express = require('express')
const crypto = require('crypto')
// Function to create server instance
function createServer(port) {
const app = express()
// Track request counts to selectively delay certain requests
let requestCount = 0;
// Global control variable to simulate server hanging
let shouldStall = false;
// Toggle the stall status every 30 seconds
setInterval(() => {
shouldStall = !shouldStall;
console.log(`Server hang status changed to: ${shouldStall ? 'hanging' : 'normal'}`);
}, 30000);
app.get('/files/:size', (req, res) => {
const sizeInKB = parseInt(req.params.size, 10)
const sizeInBytes = sizeInKB * 1024
requestCount++;
// Set response headers
res.setHeader('Content-Type', 'application/octet-stream')
res.setHeader('Content-Length', sizeInBytes)
// Set response timeout for each request to prevent complete connection interruption
req.socket.setTimeout(300000); // 5 minute timeout
// Prevent errors caused by client-side connection interruption
req.on('close', () => {
console.log(`Request #${requestCount}: Connection closed by client`);
});
// Generate random data
const buffer = crypto.randomBytes(sizeInBytes)
// Apply slow transfer strategy for all large file requests
if (sizeInBytes > 100 * 1024) { // Files larger than 100KB
console.log(`Request #${requestCount}: Applying slow transfer (${sizeInKB}KB)`);
// Use smaller chunks and more precise control
const CHUNK_SIZE = 16 * 1024; // 16KB
let offset = 0;
let chunkCount = 0;
// Progress counter
const totalChunks = Math.ceil(sizeInBytes / CHUNK_SIZE);
function sendNextChunk() {
// Check if need to enter hanging state
if (shouldStall && chunkCount > 1) {
console.log(`Request #${requestCount}: Global hang signal detected, pausing data transfer [${chunkCount}/${totalChunks}]`);
// Pause for a very long time, but don't end the response, keeping the client hanging
setTimeout(sendNextChunk, 60000); // Check again after 1 minute
return;
}
if (offset >= sizeInBytes) {
console.log(`Request #${requestCount}: Transfer completed`);
res.end();
return;
}
try {
// Determine current chunk size
const end = Math.min(offset + CHUNK_SIZE, sizeInBytes);
const chunk = buffer.slice(offset, end);
// Use callback to ensure data is written
res.write(chunk, (err) => {
if (err) {
console.error(`Request #${requestCount}: Write error`, err);
try { res.end(); } catch (e) {}
return;
}
offset += chunk.length;
chunkCount++;
// Determine delay based on chunk number
let delay;
// First two chunks sent quickly, subsequent chunks intentionally slowed
if (chunkCount <= 2) {
delay = 50; // Send first two chunks quickly
console.log(`Request #${requestCount}: Sending fast chunk ${chunkCount}/${totalChunks}, size: ${chunk.length/1024}KB`);
} else if (chunkCount === 3) {
// After the third chunk, enter extra long delay to ensure client reader.read() will hang
delay = 120000; // 2 minute delay
console.log(`Request #${requestCount}: Entering extra long delay ${delay}ms`);
} else {
delay = 10000; // Subsequent chunks also slow
console.log(`Request #${requestCount}: Sending slow chunk ${chunkCount}/${totalChunks}, delay: ${delay}ms`);
}
// Set time to send the next chunk
setTimeout(sendNextChunk, delay);
});
} catch (err) {
console.error(`Request #${requestCount}: Send error`, err);
try { res.end(); } catch (e) {}
}
}
// Start sending
sendNextChunk();
} else {
// Small files still have some delay to prevent all responses from completing too quickly
setTimeout(() => {
res.end(buffer);
}, Math.random() * 2000); // 0-2000ms random delay
}
})
return app.listen(port, () => {
console.log(`Server running on port ${port} - http://localhost:${port}/files/:size`)
})
}
// Start 4 server instances on different ports
const ports = [3000, 3001, 3002, 3003]
const servers = ports.map(port => createServer(port))
// Add overall startup success log
console.log('=================================================')
console.log('â
All servers started successfully!')
console.log('=================================================')
console.log(`đ Started ${ports.length} test server instances:`)
ports.forEach(port => {
console.log(` - http://localhost:${port}`)
})
console.log('\nđ Available endpoints:')
console.log(' - GET /files/:size - Returns random data of specified size (KB)')
console.log(' Example: http://localhost:3000/files/1024 will return 1MB data')
console.log('\nâď¸ Press Ctrl+C to stop all servers')
console.log('=================================================')
// Add graceful shutdown handling
process.on('SIGINT', () => {
console.log('Shutting down servers...')
servers.forEach(server => server.close())
process.exit()
})
test.js
// First load the async logger
require('./asyncLogger');
const undici = require('./index.js')
// Use global async logger, fallback to console if it doesn't exist
const logger = global.__asyncLogger || console;
logger.info('Test started - Using logger:', global.__asyncLogger ? 'global.__asyncLogger' : 'console');
async function downloadSingleFile(url, index) {
const startTime = Date.now()
const timeout = 5000; // Increased to 5 seconds
logger.info(`[${index}] Starting request to ${url} at ${new Date().toISOString()}`)
const controller = new AbortController()
const reqTimer = setTimeout(() => {
logger.info(`[${index}] Aborting request after 5000ms`)
controller.abort()
}, timeout)
try {
const response = await undici.fetch(url, {
signal: controller.signal,
headers: {},
bodyTimeout: 500000, // Increased to 500 seconds to ensure no reading timeout interruption
headersTimeout: 60000 // Increased header timeout
})
logger.info(`[${index}] Got response in ${Date.now() - startTime}ms, status: ${response.status}`)
if (response.status >= 200 && response.status < 300) {
logger.info(`[${index}] Starting to read body at ${new Date().toISOString()}`)
const textStartTime = Date.now()
// Add extra monitoring to detect long-running text() calls
const textTimeout = setTimeout(() => {
logger.warn(`[${index}] â ď¸ WARNING: response.text() has been running for 5 seconds without completing!`)
}, 5000)
// Additional: Pass request index to response object for use in readAllBytes
// This is a special handling for undici's internal implementation
if (response.body && response.body.getReader) {
const originalGetReader = response.body.getReader.bind(response.body);
response.body.getReader = function() {
const reader = originalGetReader();
// Inject index into reader object for tracking
reader.__tempIndexIdx = index;
// Additionally record read start time
reader.__readStartTime = Date.now();
// Add diagnostic log
logger.info(`[${index}] Created reader instance, start time: ${new Date(reader.__readStartTime).toISOString()}`);
return reader;
};
}
// Increase timeout monitoring frequency
const textTimeoutInterval = setInterval(() => {
const elapsedTime = Date.now() - textStartTime;
logger.warn(`[${index}] â ď¸ WARNING: response.text() has been running for ${elapsedTime/1000} seconds without completing!`);
}, 3000);
// This may get stuck, which is the issue we're trying to reproduce
try {
const text = await response.text();
clearInterval(textTimeoutInterval);
clearTimeout(textTimeout);
const textDuration = Date.now() - textStartTime
logger.info(`[${index}] Completed reading body in ${textDuration}ms, length: ${text.length}`)
// Record longer response times
if (textDuration > 2000) {
logger.warn(`[${index}] â ď¸ Slow text() operation detected: ${textDuration}ms for ${url}`)
}
return { success: true, length: text.length, duration: textDuration }
} catch (textError) {
clearInterval(textTimeoutInterval);
clearTimeout(textTimeout);
logger.error(`[${index}] Text reading error:`, textError);
throw textError;
}
} else {
return { success: false, status: response.status }
}
} catch (err) {
logger.error(`[${index}] Error:`, {
name: err.name,
message: err.message,
cause: err.cause,
timestamp: new Date().toISOString()
})
return { success: false, error: err.message }
} finally {
clearTimeout(reqTimer)
}
}
async function runTest() {
// Force requests for large files to increase probability of triggering the issue
const totalRequests = 100 // Reduce number of requests, but each is a large file
// Simulate 4 different hosts
const hosts = [
'http://localhost:3000',
'http://localhost:3001',
'http://localhost:3002',
'http://localhost:3003'
]
// Generate requests, all for large files
const requests = Array.from({ length: totalRequests }, (_, i) => {
// All requests for large files
const size = Math.floor(Math.random() * 4000) + 2000; // 2MB-6MB
const host = hosts[i % hosts.length]
return `${host}/files/${size}`
})
// Use lower concurrency, focusing on fewer large files
const concurrency = 10 // Reduce concurrency to allow server to process
const results = []
let completedRequests = 0
// Implement more aggressive concurrency model
const batches = []
for (let i = 0; i < requests.length; i += concurrency) {
batches.push(requests.slice(i, i + concurrency))
}
// Use concurrency pattern closer to the issue description
logger.info(`Starting test with ${requests.length} total requests, concurrency: ${concurrency}`)
logger.info(`Using ${hosts.length} different hosts`)
// Add functionality to periodically record current status
const statusInterval = setInterval(() => {
logger.info(`Current progress: ${completedRequests}/${totalRequests} completed`);
logMemoryUsage();
}, 5000);
for (let batchIndex = 0; batchIndex < batches.length; batchIndex++) {
const batch = batches[batchIndex]
const batchStartTime = Date.now()
logger.info(`Starting batch ${batchIndex + 1}/${batches.length}, size: ${batch.length}`)
// Start all requests simultaneously
const batchPromises = batch.map((url, idx) => {
const requestIndex = batchIndex * concurrency + idx
return downloadSingleFile(url, requestIndex)
.then(result => {
completedRequests++
if (completedRequests % 10 === 0) {
logger.info(`Progress: ${completedRequests}/${totalRequests} completed`)
}
return result
})
})
// Wait for all requests in this batch to complete
const batchResults = await Promise.all(batchPromises)
results.push(...batchResults)
logger.info(`Completed batch ${batchIndex + 1}/${batches.length} in ${Date.now() - batchStartTime}ms`)
}
clearInterval(statusInterval);
return results
}
// Add memory usage monitoring
function logMemoryUsage() {
const used = process.memoryUsage()
logger.info('Memory usage:', {
rss: `${Math.round(used.rss / 1024 / 1024)} MB`,
heapTotal: `${Math.round(used.heapTotal / 1024 / 1024)} MB`,
heapUsed: `${Math.round(used.heapUsed / 1024 / 1024)} MB`,
external: `${Math.round(used.external / 1024 / 1024)} MB`
})
}
// Record memory usage every 10 seconds
const memoryInterval = setInterval(logMemoryUsage, 10000)
runTest().then((results) => {
clearInterval(memoryInterval)
const successes = results.filter(r => r.success).length
const failures = results.filter(r => !r.success).length
// Calculate statistics for text() operations
const textDurations = results
.filter(r => r.success && r.duration)
.map(r => r.duration)
const avgDuration = textDurations.reduce((sum, d) => sum + d, 0) / (textDurations.length || 1)
const maxDuration = Math.max(...(textDurations.length ? textDurations : [0]))
const slowResponses = results.filter(r => r.success && r.duration > 2000).length
console.log('\nTest Summary:')
console.log(`Total requests: ${results.length}`)
console.log(`Successful: ${successes}`)
console.log(`Failed: ${failures}`)
console.log(`Average text() duration: ${avgDuration.toFixed(2)}ms`)
console.log(`Maximum text() duration: ${maxDuration}ms`)
console.log(`Slow responses (>2000ms): ${slowResponses}`)
logMemoryUsage()
}).catch(console.error)
asyncLogger.js
const fs = require('fs');
const path = require('path');
const util = require('util');
// Create log directory
const LOG_DIR = path.join(__dirname, 'logs');
if (!fs.existsSync(LOG_DIR)) {
fs.mkdirSync(LOG_DIR);
}
// Log file path
const LOG_FILE = path.join(LOG_DIR, `test-${new Date().toISOString().replace(/:/g, '-')}.log`);
// Create write stream
const logStream = fs.createWriteStream(LOG_FILE, { flags: 'a' });
// Async logger
class AsyncLogger {
constructor() {
this.queue = [];
this.isProcessing = false;
// Write logs from queue to file every second
setInterval(() => this.processQueue(), 1000);
// Ensure all logs are written when process exits
process.on('exit', () => {
this.processQueueSync();
});
}
// Format log message
formatMessage(level, message, ...args) {
const timestamp = new Date().toISOString();
const formattedArgs = args.map(arg => {
if (typeof arg === 'object') {
return util.inspect(arg, { depth: null });
}
return arg;
}).join(' ');
return `[${timestamp}] [${level}] ${message} ${formattedArgs}`.trim();
}
// Add log to queue
log(level, message, ...args) {
const formattedMessage = this.formatMessage(level, message, ...args);
// Output to console
if (level === 'ERROR') {
console.error(formattedMessage);
} else if (level === 'WARN') {
console.warn(formattedMessage);
} else {
console.log(formattedMessage);
}
// Add to queue
this.queue.push(formattedMessage);
// Start processing if queue is not already being processed
if (!this.isProcessing) {
this.processQueue();
}
}
// Process queue asynchronously
async processQueue() {
if (this.isProcessing || this.queue.length === 0) return;
this.isProcessing = true;
try {
const messagesToProcess = [...this.queue];
this.queue = [];
// Write to file
for (const message of messagesToProcess) {
logStream.write(message + '\n');
}
} finally {
this.isProcessing = false;
// Continue processing if queue still has messages
if (this.queue.length > 0) {
setImmediate(() => this.processQueue());
}
}
}
// Process queue synchronously (used when process exits)
processQueueSync() {
if (this.queue.length === 0) return;
const messagesToProcess = [...this.queue];
this.queue = [];
for (const message of messagesToProcess) {
fs.appendFileSync(LOG_FILE, message + '\n');
}
}
// Public methods
info(message, ...args) {
this.log('INFO', message, ...args);
}
warn(message, ...args) {
this.log('WARN', message, ...args);
}
error(message, ...args) {
this.log('ERROR', message, ...args);
}
debug(message, ...args) {
this.log('DEBUG', message, ...args);
}
}
// Export instance and set as global variable
const logger = new AsyncLogger();
global.__asyncLogger = logger;
module.exports = logger;
we need to update lib/web/fetch/util.js
const sleep = (ms)=>{
return new Promise((resolve) => setTimeout(() => resolve({value: false}), ms));
}
/**
* @see https://streams.spec.whatwg.org/#readablestreamdefaultreader-read-all-bytes
* @see https://streams.spec.whatwg.org/#read-loop
* @param {ReadableStreamDefaultReader} reader
*/
async function readAllBytes (reader) {
const bytes = []
let byteLength = 0
const logger = global.__asyncLogger;
const tempIndexIdx = reader["__tempIndexIdx"] //this is injected from the download index
logger.info(`fetch enter readAllBytes with idx ${tempIndexIdx} step0 ${!!reader}`)
let times = 0
let timesInfo = []
while (true) {
times++;
logger.info(`fetch enter readAllBytes with dix ${tempIndexIdx} step0 times=${times}`)
// Significantly reduce timeout time to ensure it triggers before reader.read()
const { done, value: chunk } = await Promise.race([reader.read(), sleep(100)])
timesInfo.push({
time: new Date().toISOString(),
times,
length: byteLength
})
logger.info(`fetch enter readAllBytes with idx chunk: ${chunk === false}`)
if(chunk === false){
try{
logger.info(`fetch enter controller readAllBytes with idx ${tempIndexIdx} step3 timeout ${!!reader} times=${times} length=${byteLength} ${JSON.stringify(timesInfo)}`)
logger.info(`fetch enter readAllBytes with idx ${tempIndexIdx} step3 info 1`, {reader})
const infoOfReader = {}
const symbolKey = Object.getOwnPropertySymbols(reader)
const kStateSymbol = symbolKey.find(sym toString().includes('kType'))
const kStreamSymbol = symbolKey.find(sym toString().includes('kState'))
const _innerState = reader[kStateSymbol]
const _innerReader = reader[kStreamSymbol]
infoOfReader.state = _innerState
infoOfReader.state = _innerState
const waitingLength = _innerReader.readRequests?.length || -1
const waitingReq = _innerReader.readRequests[0]
const waitingReqPromimse = waitingReq?.promise
let waitingReqStatus = 'none';
if(waitingReqPromimse){
// Increase chance to detect pending status by increasing timeout to 500ms
waitingReqStatus = await Promise.race([
waitingReqPromimse.then(() => "fulfilled").catch(() => "rejected"),
new Promise(resolve => setTimeout(() => resolve("pending"), 500))
]);
}
infoOfReader.waitingLength = waitingLength
const _stream = _innerReader.stream
const streamKeys = Object.getOwnPropertySymbols(_stream)
const kStreamStatus = streamKeys.find(sym toString().includes('kState'))
const stateOfStream = _stream[kStreamStatus]
infoOfReader.stateOfStream = stateOfStream
const controller = stateOfStream.controller
const stateOfController = controller[kStreamStatus]
infoOfReader.controllerState = stateOfController
infoOfReader.waitingReqStatus = waitingReqStatus
logger.info(`fetch enter readAllBytes with idx ${tempIndexIdx} status=${waitingReqStatus} step3 info ${JSON.stringify(infoOfReader)}`)
}catch(e){
logger.info(`fetch enter readAllBytes with idx ${tempIndexIdx} step3 timeout error ${e.message}`, e)
}
//when hangs happened, it will await here, so I can catch the hang.
await sleep(600000)
}
if (done) {
logger.info(`fetch enter readAllBytes with idx ${tempIndexIdx} step2 done`)
// 1. Call successSteps with bytes.
return Buffer.concat(bytes, byteLength)
}
// 1. If chunk is not a Uint8Array object, call failureSteps
// with a TypeError and abort these steps.
if (!isUint8Array(chunk)) {
throw new TypeError('Received non-Uint8Array chunk')
}
// 2. Append the bytes represented by chunk to bytes.
bytes.push(chunk)
byteLength += chunk.length
// 3. Read-loop given reader, bytes, successSteps, and failureSteps.
}
}
The root cause of the problem is:
- Server stalls after sending partial data
- In our server simulation code, it intentionally adds a 120-second (2-minute) delay after sending the first few data chunks
- This situation simulates a "half-open connection" problem in real-world scenarios, where the TCP connection remains open but data transfer is interrupted
- Read operation blocks indefinitely
- reader.read() is a blocking operation that waits until new data arrives or the stream ends
- But if the server neither sends new data nor closes the connection, this read operation will hang indefinitely
reader.read() is a blocking operation that waits until new data arrives or the stream ends But if the server neither sends new data nor closes the connection, this read operation will hang indefinitely
We have a header & body read timeout in undici. Why doesn't that trigger?
I don't think this
sleepsolution is the correct. Can you create a reproducible (failing) test for this problem?
I have pushed these files on branch fix-4089, we could find files on reproduce-script.
Directly run:
node ./reproduce-script/server.js node ./reproduce-script/test.js