dbt-core icon indicating copy to clipboard operation
dbt-core copied to clipboard

[CT-466] [Feature] Make `run-operation` accept selectors to be able to use the `selected_resources` Jinja variable

Open b-per opened this issue 3 years ago • 12 comments

Is there an existing feature request for this?

  • [X] I have searched the existing issues

Describe the Feature

This is a spin-off from #3471. The PR #5001 is adding selected_resources to the Jinja context but this is currently not available to the run-operation command that doesn't allow usage of selectors.

The approach I was thinking to follow would be to:

  1. allow run-operation to accept selectors arguments
  2. move the variables job_queue, _flattened_nodes and previous_state from the GraphRunnableTask to the ManifestTask https://github.com/dbt-labs/dbt-core/blob/0ec829a096839bcba8aa9bcdd7ced8db87942f47/core/dbt/task/runnable.py#L106-L115

The impact of this though would be that run-operation would then build the graph, which it is not doing today, which will imply a longer runtime for the command.

A question is also what type of parameters do we want to allow run-operation to use?

My original thinking is all the build ones but I'd be keen for some feedback. The build ones are:

  • --select
  • --exclude
  • --state
  • --resource-type
  • --selector
  • (and I am not sure about --indirect-selection)

Describe alternatives you've considered

Not making run-operation be able to leverage the selected_resources variable

Who will this benefit?

People who want to write macros and operations that leverage dbt selectors.

Are you interested in contributing this feature?

Yes, I have some draft code already

Anything else?

No response

b-per avatar Apr 07 '22 09:04 b-per

Love the issue @b-per! I'll leave some comments below, but I also don't want to stop someone from the Execution team from chiming in with more.

From my perspective, there are two things that "real" dbt tasks have access to today, which run-operation (custom user macros) don't:

  1. Real selection logic — accessible to the dbt-Jinja context as of #5001
  2. Parallelism. There's no simple way to make this work in Jinja, I think... and it's definitely come up as something that would be highly desirable: https://github.com/dbt-labs/dbt-external-tables/issues/109

