Sometimes when downloading files via streaming, the data does not reach the onData event.
Problem description
Sometimes the data sent by the server does not enter the data event.
Reproduction steps
This is a streaming file download function. The server sends data to the client in chunks until IsLastBlock is encountered, indicating that data transmission is complete. However, sometimes on('data') is not triggered, causing some chunks to be lost, resulting in an incomplete file.
Environment
- Windows11
- Node v22.21.1
- Node install method:windows msi installer
- [email protected]
Additional context
code segment:
import log from "electron-log";
import fs from "fs";
import fsPromises from "fs/promises";
import path from "path";
import { PassThrough, pipeline } from "stream";
import { promisify } from "util";
import { createGrpcClient } from "../grpc/client.js";
import { DownloadChildMessage } from "../types/grpcChild.js";
const pipe = promisify(pipeline);
log.info("[Worker] Log Downloader Worker Started");
process.on("message", async (msg: DownloadChildMessage & { userDataPath: string; protoPath: string }) => {
const { type, ip, port, payload, taskId, instance, userDataPath, protoPath } = msg;
log.info(`[Worker] gRPC target: ${ip}:${port}`);
// 添加下载超时设置
const DOWNLOAD_TIMEOUT = 5 * 60 * 1000; // 5分钟超时
let timeoutId = setTimeout(() => {
log.error(`[Worker] Download timeout after ${DOWNLOAD_TIMEOUT}ms`);
process.send?.({ success: false, error: "Download timeout" });
process.exit(1);
}, DOWNLOAD_TIMEOUT);
try {
const client = createGrpcClient(ip, port, protoPath);
const input = { InputFileJson: JSON.stringify(payload) };
const tempDir = path.join(userDataPath, `downloads/logs/${taskId}`);
await fsPromises.mkdir(tempDir, { recursive: true });
const zipPath = path.join(
tempDir,
`${generateZipId(instance)}${type === "app" ? "_app" : ""}.zip`
);
const writeStream = fs.createWriteStream(zipPath);
// 监听写入流的错误事件
writeStream.on('error', (err) => {
log.error(`[Worker] Write stream error: ${err.message}`);
pass.destroy(err); // 将错误传播到pipeline
});
const pass = new PassThrough({ highWaterMark: 1024 * 1024 }); // 设置合适的缓冲区大小
let receivedChunks = 0;
let receivedBytes = 0;
let lastBlockReceived = false;
// pipeline 自动处理流和背压
const pipelinePromise = pipe(pass, writeStream)
.then(() => {
clearTimeout(timeoutId);
if (!lastBlockReceived) {
log.warn(`[Worker] Pipeline completed but IsLastBlock was never received. Received ${receivedChunks} chunks, ${receivedBytes} bytes.`);
}
log.info(`[Worker] Download complete -> ${zipPath}`);
process.send?.({ success: true, path: zipPath });
})
.catch(err => {
clearTimeout(timeoutId);
log.error(`[Worker] Pipeline failed: ${err.message}`);
process.send?.({ success: false, error: err.message });
})
.finally(() => {
// 确保进程退出
setTimeout(() => process.exit(0), 1000);
});
const call = type === "app" ? client.DownloadFile(input) : client.DownloadLog(input);
// 改进的背压处理机制
const handleBackpressure = (chunk: Buffer) => {
if (!pass.write(chunk)) {
// 缓冲区满,暂停gRPC流
call.pause();
log.debug(`[Worker] Backpressure: pausing gRPC stream. Buffer full.`);
// 等待drain事件再恢复
pass.once("drain", () => {
log.debug(`[Worker] Backpressure relieved: resuming gRPC stream.`);
call.resume();
});
}
};
call.on("data", (res: any) => {
receivedChunks++;
const chunkSize = res.FileBytes?.length || 0;
receivedBytes += chunkSize;
log.info(`[Worker] Received chunk ${receivedChunks}: ${chunkSize} bytes, IsLastBlock: ${res.IsLastBlock}`);
if (res.IsLastBlock) {
lastBlockReceived = true;
log.info(`[Worker] Final block received. Total: ${receivedChunks} chunks, ${receivedBytes} bytes`);
}
if (res.FileBytes?.length) {
handleBackpressure(res.FileBytes);
} else {
log.warn(`[Worker] Received chunk with empty FileBytes`);
}
});
call.on("end", () => {
log.info("[Worker] gRPC stream ended, closing PassThrough");
if (!lastBlockReceived) {
log.warn(`[Worker] Stream ended but IsLastBlock=true was not received. Received ${receivedChunks} chunks.`);
}
pass.end(); // 正常结束流
});
call.on("error", (err: any) => {
clearTimeout(timeoutId);
log.error(`[Worker] gRPC error: ${err.message}`);
pass.destroy(err); // 将错误传播到pipeline
});
call.on("status", (status: any) => {
log.info(`[Worker] gRPC status: ${JSON.stringify(status)}`);
});
// 等待pipeline完成
await pipelinePromise;
} catch (err: any) {
clearTimeout(timeoutId);
log.error(`[Worker] Fatal error: ${err.message}`);
process.send?.({ success: false, error: err.message });
setTimeout(() => process.exit(1), 1000);
}
});
function generateZipId(instance: any) {
const { HostName, ServerName, Version, ServiceInstanceName, Id } = instance;
return `${HostName}_${ServerName}_${Version}_${ServiceInstanceName}_${Id}`;
}
import grpc from "@grpc/grpc-js";
import protoLoader from "@grpc/proto-loader";
export function createGrpcClient(ip: string, port: number, protoPath: string) {
const packageDefinition = protoLoader.loadSync(protoPath, {
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true,
});
const proto = grpc.loadPackageDefinition(packageDefinition) as any;
const Service = proto.ServerMangerGrpc;
return new Service(
`${ip}:${port}`,
grpc.credentials.createInsecure()
);
}
import { fork } from "child_process";
import { app } from "electron";
import log from "electron-log";
import path, { dirname } from "path";
import { fileURLToPath } from "url";
import { getProtoPath } from "../grpc/protoPath.js";
import { DownloadChildMessage } from "../types/grpcChild.js";
const __filename = fileURLToPath(import.meta.url);
const __dirname = dirname(__filename);
export const downloadLogInChild = (
ip: string,
port: number,
payload: any,
taskId: string,
instance: any,
type: "log" | "app" = "log",
protoPath?: string
): Promise<string> => {
return new Promise((resolve, reject) => {
const child = fork(path.join(__dirname, "logDownloaderWorker.js"));
let resolved = false; // 防止多次解析
// 设置通信超时
const timeout = setTimeout(() => {
if (!resolved) {
log.error(`Download child process timeout for ${instance.HostName}`);
child.kill('SIGKILL');
reject(new Error("Child process timeout"));
resolved = true;
}
}, 10 * 60 * 1000); // 10分钟超时
child.on("message", (msg: { success: boolean; path?: string; error?: string }) => {
if (resolved) return;
clearTimeout(timeout);
resolved = true;
if (msg.success) {
log.info(`Download successful for ${instance.HostName}: ${msg.path}`);
resolve(msg.path!);
} else {
log.error(`Download failed for ${instance.HostName}: ${msg.error}`);
reject(new Error(msg.error || "Download failed"));
}
// 温和关闭子进程
setTimeout(() => {
if (child.connected) {
child.disconnect();
}
if (!child.killed) {
child.kill();
}
}, 1000);
});
child.on("error", (err) => {
if (resolved) return;
clearTimeout(timeout);
resolved = true;
log.error(`Child process error for ${instance.HostName}: ${err.message}`);
reject(err);
});
child.on("exit", (code, signal) => {
if (code !== 0 && !resolved) {
clearTimeout(timeout);
resolved = true;
log.error(`Child process exited abnormally for ${instance.HostName}: code ${code}, signal ${signal}`);
reject(new Error(`Child process exited with code ${code}`));
}
});
// 传入 Electron 路径
const userDataPath = app.getPath("userData");
const message: DownloadChildMessage & { userDataPath: string; protoPath: string } = {
ip, port, payload, taskId, instance, type,
userDataPath,
protoPath: protoPath || getProtoPath()
};
// 添加发送重试机制
const sendMessage = (attempt = 0) => {
if (attempt >= 3) {
log.error(`Failed to send message to child process after ${attempt} attempts`);
reject(new Error("Cannot communicate with child process"));
return;
}
if (child.connected) {
child.send(message, (err) => {
if (err) {
log.warn(`Failed to send message to child (attempt ${attempt + 1}): ${err.message}`);
setTimeout(() => sendMessage(attempt + 1), 1000);
}
});
} else {
setTimeout(() => sendMessage(attempt + 1), 1000);
}
};
sendMessage();
});
};
log info:
Thank you very much, could you please help me figure out where the problem is?
this is a demo:
code:
rpc DownloadLog(DownloadLogRequest) returns (stream DownloadLogResponse);
message DownloadLogRequest {
string InputFileJson = 1;
}
message DownloadLogResponse {
string FileName = 1;
int32 BlockNumber = 2;
bool IsLastBlock = 3;
bytes FileBytes = 4;
string ServerName = 5;
int32 IsSuccess = 7;
string ServiceName = 6;
string Message = 8;
string StartTime = 9;
string EndTime = 10;
int64 FileNumber = 11;
int64 CurrentNumber = 12;
int64 TotalSize = 13;
int64 CurrentSie = 14;
string Type = 15;
}
const grpc = require("@grpc/grpc-js");
const protoLoader = require("@grpc/proto-loader");
const fs = require("fs");
const path = require("path");
const mkdirp = require("mkdirp");
let instances = [];
let startTime = "";
let endTime = "";
fs.readFile("task.json", "utf8", async (err, data) => {
if (err) {
console.error("Error reading task.json:", err);
return;
}
const taskInfo = JSON.parse(data);
instances = taskInfo.instances;
endTime = taskInfo.duration[1];
startTime = taskInfo.duration[0];
console.log(endTime, startTime, instances.length);
const saveDir = "downloads";
if (fs.existsSync(saveDir)) {
fs.rmdirSync(saveDir, { recursive: true });
}
// Process each instance sequentially
for (const instance of instances) {
try {
console.log(`>>>>>>>>>>>>>>>>>>>>>>${instance.HostName}>>>>>>>>>>>>>>>>>>>>>>>>>>>>`);
const body = {
StartTime: startTime,
EndTime: endTime,
ServiceInstanceName: instance.ServiceInstanceName,
Id: instance.Id,
Version: instance.Version,
ServerName: instance.ServerName,
};
await generateData(instance, body); // Wait until data generation is complete before continuing
} catch (e) {
console.log(`Error processing instance ${instance.HostName}: ${e.message}`);
}
}
});
async function generateData(instance, params) {
try {
const ip = instance.ip;
const port = instance.port;
const saveDir = "downloads";
mkdirp.sync(saveDir); // Create download directory
const packageDefinition = protoLoader.loadSync(path.join(__dirname, "ServerManager.proto"), {
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true,
});
const proto = grpc.loadPackageDefinition(packageDefinition);
const grpcClient = proto.ServerMangerGrpc;
const client = new grpcClient(`${ip}:${port}`, grpc.credentials.createInsecure());
const jsonString = JSON.stringify(params);
const grpcRequest = {
InputFileJson: jsonString,
};
const hostFilePath = path.join(saveDir, `${instance.HostName}.zip`);
let isLast = false;
// Return a Promise that resolves when the stream is finished
return new Promise((resolve, reject) => {
const call = client.DownloadLog(grpcRequest);
let fileWriteStream = fs.createWriteStream(hostFilePath, { flags: 'a' }); // Open file stream for appending
call.on("data", (data) => {
// Assuming 'data.FileBytes' contains the file data
if (data.FileBytes && data.FileBytes.length > 0) {
// Append FileBytes to the file
fileWriteStream.write(Buffer.from(data.FileBytes));
console.log(`Appended data for ${instance.HostName}, size: ${data.FileBytes.length} bytes`);
}
if (data.IsLastBlock) {
isLast = true;
}
});
call.on("end", () => {
fileWriteStream.end(); // Close the file stream
console.log(`[Worker] Download complete for instance ${instance.HostName}`);
if (!isLast) {
console.log(`XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX`);
} else {
console.log("VVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVVV");
}
resolve(); // Resolve when the stream ends
});
call.on("error", (err) => {
console.error(`Error downloading for instance ${instance.HostName}: ${err.message}`);
reject(err); // Reject the promise if there’s an error
});
call.on("status", (status) => {
// console.log(`[Worker] gRPC status: ${JSON.stringify(status)}`);
});
});
} catch (err) {
console.error(`Error generating data for instance ${instance.HostName}: ${err.message}`);
throw err; // Ensure the error is propagated if needed
}
}
result:
tips: gRPC server side is ok,grpc client with python is work.
We have tests that verify this general functionality of receiving a stream of messages on the client, and verifying that all messages were received. It would be very surprising to me if this was a problem in the library.
I think the problem is on the server. You say that the gRPC server side is OK, but the most likely explanation for this is that the server does not consistently send the last message before ending the stream.
In order to gather more information about this error, please run the demo with the environment variables GRPC_TRACE=all GRPC_VERBOSITY=DEBUG and share the output if at least one failure of this type occurred in that run.