materialize icon indicating copy to clipboard operation
materialize copied to clipboard

[explain] Support materialized views in `explain filter pushdown`

Open bkirwi opened this issue 1 year ago • 2 comments

Motivation

#24498

In particular, it can be helpful to see the statistics for an existing materialized view -- to know what the level of pushdown would be like if the view was re-bootstrapping now.

Tips for reviewer

I think the diciest part of this PR is the timestamp selection code... please check my work!

Checklist

  • [ ] This PR has adequate test coverage / QA involvement has been duly considered.
  • [ ] This PR has an associated up-to-date design doc, is a design doc (template), or is sufficiently small to not require a design.
  • [ ] If this PR evolves an existing $T ⇔ Proto$T mapping (possibly in a backwards-incompatible way), then it is tagged with a T-proto label.
  • [ ] If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label (example).
  • [ ] This PR includes the following user-facing behavior changes:

bkirwi avatar Apr 05 '24 21:04 bkirwi

Okay, I have attempted to apply all suggestions - should be ready for another look.

Output looks reasonable at a glance:

materialize=> create materialized view mv as select * from bids where bid_time + '15 seconds' > mz_now();
CREATE MATERIALIZED VIEW

materialize=> explain filter pushdown for materialized view mv;
         Source          | Total Bytes | Selected Bytes | Total Parts | Selected Parts 
-------------------------+-------------+----------------+-------------+----------------
 materialize.public.bids | 27643       | 11342          | 15          | 10
(1 row)

materialize=> explain filter pushdown for materialized view mv;
         Source          | Total Bytes | Selected Bytes | Total Parts | Selected Parts 
-------------------------+-------------+----------------+-------------+----------------
 materialize.public.bids | 22943       | 6642           | 10          | 5
(1 row)

We see the number of bytes / parts that need to be fetched vary, but remain roughly stable even as the shard grows.

bkirwi avatar Apr 26 '24 22:04 bkirwi

Thanks, the new as_of selection looks good! But some more tests would be good to have, I think.

ggevay avatar Apr 27 '24 15:04 ggevay

Taking back to draft as I integrate some of the recent explain changes and try and work through some corner cases in the bootstrap logic...

bkirwi avatar Jun 03 '24 19:06 bkirwi

After switching to bootstrap_dataflow_as_of and adding tests for the weird menagerie of MVs in materialized_views.slt, I'm now running into this panic.

thread 'coordinator' panicked at src/adapter/src/coord.rs:2339:13:
error bootstrapping dataflow `as_of`: `min_as_of` [1717446599000] greater than `max_as_of` [1717446595426] (import_ids=[u57], export_ids=[u59], storage_constraints=StorageConstraints { dependencies: {User(57)}, dependants: {User(59)} })
stack backtrace:
0: rust_begin_unwind
1: core::panicking::panic_fmt
2: <mz_adapter::coord::Coordinator>::bootstrap_dataflow_as_of

min_as_of is chosen based on the read holds we're able to obtain for all inputs. max_as_of is based on the meet of the since.join(upper) of all outputs. (Colloquially, that's the time that we need to make sure we're generating correct output as of... it's the least frontier such that future writes at that time will be visible to readers.)

This assert seems intuitively correct, though I'm not 100% sure that it's valid in the case of refresh-every MVs. Since it's only showing up when this method is invoked on my branch, that implies that there's something that holds true during bootstrap but not afterwards, but I haven't figured out which.

I'm going to try and bisect this, but given that it's just a flake, that might take a long time or give wrong results. If anyone has ideas or suggestions for places to look, I'm all eyes!

bkirwi avatar Jun 04 '24 16:06 bkirwi

I'll take a look tomorrow!

ggevay avatar Jun 04 '24 20:06 ggevay

Thanks! If it's useful, the error seems to reliably be when explaining the materialized view mvi1, which is defined as:

CREATE MATERIALIZED VIEW mvi1 WITH (REFRESH EVERY '8s' ALIGNED TO mz_now()::string::int8 + 2000) AS
SELECT 5*x FROM t3;

It is indeed a refresh-every view, though I'm not sure what in particular would make it the troublesome ones. I'm seeing a failure once in every ~5 runs of the SLT on average. The min_as_of appears reliably just a few seconds past the max_as_of.

bkirwi avatar Jun 05 '24 02:06 bkirwi

This assert seems intuitively correct, though I'm not 100% sure that it's valid in the case of refresh-every MVs.

I don't see any reason why this shouldn't be true for REFRESH MVs.

Since it's only showing up when this method is invoked on my branch, that implies that there's something that holds true during bootstrap but not afterwards, but I haven't figured out which.

Well, not necessarily. It's also possible that our restart tests are somehow not hitting the same issue due to just random chance. In other words, maybe if we did a restart at the exact same moment where your EXPLAIN FILTER PUSHDOWN is running into the issue, then maybe bootstrapping would run into the same issue too.

