[Epic] Pipeline breaking cancellation support and improvement
Is your feature request related to a problem or challenge?
We have done the first step in https://github.com/apache/datafusion/pull/16196 for pipeline breaking cancellation support, this epic trace the remaining sub-task for the remaining improvement. cc @ozankabak @alamb @pepijnve
-
[x] Adding a few tests (maybe SLT?) that show YieldStreamExec being inserted. Also add logs related to built-in YiedStream.
-
[ ] Improving the documentation to make it clear that any leaf (source) that already yields just has to implement
fn with_cooperative_yields(self: Arc<Self>) -> Option<Arc<dyn ExecutionPlan>> {
Some(self)
}
to signal the planner that no YieldStream combinator is necessary.
-
[ ] Improving InsertYieldExec rule by means of an API that exposes input and output pipelining behaviors of operators effectively
-
[ ] Investigating whether any already-existing manual yielding (for example, like the one in RepartitionExec) can now be removed
-
[ ] We will think about supporting cases involving non-volcano (e.g. spill) data flow.
-
[ ] Fix the corner case provided in this link: https://gist.github.com/pepijnve/0e1a66f98033c6c44c62a51fb9dbae5a
Feel free to add more tasks, thanks!
Describe the solution you'd like
No response
Describe alternatives you've considered
No response
Additional context
No response
Thank you @zhuqi-lucas -- I also added this as a wishlist item for https://github.com/apache/datafusion/issues/16235
Thank you @alamb !
At the risk of making myself unpopular, I feel it's relevant to share my findings with you guys.
Working on #16322 led me into the tokio implementation, in particular it led me to this line in the Chan implementation. This is the code that handles RecordBatch passing in RecordBatchReceiverStream.
I was immediately reminded of the cancellation discussions. Without realizing it DataFusion is actually already using Tokio's coop mechanism. This strengthens my belief that the PR that was merged is going about things the wrong way. It introduces API which overlaps 100% with something that already exists and is already being used. I don't think it's a good idea to have multiple mechanisms for the same thing. Pipeline-blocking operators exactly match the pattern described in the Tokio cooperative scheduling documentation so why would you not use the solution the runtime provides which you're already using in quite a few place already (everywhere RecordBatchReceiverStream is used)?
A colleague of mine advised me to refer to PR #16301 to show what usage of tokio::coop might look like one more time for people who were not involved with the cancellation work that was already done. I was reluctant to do so myself.
That PR shows the operator changes that would be required. The custom PollBudget thing would be removed and the tokio coop budget would be used instead.
At the risk of making myself unpopular, I feel it's relevant to share my findings with you guys.
Working on #16322 led me into the tokio implementation, in particular it led me to this line in the Chan implementation. This is the code that handles RecordBatch passing in RecordBatchReceiverStream.
I was immediately reminded of the cancellation discussions. Without realizing it DataFusion is actually already using Tokio's coop mechanism. This strengthens my belief that the PR that was merged is going about things the wrong way. It introduces API which overlaps 100% with something that already exists and is already being used. I don't think it's a good idea to have multiple mechanisms for the same thing. Pipeline-blocking operators exactly match the pattern described in the Tokio cooperative scheduling documentation so why would you not use the solution the runtime provides which you're already using in quite a few place already (everywhere RecordBatchReceiverStream is used)?
Thank you @pepijnve , do you mean we can replace YieldStream with Tokio's coop? Or change the rule for adding Yield also?
I am still not looking into the Tokio's coop, maybe we can also add a sub-task for it, and list the benefit for it:
Such as
- The performance will be better after using Tokio's coop with benchmark result?
- Or we can handle more corner cases, and automatically handling user-defined exec?
- Or we will have more clear and easy API?
- ETC
We are open for all improvements, thanks!
Tokio's cooperative budget is essentially a counter per task that can be decremented at any point in the task. When the counter hits zero you'll return Pending from the function trying to consume the budget. That's basically what YieldStream is doing but with a local counter rather than a task wide one.
DataFusion's ReceiverStreamBuilder makes use of tokio::sync::mpsc::Receiver. Whenever Receiver::recv is being called, that counter is being decremented, and you'll get a Pending result when the budget is depleted.
This is the same thing as what YieldStream is trying to do.
The benefits I see of trying to leverage the same mechanism elsewhere in DataFusion are:
- There is only one cooperative yielding mechanism at play. This is easier to reason about than multiple interacting ones.
- There is no need for additional API. DataFusion is already using this in the current released version.
- There are fewer corner cases. Once the budget is depleted, any point in the code checking the budget will yield since all those points are checking the same shared counter.
The downsides remain:
- Code that loops may still need to have yield points added to it in order to not yield unnecessarily.
- It's not yet 100% clear to me how you can use this in manually written Futures and Streams. The required bits for that seem to only be crate visible in the current Tokio release. I've raised the question here https://github.com/tokio-rs/tokio/issues/7403
- I have not made a performance analysis of this yet, but since it's used quite extensively already it's likely to be ok. Needs to be evaluated.
Tokio's cooperative budget is essentially a counter per task that can be decremented at any point in the task. When the counter hits zero you'll return Pending from the function trying to consume the budget. That's basically what YieldStream is doing but with a local counter rather than a task wide one.
DataFusion's
ReceiverStreamBuildermakes use oftokio::sync::mpsc::Receiver. WheneverReceiver::recvis being called, that counter is being decremented, and you'll get a Pending result when the budget is depleted. This is the same thing as what YieldStream is trying to do.The benefits I see of trying to leverage the same mechanism elsewhere in DataFusion are:
- There is only one cooperative yielding mechanism at play. This is easier to reason about than multiple interacting ones.
- There is no need for additional API. DataFusion is already using this in the current released version.
- There are fewer corner cases. Once the budget is depleted, any point in the code checking the budget will yield since all those points are checking the same shared counter.
The downsides remain:
- Code that loops may still need to have yield points added to it in order to not yield unnecessarily.
- It's not yet 100% clear to me how you can use this in manually written Futures and Streams. The required bits for that seem to only be crate visible in the current Tokio release. I've raised the question here Example of using cooperative scheduling budget in manual Future/Stream implementations tokio-rs/tokio#7403
- I have not made a performance analysis of this yet, but since it's used quite extensively already it's likely to be ok. Needs to be evaluated.
You’re right. In DataFusion, only operators that fan out work into multiple spawned tasks and then re-aggregate via a Tokio MPSC channel actually consume the cooperative budget automatically (because each Receiver::recv().await call decrements it). Examples include:
CoalescePartitionsExec
SortPreservingMergeExec
All of those use RecordBatchReceiverStreamBuilder::run_input, whose .next().await is really rx.recv().await under the hood—and that is what charges the Tokio coop budget.
But most other operators (Projection, Filter, HashAggregate, HashJoin, WindowAgg, simple TableScans, etc.) do not use an MPSC channel. They execute pull-based within a single Stream implementation, and never call recv(), so they don’t automatically consume any cooperative budget.
That means, we still need to insert explicit yield points YieldStream/PollBudget to avoid starving the thread.
I believe no major difference for it? Please correct me if i am wrong.
But most other operators (Projection, Filter, HashAggregate, HashJoin, WindowAgg, simple TableScans, etc.) do not use an MPSC channel. That means, we still need to insert explicit yield points YieldStream/PollBudget to avoid starving the thread.
You're indeed 100% dependent on your child streams which is what makes the current solution somewhat brittle. If that happens to use a Receiver (or some other implementation that consumes budget) it will work. If it's some other stream that does not you may have issues again. Because the sources are user definable, I think it's wise to take a defensive stance in the implementation of operators and assume you don't know what they will or will not do.
The current implementation attempts to fix this by ensuring the sources have yield points. That breaks when streams are swapped dynamically because you no longer have a way to ensure they contain the necessary yield points. This is a point the DataFusion library cannot currently intercept.
The current implementation with the non-task wise budge also breaks when an intermediate operator uses select! (or something similar where you read from whatever stream happens to be ready) since this can obscure the Pending result from a stream. There's no way to guarantee that Pending bubbles all the way up.
I believe no major difference for it? Please correct me if i am wrong.
The point of contention was where you put these yield points. Do you instrument all leave nodes, or do you instrument consumers that may refuse to yield. To make the system robust I really think you need to do this in the consumers. It's also beneficial for locality of reasoning. You can look at the implementation of an operator and assess that it's correct from a cooperative scheduling point of view without having to look at any other code. The objection was that there are many, many operators out there in the wild downstream of DataFusion. That's one that I do not have an answer for. How many people are building custom pipeline blocking operators?
It's important to note that you would only need to take action in operators where you can see from the implementation that it may not return any value, either Ready or Pending, relatively quickly. That's basically anything that loops over input streams an unbounded number of times.
- Project (or any other simple transformation operator) doesn't need to do anything since it takes one record batch in and immediately emits another one.
- Table scans shouldn't either. They'll yield naturally if their input is not ready, and otherwise they'll return a RecordBatch.
- Filter in theory should not do anything, the exception being dropping lots of batches entirely.
- Joins depends. A build/probe style implementation probably should consume during build, not during probe. But it depends on the implementation.
- Aggregation and sorting do need to consume since those can block for an extended period time.
But most other operators (Projection, Filter, HashAggregate, HashJoin, WindowAgg, simple TableScans, etc.) do not use an MPSC channel. That means, we still need to insert explicit yield points YieldStream/PollBudget to avoid starving the thread.
You're indeed 100% dependent on your child streams which is what makes the current solution somewhat brittle. If that happens to use a Receiver (or some other implementation that consumes budget) it will work. If it's some other stream that does not you may have issues again. Because the sources are user definable, I think it's wise to take a defensive stance in the implementation of operators and assume you don't know what they will or will not do. The current implementation attempts to fix this by ensuring the sources have yield points. That breaks when streams are swapped dynamically because you no longer have a way to ensure they contain the necessary yield points. This is a point the DataFusion library cannot currently intercept. The current implementation with the non-task wise budge also breaks when an intermediate operator uses
select!(or something similar where you read from whatever stream happens to be ready) since this can obscure the Pending result from a stream. There's no way to guarantee that Pending bubbles all the way up.I believe no major difference for it? Please correct me if i am wrong.
The point of contention was where you put these yield points. Do you instrument all leave nodes, or do you instrument consumers that may refuse to yield. To make the system robust I really think you need to do this in the consumers. It's also beneficial for locality of reasoning. You can look at the implementation of an operator and assess that it's correct from a cooperative scheduling point of view without having to look at any other code. The objection was that there are many, many operators out there in the wild downstream of DataFusion. That's one that I do not have an answer for. How many people are building custom pipeline blocking operators?
It's important to note that you would only need to take action in operators where you can see from the implementation that it may not return any value, either Ready or Pending, relatively quickly. That's basically anything that loops over input streams an unbounded number of times.
- Project (or any other simple transformation operator) doesn't need to do anything since it takes one record batch in and immediately emits another one.
- Table scans shouldn't either. They'll yield naturally if their input is not ready, and otherwise they'll return a RecordBatch.
- Filter in theory should not do anything, the exception being dropping lots of batches entirely.
- Joins depends. A build/probe style implementation probably should consume during build, not during probe. But it depends on the implementation.
- Aggregation and sorting do need to consume since those can block for an extended period time.
Thank you, i may got your point, i was thinking optimize the rule, is it a similar point?
// traverse all nodes, not just leaves
plan.transform_down(|plan| {
// wrap if leaf OR long-running
if plan.children().is_empty() || is_long_running(plan.as_ref()) {
// use existing cooperative variant if available
let wrapped = plan
.clone()
.with_cooperative_yields()
.unwrap_or_else(|| Arc::new(YieldStreamExec::new(Arc::clone(&plan), yield_period)));
Ok(Transformed::new(wrapped, true, TreeNodeRecursion::Jump))
} else {
Ok(Transformed::no(plan))
}
})
.map(|t| t.data)
- Leaf-only wrapping can be bypassed if someone plugs in a custom Stream or uses select!‑style combinators.
- By also wrapping every consumer that does heavy looping—aggregations, sorts, joins, window funcs—you guarantee that no matter how the streams are composed, there’s always an explicit YieldStreamExec (or the built‑in cooperative variant) in the path. (This can be optimized to PollBudget if possible)
- We still avoid unnecessary overhead on “simple” operators like Projection or basic TableScan, because they’re neither leaves with no loops nor in your “long‑running” list.
Thanks!
At the risk of making myself unpopular, I feel it's relevant to share my findings with you guys.
Not at all --- this is great stuff -- thank you @pepijnve for continuing to push to get better.
In my mind, given the tests that @zhuqi-lucas added in https://github.com/apache/datafusion/pull/16196, we are now in a great position to revisit the design. If we can come up with something simpler that still achieves the same aim that is a great outcome
The benefits I see of trying to leverage the same mechanism elsewhere in DataFusion are:
- There is only one cooperative yielding mechanism at play. This is easier to reason about than multiple interacting ones.
- There is no need for additional API. DataFusion is already using this in the current released version.
- There are fewer corner cases. Once the budget is depleted, any point in the code checking the budget will yield since all those points are checking the same shared counter.
A single framework that is general purpose and uses a mechanism in tokio certainly sounds compelling to me as well.
I have somewhat lost track of what exactly you are proposing. Is it the approach in one of these PRs:
- https://github.com/apache/datafusion/pull/16301
- https://github.com/apache/datafusion/pull/16319
Or is it something new based on some other research?
What I think is important is that the solution
- is well documented and clear
- works with both built in operators as well as user defined ones
- is minimally invasive (e.g. people implementing operators don't have to know too much about streams / polling, etc)
@zhuqi-lucas it would be useful to have some additional voices in this discussion. I can share my opinion, but it's only one opinion. I feel like I'm just going to keep repeating the same critique over and over again otherwise.
In short, I'm not convinced this is a physical plan optimization problem. It was a good idea, but I don't think it can be refined into something sufficiently precise. By trying to make a generic/universal solution you end up with something unnecessarily complicated.
I see this as a Stream implementation problem. The place to fix this is in the Rust code of the stream implementations because the decision to consume budget or not is tightly coupled to the implementation logic. You're trying to count how many times a Future or Stream was able to make progress towards its goal. A YieldStream is always going to be a crude approximation of that. Implementations may opt to use YieldStream as a convenience of course.
You could try to describe the need for a YieldStream around a child declaratively, but the ability to switch streams dynamically is at odds with a one-shot approach like an optimizer rule. Additionally, but this is arguably just aesthetic, I don't think you ever want to see YieldExec showing up in explain plans.
I see this as a Stream implementation problem.
I see the wisdom of this view.
In my mind I think the DataFusion philosophy is "keep the barrier to entry as low as possible, and provide APIs that people can use to customize / optimize when needed / necessary"
Do you think the current implementation will work "most of the time" and can be customized when needed for advanced usecases? Or is there something we can't do with the current APIs?
From my perspective, as long as the overhead required of people implementing custom operators is low but they can customize when necessary it is good
@alamb our messages crossed paths in the ether.
I have somewhat lost track of what exactly you are proposing.
Sorry, I usually do this around a white board. First and foremost I'm trying to communicate what I see as intrinsic problems with the external optimizer rule approach. This was countered with a belief that it can be solved, but I don't see it yet myself. Happy to be proven wrong of course. I've explained the problem to 3 or 4 colleagues in the meantime as unbiased as I possibly could and they basically all tell me the same.
Is it the approach in one of these PRs: https://github.com/apache/datafusion/pull/16301 https://github.com/apache/datafusion/pull/16319
A bit of both :)
The rationale behind #16319 is twofold.
- By using a JoinHandle you naturally solve the cancellation problem. A JoinHandle is pending once, and wakes when the result is ready. The caller is unblocked immediately and can
select!on a timeout. The subtle effect of it is that you shift from a cancellation problem for the caller to an abort problem for the spawned task. Aborting an AbortHandle still requires cooperation from the task in order for it to actually stop. - My hunch was that by splitting deep call stacks into multiple chained shallow call stacks the cost of yielding to the runtime could be reduced. That was just a first principles reasoning thing that I wanted to try out. Still working on measuring if it has a performance benefit.
#16301 is the variant of yielding where we would punt on the idea of implementing yielding via an optimizer rule or some other automatic mechanism. Instead you state that every Stream is responsible for its own behavior (which the guidelines kind of already do). And then we fix the base library implementations.
I think this fixes cancellation for everyone except downstream users with custom pipeline breaking operators. For those people I would propose providing utilities in the library to make doing the right thing as low-effort as possible and provide best practice examples. Something akin to "If your code has looping patterns call 'consume_budget' every so often" or "wrap your input stream in a YieldStream".
#16301 does not make use of Tokio's coop budget yet however. Instead it's the per-operator counter variant. The diff per operator does illustrate the scope and nature of the change Stream implementers would need to make.
Sorry, I usually do this around a white board.
Yeah I agree doing this kind of thing with a whiteboard is easier. I am happy to set up a video call with the relevant parties if that might be more efficient (and it would be nice to meet you all "face to face")
First and foremost I'm trying to communicate what I see as intrinsic problems with the external optimizer rule approach. This was countered with a belief that it can be solved, but I don't see it yet myself.
Indeed -- it is my perspective that
- https://github.com/apache/datafusion/pull/16196 handles most, but not all, cases where an
Streamimplementation may not yield directly. - For some of the more advanced cases you describe (like switching a stream mid-execution) the general optimizer approach will not permit cancelling and thus the individual stream implementations need to be made yield aware.
stream in a YieldStream". #16301 does not make use of Tokio's coop budget yet however. Instead it's the per-operator counter variant. The diff per operator does illustrate the scope and nature of the change
Streamimplementers would need to make.
This makes sense (and I am sure we could encapsulate the changes from 16301 more too if we go with that approach)
I wonder if your main concern is that the optimizer approach we merged does not handle all of the cases, so it potentially leaves a trap for future Stream implementors (who do more avanced streams)?
Some more information to share and time to eat some humble pie for me. Google led me to a withoutboats post which led me to a Tokio blog post which led to an aha moment. It’s worth reading the entire post but the key quote was
As long as the task has budget remaining, the resource operates as it did previously. Each asynchronous operation (actions that users must .await on) decrements the task's budget. Once the task is out of budget, all Tokio resources will perpetually return "not ready" until the task yields back to the scheduler. At that point, the budget is reset, and future .awaits on Tokio resources will again function normally.
So this is the “consume at the source” idea we have now, but with a task-wide latch per tick rather than per resource. And the latch only resets when you actually yield to the runtime. This removes all the edge cases as long as we ensure all sources are using the same task budget.
As we’ve discussed above the channel receiver is already doing that for us. For some reason file IO was not. I’m not sure I understand why that’s the case and will try to figure out why tomorrow. Perhaps we can have the “it works automagically” cake and eat it after all.
Figured it out. File access is done via object store and object store uses std::fs::File, not tokio::fs::File. Even if it would, from browsing the code it doesn't look to me like Tokio's file stuff consumes budget.
I tried reworking YieldStream to use the Tokio budget, but I don't see a way to. I rephrased the issue I opened at https://github.com/tokio-rs/tokio/issues/7403.
Figured it out. File access is done via object store and object store uses
std::fs::File, nottokio::fs::File. Even if it would, from browsing the code it doesn't look to me like Tokio's file stuff consumes budget.I tried reworking YieldStream to use the Tokio budget, but I don't see a way to. I rephrased the issue I opened at tokio-rs/tokio#7403.
Interesting finding, i check the spilling file, that's the truth that we are using std::fs::File. So before the solution which using Tokio budget. We should wrap YieldStream to SpillReaderStream.
I will hold on the sub-task, if we will have a better Tokio budget which can replace YieldStream.
We will think about supporting cases involving non-volcano (e.g. spill) data flow.
use std::fs::File;
impl SpillReaderStream {
fn new(schema: SchemaRef, spill_file: RefCountedTempFile) -> Self {
Self {
schema,
state: SpillReaderStreamState::Uninitialized(spill_file),
}
}
fn poll_next_inner(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<RecordBatch>>> {
match &mut self.state {
SpillReaderStreamState::Uninitialized(_) => {
// Temporarily replace with `Done` to be able to pass the file to the task.
let SpillReaderStreamState::Uninitialized(spill_file) =
std::mem::replace(&mut self.state, SpillReaderStreamState::Done)
else {
unreachable!()
};
let task = SpawnedTask::spawn_blocking(move || {
let file = BufReader::new(File::open(spill_file.path())?);
// SAFETY: DataFusion's spill writer strictly follows Arrow IPC specifications
// with validated schemas and buffers. Skip redundant validation during read
// to speedup read operation. This is safe for DataFusion as input guaranteed to be correct when written.
let mut reader = unsafe {
StreamReader::try_new(file, None)?.with_skip_validation(true)
};
let next_batch = reader.next().transpose()?;
Ok((reader, next_batch))
});
self.state = SpillReaderStreamState::ReadInProgress(task);
// Poll again immediately so the inner task is polled and the waker is
// registered.
self.poll_next_inner(cx)
}
SpillReaderStreamState::ReadInProgress(task) => {
let result = futures::ready!(task.poll_unpin(cx))
.unwrap_or_else(|err| Err(DataFusionError::External(Box::new(err))));
match result {
Ok((reader, batch)) => {
match batch {
Some(batch) => {
self.state = SpillReaderStreamState::Waiting(reader);
Poll::Ready(Some(Ok(batch)))
}
None => {
// Stream is done
self.state = SpillReaderStreamState::Done;
Poll::Ready(None)
}
}
}
Err(err) => {
self.state = SpillReaderStreamState::Done;
Poll::Ready(Some(Err(err)))
}
}
}
SpillReaderStreamState::Waiting(_) => {
// Temporarily replace with `Done` to be able to pass the file to the task.
let SpillReaderStreamState::Waiting(mut reader) =
std::mem::replace(&mut self.state, SpillReaderStreamState::Done)
else {
unreachable!()
};
let task = SpawnedTask::spawn_blocking(move || {
let next_batch = reader.next().transpose()?;
Ok((reader, next_batch))
});
self.state = SpillReaderStreamState::ReadInProgress(task);
// Poll again immediately so the inner task is polled and the waker is
// registered.
self.poll_next_inner(cx)
}
SpillReaderStreamState::Done => Poll::Ready(None),
}
}
}
@zhuqi-lucas with all the various sprawling discussion threads I think we may have gotten to a point where it's no longer easy to have an overview of what the problems are and what the final goal may look like. To make matters worse there are multiple options. I was thinking it might be useful to try and put together some kind of design document / overview that clearly describes the cancellation/abort problem, its root cause(s), the existing mitigations and possible future mitigations. Having a bit of a mea culpa moment, so I would like to take a shot at making a first draft. Is that ok for you? Any preference wrt tooling? I usually write my technical docs in asciidoc+plantuml for ease of version control and diffing, but willing to use whatever people prefer.
@zhuqi-lucas with all the various sprawling discussion threads I think we may have gotten to a point where it's no longer easy to have an overview of what the problems are and what the final goal may look like. To make matters worse there are multiple options. I was thinking it might be useful to try and put together some kind of design document / overview that clearly describes the cancellation/abort problem, its root cause(s), the existing mitigations and possible future mitigations. Having a bit of a mea culpa moment, so I would like to take a shot at making a first draft. Is that ok for you? Any preference wrt tooling? I usually write my technical docs in asciidoc+plantuml for ease of version control and diffing, but willing to use whatever people prefer.
Thank you @pepijnve , I am ok with it, a design will make it clear.
Both docs in asciidoc+plantuml or just Markdown are fine.
Made some progress on the problem statement already. I gave the AI the facts, it turned it into something I would actually enjoy reading. I'm going to work on the way things work today next. Feedback already welcome. https://github.com/pepijnve/datafusion/blob/cancel_spec/dev/design/cancellation.md
Made some progress on the problem statement already. I gave the AI the facts, it turned it into something I would actually enjoy reading. I'm going to work on the way things work today next. Feedback already welcome. https://github.com/pepijnve/datafusion/blob/cancel_spec/dev/design/cancellation.md
This is a good start, thanks. May be we also can add the statement for pipeline mode and pipeline breaking.
pipeline mode and pipeline breaking
I'm starting to realize we might have been placing too much emphasis on this aspect. I've been doing my homework by reading the Volcano paper. I had never read that paper in depth (never has a need to), I just knew that people used the term to kind of refer to 'the iterator approach'. The more I read the more I can see DataFusion is basically a modern day Volcano.
One thing DataFusion does not have explicitly, as far as I know, is an exchange operator. I say not explicitly, because the essential demand/data-driven switch part of exchange is present in a couple of operators like Repartition and Coalesce. Perhaps these dataflow change points are a better way of looking at the problem.
I really miss having a white board, but here's an approximation :smiling: This is from the Volcano paper (with annotations by me of course).
Each of the colored blocks is an independently executing sub portion of the query. Translated to Tokio each of these colored blocks is a separate concurrent task. Each of those tasks needs to be cooperatively scheduled to guarantee all of them get a fair share of time to run.
As we've concluded earlier the output side of the exchange-like operators is already handling this for us implicitly because they consume tokio task budget. The table sources (Scan in the image) do not.
Perhaps this reframing of the problem is the path to a general purpose solution. To verify correct scheduling behavior, you can first subdivide the plan into subplans using the exchange-like operators as cut points. Per sub plan you can then look at all the leave nodes. Each leave node that 'inserts' work into the task needs to consume from the same task-wide tokio budget, not a per operator budget as we're doing today.
So what does all this mean in terms of implementation:
- Replace the per operator counters with consuming the Tokio task budget. DataFusion is already doing this today so there's precedent for it, and it resolves a bunch of side effects. I've opened a PR in tokio to allow us to use the necessary API for this https://github.com/tokio-rs/tokio/pull/7405. I think we can approximate
poll_proceedwith a combination of 'has budget' and 'consume budget' in the meantime. - Remove the configuration option
- Consider renaming YieldStream to CooperativeStream.
- I think I would prefer a declarative property on
ExecutionPlanthat communicates if an operator consumes the task budget (not sure what the best description of this would be) instead ofwith_cooperative_yielding. It's not really something you want to opt-in to after all and the exchange-like operators have no way of opting out.
The one thing that we still cannot solve automatically then is dynamic query planning. Operators that create streams dynamically still have to make sure they set things up correctly themselves.
One possible downside to this approach is that the cooperative scheduling budget is implementation specific to the Tokio runtime. DataFusion becomes more tied to Tokio rather than less. Not sure if that's an issue or not.
@alamb @ozankabak wdyt? Maybe this is what you were going for all along and I'm just slowly catching up :smiling:
The change of heart comes from the realization that Tokio itself also takes a 'consume at the leaves' strategy and having a task wide budget ensures that tasks cannot silently ignore the yield request. Once one resource depletes the budget, it's no longer possible to make progress anywhere else provided all resource participate in the budgeting system.
As we’ve discussed above the channel receiver is already doing that for us. For some reason file IO was not. I’m not sure I understand why that’s the case and will try to figure out why tomorrow.
This is consistent with our observations at InfluxData: we saw uncancellable queries when feeding our plan from an in memory cache (not a file / memory)
https://github.com/pepijnve/datafusion/blob/cancel_spec/dev/design/cancellation.md
This is a really nice writeup: it matches my understanding / mental model. It would also make the start of a great blog post for the DataFusion blog FWIW and I filed a ticket to track that idea 🎣 :
- https://github.com/apache/datafusion/issues/16396
The more I read the more I can see DataFusion is basically a modern day Volcano.
I think this is an accurate assessment, though I would probably phrase it as "DataFusion uses Volcano-style parallelism where operators are single threaded and Exchange (RepartitionExec) operators handle parallelism". The other prevalent style is called "Morsel Driven Parallelism" popularized by DuckDB and TUM/Umbra in this paper which uses operators that are explicitly multi-threaded.
Each of the colored blocks is an independently executing sub portion of the query. Translated to Tokio each of these colored blocks is a separate concurrent task. Each of those tasks needs to be cooperatively scheduled to guarantee all of them get a fair share of time to run.
This is true in theory -- but I think we also take pains to try and avoid "over scheduling" tasks in tokio -- for example, we purposely only have N input partitions (and hence N streams) per scan, even if there are 100+ files -- the goal is to keep all the cores busy, but not oversubscribed.
So what does all this mean in terms of implementation:
This also sounds fine to me, and would be happy to review PRs, etc. However it is not 100% clear if your proposed design
- fixes any bugs / adds features over the current one,
- Is "just" cleaner way to implement the same thing (this is also a fine thing to contribute as well).
For example, I wonder if there are additional tests / cases that would be improved with the proposed implementation 🤔
The one thing that we still cannot solve automatically then is dynamic query planning. Operators that create streams dynamically still have to make sure they set things up correctly themselves.
In my opinion this is fine -- if operators are making dynamic streams, that is an advanced usecase that today must still handle canceling / yielding. I think it is ok if we can't find a way to automatically provide yielding behavior to them (they are no worse off then today)
One possible downside to this approach is that the cooperative scheduling budget is implementation specific to the Tokio runtime. DataFusion becomes more tied to Tokio rather than less. Not sure if that's an issue or not.
I personally don't think this is an issue as I don't see any movement and have not heard any desire to move away from tokio.
fixes any bugs / adds features over the current one, Is "just" cleaner way to implement the same thing (this is also a fine thing to contribute as well).
There are a couple of benefits.
It removes the edge case seen in the interleave operator (or any select! style code in general). With the current per stream counter, one stream might want to yield, but the parent stream may decide to poll another stream in response which happens to be ready. The end result is that two cooperating streams may turn into a non-cooperating when they are merged. To fix this, you would need to adjust the merging operator as well and we're basically back where we started.
If all cooperating streams use the same budget, then this problem goes away. Once the yield point has been hit, all cooperating streams will yield.
Using the task budget also avoids the 'redundant yield' problem in the current version. If you now do a simple SELECT * FROM ... query, by default you'll get a Pending after every 64 Ready(RecordBatch). With the task budget you will only actually inject the Pending when it's actually necessary. The system automatically does the right thing.
Finally it aligns the cooperative yielding strategy across the library. RecordBatchReceiverStream is implicitly already using this strategy in a way you cannot opt out of. It's better to have one consist way of solving this cancellation problem once and for all.
I have a patch almost ready. I'll make a draft PR already so this all becomes a bit more tangible.
fixes any bugs / adds features over the current one, Is "just" cleaner way to implement the same thing (this is also a fine thing to contribute as well).
There are a couple of benefits.
It removes the edge case seen in the interleave operator (or any
select!style code in general). With the current per stream counter, one stream might want to yield, but the parent stream may decide to poll another stream in response which happens to be ready. The end result is that two cooperating streams may turn into a non-cooperating when they are merged. To fix this, you would need to adjust the merging operator as well and we're basically back where we started. If all cooperating streams use the same budget, then this problem goes away. Once the yield point has been hit, all cooperating streams will yield.
So it means this sub-task corner case can be resolved?
Fix the corner case provided in this link: https://gist.github.com/pepijnve/0e1a66f98033c6c44c62a51fb9dbae5a
Using the task budget also avoids the 'redundant yield' problem in the current version. If you now do a simple
SELECT * FROM ...query, by default you'll get aPendingafter every 64Ready(RecordBatch). With the task budget you will only actually inject thePendingwhen it's actually necessary. The system automatically does the right thing.
I am curious what's the budget count since we can't config it from datafusion, will it affect performance or other things? It seems not, because we already use RecordBatchReceiverStream for the budget?
Another question:
If we have to share the one budget for all leaf nodes, will some leaf node very aggressive consuming budget will affect the total fairness or performance?
So it means this sub-task corner case can be resolved?
Yes, that's correct.
I am curious what's the budget count since we can't config it from datafusion, will it affect performance or other things? It seems not, because we already use RecordBatchReceiverStream for the budget?
@zhuqi-lucas that's correct, you can't configure it at the moment. That's the case for RecordBatchReceiverStream today as well indeed. Tokio hardcodes the magic number 128 (see https://github.com/tokio-rs/tokio/blob/master/tokio/src/task/coop/mod.rs#L116).
If we have to share the one budget for all leaf nodes, will some leaf node very aggressive consuming budget will affect the total fairness or performance?
The budget is per spawned task. Every time the tokio scheduler lets a task run it gives it a budget of 128 which the task can then deplete until it hits zero. Then the task is coaxed towards yielding by making all budget aware Tokio resources return Pending.
From the perspective of DataFusion code I don't think this really changes all that much. It's the exact same behavior you have today already when the source streams are RecordBatchReceiverStream. So the moment you have a repartition/coalesce you're getting exactly this with the current code.
So it means this sub-task corner case can be resolved?
Yes, that's correct.
I am curious what's the budget count since we can't config it from datafusion, will it affect performance or other things? It seems not, because we already use RecordBatchReceiverStream for the budget?
@zhuqi-lucas that's correct, you can't configure it at the moment. That's the case for
RecordBatchReceiverStreamtoday as well indeed. Tokio hardcodes the magic number128(see https://github.com/tokio-rs/tokio/blob/master/tokio/src/task/coop/mod.rs#L116).If we have to share the one budget for all leaf nodes, will some leaf node very aggressive consuming budget will affect the total fairness or performance?
The budget is per spawned task. Every time the tokio scheduler lets a task run it gives it a budget of 128 which the task can then deplete until it hits zero. Then the task is coaxed towards yielding by making all budget aware Tokio resources return
Pending. From the perspective of DataFusion code I don't think this really changes all that much. It's the exact same behavior you have today already when the source streams areRecordBatchReceiverStream. So the moment you have a repartition/coalesce you're getting exactly this with the current code.
Got it, it makes sense to me @pepijnve! Thanks!
This is true in theory -- but I think we also take pains to try and avoid "over scheduling" tasks in tokio -- for example, we purposely only have N input partitions (and hence N streams) per scan, even if there are 100+ files -- the goal is to keep all the cores busy, but not oversubscribed.
What I was trying to say is that from a scheduling/yielding pov you can reason about each box in isolation. Whether you actually try to make 100s of concurrent (not parallel) tasks or not is a rabbit hole for another thread 😄
@alamb @ozankabak wdyt? Maybe this is what you were going for all along and I'm just slowly catching up :smiling:
The change of heart comes from the realization that Tokio itself also takes a 'consume at the leaves' strategy and having a task wide budget ensures that tasks cannot silently ignore the yield request. Once one resource depletes the budget, it's no longer possible to make progress anywhere else provided all resource participate in the budgeting system.
A draft PR would be good to have, I think I can make better comments then. However, solving this at the stream level in a way transparent to the operator builder would be great in general. That was my original intention, but we weren't able to materialize that solution in a reasonable amount of time. Hence the current solution, which is basically a fallback that has some characteristics of the ideal solution (e.g. "transparency", focusing on leaves etc.), but requires support from the planner via to-be-designed APIs. The current approach can evolve into a decent one with such APIs, but it would always be worse than a proper lower-level solution. It would be good if we can build that, but in the meantime, I am glad that we have some solution that works for many cases.
If it turns out that we can arrive at a proper stream-based solution quickly, we can retire this one quickly. Otherwise, we can incrementally improve what we have today as alternatives go through design/experimentation etc.
@ozankabak draft PR is in good enough shape to review I think. The general idea is still the same as what was there before.
The last thing I'm still working on is the ExecutionPlan API where I would like to replace with_cooperative_yielding with a plan property. Trying to get things to be more declarative.