nodejs-bigtable
nodejs-bigtable copied to clipboard
createReadStream didn't returns all data when stream is pause and resumed
Hello,
I have an issue regarding createReadStream method in nodejs bigtable client, in may situation I need to have a way to get only one item from the stream and do some process on it then get the next and so on, and I don't want to store all the data in memory, and bigtable client in nodejs didn't provide any thing to do it, I use the createReadStream and pause the stream once I get an Item/Row from the table.
This worked fine for small data, but when I tried to do it on large data about 500k rows I didn't retrieve all the rows that I have in the table for the query I made (the 'end' event is emitted before I get all the result from the table).
My code use class similar to this code, the class is supposed to return promise resolves to the next item from the stream or undefined if the stream is finished
export default class Iterable {
stream: any;
buffer: any[] = [];
public cnt: number = 0;
id = 0;
constructor(id: number, stream: any) {
this.id = id;
this.stream = stream;
this.stream.on('end', () => {
console.log('end', this.cnt, this.buffer.length);
this.buffer.push(undefined);
});
this.stream.once('data', (d: any) => {
this.buffer.push(d);
this.stream.pause();
});
}
async next(): Promise<any> {
this.stream.resume();
return new Promise(resolve => {
if(this.cnt < this.buffer.length) {
resolve(this.buffer[this.cnt++]);
} else {
let id = setInterval(() => {
if (this.cnt < this.buffer.length) {
clearInterval(id);
resolve(this.buffer[this.cnt++]);
}
}, 1);
}
});
}
}
I also try to use once to only listen on data event on time each time I call next() but I still also didn't get all the data that I should get.
I really hope you solve this issue or create another way to get only one item from the bigtable at each step.
Environment details
- OS: Ubuntu 19.10
- Node.js version: 10.15.2
- npm version: 6.12.0
-
@google-cloud/bigtable
version: 2.3.1
I also use bigtable emulator
Steps to reproduce
- Have a big table with large number of rows
- Try to use the above class to get all the data
Thanks!
Hello! Here's what I've tried that works as expected:
const Transform = require('stream').Transform
const processStream = new Transform({
objectMode: true,
transform(row, enc, next) {
setTimeout(() => {
// Something async
next(null, row)
}, 1000)
},
})
let numRowsReceived = 0
table.createReadStream()
.pipe(processStream)
.on('data', () => numRowsReceived++)
.on('end', () => {
// Verify numRowsReceived is expected number
console.log('All results processed successfully!')
})
The built-in Transform stream seems perfectly suited to what you're after. Let me know if this will work for you.
Hello This might be helpful, I will try to use it and let you know if it works
What I want is to do the same behavior as the iterator in Java that's why I wrote the mentioned class
Hello
@stephenplusplus, I tried what we told me and I tried multiple ways including (pipe/unpine, cork/uncork and pause/resume) and I still didn't successfully able to pause and resume the stream and get the correct data What I am try to Implement is iterator and when I want the row I will call function next (this behavior already implemented in java API for bigtable), so I need the item to be returned by calling next() in mentioned class thanks
I am facing the same issue with the results being inconsistent. Most of the times the rows retrieved match the expected rows, but critically not always and not for all the streams that are running at the time. I have tried pull / push variations of the stream, as well as simple pull only (readable) stream implementation, all with the same results. Not sure if this is related but it happens when retrieving multiple streams, of more than 50K rows each, with the total processing running for more than 5 minutes. Also, the emulator is used, instead of an actual Bigtable instance. Any ideas on whether this is related to the client library or some other dependency (grpc?)?
I also tried using stream[Symbol.asyncIterator]()
but failed with:
(node:9665) UnhandledPromiseRejectionWarning: TypeError: this.stream[Symbol.asyncIterator] is not a function
while using new Readable().wrap(stream)[Symbol.asyncIterator]()
returns an async iterator that provides no results (after some delay).
Thanks
@stephenplusplus Here is log based on above snippet given by you with timeout of 1 ms.
Execution Time is in mm:ss
format.
With Emulator : With 50k Records Attempt-1 Total Rows Received: 49378 Execution Time: 01:5.792
Attempt-2 Total Rows Received: 49378 Execution Time: 01:7.204
Attempt-3 Total Rows Received: 49378 Execution Time: 01:7.769
With 100k Records Attempt-1 Total Rows Received: 99426 Execution Time: 02:20.165
Attempt-2 Total Rows Received: 99425 Execution Time: 02:50.432
Attempt-3 Total Rows Received: 99426 Execution Time: 02:36.44
Without Emulator(Live Data): With 50k Records Attempt-1 Total Rows Received: 50000 Execution Time: 01:38.239
Attempt-2 Total Rows Received: 50000 Execution Time: 01:17.978
Attempt-3 Total Rows Received: 50000 Execution Time: 01:30.748
With 100k Records Attempt-1 Total Rows Received: 100000 Execution Time: 03:7.178
Attempt-2 Total Rows Received: 100000 Execution Time: 02:51.23
Attempt-3 Total Rows Received: 100000 Execution Time: 02:23.271
It seems like issue with Emulator but working fine with live data. Above Result varies based on the size of the data specially for the Emulator, currently I have below formatted data:
const rowsToInsert = [];
for (let index = 0; index < 50000; index++) {
rowsToInsert.push({
key: `tablet#a0b81f74#${index}`,
data: {
stats_summary: {
name: `name-${index}`,
value: index
}
}
})
}
We found this behavior today while developing a new system on with BigTable. Using the most recent client, on a Live BigTable instance (our staging instance), when I try and stream a key range with 66,003 rows I have found that the stream stops at 66,000 rows if the consumer is async, but returns all 66,003 rows if its immediate.
edit: I should add our version information here
- Node Version: v14.19.1 (I can test on a newer one if unable to replicate on a newer version, though I doubt this is a NodeJS issue)
- @google-cloud/bigtable version: 4.5.1
I have been able to replicate with this basic setup:
const startTime = Date.now();
let rowCount = 0;
const transformer = new Transform({
objectMode: true,
transform(chunk: any, _encoding, callback) {
rowCount++;
if (process.env.DELAY_TRANSFORM === 'true') {
setTimeout(() => {
callback(null, chunk);
}, 0);
} else {
callback(null, chunk);
}
},
});
const output = new Writable({
objectMode: true,
write(_chunk, _encoding, callback) {
callback();
},
});
const readStream = await eventsTable.createReadStream({
decode: false,
filter: [{ family: 'metadata', column: { cellLimit: 1 } }],
start: `${sessionKey}|`,
end: `${sessionKey}||`,
});
await new Promise<void>((resolve, reject) => {
pipeline(
readStream,
transformer,
output,
err => {
if (err) {
reject(err);
} else {
resolve();
}
},
);
});
const elapsed = Date.now() - startTime;
console.info(`Finished reading ${rowCount} rows in ${elapsed}ms`);
When executing with DELAY_TRANSFORM=false
I get this:
Finished reading 66003 rows in 7900ms
But with DELAY_TRANSFORM=true
I get this:
Finished reading 66000 rows in 89192ms
First, note the missing three rows. Second, the execution takes 10 times longer! I was expecting a bit more time since the stream does a pause for (presumably) back pressure in the transformer, but 10x seems a bit excessive for what is nearly an instance pass-through of each row.
But, we can ignore the excessive slow down for now (fortunately this system is not in a performance critical path). What we are having trouble understanding how to resolve is the three missing rows (for this example).
Could there be a pause happening after the 66,000 row is received that never gets resumed to finish the stream completely?
edit: I ran this same script, but with the end
setup to fetch 20,002 rows (instead of the whole set). The DELAY_TRANSFORM
mode stops at exactly 20,000 rows, dropping two on the floor. So a failure to resume and complete the source stream seems likely here?
I also ran it with the end set to accept up to 20,052 and the stream ends at 20,048.
Did a bit more testing this morning, when I use a key range that is a nice even number of rows (e.g. 60,000 or 20,000) all rows are returned. More irregular expected row counts (like 60,003, or 20,050) result in missing data.
https://github.com/nodejs/node/issues/34274
I was just forwarded this, and it appears to be the issue we're observing. Sounds like fixed in newer Node (which we are working towards), and may have a work-around from that. I will test this and report back.
We're doing final validation that the node upgrade resolves this problem over the next two days, but at this point it seems safe to say it is that Node issue and that the only resolution is to upgrade to at least 15.x to pick up the streaming fixes.
I will follow back up after we are confident in the resolution. We appreciate the attention this received (on the support side, presumably what lead to the label shifts) while we continued investigation and found the true cause.
We are working on a fix for in the client library. We have been able to reproduce this issue and are working on fix for this
@jlogsdon Thank you for the workaround update! As we will likely drop Node 12 (but continue to support Node 14+) we'll still have to address this in the library (as per @igorbernstein2's comment) but it's great to know there's an interim solution for users who are able to upgrade.
That's good to hear, as it seems like upgrading to Node 15 did not completely resolve this issue. We were able to get our validator shipped and run some testing today and found we're still frequently missing rows from the end of streams (upwards of 50% of the time!).
I noticed something while testing this out against one of the specific stream ranges that failed. First, a bit of background about our data: we have an events
table with two families: metadata
and event
. The former is a pretty light set of string or numeric values that briefly describe the event, and the event
family stores a single binary blob which can range from 10 bytes to (in the extreme cases) upwards of 10MB - but for the stream I tested the largest is ~150KB with the vast majority being closer to 100 bytes.
Now, when I create a read stream that filters down to just the metadata
family I am able to receive every row, seemingly 100% of the time (over the 10 times I ran it). If I either remove that family filter or change it to event
we start to miss rows at the end of the stream (how many varies).
Hopefully this context is useful for the team. Looking forward to seeing a potential fix here, but will be looking at alternate avenues on our side in the interim.
edit: For anyone unable to upgrade who might be looking for a solution before the client can be fixed, we successfully worked around the streaming issue by building an async generator that fetches a chunk of rows at a time, pushes those out of the generator, and then repeats until the fetch returns a partial or empty chunk.
We've narrowed down the issue to a race condition in a stream Transform:
https://github.com/googleapis/nodejs-bigtable/blob/436e77807e87e13f80ac2bc2c43813b09090000f/src/table.ts#L885
The transform (toRowStream) references the state of previous node in the pipeline (chunktransformer) as a signal to stop itself from emitting elements. Under most circumstances, this works ok. However when the grpc stream is producing elements faster than the consumer is consuming (ie. a Transform that invokes the callback in a setTimeout), this causes the intermediate transforms to start buffering elements. Thus the chunktransformer can be done, while there are still elements in the pipeline. Any elements left in the pipeline will end up getting dropped.
We are working on a fix, but the buffering angle makes this a bit difficult:
- the current api dictates that the caller can immediate end a readStream early by calling end()
- we rely on the emitted row keys to compute the request message for a retry attempt
In one case we need to ignore the buffered data and in the other we need to drain it. In the mean time the only workaround is avoid deferring the processing of the rows to the next event loop and process the rows inline.
I apologize for the inconvenience and will update this ticket when we have an update
We have a potential fix in this pr: https://github.com/googleapis/nodejs-bigtable/pull/1284/files
Would it be possible for you to confirm that it fixes your use case?
Yup! Here's a before test to show the issue still exists on release 4.5.1 (expected 66,003, which were returned with a sync stream):
Finished reading 65999 rows in 89919ms
and then after pulling that PR in:
Finished reading 65990 rows in 86268ms
Doesn't look like it helped :(
I pulled that PR in by cloning the repo, running npm run compile
, and replacing the build/
folder in my node_modules
for the repo I'm working in. Compiled using latest Node 18.
Thanks for the quick response. We are having trouble reproducing the issue with the fix applied. Just to double check that we are running the same code. May I ask you to re-run your test, but this time to use a published version of the client?
Specifically:
-
rm -rf node_modules package-lock.json
- install bigtable:
npm install @google-cloud/[email protected]
- confirm that the only version of bigtable in package-lock.json is
4.5.2-pre.0
and please share the output ofnpm ls @google-cloud/bigtable
- re-run your test
Thanks for helping us work through this
@jlogsdon friendly ping
Ah, missed that last reply! I should be able to try again today or tomorrow.
I was able to do some testing and it looks fixed! I wasn't able to use the same data that I had previously, unfortunately, as it has fallen out of its retention period, but I tested a few other prefixes with anywhere from 2k to 30k rows and always got the correct number of results!
great to hear! We will cut a release, please let us know if you encounter any other issues