I've looked into this, but it appears to be some deep bug. bootstrap_dataflow_as_of seems correct: The assertion must be true simply because the Compute Controller ensures that for every materialized view, we have a read capability on every input at a time that is not greater than the write frontier of the materialized view. So there might be some bug in the controllers' frontier management:

  1. Maybe the Compute Controller is somehow not keeping a correct read capability on an input.
  2. Or maybe the Storage Controller somehow forgets about a read capability that the Compute Controller wants to keep.
  3. Or maybe the Storage Controller somehow gets confused about what is the write frontier of the MV. Like, the MV writes some output data, so a new write frontier gets to handle_frontiers_response and the Compute Controller updates the input read holds, but somehow the Storage Controller doesn't update the write frontier for the MV. Btw. where is the code that updates the Storage Controller's knowledge of an MV's write frontier whenever the MV writes some output? Wherever that is, might it be that the information gets there later than to the Compute Controller's handle_frontiers_response?

I'm leaning towards 3., because I see no reason for why would the Compute Controller's input read holds even matter in this situation: the MV is a very simple computation, reading from a single table, so as soon as the table's write frontier ticks forward, the MV should be able to immediately process it, and therefore there should be no reason to keep the table's read capability from advancing for several seconds (the difference in the assert msg that Ben pasted is more than 3 seconds).

@teskje, what do you think?

I'm seeing a failure once in every ~5 runs of the SLT on average.

It immediately happened for me locally on the first try, but then I added some debug logging, and it hasn't happened since, even though I've ran it much more than 5 times by now :D

ggevay avatar Jun 05 '24 19:06 ggevay

Btw. where is the code that updates the Storage Controller's knowledge of an MV's write frontier whenever the MV writes some output? Wherever that is, might it be that the information gets there later than to the Compute Controller's handle_frontiers_response?

This might be it! I've added an assert in handle_frontiers_response that queries the Storage Controller for the MV's write frontier, and checks that it's not earlier than the frontier that handle_frontiers_response was called with, and this assert fires super quickly in the slt! My debug branch is here. (I have to sign off for today.)

ggevay avatar Jun 05 '24 19:06 ggevay

Btw. if indeed the above is the problem, then the actual bootstrapping might be safe from it, because the Storage Controller's state is rebuilt during restart. (Assuming that the updated write frontier hits Persist before it reaches handle_frontiers_response.)

ggevay avatar Jun 05 '24 19:06 ggevay

Btw. where is the code that updates the Storage Controller's knowledge of an MV's write frontier whenever the MV writes some output? Wherever that is, might it be that the information gets there later than to the Compute Controller's handle_frontiers_response?

Yep, that’s possible now. Until recently the compute controller would tell the storage controller about write frontier updates of MVs, but since https://github.com/MaterializeInc/materialize/pull/27057 the StorageCollections object installs a background task that listens for these frontier advancements on its own using persist handles. I would expect that to usually be faster than going through the compute protocol, but that might be different in CI. In any case, the storage controller’s view of MV write frontiers can be different than the compute controller’s.

teskje avatar Jun 05 '24 19:06 teskje

Yeah, that all sounds quite plausible! In particular it helps explain why we don't see issues at bootstrap time. (Though of course this appears pretty hard to trigger even in the problem case.)

I suppose possible fixes include:

  1. Having the compute controller learn about progress via storage collections directly. (I like this but it seems like a significant change.)
  2. Making the frontier-fetch method do a linearizable upper fetch. (Definitely fine but maybe expensive.)
  3. Turning the panics into errors and adding a retry loop. (Gross but probably works around this already-esoteric issue.)

I'll try whichever one of these is easiest and follow up with whether I can still reproduce or not, to help confirm the diagnosis.

bkirwi avatar Jun 05 '24 20:06 bkirwi

Having the compute controller learn about progress via storage collections directly.

If at all possible I'd like to evolve the compute controller to have fewer special cases, not more. So I'd prefer to keep the way of updating write frontiers of persist-sink collections the same as for other collection types. We already have an inconsistency with subscribes (which receive frontier updates through SubscribeResponses instead of Frontiers responses) and this complicates the code enough that we'd like to move away from it (https://github.com/MaterializeInc/database-issues/issues/4701).

Apart from adding a special case, having the compute controller learn about persist-sink frontiers from the storage controller requires the compute controller to be aware of the relationship between the dataflow and the storage collection. Currently it doesn't need to be. I'm not sure how much more difficult our lives would be if we introduced that new assumption (maybe not much?) but it's nice not needing it.

