dbt-spark icon indicating copy to clipboard operation
dbt-spark copied to clipboard

[CT-202] Workaround for some limitations due to `list_relations_without_caching` method

Open danfran opened this issue 4 years ago • 45 comments

Describe the feature

I am currently facing an issue using DBT with Spark on AWS/Glue/EMR environment as discussed already in https://github.com/dbt-labs/dbt-spark/issues/215 (but already raised here https://github.com/dbt-labs/dbt-spark/issues/93).

The current issue is about the adapter's method list_relations_without_caching:

https://github.com/dbt-labs/dbt/blob/HEAD/core/dbt/include/global_project/macros/adapters/common.sql#L240

which in the Spark Adapter implementation is:

https://github.com/dbt-labs/dbt-spark/blob/a8a85c54d10920af1c5efcbb4d2a51eb7cfcad11/dbt/include/spark/macros/adapters.sql#L133-L139

In this case you can see that the command show table extended in {{ relation }} like '*' is executed. It will force Spark to go through all the tables info in the schema (as Spark has not Information Schema Layer) in a sort of "discovery mode" and this approach produces two main issues:

  1. Bad performance: some environments can have hundreds or even thousands of tables generated not only by DBT but also by other processes in the same schema. In that case this operation can be very costly, especially when you have different DBT processes that run some updates at different times on a few tables.

  2. Instability, as I verified in AWS/Glue/EMR environment, where you can have views without "S3 Location" defined, like an Athena/Presto view, that will make crash a DBT process running SparkSql on EMR with errors like:

show table extended in my_schema like '*'
  ' with 62bb6394-b13a-4b79-92dd-2e0918831cf3
21/09/18 13:00:22 INFO SparkExecuteStatementOperation: Running query with 62bb6394-b13a-4b79-92dd-2e0918831cf3
21/09/18 13:01:03 INFO DAGScheduler: Asked to cancel job group 62bb6394-b13a-4b79-92dd-2e0918831cf3
21/09/18 13:01:03 ERROR SparkExecuteStatementOperation: Error executing query with 62bb6394-b13a-4b79-92dd-2e0918831cf3, currentState RUNNING,
org.apache.spark.sql.AnalysisException: java.lang.IllegalArgumentException: Can not create a Path from an empty string

Describe alternatives you've considered

I do not see the reason why DBT process should care of the "rest of the world" like the Athena views from before or tables created from other processes that are in the same schema.

So I can think ideally to replace the method:

show table extended in <schema> like '*'

with something like:

show table extended in <schema> like ('<table1>|<table2>|…')

where my <table1>, <table2>, etc. are determined automatically when I run a command like

dbt run --models my_folder

where my_folder contains the files: table1.sql, table2.sql, etc

but from the current method interface, only the schema params can be passed.

Two questions here: How can I infer automatically the name of the tables involved when a command like dbt run --models my_folder run and how can I pass them eventually to the list_relations_without_caching?

Additional context

I found it relevant for Spark on AWS environment but can be potentially a similar issue for other implementations.

Who will this benefit?

On DBT's slack channel I talked to another used "affected" by similar issue, but probably whoever is going to use Spark in distributed environment can be affected by this (AWS and non).

Are you interested in contributing this feature?

Sure, both coding and testing.

danfran avatar Sep 21 '21 15:09 danfran