(There's one other distinction between run-operation and other "runnable" tasks, which is highly technical and hard to wrap your head around. Suffice to say, ref works slightly differently, and ephemeral models are unsupported. For an overview: #4851)

What you're getting at feels both like a natural extension of run-operation, and like a slightly different thing: The ability to compile a DAG, and perform multi-threaded processing of the nodes in that DAG, where each node undergoes a custom process, defined by a user-provided macro.

That's honestly not far off from how other tasks work today:

  • dbt source freshness does this with the collect_freshness macro (just sources, not in DAG order)
  • dbt run (seed/snapshot/etc) does this, where the user-provided macro is a materialization

So there's a lot of existing art here to leverage. The question is, how to factor this code, and present these concepts, in a way that makes sense. (Is this... custom tasks? #2381)

jtcohen6 avatar Apr 07 '22 17:04 jtcohen6

@jtcohen6 Pretty much covered this already, but here's a few more thoughts:

move the variables job_queue, _flattened_nodes and previous_state from the GraphRunnableTask to the ManifestTask

We probably want to make the RunOperationTask inherit from GraphRunnableTask rather than move the vars.

Parallelism. There's no simple way to make this work in Jinja, I think... and it's definitely come up as something that would be highly desirable

This is def going to be a thing and I'd say that it's a requirement to add this as a proper feature to dbt.

That said, it seems like a sound idea and probably quite useful if we can sort out now to keep it parallelized.

iknox-fa avatar Apr 07 '22 20:04 iknox-fa

I'll admit that my original idea didn't take the parallelism in consideration.

The use case I was thinking of at first was an existing run-operation that people might want to filter based on the --selector provided (like running dbt_meta_testing but on a selected number of models, using the power of dbt selectors). In those cases, this is actually just 1 operation looping through the graph that wouldn't get any value from parallelism as it is actually not even running any SQL.

Should those operations with no/low actual queries then be differentiated from operations that rely on many calls to the Warehouse and would need parallelism to be performant?

I could help with the "1. Real selection logic" but "2. Parallelism" looks like requiring much more work and design decisions. I think that 1 without 2 could already cover some use cases but if we want to solve both at the same time and avoid rework I might just leave this to someone else.

b-per avatar Apr 08 '22 11:04 b-per

I have been looking at it a bit more and I think that there is another design decision that needs to be taken: What kind of resources would we want selected_resources to contain when generated from run-operation

Currently, we have:

  • test returns tests only
  • run returns models only
  • build returns all nodes + tests, e.g. models, seeds, snapshots and tests

If we want to leverage selected_resources and run-operation for dbt_external_tables, we would most likely want to add the selected sources to the variable.

The question then becomes if we also want to add the selected metrics and exposures

b-per avatar Apr 12 '22 16:04 b-per

This would be helpful for the Slim CI in dbt cloud workflow which includes a run-operation to zero-copy-clone tables into the temporary CI schema. With 1,000+ models, this takes ~15+ minutes.

If the cloning run-operation could accept selectors then -s 1+state:modified would cut this down to <1 minute for most PRs, and cut down on total CI time immensely.

jaysobel avatar Oct 04 '22 00:10 jaysobel

Coming back to this, I think it makes sense to treat separately the two paths I was describing above:

  1. run-operation + node selection — this issue
  2. User-space access to threads/parallelism — future, out of scope for this issue

Support for node selection in run-operation, whereby users can access both the graph and the selected_resources variable within the Jinja context, would be a compelling way to unlock/unblock some advanced use cases. We could even do a bit more of the legwork, and avoid some repeated boilerplate, by creating a context member that is just the selected subset of the graph. Pseudo-code: [node for node in graph.nodes.values() if node.unique_id in selected_resources], and the same for sources/exposures/metrics.

Support for threads/parallelism. We shouldn't try exposing this to the Jinja context. Instead, we should (a) add built-in support for common use cases where multi-threading is essential, and (b) work toward a future where users can define their own custom tasks/commands (in Python).

jtcohen6 avatar Jan 22 '23 14:01 jtcohen6

Would it be reasonable to enable an additional configuration on table, snapshot and incremental materialization to clone_from another database location / environment? Going the materialization route enables parallelism. It also feels reasonable that the result of a materialization is in fact a clone.

matt-winkler avatar May 19 '23 14:05 matt-winkler

@matt-winkler if we enabled this additional configuration for the table materialization,

  1. what might an example model {{ config(...) }} block look like?
  2. what SQL would you imagine being executed as a result of that configuration?
    • Two different examples would be nice, so let's suppose dbt-snowflake and dbt-postgres

dbeatty10 avatar May 21 '23 00:05 dbeatty10

For Snowflake:

-- in models/fct_orders.sql
{{
    config(
        materialized = 'table',
        tags=['finance'],
        clone_from={'database': 'analytics_mwinkler_dbt_workspace', 'schema': 'dbt_mwinkler'}
    )
}}
-- in macros/materializations/snowflake__create_table_as.sql
{% macro snowflake__create_table_as(temporary, relation, compiled_code, language='sql') -%}
  {%- if language == 'sql' -%}
    {%- set transient = config.get('transient', default=true) -%}
    {%- set clone_from = config.get('clone_from', default=none) -%}
    {%- set cluster_by_keys = config.get('cluster_by', default=none) -%}
    {%- set enable_automatic_clustering = config.get('automatic_clustering', default=false) -%}
    {%- set copy_grants = config.get('copy_grants', default=false) -%}

    {%- if clone_from is not none -%}
      {# 
        needs logic to validate clone_from has database / schema config'd 
      #}
      {%- set clone_database = clone_from['database'] -%}
      {%- set clone_schema = clone_from['schema'] -%}
    {%- endif -%}

    {%- if cluster_by_keys is not none and cluster_by_keys is string -%}
      {%- set cluster_by_keys = [cluster_by_keys] -%}
    {%- endif -%}
    {%- if cluster_by_keys is not none -%}
      {%- set cluster_by_string = cluster_by_keys|join(", ")-%}
    {% else %}
      {%- set cluster_by_string = none -%}
    {%- endif -%}
    {%- set sql_header = config.get('sql_header', none) -%}

    {{ sql_header if sql_header is not none }}

        create or replace {% if temporary -%}
          temporary
        {%- elif transient -%}
          transient
        {%- endif %} table {{ relation }} 
        {% if clone_from -%}
          clone {{ clone_database }}.{{ clone_schema }}.{{ relation.name }};
        
        {%- else -%}
          {#
            What of the below needs to be applied to a clone? Anything?
          #}
          {%- set contract_config = config.get('contract') -%}
          {%- if contract_config.enforced -%}
            {{ get_assert_columns_equivalent(sql) }}
            {{ get_table_columns_and_constraints() }}
            {% set compiled_code = get_select_subquery(compiled_code) %}
          {% endif %}
          {% if copy_grants and not temporary -%} copy grants {%- endif %} as
          (
            {%- if cluster_by_string is not none -%}
              select * from (
                {{ compiled_code }}
                ) order by ({{ cluster_by_string }})
            {%- else -%}
              {{ compiled_code }}
            {%- endif %}
          );

        {%- endif -%}

      {% if cluster_by_string is not none and not temporary -%}
        alter table {{relation}} cluster by ({{cluster_by_string}});
      {%- endif -%}
      {% if enable_automatic_clustering and cluster_by_string is not none and not temporary  -%}
        alter table {{relation}} resume recluster;
      {%- endif -%}

  {%- elif language == 'python' -%}
    {{ py_write_table(compiled_code=compiled_code, target_relation=relation, temporary=temporary) }}
  {%- else -%}
      {% do exceptions.raise_compiler_error("snowflake__create_table_as macro didn't get supported language, it got %s" % language) %}
  {%- endif -%}

{% endmacro %}

matt-winkler avatar May 26 '23 18:05 matt-winkler

@matt-winkler Is there anything that you prefer about a clone_from config, versus the proposal in #7258?

That proposal shakes out to:

  • a first-class clone materialization
  • a new command, dbt clone, that will clone all selected resources (using the clone materialization instead of their otherwise configured one)

jtcohen6 avatar May 26 '23 19:05 jtcohen6

Even better @jtcohen6. 0 issues with that and will avoid stacking on top of existing materialization logic in a confusing way.

I'll pause on this approach. LMK if anything I can help with to accelerate the other.

I think then the CI process looks like

dbt clone -s config.materialized:incremental dbt build -s @state:modified

yeah?

matt-winkler avatar May 26 '23 19:05 matt-winkler

This issue has been marked as Stale because it has been open for 180 days with no activity. If you would like the issue to remain open, please comment on the issue or else it will be closed in 7 days.

github-actions[bot] avatar Feb 24 '24 01:02 github-actions[bot]