[CT-2723] [spike+] Maximally parallelize `dbt clone` operations, a different mechanism for processing a queue
Originally posted by @jtcohen6 in https://github.com/dbt-labs/dbt-core/pull/7881#discussion_r1236713450
One thing I noticed in my previous spike, and I'm seeing in this implementation as well: Even though each node's
cloneoperation is completely independent of any other node, we are still runningclonein DAG order. That significantly limits how fast this can be with--threads 1000.
I'm not sure how to fix that, and it will require digging into our base task / graph queue logic — but I suspect that will be key to unlocking a lot of the value here.
Repro
-- models/model_a.sql
select 1 as id
-- models/model_b.sql
select * from {{ ref('model_a') }}
$ dbt run --target prod
$ mkdir state
$ mv target/manifest.json state/
$ dbt clone --target dev --state state --full-refresh
From the logs: Notice that
model_bruns in serial aftermodel_a, even though they're on different threads. dbt understands thatmodel_bdepends onmodel_a, even though for the purposes of theclonetask, it really doesn't.
============================== 11:36:51.356566 | 2fab3bf3-3220-47f5-a842-a8e7aee780d1 ==============================
[0m11:36:51.356566 [info ] [MainThread]: Running with dbt=1.6.0-b4
[0m11:36:51.360523 [debug] [MainThread]: running dbt with arguments {'printer_width': '80', 'indirect_selection': 'eager', 'write_json': 'True', 'log_cache_events': 'False', 'partial_parse': 'True', 'cache_selected_only': 'False', 'warn_error': 'None', 'version_check': 'False', 'debug': 'False', 'log_path': '/Users/jerco/dev/scratch/testy/logs', 'fail_fast': 'False', 'profiles_dir': '/Users/jerco/.dbt', 'use_colors': 'True', 'use_experimental_parser': 'False', 'no_print': 'None', 'quiet': 'False', 'warn_error_options': 'WarnErrorOptions(include=[], exclude=[])', 'static_parser': 'True', 'log_format': 'default', 'introspect': 'True', 'target_path': 'None', 'send_anonymous_usage_stats': 'False'}
[0m11:36:51.616469 [info ] [MainThread]: Registered adapter: snowflake=1.6.0-b3
[0m11:36:51.632107 [debug] [MainThread]: checksum: 85def1bdb9aa2c18a6a11d52b89108c001273905a0f5fd72c238caeba233ef8d, vars: {}, profile: , target: dev, version: 1.6.0b4
[0m11:36:51.669565 [debug] [MainThread]: Partial parsing enabled: 0 files deleted, 0 files added, 0 files changed.
[0m11:36:51.669910 [debug] [MainThread]: Partial parsing enabled, no changes found, skipping parsing
[0m11:36:51.670469 [debug] [MainThread]: Publication artifact available
[0m11:36:51.692800 [info ] [MainThread]: Found 2 models, 0 tests, 0 snapshots, 0 analyses, 371 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics, 0 groups
[0m11:36:51.694034 [info ] [MainThread]:
[0m11:36:51.694695 [debug] [MainThread]: Acquiring new snowflake connection 'master'
[0m11:36:51.700555 [debug] [ThreadPool]: Acquiring new snowflake connection 'list_analytics'
[0m11:36:51.712206 [debug] [ThreadPool]: Using snowflake connection "list_analytics"
[0m11:36:51.712583 [debug] [ThreadPool]: On list_analytics: /* {"app": "dbt", "dbt_version": "1.6.0b4", "profile_name": "sandbox-snowflake", "target_name": "dev", "connection_name": "list_analytics"} */
show terse schemas in database analytics
limit 10000
[0m11:36:51.712825 [debug] [ThreadPool]: Opening a new connection, currently in state init
[0m11:36:52.957601 [debug] [ThreadPool]: SQL status: SUCCESS 119 in 1.0 seconds
[0m11:36:53.011881 [debug] [ThreadPool]: On list_analytics: Close
[0m11:36:53.372661 [debug] [ThreadPool]: Re-using an available connection from the pool (formerly list_analytics, now list_analytics_dbt_jcohen_dev)
[0m11:36:53.381793 [debug] [ThreadPool]: Using snowflake connection "list_analytics_dbt_jcohen_dev"
[0m11:36:53.382231 [debug] [ThreadPool]: On list_analytics_dbt_jcohen_dev: /* {"app": "dbt", "dbt_version": "1.6.0b4", "profile_name": "sandbox-snowflake", "target_name": "dev", "connection_name": "list_analytics_dbt_jcohen_dev"} */
show terse objects in analytics.dbt_jcohen_dev limit 10000
[0m11:36:53.382470 [debug] [ThreadPool]: Opening a new connection, currently in state closed
[0m11:36:54.382890 [debug] [ThreadPool]: SQL status: SUCCESS 2 in 1.0 seconds
[0m11:36:54.387887 [debug] [ThreadPool]: On list_analytics_dbt_jcohen_dev: Close
[0m11:36:54.748809 [info ] [MainThread]: Concurrency: 8 threads (target='dev')
[0m11:36:54.749807 [info ] [MainThread]:
[0m11:36:54.755801 [debug] [Thread-1 (]: Began running node model.my_dbt_project.model_a
[0m11:36:54.756728 [debug] [Thread-1 (]: Re-using an available connection from the pool (formerly list_analytics_dbt_jcohen_dev, now model.my_dbt_project.model_a)
[0m11:36:54.757151 [debug] [Thread-1 (]: Began compiling node model.my_dbt_project.model_a
[0m11:36:54.757645 [debug] [Thread-1 (]: Timing info for model.my_dbt_project.model_a (compile): 11:36:54.757426 => 11:36:54.757430
[0m11:36:54.758014 [debug] [Thread-1 (]: Began executing node model.my_dbt_project.model_a
[0m11:36:54.775804 [debug] [Thread-1 (]: On "model.my_dbt_project.model_a": cache miss for schema "analytics.dbt_jcohen_prod", this is inefficient
[0m11:36:54.778532 [debug] [Thread-1 (]: Using snowflake connection "model.my_dbt_project.model_a"
[0m11:36:54.778925 [debug] [Thread-1 (]: On model.my_dbt_project.model_a: /* {"app": "dbt", "dbt_version": "1.6.0b4", "profile_name": "sandbox-snowflake", "target_name": "dev", "node_id": "model.my_dbt_project.model_a"} */
show terse objects in analytics.dbt_jcohen_prod limit 10000
[0m11:36:54.779255 [debug] [Thread-1 (]: Opening a new connection, currently in state closed
[0m11:36:55.787694 [debug] [Thread-1 (]: SQL status: SUCCESS 5 in 1.0 seconds
[0m11:36:55.793394 [debug] [Thread-1 (]: While listing relations in database=analytics, schema=dbt_jcohen_prod, found: ANOTHER_MODEL, MODEL_A, MODEL_B, MY_MODEL, MY_SEED
[0m11:36:55.842689 [debug] [Thread-1 (]: Writing runtime sql for node "model.my_dbt_project.model_a"
[0m11:36:55.845516 [debug] [Thread-1 (]: Using snowflake connection "model.my_dbt_project.model_a"
[0m11:36:55.845921 [debug] [Thread-1 (]: On model.my_dbt_project.model_a: /* {"app": "dbt", "dbt_version": "1.6.0b4", "profile_name": "sandbox-snowflake", "target_name": "dev", "node_id": "model.my_dbt_project.model_a"} */
create or replace view analytics.dbt_jcohen_dev.model_a
as (
select * from analytics.dbt_jcohen_prod.model_a
);
[0m11:36:56.184105 [debug] [Thread-1 (]: SQL status: SUCCESS 1 in 0.0 seconds
[0m11:36:56.215770 [debug] [Thread-1 (]: Timing info for model.my_dbt_project.model_a (execute): 11:36:54.758266 => 11:36:56.215606
[0m11:36:56.216137 [debug] [Thread-1 (]: On model.my_dbt_project.model_a: Close
[0m11:36:56.587539 [debug] [Thread-1 (]: Finished running node model.my_dbt_project.model_a
[0m11:36:56.589991 [debug] [Thread-3 (]: Began running node model.my_dbt_project.model_b
[0m11:36:56.591451 [debug] [Thread-3 (]: Acquiring new snowflake connection 'model.my_dbt_project.model_b'
[0m11:36:56.592146 [debug] [Thread-3 (]: Began compiling node model.my_dbt_project.model_b
[0m11:36:56.592824 [debug] [Thread-3 (]: Timing info for model.my_dbt_project.model_b (compile): 11:36:56.592575 => 11:36:56.592579
[0m11:36:56.593252 [debug] [Thread-3 (]: Began executing node model.my_dbt_project.model_b
[0m11:36:56.600984 [debug] [Thread-3 (]: Writing runtime sql for node "model.my_dbt_project.model_b"
[0m11:36:56.602917 [debug] [Thread-3 (]: Using snowflake connection "model.my_dbt_project.model_b"
[0m11:36:56.603500 [debug] [Thread-3 (]: On model.my_dbt_project.model_b: /* {"app": "dbt", "dbt_version": "1.6.0b4", "profile_name": "sandbox-snowflake", "target_name": "dev", "node_id": "model.my_dbt_project.model_b"} */
create or replace view analytics.dbt_jcohen_dev.model_b
as (
select * from analytics.dbt_jcohen_prod.model_b
);
[0m11:36:56.603966 [debug] [Thread-3 (]: Opening a new connection, currently in state init
[0m11:36:57.722641 [debug] [Thread-3 (]: SQL status: SUCCESS 1 in 1.0 seconds
[0m11:36:57.728537 [debug] [Thread-3 (]: Timing info for model.my_dbt_project.model_b (execute): 11:36:56.593543 => 11:36:57.728157
[0m11:36:57.729557 [debug] [Thread-3 (]: On model.my_dbt_project.model_b: Close
[0m11:36:58.086873 [debug] [Thread-3 (]: Finished running node model.my_dbt_project.model_b
[0m11:36:58.089987 [debug] [MainThread]: Connection 'master' was properly closed.
[0m11:36:58.090496 [debug] [MainThread]: Connection 'model.my_dbt_project.model_a' was properly closed.
[0m11:36:58.090840 [debug] [MainThread]: Connection 'model.my_dbt_project.model_b' was properly closed.
[0m11:36:58.091862 [debug] [MainThread]: Command end result
[0m11:36:58.108549 [info ] [MainThread]:
[0m11:36:58.109036 [info ] [MainThread]: [32mCompleted successfully[0m
[0m11:36:58.109432 [info ] [MainThread]:
[0m11:36:58.109785 [info ] [MainThread]: Done. PASS=2 WARN=0 ERROR=0 SKIP=0 TOTAL=2
[0m11:36:58.110456 [debug] [MainThread]: Command `dbt clone` succeeded at 11:36:58.110325 after 6.42 seconds
[0m11:36:58.110747 [debug] [MainThread]: Flushing usage events
I said in https://github.com/dbt-labs/dbt-core/pull/7881#discussion_r1237123614:
IIRC -
compileanddocs generatedon't run in DAG order. (They also interrupt the entire command on the first failure encountered during compilation.) I think the inconsistency is justifiable, if the parallelism delivers significantly better performance.
After looking into it a bit more, I don't think this is actually is true. It would make sense, though — even if model_b depends on model_a (e.g. for an introspective query), we're not actually materializing model_a, so it makes no difference whether we compile it before/after/concurrently with model_b.
So: This would be a first in dbt, and require some modification to how we iterate over the job/graph queue.
Notes from refinement:
- where would we want to put this change?
- we could remove all edges in the node so they're all the "same", but we'd be constructing a graph in a way different from the DAG order
- for clone, we should always be using the maximum number of threads
- instead, could we run in a different "mode" (where we don't worry about if they're are edges)
- this may have implications to ephemeral models
Where would this speed things up and by how much? @jtcohen6 do you have an example here to understand the concrete benefit of doing this - we'd like to understand what the performance gain would be to decide if this change is worth it
We still need to construct a DAG for the dbt clone task to support node selection — but when it comes time to execute, we can treat it as a "flat" list, not a directed graph with edges. I agree a different run "mode" makes sense here. The same "run mode" could apply to compile + docs generate.
It's a fair thought re: ephemeral models. The good news is, at least for dbt clone, they're totally irrelevant: They're not cloned, and we don't need to interpolate them into the compiled_code of other nodes.
I believe this would significantly speed up the execution of dbt clone, though "by how much" will depend on the size & shape of a given project's DAG and the actual concurrency that Snowflake can handle.
In theory, if you're running with as many threads as you have selected models for cloning:
- Let's say it takes X seconds to clone one model (whether creating a table clone or a "pointer" view). Conservatively, let's say this is ~2 seconds.
- Current state is that
dbt clonewill take no less than X seconds * Y, where Y is the greatest parent-child depth in the DAG, because each parent will be cloned before its child. - This could be as fast as X (total) — up to the level of parallelism that Snowflake can actually support, which is not documented :)
Meaning: The maximum theoretical speedup is to run this Y times faster, where Y is the greatest parent-child depth among selected resources. In dbt Labs' internal-analytics project, Y = 31.
import networkx
G = networkx.readwrite.gpickle.read_gpickle("target/graph.gpickle")
networkx.dag_longest_path(G)
print(len(networkx.dag_longest_path(G)))
Instead of dbt clone taking ~200 seconds, it could be taking <10 seconds. In theory!!
We're never going to achieve that theoretical speedup in practice, but I'd hope it could be pretty darn significant. There is the additional latency due to dbt maintaining a central adapter cache, which each thread must lock while updating. @peterallenwebb had identified some bonus cache-related slowness in https://github.com/dbt-labs/dbt-core/pull/6844. I had tried pulling that change into my previous spike PR, and it shaved off ~40% of the total runtime: https://github.com/dbt-labs/dbt-core/pull/7258#issuecomment-1501587554.
The same "run mode" could apply to compile + docs generate.
I believe this will also be relevant for our unit testing work, since unit tests do not need to run in DAG order
A simple approach that might work here to achieve an 'maximally parallelized execution mode' would be to modify the get_graph_queue method to accept an optional config that builds the graph queue without any edges.
More notes from refinement:
- we should do something deterministic here
- this feels similar to the ordering of the unit tests in the build command (having some flexibility around execution order, a more general approach)
- for clone, every node becomes independent -> in theory, it could run fully in parallel
PriorityQueuedetermines the order things should be run in, could we use an alternative queue object (flat)?- alt. make the DAG without any edges -> just a flat list
- just depends on where we implement it -> in the queue object vs. changing the DAG
- theme: the DAG doesn't quite tell us what the execution order is... we need a more general, less coupled mechanism for determining execution order
- the queue starts with small subset, then looks for more things to add -> instead we have a mode that just accepts a list and works through that list (maybe we need a different mechanism for processing a queue, new flag or new queue object)
- testing this could be tricky
This might also apply to unittest(in test command).