Taking a step back, bootstrap_dataflow_as_of is written to work in the context of adapter's bootstrap method, and specifically only after all the storage collections have been created in the storage controller but before the given dataflow has been created in the compute controller. So it's not entirely surprising that it doesn't work in other contexts. Before trying to adapt it to other contexts (which might be hard as you point out), are we sure that the EXPLAIN timestamp selection really needs to do everything that method does?

What bootstrap_dataflow_as_of tries to achieve, on the basis of persist-frontiers only, is to produce an as-of that is close to the read frontier the dataflow had before the restart. Would it work to just use the current read frontier of the MV dataflow, i.e. the read frontier the compute controller knows for that collection?

teskje avatar Jun 06 '24 08:06 teskje

Yeah, I agree that we should consider moving away from bootstrap_dataflow_as_of. At this point, it's probably better to not exactly nail the timestamp that an MV would have if a restart were to happen, rather than to introduce more code complications.

ggevay avatar Jun 06 '24 08:06 ggevay

Before trying to adapt it to other contexts (which might be hard as you point out), are we sure that the EXPLAIN timestamp selection really needs to do everything that method does?

Well! The underlying motive is that we'd like to produce an estimate of the stats that we'd see if we were re-hydrating the dataflow now, and calling this method was the suggested highest-fidelity way of doing that. Which makes sense to me - it's the ultimate source of truth for what rehydrating a dataflow would do, IIUC!

I don't think choosing a cheaper but lower-precision timestamp is off the table. We've been tempted to turn this knob a bit further, and make this never block at the cost of returning some useless stats in some cases. I may follow up on that off-thread.

Would it work to just use the current read frontier of the MV dataflow, i.e. the read frontier the compute controller knows for that collection?

IIUC the current read frontier of an MV is no later than the write frontier, and we're getting this panic because that's earlier than any of the read holds we can obtain. So I feel like the direct version of this might panic as well.

Could always clamp that frontier at whatever read hold we get, which is weird but probably close enough...

If at all possible I'd like to evolve the compute controller to have fewer special cases, not more.

Hard to argue with this motive! I do think it's likely to be a problem in the future if the storage collections have inconsistent state. But I think there are probably enough cooks in the StorageCollections kitchen so I will leave it to y'all to sort out... 😅

bkirwi avatar Jun 06 '24 15:06 bkirwi

Would it work to just use the current read frontier of the MV dataflow, i.e. the read frontier the compute controller knows for that collection?

IIUC the current read frontier of an MV is no later than the write frontier, and we're getting this panic because that's earlier than any of the read holds we can obtain. So I feel like the direct version of this might panic as well.

I think Jan was suggesting to get the read frontier of the MV dataflow from the Compute Controller, which should be consistent with the read capabilities that the Compute Controller is keeping on the inputs. So you should be able to read the inputs at that timestamp.

ggevay avatar Jun 06 '24 15:06 ggevay

Which makes sense to me - it's the ultimate source of truth for what rehydrating a dataflow would do, IIUC!

Depends on the circumstances of rehydrating: bootstrap_dataflow_as_of selects the as-of from which dataflows hydrate after an envd restart. But if you just spawn a new replica in a cluster, it will hydrate the dataflow from its current read frontier.

IIUC the current read frontier of an MV is no later than the write frontier, and we're getting this panic because that's earlier than any of the read holds we can obtain.

This might be true. I think you can only get a read hold at a storage collection's implied_capability, where the read frontier of a dataflow can easily be before that. The read frontier can never be before the read_capabilities.frontier() of any of its input collections, but the storage controller might refuse to give out read holds for that time.

I don't think there are any hard reasons for the storage controller to behave like that, so that might be something we can change. But I agree, using max(read_hold, compute_read_frontier) seems fine.

teskje avatar Jun 06 '24 15:06 teskje

I think you can only get a read hold at a storage collection's implied_capability, where the read frontier of a dataflow can easily be before that. The read frontier can never be before the read_capabilities.frontier() of any of its input collections, but the storage controller might refuse to give out read holds for that time.

I don't think there are any hard reasons for the storage controller to behave like that, so that might be something we can change. But I agree, using max(read_hold, compute_read_frontier) seems fine.

This used to be like this, but I think this has actually changed recently. StorageCollections::acquire_read_holds seems to go by collection.read_capabilities nowadays. @aljoscha might be able to confirm.

ggevay avatar Jun 06 '24 15:06 ggevay

Ah, and there is even a comment explaining the decision! In this case using the compute read frontier should indeed just work.

teskje avatar Jun 06 '24 16:06 teskje

Cool! I'm not totally comfortable with my understanding of the new offset-hold infrastructure, but the approach does seem to work... see the final commit for the relevant diff.

Nightly run here, though it's probably not that interesting.

bkirwi avatar Jun 06 '24 21:06 bkirwi

The timestamp selection is looking good to me, thanks for iterating on this!

I wrote some comments on the slt.

ggevay avatar Jun 07 '24 12:06 ggevay