@danfran Thanks for opening the issue, and apologies for getting back to you here (and over in https://github.com/dbt-labs/dbt-spark/issues/215).

Background

The reason we use show table extended in <schema> like '*' is to build the adapter cache at the start of every run. This is how dbt can know which models already exist, and (crucially) whether they exist as a view or table, so that subsequent calls to get_relation within materializations (e.g. here and here) can be cache lookups rather than redundant, slow introspective queries.

We found we need to use show table extended in <schema> like '*', instead of the much less verbose (and more performant) show tables in <schema>, because show tables does not include the relation type (view or table), only whether the object is temporary. On other databases, this kind of metadata is easy and fast to access, but Apache Spark lacks a real information schema.

(A neat side effect of having to use show table extended in <schema> like '*' is that we can resolve get_columns_in_relation from the adapter cache, when available, which we cannot do on other adapters. This is purely a bonus, though, and it would be worth it to give up this capability in exchange for faster, more reliable cache construction.)

I know that show table extended in <schema> like '*' can be very slow, especially when accessing metadata for schemas with thousands of objects. This feels especially frustrating when most of those objects are not relevant to dbt! As a best practice, we highly recommend that you create dbt objects in a dedicated schema, separate from other processes. I understand it isn't always possible to isolate source tables, but this only impacts catalog querying (docs generate) rather than cache construction (run etc), since schemas with sources are catalogued but not cached.

Proposed change

Okay, now that we've gotten that background out of the way—I think you're onto something here:

So I can think ideally to replace the method:

show table extended in <schema> like '*'

with something like:

show table extended in <schema> like ('<table1>|<table2>|…')

I think this might just be doable!

There's a base adapter method, _get_cache_schemas, that the adapter uses to identify which schemas it ought to cache. That list is derived from the enabled, non-ephemeral relations in the project. Normally, the list includes identifier-less relations, because we just need to run a single metadata query for the entire schema. The list of schema-only identifiers is then passed into the list_relations_without_caching method, i.e. the list_relations_without_caching macro, i.e. show table extended in <schema> like '*'.

But what if we instead passed in a schema relation that did have an identifier? That identifier could look like '<table1>|<table2>|…', and the list_relations_without_caching macro could use it to run a more targeted metadata query.

Any methods defined in BaseAdapter can be overridden within SparkAdapter, so even though _get_cache_schemas isn't defined there now, we can reimplement it.

Here's my rough attempt:

    def _get_cache_schemas(self, manifest: Manifest) -> Set[BaseRelation]:
        """Get the set of schema relations that the cache logic needs to
        populate. This means only executable nodes are included.
        """
        # the cache only cares about executable nodes
        relations = [
            self.Relation.create_from(self.config, node)  # keep the identifier
            for node in manifest.nodes.values()
            if (
                node.is_relational and not node.is_ephemeral_model
            )
        ]
        # group up relations by common schema
        import collections
        relmap = collections.defaultdict(list)
        for r in relations:
            relmap[r.schema].append(r)
        # create a single relation for each schema
        # set the identifier to a '|' delimited string of relation names, or '*'
        schemas = [
            self.Relation.create(
                schema=schema,                
                identifier=(
                    '|'.join(r.identifier for r in rels)
                    # there's probably some limit to how many we can include by name
                    if len(rels) < 100 else '*'
                )
            ) for schema, rels in relmap.items()
        ]
        return schemas

Then, the macro becomes:

{% macro spark__list_relations_without_caching(relation) %}
  {% call statement('list_relations_without_caching', fetch_result=True) -%}
    show table extended in {{ relation.schema }} like '{{ relation.identifier or "*" }}'
  {% endcall %}

  {% do return(load_result('list_relations_without_caching').table) %}
{% endmacro %}

That works for me locally:

show table extended in dev_jerco like 'my_first_dbt_model|my_second_dbt_model'

Next steps

  • What do you think of an approach like that? Would you be interested in taking that and running with it?
  • Should we also try to do something similar for _get_catalog_schemas, to speed up docs generate?
  • I'm going to transfer this issue to the dbt-spark repo, since (I believe) that's where we can make all the code changes we need.
  • Semi-related: In the process of teasing apart this functionality, I noticed that the add_schema_to_cache method doesn't seem to be called/used anywhere. It might be a holdover from a long time ago. It's always a good idea to remove totally unused code.

jtcohen6 avatar Oct 13 '21 10:10 jtcohen6

Hi @jtcohen6 thank you for the insights, very useful! I am more than happy to try what you suggested as seems working around the issue in a good way. I will also try to apply your other suggested points.

danfran avatar Oct 14 '21 13:10 danfran

@danfran I am curious to know whether this worked for you.

ssimeonov avatar Jan 12 '22 00:01 ssimeonov

@ssimeonov sorry for the late answer. Unfortunately my work conditions are changed since then and I was not able to test it in my original environment. However the solution proposed by @jtcohen6 still seems valid. If you are in an AWS environment and you want to test it (included the Athena view / EMR-Hive conflict that makes this fix even more useful) you need just a few tables in order to reproduce the issue and create a valid environment.

danfran avatar Jan 17 '22 11:01 danfran

@jtcohen6 is there a strictly macro override way (without getting into python) to make the macro do as you suggested? I plonked something like this into the /macros folder but didn't quite work (did a few variations, including having if execute around {% for node in).

{% macro spark__list_relations_without_caching(relation) %}
  {% set rels = [] %}
  {% for node in graph.nodes.values() | selectattr("resource_type", "equalto", "model") %}
      {% do rels.append(node.fqn[1]) %}
  {% endfor %}

  {% if rels | length > 1 %}  
    {% set suffix = rels | join('|') %}
  {% else %}
    {% set suffix = '*' %}
  {% endif %}

  {% call statement('list_relations_without_caching', fetch_result=True) -%}
    show table extended in {{ relation }} like {{ suffix }}
  {% endcall %}
  {% do return(load_result('list_relations_without_caching').table) %}
{% endmacro %}

image

I'm guessing surely not because you provided a _get_cache_schemas() function + macro method :grin:

jeremyyeo avatar Jan 18 '22 08:01 jeremyyeo

@jeremyyeo I think this approach would indeed require implementing _get_cache_schemas(), i.e. a change to the python methods in dbt-spark—so there's no way to test this out with purely user-space code in the meantime.

An alternative approach that either I just remembered, or which just occurred to me, in the threads where we're discussing this internally:

  • The reason we initially opted (back in https://github.com/dbt-labs/dbt-spark/issues/49) for show table extended (verbose) over show tables (concise) is because the concise version lacks a field/specifier that enables us to tell between views and tables. (Contrary to the nomenclature, show tables includes both views + tables.)
  • We could look into running two concise queries rather than one verbose one, by running (for each relevant database) show views in <database> + show tables in <database>. Then, we can reasonably infer that every object returned by the former is a view, and every object returned by the latter and not in former is a table.
  • This bare-bones approach would be enough for the initial adapter cache that powers materializations—though it wouldn't let us do incidentally clever things like returning get_columns_in_relation from the cache if available.

It might be worth experimenting with both approaches, and seeing which one yields greater benefits. Does the slowness have more to do with the verbosity of show table extended (contextual information we don't strictly need)? Or more to do with trying to return metadata for thousands of extraneous objects that dbt doesn't care about, but which happen to live alongside relevant objects in the same database/schema?

jtcohen6 avatar Jan 18 '22 09:01 jtcohen6

Does the slowness have more to do with the verbosity of show table extended (contextual information we don't strictly need)?

Yes. SHOW TABLES runs quickly.

ssimeonov avatar Jan 19 '22 04:01 ssimeonov

After speaking a bit more about this with @superdupershant:

  • show tables should indeed be very fast, but show views is likely to be significantly slower at scale
  • Even though show table extended requires pulling out more metadata than the simple show command, filtering on the table name (like 'table_one|table_two|...') is more likely to yield a significant speedup

There are other performance boosts that the dbt-databricks team is planning to take advantage of, enabled by the specificity of their underlying architecture. That feels like an appropriate approach for our two plugins going forward.

jtcohen6 avatar Feb 04 '22 14:02 jtcohen6

This aligns with our experience @jtcohen6.

Forgive my unfamiliarity with DBT architecture: how do capabilities "degrade" based on the quality of metadata available about assets a data production depends on?

I can think of (at least) several levels:

  1. Existence (SHOW TABLES can solve this)
  2. Schema information
  3. Lineage information (view V depends on table W and view X, which in turn depends on table Y)
  4. Data freshness information (things like last data update time)

The reason for the question is that, while the Spark metastore is quite dumb, Delta tables have queryable history and recognizable versioned identifiers (which could be used as an indication of immutable data state).

Further, one could design a simple open metadata standard (say, a single property containing some JSON, implemented via managed table properties in Spark, for example) for getting the metadata DBT needs for advanced functionality.

ssimeonov avatar Feb 04 '22 15:02 ssimeonov

@ssimeonov It's a really fair question!

  1. Existence (+ relation type) is the only information dbt needs for all relations before running, to populate its runtime cache. As each model is materialized, it updates the cache.
  2. Schema information is pulled for post-run metadata ("catalog"), and it can be accessed during the run via well-understood methods (get_columns_in_relation) if needed to power dynamically templated model SQL
  3. dbt infers lineage information by parsing a project — those ref() + source() calls. This doesn't require a database connection at all. However, for some databases (such as "bound" views in Postgres), dbt also queries for inter-object dependencies, so that its runtime cache can correctly handle the implications of drop ... cascade. That's less relevant on most analytical databases, including Spark/Databricks.
  4. Data freshness information, for sources, is currently powered by a separate task — dbt source freshness. This is comparable to docs generate, which generates the "catalog" described in (2). It also checks freshness naively (select max(timestamp_col)) — I know that, for Delta tables, it would make a lot of sense to access native metadata to answer the same question.

In the future, I could see integrating both types of information (freshness + catalog) into other dbt tasks. Overall, I think dbt only stands to benefit from Delta tables that can provide all types of metadata more readily, quickly, and reliably.

jtcohen6 avatar Feb 04 '22 15:02 jtcohen6

Then the path forward seems to be the implementation of your original idea: to generate the like 'table_one|table_two|...' from DBT's project information, is that right?

ssimeonov avatar Feb 04 '22 15:02 ssimeonov

Yes, I think that's right. Let's aim to include this change in a v1.1 beta, and ask folks to test it out in their own projects to ensure it yields the desired speedups.

cc @VersusFacit: This is another cache-related performance question, similar to https://github.com/dbt-labs/dbt-snowflake/issues/83, where benchmarking (and anticipating edge cases) will be the name of the game.

The change proposed in this issue will significantly help in cases where dbt manages some objects, in large databases that also contain thousands of non-dbt objects. If we want to speed up cases where dbt manages thousands of objects, and a given invocation is only selecting a handful to run, we'll need to tackle the larger question of selection-based caching: https://github.com/dbt-labs/dbt-core/issues/4688

jtcohen6 avatar Feb 05 '22 12:02 jtcohen6

Adding another customer onto this issue. Seeing dbt docs generate taking up to an hour or more on job runs. Please msg if more info is required.

crystalro0 avatar Mar 10 '22 22:03 crystalro0

Hoping this change can get added into 1.1 as part of the base code!

It would be a significant benefit for us as we are using Managed Workflows for Apache Airflow (MWAA) and are unable to modify the core code as referenced here https://github.com/dbt-labs/dbt-spark/issues/228#issuecomment-942132634.

With AWS MWAA we provide the requirements.txt (dbt-core==1.0.0) but have no access to the shell to modify the core code after the fact and show table extended in <schema> like '*' continues to take longer and longer and with our implementation show table extended is run for every model step.

Our implementation means we don't do a single dbt run --select <project> as we wanted control over re-running failed models and that control to be handled via restarting the task inside airflow. See https://www.astronomer.io/blog/airflow-dbt-1/ "A Better, Better Way"

Anyways, looking forward to changes that speed up "show table extended" for spark schema objects.

ryanClift-sd avatar Mar 14 '22 19:03 ryanClift-sd

dbt uses show table extended like '*' for two purposes today:

  1. To cache relations at the start of each run, whether for one model or many
  2. To access rich metadata during catalog generation (docs generate)

Cache population: We're tracking the investigation into the two Spark-specific possibilities noted above in #296. There are two branches with sample/experimental code, if you're able to try them out and let us know what you find. We're also thinking about cache speedups via dbt-core changes, e.g. https://github.com/dbt-labs/dbt-core/issues/4688.

Catalog generation: As a separate question, I think it would make a lot of sense to pass specific table info (show table extended like 'one|two|three|...) when grabbing metadata from the catalog. The stakes of missing an object are lower, and source tables/schemas are included. This will require code changes different from the ones sketched out above / in the branches linked by #296. I've opened a new issue to track that work: https://github.com/dbt-labs/dbt-spark/issues/300

jtcohen6 avatar Mar 18 '22 09:03 jtcohen6

@jtcohen6 Thank you for the quick adjustments and 2 branch options!
Sorry for the delay, testing took longer than I thought.

I was able to complete testing and below are my findings and some related context around the testing. Additional Context & Final Thoughts to follow below testing/findings.

TL;DR

  • I'm definitely leaning towards Option2 as well
    • For everyone not interested in the entire thing haha
      • I know, holy wall of text, batman

Testing/Findings

  • All Testing / Findings do not include cluster startup time.
    • For every test of DBT Compile & DBT Run we manually started the cluster
      • once it was started & available then testing commenced
        • This was so no cluster startup time is included in the metadata calls timing test
  • All testing was done on an i3.2xlarge cluster size (61GB of memory, 4 CPUs)
    • 1 Worker
    • 1 Driver

dbt-spark v1.0.0 Code

show table extended in <schema> like '*'

  • In schemas that contain a few objects, performance concerns aren't really existent; overhead is negligible
    • As objects in the schema grow, even if all are related to a DBT Project, this command starts to take longer
    • Spark Cluster size makes no difference here for execution time of show table extended in <schema> like '*'
  • Executing a single command at the beginning of DBT Run to populate the cache or catalog
    • Cache
      • As schemas grow that are quite large definitely slow down
      • schemas may contain objects not relevant to the current DBT Project or any DBT Project at all
        • In some cases many more objects, thereby limiting the efficacy of this command/process
    • Catalog
      • dbt docs generate can suffer from this single call, especially in the schema/project cases above
        • https://github.com/dbt-labs/dbt-core/issues/2570
        • https://github.com/dbt-labs/dbt-core/issues/1576
        • https://github.com/dbt-labs/dbt-bigquery/issues/115
      • Testing on spark shows the show table extended in <schema> like '*' can also be quite lengthy inside schemas that are quite large and/or contain many objects not related to the project/model docs is being generated for/on
  • I have attached a Screenshot showing the execution timings of show table extended in <schema> lilke '*' from the spark cluster as well as DBT command line for both DBT Compile & DBT Run
    • click to enlarge for better quality and readability current production dbt-spark metadata gather

dbt-spark wip-faster-caching-option1

show table extended in <schema> like <relation object|relation object|relation object|....> or as I will refer to it show table extended in <schema> like <list>

  • For any project in any schema this is a great step & more targeted metadata query
    • Only models related to the DBT Project are added to the like <list> for enumeration
  • Executing a subset of models or even a single model in the project
    • all models in the project are still added to the like <list> enumeration
      • Meaning, if we executed just our staging folder or a single model.sql
        • all 1,578 models are added to the like <list>
  • like <list> "limit" testing - Tested with 1,578 Models
    • No rejection from the spark cluster that the like <list> was to large
      • This is our current project size, it doesn't mean that 1,579 will error, but it could
  • Execution timing on this command was less than 3 minutes for the 1,578 models
    • For Smaller Projects inside schemas that have objects not related to the DBT Project, this would be significantly faster metadata query
    • The schema this was tested in has 2,048 Tables and ~1,578 are related to the DBT Project
  • Definitely faster and more targeted than Current v1.0 Code Base
    • Again, executing a subsection of the Project, or a single model still incurs the lookup of all objects
  • I have attached a Screenshot showing the execution timings of show table extended in <schema> like <list> from the spark cluster as well as DBT command line for both DBT Compile and DBT Run
    • click to enlarge for better quality and readability option1 dbt-spark metadata gather

dbt-spark wip-faster-caching-option2

show tables in <schema> show views in <schema>

followed by describe extended <objects> for any "refs" inside the model.sql

  • Show Tables & Show Views in <schema> is lightning fast
    • Both commands on the schema in question execute in less than 10 seconds combined
    • describe extended <object> is quick as well
      • milliseconds execution for any single object
  • DBT Run though suffers in terms of overall execution compared to Option1 due to the describe event occurring every model
  • DBT Compile is the command that suffers the most with this approach
    • DBT Docs would as well due to needing to run describe extended 1 by 1 for these commands compared to like <list>|'*'
    • describe extended <object> doesn't provide whether the column is nullable or not
      • data availability inconsistency between show table extended in <schema> like <list>|'*' and describe extended <object>
  • In regards to some principal concerns here: https://github.com/dbt-labs/dbt-spark/issues/296#issuecomment-1072211686
    • Tested/Verified External Object creation to make sure it still created Delta objects "as "
    • Tested/Verified that "is_incremental()" models still executed properly
    • Tested/Verified that a merge statement was used
      • Our dbt_project.yml absolutely has +file_format: delta set though, maybe that's entirely why this passed my testing
  • I have attached a Screenshot showing the execution timings of show tables in <schema> & show views in <schema> from the spark cluster as well as DBT command line for both DBT Compile and DBT Run option2 dbt-spark metadata gather

Additional Context

How we are executing DBT Projects/Models at Slickdeals

I understand this might not be the recommended or normal approach. Below is an example Screenshot of our implementation and notes following DBT Airflow DAG execution details

  • We have farmed out the execution of every single model to a single Airflow Task where every task calls DBT Run --select model.sql
    • model.sql is every individual model in the graph
    • Allows us to let Airflow handle execution state tracking
      • I know you are all looking at this/state tracking and re-running the DAG
        • https://github.com/dbt-labs/dbt-core/issues/2465
      • Leaves DBT to handle code creation and not have to be the orchestrator as well
    • Allows us to parallelize execution of the entire graph at a very low cost/overhead via MWAA
      • other than the core calls that are executed for every "DBT Run" call
        • This is absolutely what is crushing our execution times & environment
        • we end up having to call show table extended in <schema> like '*' for every model (nearly 900+ times currently)
  • We have a limited DAG capacity though using MWAA
    • why this project isn't entirely broken down into many projects
    • plus all models in this project handle applying replicated data transactions to our Datalake from on premise via the same code template used to generate all the models

I totally understand as well that our implementation might not be normal or recommended and is part, if not entirely, the reason we are experiencing problems. Of course, none of these are your problems but I greatly appreciate the work being put into making caching faster overall

Reasons why we went the above Implementation route

  • Execution State Tracking to restart failed models
    • There may be changes upstream or unexpected data that flows through the system or schema changes that we are not aware of and we wanted the ability to restart the DAG where it failed while letting other tasks to continue to execute and eventually complete their execution path.
  • Parallelization of task execution
    • As we are migrating towards cloud infrastructure, new tools and building an entirely new platform, we needed to be able to execute 100's of steps in parallel as a lot of our DAGs only have limited dependencies and take the view of eventual consistency
      • I love FK's and ensuring referential consistency but as items get more and more distributed that becomes a "secondary" task - say DBT Test
      • Current on premise "DAGs" run 100's of steps in parallel and we needed like for like

Final Thoughts

show table extended in <schema> like '*'

  • Because our implementation calls DBT Run every time, we incur this cost every time
  • This command run by itself this call can take 5-8+ minutes, which isn't terrible really
    • our implementation makes it terrible sadly
  • Regardless of implementation though, this could be more a more targeted cache query, especially for others use cases as well as schemas that contain objects unrelated to DBT DAG processing

show table extended in <schema> like <list>

  • I like it overall for DBT Compile & DBT Docs
  • Potential serious draw back due to the could get too long to be executed and error
  • For DBT Run it's still faster than like '*'
  • I think this fits the methodology that is recommended for executing DBT DAGs
    • probably how it is normally executed by most using DBT
      • Could be wrong though
  • Eventually the list will get so long where performance could "suffer" again?
    • Not sold on this, but maybe

show tables in <schema> show views in <schema> followed by describe extended <objects> for any "refs" inside the model.sql

  • I love this method overall, especially for our implementation!
    • In fact I'm trying to get it into a python wheel and uploaded to pypi so we can install it to fix some of our current production issues
    • I understand that this is purely a work in progress and there might be other drawbacks were missing or enabling by using this methodology, especially asap.
      • I understand all code is "as is" etc
  • This is of course the most targeted metadata query gathering and ideal for how we have implemented DBT
  • As you've outlined here https://github.com/dbt-labs/dbt-spark/issues/296#issuecomment-1072211686 , this method is faster in larger models and slower in small models
    • I don't want you to have to go back to a single describe extended table approach indicated in your last paragraph here https://github.com/dbt-labs/dbt-spark/issues/295#issue-1149338024
      • You do see a slow down vs Option1 with a single call to populate the cache and that call being more targeted
  • In that regards, you also mention in https://github.com/dbt-labs/dbt-spark/issues/296#issuecomment-1072211686 Principal risk of approach 2: is that you'd be missing is_delta
    • describe extended does have a Provider attribute available, maybe that can be used to reduce this risk? describe extended output
    • Not saying that you don't have to run an additional query describe extended <object> to get these results, but could those then be cached?
      • Thereby allowing you to still do cache lookup checks?
        • Just thinking out loud here, maybe you still cannot and must run the describe extended <object> again

I don't want you to have to finagle support for both Option1 and Option2

  • Overall I'm leaning towards Option2 heavily.
    • Slightly biased by the significant benefit it would have for our implementation of course
    • I still feel that option 2 overall is more targeted
      • sadly the describe extended call must be accepted and that might be a sort of return back to the slower approach that you mention in your last paragraph here https://github.com/dbt-labs/dbt-spark/issues/295#issue-1149338024
  • Our implementation would still be faster with Option1
    • Option2 is exactly what we'd want and would like to with from our perspective
      • Open Source being what it is and all
  • I feel like Option 2 is the most performant
  • I'm definitely leaning towards Option2 as well

Again, thank you very much for the two options, and to everyone if you got this far kudos!

ryanClift-sd avatar Mar 21 '22 10:03 ryanClift-sd

@jtcohen6 Thanks for your work on this, it is so cool for me.

I have a suggestion, in the wip-faster-caching-option1, would you consider passing the select models to the function instead of the manifest if the user selects some models.

I set up airflow like @ryanClift-sd, only one model is running in a task. It would be a significant benefit for this usage.

TalkWIthKeyboard avatar Apr 29 '22 08:04 TalkWIthKeyboard

@ryanClift-sd Thanks for the amazing comment + experimentation! I agree, I'm leaning pretty strongly in the direction of approach 2 (show tables + show views). It performs much better at scale, it's more consistent with the ways we're thinking about caching everywhere else, and the trade-offs are much smaller.

This isn't work we've been able to prioritize yet. If you (or anyone else) would be interested in giving a PR a go, I can offer what support I can to see it over the finish line that much sooner. I see you've already got a PR merged on your fork!

There's follow-on core work around:

  • Caching only selected resources, rather than all enabled manifest resources (https://github.com/dbt-labs/dbt-core/issues/4961). The first cut of this available as an "experimental" config in v1.1 (released yesterday): https://docs.getdbt.com/reference/global-configs#cache-database-objects-for-selected-resource. I think this is exactly what you're asking after @TalkWIthKeyboard!
  • Reimplementing SchemaSearchMap to support catalog queries that filter down to the object level, rather than the schema level (https://github.com/dbt-labs/dbt-core/issues/4997, https://github.com/dbt-labs/dbt-spark/issues/300). The stakes of "missing" one object when cataloging are much lower than when caching.

jtcohen6 avatar Apr 29 '22 09:04 jtcohen6

@jtcohen6 Thank you for your quick response, sure, I see that the 1.1.0 version was able to provide some performance optimization for the way I use it.

For example, I have a database with 3 schemas: schema_a, schema_b, schema_c. I want to run schema_a.table1 now, my command is dbt run —select schema_a.table1 , before version 1.1.0, _get_cache_schemas() will execute:

  • show table extended in schema_a like '*'
  • show table extended in schema_b like '*'
  • show table extended in schema_c like '*'

after the optimization of version 1.1.0, it will only execute:

  • show table extended in schema_a like '*'

I was wondering if it could go one step further and just query table1 information:

  • show table extended in schema_a like 'table1'

The advantage of this is that when I have a large schema(with thousands of tables in it) I can have a good performance improvement when I only run a small number of models.

Also, Option2 is great, I have deployed it in a production environment and it is working well. But there is one small problem, I found that the show table result does not have a database column, it is called namespace. It will cause an NPE error in code, this may be related to the Spark version, I am using version 3.1.2.

Finally, I am also interested in implementing this feature. Could you please evaluate whether the optimization of option 1 is feasible and better? If so, I will work on it; if not, I will work on option 2. Thank you very much for your time.

TalkWIthKeyboard avatar Apr 29 '22 12:04 TalkWIthKeyboard

@jtcohen6 @TalkWIthKeyboard yes, Spark will use database or namespace based on the Spark version. dbt code should expect and support either. The change was made for ANSI SQL compatibility reasons.

ssimeonov avatar Apr 29 '22 22:04 ssimeonov

Thanks for the context there @ssimeonov. I agree, dbt should be able to pick it out as either database or namespace, whichever is available. I sense that this may be related to the larger changes coming in #281, which will require (optionally) rolling back some of the namespacing rules we've had in dbt-spark to date, again depending on the Spark version.

@TalkWIthKeyboard Given the evidence we've laid out thus far, I think option 2 seems by far preferable. (Crucially, option 1 risks cache misses.) Even for schemas with many many objects, show tables + show views seems fast enough to manage. If that's something you'd be interested in working on, I'd welcome a PR for it!

jtcohen6 avatar May 02 '22 08:05 jtcohen6

@jtcohen6 Sure, I’ll polish the code and submit a PR base on option 2. I’ve checked the Spark source code and confirmed that Spark will use database or namespace as the field name is a separate thing from #281.

The issue was imported from v3.0.2 because Spark updated the Catalog class in the v3.0.0 (it may be due to ASNI SQL compatibility, which I haven’t confirmed, but it does not affect the current issue). The tricky part is that we can’t get the Spark version and configurations through SQL (Databrick runtime can get config). so I have two options:

  • Add SET spark.sql.legacy.keepCommandOutputSchema=true to the top of the spark__list_tables_without_caching and spark__list_views_without_caching marco, this would eliminate the impact of this issue and still use the database access element, but override the user’s configuration directly doesn’t seem like a good thing and could affect the user’s judgment of the SQL result.

  • Select existing ones from namespace and database :

    if 'namespace' in tbl:
        schema = tbl['namespace']
    elif 'database' in tbl:
        schema = tbl['database']
    else:
        logger.debug(f"Error while retrieving information about {schema_relation}: "
                     f"there is no database nor namespace field in the table")
        return []
    

    this solution will always work as long as subsequent changes don’t make the database and namespace appear at the same time. It may not look elegant enough, but it is very simple.

I prefer the second, please let me know if you have some good ways to get the Spark version and configurations.

TalkWIthKeyboard avatar May 02 '22 16:05 TalkWIthKeyboard

Thanks @TalkWIthKeyboard!! Yes, I'm happy with the simpler if/elif/else solution. I don't imagine they're planning to change this again anytime soon.

jtcohen6 avatar May 02 '22 16:05 jtcohen6

Thanks @TalkWIthKeyboard!! Yes, I'm happy with the simpler if/elif/else solution. I don't imagine they're planning to change this again anytime soon.

OK, it is better.

TalkWIthKeyboard avatar May 02 '22 16:05 TalkWIthKeyboard

Very interested in this fix -- @jtcohen6 @TalkWIthKeyboard do you think there's anything I could do to help with #433, or do you think it's best to let y'all handle it?

spenaustin avatar Nov 01 '22 21:11 spenaustin

I'm working with a dbt Cloud customer who have an existing, long-lived Spark cluster and are keen to try dbt Core/Cloud with it. They have 10,000+ objects in Spark and are experiencing this issue. They are following along to see its resolution!

boxysean avatar Nov 09 '22 10:11 boxysean

Is there any progress with a fix? The solution jtcohen6 suggested really benefits our use-case... Is there any estimations?

theromsh avatar Feb 01 '23 16:02 theromsh

Per @smtaylorslc @theromsh @boxysean I'll add that our team has moved away from dbt Cloud because of this problem.

ssimeonov avatar Feb 01 '23 17:02 ssimeonov

I ran into the same issue, it took ~1hr to run dbt docs generate. Is there any fix for this?

tinolyu avatar Feb 14 '23 01:02 tinolyu

While waiting for this to be implemented I decided to use @ryanClift-sd 's dbt-spark wip-faster-caching-option1 and stuck with the further problem: it seems that dbt-core logic is broken and schema list isn't being generated for sources. While _get_cache_schemas() correctly generate list from manifest, we end up in list_relation() for the sources claiming that

we can't build the relations cache because we don't have a manifest so we can't run any operations.

even thought the manifest definitely exists because it is being used by _get_cache_schemas(). After adding some logging:

dbt_core_missed_mainfest

I'm not sure whom this problem should be addressed to.

VShkaberda avatar Jul 28 '23 14:07 VShkaberda