dbt-core icon indicating copy to clipboard operation
dbt-core copied to clipboard

[CT-2723] [spike+] Maximally parallelize `dbt clone` operations, a different mechanism for processing a queue

Open aranke opened this issue 2 years ago • 7 comments

Originally posted by @jtcohen6 in https://github.com/dbt-labs/dbt-core/pull/7881#discussion_r1236713450

One thing I noticed in my previous spike, and I'm seeing in this implementation as well: Even though each node's clone operation is completely independent of any other node, we are still running clone in DAG order. That significantly limits how fast this can be with --threads 1000.

I'm not sure how to fix that, and it will require digging into our base task / graph queue logic — but I suspect that will be key to unlocking a lot of the value here.

Repro

-- models/model_a.sql
select 1 as id

-- models/model_b.sql
select * from {{ ref('model_a') }} 
$ dbt run --target prod
$ mkdir state
$ mv target/manifest.json state/
$ dbt clone --target dev --state state --full-refresh

From the logs: Notice that model_b runs in serial after model_a, even though they're on different threads. dbt understands that model_b depends on model_a, even though for the purposes of the clone task, it really doesn't.

============================== 11:36:51.356566 | 2fab3bf3-3220-47f5-a842-a8e7aee780d1 ==============================
[0m11:36:51.356566 [info ] [MainThread]: Running with dbt=1.6.0-b4
[0m11:36:51.360523 [debug] [MainThread]: running dbt with arguments {'printer_width': '80', 'indirect_selection': 'eager', 'write_json': 'True', 'log_cache_events': 'False', 'partial_parse': 'True', 'cache_selected_only': 'False', 'warn_error': 'None', 'version_check': 'False', 'debug': 'False', 'log_path': '/Users/jerco/dev/scratch/testy/logs', 'fail_fast': 'False', 'profiles_dir': '/Users/jerco/.dbt', 'use_colors': 'True', 'use_experimental_parser': 'False', 'no_print': 'None', 'quiet': 'False', 'warn_error_options': 'WarnErrorOptions(include=[], exclude=[])', 'static_parser': 'True', 'log_format': 'default', 'introspect': 'True', 'target_path': 'None', 'send_anonymous_usage_stats': 'False'}
[0m11:36:51.616469 [info ] [MainThread]: Registered adapter: snowflake=1.6.0-b3
[0m11:36:51.632107 [debug] [MainThread]: checksum: 85def1bdb9aa2c18a6a11d52b89108c001273905a0f5fd72c238caeba233ef8d, vars: {}, profile: , target: dev, version: 1.6.0b4
[0m11:36:51.669565 [debug] [MainThread]: Partial parsing enabled: 0 files deleted, 0 files added, 0 files changed.
[0m11:36:51.669910 [debug] [MainThread]: Partial parsing enabled, no changes found, skipping parsing
[0m11:36:51.670469 [debug] [MainThread]: Publication artifact available
[0m11:36:51.692800 [info ] [MainThread]: Found 2 models, 0 tests, 0 snapshots, 0 analyses, 371 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics, 0 groups
[0m11:36:51.694034 [info ] [MainThread]: 
[0m11:36:51.694695 [debug] [MainThread]: Acquiring new snowflake connection 'master'
[0m11:36:51.700555 [debug] [ThreadPool]: Acquiring new snowflake connection 'list_analytics'
[0m11:36:51.712206 [debug] [ThreadPool]: Using snowflake connection "list_analytics"
[0m11:36:51.712583 [debug] [ThreadPool]: On list_analytics: /* {"app": "dbt", "dbt_version": "1.6.0b4", "profile_name": "sandbox-snowflake", "target_name": "dev", "connection_name": "list_analytics"} */
show terse schemas in database analytics
    limit 10000
[0m11:36:51.712825 [debug] [ThreadPool]: Opening a new connection, currently in state init
[0m11:36:52.957601 [debug] [ThreadPool]: SQL status: SUCCESS 119 in 1.0 seconds
[0m11:36:53.011881 [debug] [ThreadPool]: On list_analytics: Close
[0m11:36:53.372661 [debug] [ThreadPool]: Re-using an available connection from the pool (formerly list_analytics, now list_analytics_dbt_jcohen_dev)
[0m11:36:53.381793 [debug] [ThreadPool]: Using snowflake connection "list_analytics_dbt_jcohen_dev"
[0m11:36:53.382231 [debug] [ThreadPool]: On list_analytics_dbt_jcohen_dev: /* {"app": "dbt", "dbt_version": "1.6.0b4", "profile_name": "sandbox-snowflake", "target_name": "dev", "connection_name": "list_analytics_dbt_jcohen_dev"} */
show terse objects in analytics.dbt_jcohen_dev limit 10000
[0m11:36:53.382470 [debug] [ThreadPool]: Opening a new connection, currently in state closed
[0m11:36:54.382890 [debug] [ThreadPool]: SQL status: SUCCESS 2 in 1.0 seconds
[0m11:36:54.387887 [debug] [ThreadPool]: On list_analytics_dbt_jcohen_dev: Close
[0m11:36:54.748809 [info ] [MainThread]: Concurrency: 8 threads (target='dev')
[0m11:36:54.749807 [info ] [MainThread]: 
[0m11:36:54.755801 [debug] [Thread-1 (]: Began running node model.my_dbt_project.model_a
[0m11:36:54.756728 [debug] [Thread-1 (]: Re-using an available connection from the pool (formerly list_analytics_dbt_jcohen_dev, now model.my_dbt_project.model_a)
[0m11:36:54.757151 [debug] [Thread-1 (]: Began compiling node model.my_dbt_project.model_a
[0m11:36:54.757645 [debug] [Thread-1 (]: Timing info for model.my_dbt_project.model_a (compile): 11:36:54.757426 => 11:36:54.757430
[0m11:36:54.758014 [debug] [Thread-1 (]: Began executing node model.my_dbt_project.model_a
[0m11:36:54.775804 [debug] [Thread-1 (]: On "model.my_dbt_project.model_a": cache miss for schema "analytics.dbt_jcohen_prod", this is inefficient
[0m11:36:54.778532 [debug] [Thread-1 (]: Using snowflake connection "model.my_dbt_project.model_a"
[0m11:36:54.778925 [debug] [Thread-1 (]: On model.my_dbt_project.model_a: /* {"app": "dbt", "dbt_version": "1.6.0b4", "profile_name": "sandbox-snowflake", "target_name": "dev", "node_id": "model.my_dbt_project.model_a"} */
show terse objects in analytics.dbt_jcohen_prod limit 10000
[0m11:36:54.779255 [debug] [Thread-1 (]: Opening a new connection, currently in state closed
[0m11:36:55.787694 [debug] [Thread-1 (]: SQL status: SUCCESS 5 in 1.0 seconds
[0m11:36:55.793394 [debug] [Thread-1 (]: While listing relations in database=analytics, schema=dbt_jcohen_prod, found: ANOTHER_MODEL, MODEL_A, MODEL_B, MY_MODEL, MY_SEED
[0m11:36:55.842689 [debug] [Thread-1 (]: Writing runtime sql for node "model.my_dbt_project.model_a"
[0m11:36:55.845516 [debug] [Thread-1 (]: Using snowflake connection "model.my_dbt_project.model_a"
[0m11:36:55.845921 [debug] [Thread-1 (]: On model.my_dbt_project.model_a: /* {"app": "dbt", "dbt_version": "1.6.0b4", "profile_name": "sandbox-snowflake", "target_name": "dev", "node_id": "model.my_dbt_project.model_a"} */
create or replace   view analytics.dbt_jcohen_dev.model_a
  
   as (
    
        select * from analytics.dbt_jcohen_prod.model_a
    
  );
[0m11:36:56.184105 [debug] [Thread-1 (]: SQL status: SUCCESS 1 in 0.0 seconds
[0m11:36:56.215770 [debug] [Thread-1 (]: Timing info for model.my_dbt_project.model_a (execute): 11:36:54.758266 => 11:36:56.215606
[0m11:36:56.216137 [debug] [Thread-1 (]: On model.my_dbt_project.model_a: Close
[0m11:36:56.587539 [debug] [Thread-1 (]: Finished running node model.my_dbt_project.model_a
[0m11:36:56.589991 [debug] [Thread-3 (]: Began running node model.my_dbt_project.model_b
[0m11:36:56.591451 [debug] [Thread-3 (]: Acquiring new snowflake connection 'model.my_dbt_project.model_b'
[0m11:36:56.592146 [debug] [Thread-3 (]: Began compiling node model.my_dbt_project.model_b
[0m11:36:56.592824 [debug] [Thread-3 (]: Timing info for model.my_dbt_project.model_b (compile): 11:36:56.592575 => 11:36:56.592579
[0m11:36:56.593252 [debug] [Thread-3 (]: Began executing node model.my_dbt_project.model_b
[0m11:36:56.600984 [debug] [Thread-3 (]: Writing runtime sql for node "model.my_dbt_project.model_b"
[0m11:36:56.602917 [debug] [Thread-3 (]: Using snowflake connection "model.my_dbt_project.model_b"
[0m11:36:56.603500 [debug] [Thread-3 (]: On model.my_dbt_project.model_b: /* {"app": "dbt", "dbt_version": "1.6.0b4", "profile_name": "sandbox-snowflake", "target_name": "dev", "node_id": "model.my_dbt_project.model_b"} */
create or replace   view analytics.dbt_jcohen_dev.model_b
  
   as (
    
        select * from analytics.dbt_jcohen_prod.model_b
    
  );
[0m11:36:56.603966 [debug] [Thread-3 (]: Opening a new connection, currently in state init
[0m11:36:57.722641 [debug] [Thread-3 (]: SQL status: SUCCESS 1 in 1.0 seconds
[0m11:36:57.728537 [debug] [Thread-3 (]: Timing info for model.my_dbt_project.model_b (execute): 11:36:56.593543 => 11:36:57.728157
[0m11:36:57.729557 [debug] [Thread-3 (]: On model.my_dbt_project.model_b: Close
[0m11:36:58.086873 [debug] [Thread-3 (]: Finished running node model.my_dbt_project.model_b
[0m11:36:58.089987 [debug] [MainThread]: Connection 'master' was properly closed.
[0m11:36:58.090496 [debug] [MainThread]: Connection 'model.my_dbt_project.model_a' was properly closed.
[0m11:36:58.090840 [debug] [MainThread]: Connection 'model.my_dbt_project.model_b' was properly closed.
[0m11:36:58.091862 [debug] [MainThread]: Command end result
[0m11:36:58.108549 [info ] [MainThread]: 
[0m11:36:58.109036 [info ] [MainThread]: [32mCompleted successfully[0m
[0m11:36:58.109432 [info ] [MainThread]: 
[0m11:36:58.109785 [info ] [MainThread]: Done. PASS=2 WARN=0 ERROR=0 SKIP=0 TOTAL=2
[0m11:36:58.110456 [debug] [MainThread]: Command `dbt clone` succeeded at 11:36:58.110325 after 6.42 seconds
[0m11:36:58.110747 [debug] [MainThread]: Flushing usage events

aranke avatar Jun 21 '23 15:06 aranke

I said in https://github.com/dbt-labs/dbt-core/pull/7881#discussion_r1237123614:

IIRC - compile and docs generate don't run in DAG order. (They also interrupt the entire command on the first failure encountered during compilation.) I think the inconsistency is justifiable, if the parallelism delivers significantly better performance.

After looking into it a bit more, I don't think this is actually is true. It would make sense, though — even if model_b depends on model_a (e.g. for an introspective query), we're not actually materializing model_a, so it makes no difference whether we compile it before/after/concurrently with model_b.

So: This would be a first in dbt, and require some modification to how we iterate over the job/graph queue.

jtcohen6 avatar Jun 26 '23 10:06 jtcohen6

Notes from refinement:

  • where would we want to put this change?
  • we could remove all edges in the node so they're all the "same", but we'd be constructing a graph in a way different from the DAG order
  • for clone, we should always be using the maximum number of threads
  • instead, could we run in a different "mode" (where we don't worry about if they're are edges)
  • this may have implications to ephemeral models

Where would this speed things up and by how much? @jtcohen6 do you have an example here to understand the concrete benefit of doing this - we'd like to understand what the performance gain would be to decide if this change is worth it

graciegoheen avatar Aug 21 '23 15:08 graciegoheen

We still need to construct a DAG for the dbt clone task to support node selection — but when it comes time to execute, we can treat it as a "flat" list, not a directed graph with edges. I agree a different run "mode" makes sense here. The same "run mode" could apply to compile + docs generate.

It's a fair thought re: ephemeral models. The good news is, at least for dbt clone, they're totally irrelevant: They're not cloned, and we don't need to interpolate them into the compiled_code of other nodes.

I believe this would significantly speed up the execution of dbt clone, though "by how much" will depend on the size & shape of a given project's DAG and the actual concurrency that Snowflake can handle.

In theory, if you're running with as many threads as you have selected models for cloning:

  • Let's say it takes X seconds to clone one model (whether creating a table clone or a "pointer" view). Conservatively, let's say this is ~2 seconds.
  • Current state is that dbt clone will take no less than X seconds * Y, where Y is the greatest parent-child depth in the DAG, because each parent will be cloned before its child.
  • This could be as fast as X (total) — up to the level of parallelism that Snowflake can actually support, which is not documented :)

Meaning: The maximum theoretical speedup is to run this Y times faster, where Y is the greatest parent-child depth among selected resources. In dbt Labs' internal-analytics project, Y = 31.

import networkx
G = networkx.readwrite.gpickle.read_gpickle("target/graph.gpickle")
networkx.dag_longest_path(G)
print(len(networkx.dag_longest_path(G)))

Instead of dbt clone taking ~200 seconds, it could be taking <10 seconds. In theory!!

We're never going to achieve that theoretical speedup in practice, but I'd hope it could be pretty darn significant. There is the additional latency due to dbt maintaining a central adapter cache, which each thread must lock while updating. @peterallenwebb had identified some bonus cache-related slowness in https://github.com/dbt-labs/dbt-core/pull/6844. I had tried pulling that change into my previous spike PR, and it shaved off ~40% of the total runtime: https://github.com/dbt-labs/dbt-core/pull/7258#issuecomment-1501587554.

jtcohen6 avatar Aug 23 '23 12:08 jtcohen6

The same "run mode" could apply to compile + docs generate.

I believe this will also be relevant for our unit testing work, since unit tests do not need to run in DAG order

graciegoheen avatar Aug 23 '23 20:08 graciegoheen

A simple approach that might work here to achieve an 'maximally parallelized execution mode' would be to modify the get_graph_queue method to accept an optional config that builds the graph queue without any edges.

MichelleArk avatar Aug 28 '23 15:08 MichelleArk

More notes from refinement:

  • we should do something deterministic here
  • this feels similar to the ordering of the unit tests in the build command (having some flexibility around execution order, a more general approach)
  • for clone, every node becomes independent -> in theory, it could run fully in parallel
  • PriorityQueue determines the order things should be run in, could we use an alternative queue object (flat)?
  • alt. make the DAG without any edges -> just a flat list
  • just depends on where we implement it -> in the queue object vs. changing the DAG
  • theme: the DAG doesn't quite tell us what the execution order is... we need a more general, less coupled mechanism for determining execution order
  • the queue starts with small subset, then looks for more things to add -> instead we have a mode that just accepts a list and works through that list (maybe we need a different mechanism for processing a queue, new flag or new queue object)
  • testing this could be tricky

graciegoheen avatar Jan 02 '24 18:01 graciegoheen

This might also apply to unittest(in test command).

ChenyuLInx avatar May 01 '24 18:05 ChenyuLInx