adapter,storage: expose the catalog shard to SQL
Adapter's catalog is now stored in a persist shard with our usual
<SourceData, (), Timestamp, i64> types. As a result, we can present
this shard for introspection to the rest of the system.
A number of internal tables contain information derived entirely from the catalog, which is the source of truth. For these tables, when the catalog changes, we issue a second write to the storage controller Append api.
This PR sets us up for starting to replace those table with VIEWs. Not
only does this save us the second write's complexity and performance
hit, it reduces the chance for discrepancies when catalog changes are
happening in multiple places (Pv2).
It also happens to be extremely cool for debugging. Now that the catalog also contains the storage controller state, this allows us to introspect that, too.
materialize=> COPY (SUBSCRIBE TO (select data from mz_internal.mz_catalog_raw where data->>'kind' = 'UnfinalizedShard') as of 1) TO STDOUT;
13 1 {"key":{"shard":"sc6df7783-69cb-4b31-9b45-98c8e2799076"},"kind":"UnfinalizedShard"}
materialize=> COPY (SUBSCRIBE TO (select data from mz_internal.mz_catalog_raw where data->>'kind' = 'StorageCollectionMetadata') as of 1) TO STDOUT;
6 1 {"key":{"id":{"value":{"System":450}}},"kind":"StorageCollectionMetadata","value":{"shard":"s04d31384-3a30-48d4-8ca8-6d35464ebd56"}}
...
6 1 {"key":{"id":{"value":{"System":487}}},"kind":"StorageCollectionMetadata","value":{"shard":"s9b99714a-0f13-4653-a6e6-92cf0eab50a8"}}
12 1 {"key":{"id":{"value":{"User":1}}},"kind":"StorageCollectionMetadata","value":{"shard":"sc6df7783-69cb-4b31-9b45-98c8e2799076"}}
13 -1 {"key":{"id":{"value":{"User":1}}},"kind":"StorageCollectionMetadata","value":{"shard":"sc6df7783-69cb-4b31-9b45-98c8e2799076"}}
Motivation
- This PR adds a feature that has not yet been specified.
Tips for reviewer
Open questions:
- ~Storage: The last time we talked about this, we decided to model this conceptually as a "source" external to storage that happens to be ingested from a persist shard, with a performance optimization to read the shard directly instead of copying it to a new shard. But mechanically, I forgot what we said: was it
DataSource::Other(DataSourceOther::Shard(ShardId))?~- Upon discussion, we can just use the existing
DataSourceOther::Computefor this; just need to give that variant a more generic name. https://github.com/MaterializeInc/materialize/pull/29768 does the trick.
- Upon discussion, we can just use the existing
- ~Storage: The new storage controller durability stuff requires that we write down the shard ids before the
create_collectionscall, which means we only see theShardIdin theDataSourcetoo late. I've got a big hack and a WIP comment here in the draft. Lots of options for fixing this and I don't have a particular preference, so happy to type up whatever y'all prefer!~- @benesch says: I don't think this was a hack at all, and is fundamental to introspecting the catalog shard. There's just a chicken and egg problem here. I cleaned up the implementation of the hack, and I think it's now quite elegant.
- ~Adapter: How do we want to model this in builtins.rs and the CatalogEntry? I'd prefer to avoid adding a new variant to
DataSourceDescsince that gets matched in a ton of places and AFAICTDataSourceDesc::Introspectionis semantically correct (enough).~- We're using the same
BuiltinSourceobject as we do for other builtin sources, but we've introduced aDataSourceIntrospectionDescthat distinguishes between the catalog introspection and other types of storage introspection collections.
- We're using the same
- ~Joe: We'll probably want to start using more realtime timestamps for the writes instead of just incrementing by 1. I assume that's not particularly hard? The thing I haven't figured out yet is if we need to start advancing the upper of the catalog shard as time passes, or if there's some nice way around that.~
- We're now marking the catalog shard as in an external "mz_catalog" timeline, which has decent semantics for now. There's a TODO we can tackle later to move the whole shard into the EpochMillis timeline, but that's a larger refactor.
- ~Aljoscha: This will want a bit of code that watches for upper advancements and sends them to the storage controller to power the since downgrades. IIRC some of your upcoming controller work already does something like that. Is that landing soon and/or is there a bit we should pull out and merge to reuse here?~
- This is done, and works well!
Checklist
- [x] This PR has adequate test coverage / QA involvement has been duly considered.
- [x] This PR has an associated up-to-date design doc, is a design doc (template), or is sufficiently small to not require a design.
- [x] If this PR evolves an existing
$T ⇔ Proto$Tmapping (possibly in a backwards-incompatible way), then it is tagged with aT-protolabel. - [x] If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label (example).
- [x] This PR includes the following user-facing behavior changes:
- Not user facing yet.
Joe: We'll probably want to start using more realtime timestamps for the writes instead of just incrementing by 1. I assume that's not particularly hard?
Yes, we already want to start using timestamps from the timestamp oracle for writes we just haven't gotten around to that yet. An open question around this is should we use the existing EpochMillis timeline or should we have a separate catalog timeline? Personally, I think we should use the existing EpochMillis timeline. One of the most compelling reasons for this is that the views over the catalog (i.e. the builtin views you're proposing in this PR) will be in the same timeline as user objects. That's how the system currently behaves and would allow joining builtin views with user objects without the need for some re-clocking logic from the catalog timeline to the EpochMillis timeline. Though there are still arguments to create a new catalog timeline.
The thing I haven't figured out yet is if we need to start advancing the upper of the catalog shard as time passes, or if there's some nice way around that.
With the EpochMillis approach I think we would need some advancing of the upper as time passes. A short term solution could be to advance the upper in parallel to advancing the upper of all tables. A longer term solution could be to include the catalog shard in the persist-txn shard of all the tables somehow.
- Aljoscha: This will want a bit of code that watches for upper advancements and sends them to the storage controller to power the since downgrades. IIRC some of your upcoming controller work already does something like that. Is that landing soon and/or is there a bit we should pull out and merge to reuse here?
I have it in my branch/PR and will hopefully land if a couple weeks: https://github.com/MaterializeInc/materialize/pull/24816
With the EpochMillis approach I think we would need some advancing of the upper as time passes. A short term solution could be to advance the upper in parallel to advancing the upper of all tables. A longer term solution could be to include the catalog shard in the persist-txn shard of all the tables somehow.
Agreed! I don't like manually advancing the upper continually, so adding it to txn would be the tidiest solution. I thought a tiny bit about the bootstrapping problem, where we have to read from the catalog first in order to set up txn and everything else: I think it can be done, we can read from the catalog shard like from a normal shard, bootstrap txn and things, and then switch over to treating it as a txns shard. That way we get around the chicken-and-egg problem of requiring reading the catalog shard to know the location of the txn shard and potentially other parameters.
Adapter: How do we want to model this in builtins.rs and the CatalogEntry? I'd prefer to avoid adding a new variant to DataSourceDesc since that gets matched in a ton of places and AFAICT DataSourceDesc::Introspection is semantically correct (enough).
My vote would be to add a new builtin and CatalogEntry type specifically for the catalog shard since it's so unique. However, we could still present it to the storage controller as a normal source with a DataSourceDesc::Introspection. Would something like that work?
Yes, we already want to start using timestamps from the timestamp oracle for writes we just haven't gotten around to that yet. An open question around this is should we use the existing EpochMillis timeline or should we have a separate catalog timeline? Personally, I think we should use the existing EpochMillis timeline. One of the most compelling reasons for this is that the views over the catalog (i.e. the builtin views you're proposing in this PR) will be in the same timeline as user objects. That's how the system currently behaves and would allow joining builtin views with user objects without the need for some re-clocking logic from the catalog timeline to the EpochMillis timeline. Though there are still arguments to create a new catalog timeline.
With the EpochMillis approach I think we would need some advancing of the upper as time passes. A short term solution could be to advance the upper in parallel to advancing the upper of all tables. A longer term solution could be to include the catalog shard in the persist-txn shard of all the tables somehow.
Agreed! I don't like manually advancing the upper continually, so adding it to txn would be the tidiest solution. I thought a tiny bit about the bootstrapping problem, where we have to read from the catalog first in order to set up txn and everything else: I think it can be done, we can read from the catalog shard like from a normal shard, bootstrap txn and things, and then switch over to treating it as a txns shard. That way we get around the chicken-and-egg problem of requiring reading the catalog shard to know the location of the txn shard and potentially other parameters.
Yeah, that's interesting! I think resolving all this blocks replacing the builtin tables with views that are derived from the catalog, but maybe doesn't need to block getting this PR merged? The only thing that would give me pause would be how the read policy interacts with the way the catalog shard timestamps are picked. I wouldn't want to accidentally be holding back compaction on the catalog shard because the storage controller is applying a millisecond-based read policy to what is actually catalog versions.
My vote would be to add a new builtin and
CatalogEntrytype specifically for the catalog shard since it's so unique. However, we could still present it to the storage controller as a normal source with aDataSourceDesc::Introspection. Would something like that work?
Last I heard, the way storage folks wanted to represent this was as a DataSource::Other(DataSourceOther::SomeNewVariant) instead of a DataSource::Introspection. DataSourceDesc::Introspection and DataSource::Introspection both use the same IntrospectionType enum, which makes it tricky to thread this needle.
I tried a variant of this PR where we added it as a variant to that enum. Mechanically, there's a bunch of DataSource::Introspection(_) match branches in storage that are incorrect for this, so it seemed brittle.
Broadly, I don't think it's unreasonable for somewhere there to be an enum which looks like enum FooIntrospection { Storage(_), Catalog(_) } given that with this PR we actually do have two different categories of introspection.
I have it in my branch/PR and will hopefully land if a couple weeks: #24816
I was hoping to land this more like next week! Is there a bit of that we could pull out and merge as part of this? Mechanically, all we need really is a task that does the WriteHandle::wait_for_upper_past trick, but I don't want to add anything that makes landing your PR harder.
Mitigations
Completing required mitigations increases Resilience Coverage.
- [x] (Required) Code Review
🔍 Detected - [ ] (Required) Observability
- [ ] Feature Flag
- [x] Integration Test
🔍 Detected - [ ] QA Review
- [ ] Unit Test
Risk Summary:
The pull request has a high-risk score of 78, driven by the number of files modified and function declarations within those files. Historically, PRs with these predictors are 65% more likely to cause a bug than the repository baseline. Additionally, the repository's observed and predicted bug trends are both decreasing.
Note: The risk score is not based on semantic analysis but on historical predictors of bug occurrence in the repository. The attributes above were deemed the strongest predictors based on that history. Predictors and the score may change as the PR evolves in code, time, and review activity.
Bug Hotspots: What's This?
| File | Percentile |
|---|---|
| ../src/coord.rs | 96 |
| ../durable/persist.rs | 99 |
| ../src/builtin.rs | 99 |
| ../src/durable.rs | 93 |
| ../catalog/open.rs | 95 |
I'm taking this over from @danhhz to give him cycles to drive on the persist sink refactor. I'm happy to report I think this is ready for a full review! The first commit is split out into its own PR (https://github.com/MaterializeInc/materialize/pull/29768) in case it's easier to merge that commit in isolation. The second commit contains the actual implementation, but it's a pretty small and straightforward change in the end.
@petrosagg @jkosh44 — can you take a look? All of the open questions are resolved, and I've added notes about their resolution to the PR description.
Porting over a comment from @petrosagg on #29768:
...it's a bit unclear how the catalog shard is going to be tied into this model. Compute does not deal with any unmanaged shards directly. The storage controller mediates its writes to a collection by prescribing a storage-specific dataflow fragment that should be rendered into a compute dataflow. That fragment is opaque and its API is "give me a DD collection". The other API offered by the storage controller is the table write API.
The "storage-specific dataflow fragment" is the persist_sink operator(s), yeah? How do you think about the fact that compute currently has its own custom implementation of the persist sink? Is that just a weird happenstance and that code is still "morally" owned by storage?
AFAIK the adapter currently directly handles and writes a persist shard by using the persist API, which breaks this model. Is this correct? If yes, are we going to make those writes happen through storage APIs?
I think this is very much on the table! Tagging in @jkosh44, who is presently skunkworksing a change to move the shard into the EpochMillis timeline: https://materializeinc.slack.com/archives/C071VTZS1M5/p1728053998234389?thread_ts=1728053899.171539&cid=C071VTZS1M5. I think that change could very plausibly delegate the work of continually bumping forward the catalog shard's upper into the storage controller. Or even moving the catalog shard into the txns system.
How do you think about the fact that compute currently has its own custom implementation of the persist sink? Is that just a weird happenstance and that code is still "morally" owned by storage?
Exactly. In the past having compute changes not lead to a recompilation of mz_storage during development was high up in the desired properties list so that's why it's there. The abstraction is morally enforced by this generic parameter here https://dev.materialize.com/api/rust-private/mz_compute_types/sources/struct.SourceInstanceDesc.html
The M parameter is specified by the storage controller and is the thing that carries all the additional metadata required to produce the pTVC described by that global id in that fragment. That M parameter contains the shard id and is used by the persist sink to do its job but nothing should depend on its concrete type unless it's storage code
The abstraction is morally enforced by this generic parameter
Gotcha, makes sense.
@jkosh44 @danhhz here's a patch for what it would look like to rely on a single critical since handle in the controller, and do a more surgical migration of the opaque types inside of persist.
diff --git a/src/catalog/src/durable/persist.rs b/src/catalog/src/durable/persist.rs
index 1ca5b9436f..82ca4a708a 100644
--- a/src/catalog/src/durable/persist.rs
+++ b/src/catalog/src/durable/persist.rs
@@ -28,24 +28,22 @@ use mz_ore::metrics::MetricsFutureExt;
use mz_ore::now::EpochMillis;
use mz_ore::{
soft_assert_eq_no_log, soft_assert_eq_or_log, soft_assert_ne_or_log, soft_assert_no_log,
- soft_assert_or_log, soft_panic_or_log,
+ soft_assert_or_log,
};
use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
use mz_persist_client::cli::admin::{CATALOG_FORCE_COMPACTION_FUEL, CATALOG_FORCE_COMPACTION_WAIT};
-use mz_persist_client::critical::SinceHandle;
use mz_persist_client::error::UpperMismatch;
use mz_persist_client::read::{Listen, ListenEvent, ReadHandle};
use mz_persist_client::write::WriteHandle;
use mz_persist_client::{Diagnostics, PersistClient, ShardId};
use mz_persist_types::codec_impls::UnitSchema;
-use mz_persist_types::Opaque;
use mz_proto::{RustType, TryFromProtoError};
use mz_repr::Diff;
use mz_storage_types::sources::SourceData;
use sha2::Digest;
use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
use timely::Container;
-use tracing::{debug, info, warn};
+use tracing::{debug, warn};
use uuid::Uuid;
use crate::durable::debug::{Collection, DebugCatalogState, Trace};
@@ -358,8 +356,6 @@ pub(crate) trait ApplyUpdate<T: IntoStateUpdateKindJson> {
pub(crate) struct PersistHandle<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> {
/// The [`Mode`] that this catalog was opened in.
pub(crate) mode: Mode,
- /// Since handle to control compaction.
- since_handle: SinceHandle<SourceData, (), Timestamp, Diff, i64>,
/// Write handle to persist.
write_handle: WriteHandle<SourceData, (), Timestamp, Diff>,
/// Listener to catalog changes.
@@ -502,27 +498,6 @@ impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
return Err(e.into());
}
- // Lag the shard's upper by 1 to keep it readable.
- let downgrade_to = Antichain::from_elem(next_upper.saturating_sub(1));
-
- // The since handle gives us the ability to fence out other downgraders using an opaque token.
- // (See the method documentation for details.)
- // That's not needed here, so we the since handle's opaque token to avoid any comparison
- // failures.
- let opaque = *self.since_handle.opaque();
- let downgrade = self
- .since_handle
- .maybe_compare_and_downgrade_since(&opaque, (&opaque, &downgrade_to))
- .await;
-
- match downgrade {
- None => {}
- Some(Err(e)) => soft_panic_or_log!("found opaque value {e}, but expected {opaque}"),
- Some(Ok(updated)) => soft_assert_or_log!(
- updated == downgrade_to,
- "updated bound should match expected"
- ),
- }
self.sync(next_upper).await?;
Ok(next_upper)
}
@@ -975,17 +950,6 @@ impl UnopenedPersistCatalogState {
}
}
- let since_handle = persist_client
- .open_critical_since(
- catalog_shard_id,
- PersistClient::CATALOG_CRITICAL_SINCE,
- Diagnostics {
- shard_name: CATALOG_SHARD_NAME.to_string(),
- handle_purpose: "durable catalog state critical since".to_string(),
- },
- )
- .await
- .expect("invalid usage");
let (mut write_handle, mut read_handle) = persist_client
.open(
catalog_shard_id,
@@ -1028,7 +992,6 @@ impl UnopenedPersistCatalogState {
let mut handle = UnopenedPersistCatalogState {
// Unopened catalogs are always writeable until they're opened in an explicit mode.
mode: Mode::Writable,
- since_handle,
write_handle,
listen,
persist_client,
@@ -1158,7 +1121,6 @@ impl UnopenedPersistCatalogState {
);
let mut catalog = PersistCatalogState {
mode: self.mode,
- since_handle: self.since_handle,
write_handle: self.write_handle,
listen: self.listen,
persist_client: self.persist_client,
@@ -1223,43 +1185,6 @@ impl UnopenedPersistCatalogState {
.increment_catalog_upgrade_shard_version(self.update_applier.organization_id)
.await;
- // Before v0.120, the durable catalog opened a since handle using the
- // controller's critical ID. Starting in v0.120, the durable catalog
- // uses its own critical ID so that the storage controller can register
- // a critical since handle using a controller critical ID. However, we
- // need to expire our old critical since handle that uses the controller
- // critical ID because it uses the wrong opaque type.
- //
- // TODO(benesch): remove this in v0.121.
- if matches!(self.mode, Mode::Writable) {
- let old_since_handle: SinceHandle<SourceData, (), Timestamp, Diff, i64> = catalog
- .persist_client
- .open_critical_since(
- self.shard_id,
- PersistClient::CONTROLLER_CRITICAL_SINCE,
- Diagnostics {
- shard_name: CATALOG_SHARD_NAME.to_string(),
- handle_purpose: "durable catalog state old critical since".to_string(),
- },
- )
- .await
- .expect("invalid usage");
- let opaque = *old_since_handle.opaque();
- if opaque == <i64 as Opaque>::initial() {
- // If the opaque value is the initial i64 opaque value,
- // we're looking at a critical since handle that an old
- // version of the catalog created. It's safe to call expire
- // on this handle. We don't need to worry about this
- // accidentally finalizing the shard because
- // `catalog.since_handle` is holding back the read frontier.
- info!("expiring old critical since handle for catalog shard");
- old_since_handle.dangerously_expire().await;
- } else {
- info!(%opaque, "not expiring critical since handle for catalog shard; looks new");
- drop(old_since_handle);
- }
- }
-
let write_handle = catalog
.persist_client
.open_writer::<SourceData, (), Timestamp, i64>(
diff --git a/src/persist-client/src/critical.rs b/src/persist-client/src/critical.rs
index 7c4a2cdb55..70062bbd47 100644
--- a/src/persist-client/src/critical.rs
+++ b/src/persist-client/src/critical.rs
@@ -329,23 +329,6 @@ where
// downgrade it to [].
// TODO(bkirwi): revert this when since behaviour on expiry has settled,
// or all readers are associated with a critical handle.
-
- /// Politely expires this reader, releasing its since capability.
- ///
- /// Added back temporarily to faciliate the migration of the catalog shard's
- /// critical since handler to the controller. This migration is careful to
- /// uphold the invariant that an empty `state.critical_readers` means that
- /// the shard has never had a critical reader registered--i.e., the
- /// migration ensures that the shard always has at least one other critical
- /// reader registered before calling this method.
- ///
- /// TODO(benesch): remove this in v0.121.
- #[doc(hidden)]
- #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
- pub async fn dangerously_expire(mut self) {
- let (_, maintenance) = self.machine.expire_critical_reader(&self.reader_id).await;
- maintenance.start_performing(&self.machine, &self.gc);
- }
}
#[cfg(test)]
diff --git a/src/persist-client/src/internal/state.rs b/src/persist-client/src/internal/state.rs
index 09cc036f91..83b3d77c33 100644
--- a/src/persist-client/src/internal/state.rs
+++ b/src/persist-client/src/internal/state.rs
@@ -1391,6 +1391,15 @@ where
}
let reader_state = self.critical_reader(reader_id);
+
+ // One-time migration of catalog shard from i64 opaques to PersistEpoch
+ // opaques.
+ //
+ // TODO(benesch): remove in v0.121.
+ let initial_i64_opaque = OpaqueState(<i64 as Codec64>::encode(&<i64 as Opaque>::initial()));
+ if reader_state.opaque_codec == "i64" && reader_state.opaque == initial_i64_opaque {
+ reader_state.opaque_codec = "PersistEpoch".into();
+ }
assert_eq!(reader_state.opaque_codec, O::codec_name());
if &O::decode(reader_state.opaque.0) != expected_opaque {
diff --git a/src/persist-client/src/lib.rs b/src/persist-client/src/lib.rs
index ccb76b2b94..f512f46140 100644
--- a/src/persist-client/src/lib.rs
+++ b/src/persist-client/src/lib.rs
@@ -410,20 +410,6 @@ impl PersistClient {
Ok(fetcher)
}
-
- /// A convenience [CriticalReaderId] for the catalog shard.
- ///
- /// ```rust
- /// // This prints as something that is not 0 but is visually recognizable.
- /// assert_eq!(
- /// mz_persist_client::PersistClient::CATALOG_CRITICAL_SINCE.to_string(),
- /// "c55555555-6666-7777-8888-999999999999",
- /// )
- /// ```
- pub const CATALOG_CRITICAL_SINCE: CriticalReaderId = CriticalReaderId([
- 85, 85, 85, 85, 102, 102, 119, 119, 136, 136, 153, 153, 153, 153, 153, 153,
- ]);
-
/// A convenience [CriticalReaderId] for Materialize controllers.
///
/// For most (soon to be all?) shards in Materialize, a centralized
Yeah, that diff seems great to me, assuming CI is happy with it.
Cool. I've optimistically punched it in.
Cleaning up old PRs