Avoid generating empty `TableChunks` in streaming scan nodes
Description
We are currently pushing an empty TableChunk into the scan_node output channel when ranks are not assigned to any data. This should not be necessary and is technically a "bug" in the current implementation. Nodes should be able to drain immediately when they have no data to process.
This PR makes changes to several node implementations to handle the "empty-channel" case.
Checklist
- [ ] I am familiar with the Contributing Guidelines.
- [ ] New or existing tests cover these changes.
- [ ] The documentation is up to date with these changes.
Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually.
Contributors can view more details about this message here.
/ok to test
Thanks for reviewing @bdice and @TomAugspurger !
I'm trying to think of a way to get test coverage on this. The problem is that we only produce empty channels when we are running in distributed mode and the number of ranks is larger than the number of chunks. In CI, we only have one rank anyway. I may need to come up with something bespoke that produces empty channels for testing only.
The problem is that we only produce empty channels when we are running in distributed mode and the number of ranks is larger than the number of chunks
If we do read_parquet on a parquet file with zero rows, does that that produce 0 or 1 chunks? Based on a quick local test, an empty parquet file still has one row group, and so I guess that would be at least 1 chunk.
If we do read_parquet on a parquet file with zero rows, does that that produce 0 or 1 chunks? Based on a quick local test, an empty parquet file still has one row group, and so I guess that would be at least 1 chunk.
My understanding is that we would get a single chunk with empty data.
OK, I think that leaves us with multiple ranks on a single GPU (doable, but doesn't nicely integrate with the rest of our CI setup) and somehow injecting a empty chunk / channel (like we'd get with a node not assigned any chunks) and seeing how it flows through... I don't have a good sense of which of those would work better here.
I don't have a good sense of which of those would work better here.
I tweaked the dataframescan_node to avoid sending empty chunks for a slice with 0 rows. This seems to be enough to test the empty-channel handling added in this PR (without needing distributed testing yet).
/merge