dagster icon indicating copy to clipboard operation
dagster copied to clipboard

RFC: LoadableDefs

Open schrockn opened this issue 6 months ago • 4 comments

Summary & Motivation

We've been discussing different ways of lazy loading definitions and I wanted to flesh out some ideas I've had in this direction.

  • There is a new entry point. A function call defs. It optionally takes a context object provided by the system. This is the new default entry point. We change all examples and docs to use it.
  • New class LoadableDefs that used when you want to create a basket of Definitions backed by an external source of some kind.

The contract of LoadableDefs has the following behavior:

  • In order to use it you need to build a subclass.
  • You must provide a key for storage in the superclass __init__. Note: I wish we could get away without needing this. All ears for alt implementation ideas)
  • You must override compute_defs_record, which is the code that hits whatever unreliable backend is the source of truth for definitions. This returns DefsRecord.
  • You must override definitions_from_record, with takes a DefsRecord and returns a Definitions object.

DefsRecord builds on the spec system and make it very convenient to convert to lists of specs. We built up a new (lightweight) layer of classes called "SpecRecord". This is the format we persist spec info into our db and (eventually) event log. This makes a contract pretty easy to define:

  1. You write a function that fetches specs records from the external source.
  2. You write a function that takes those specs records and produces a Defintitons object.

The machinery takes care of storing that DefsRecord object to our key value store (this stashes it in cursor storage for now).

It is also very easy/triviai/automatic to convert spec records to specs in most cases.

By standardizing on this storage format we could also build tools to visualize and manage persisted specs, which I think will become necessary as more critical integrations rely on this and people need to debug things. This is also a logical building block for a more generalized state-driven definition infrastructure.

For the default case where you don't need to anything fancy, you can use a convenience method to load all the loadable defs and merge them:

def fetch_table_names_from_rest_api() -> List[str]:
    # imagine this calling requests or something
    return ["lakehouse_a"]

class LakehouseDefs(LoadableDefs):
    def __init__(self) -> None:
        super().__init__(external_source_key="lakehouse_api")

    def compute_defs_record(self, context: DefLoadingContext) -> DefsRecord:
        return DefsRecord(
            asset_spec_records=[
                AssetSpecRecord(key=AssetKey(table_name))
                for table_name in fetch_table_names_from_rest_api()
            ]
        )

    def definitions_from_record(self, defs_record: DefsRecord) -> Definitions:
        @multi_asset(specs=defs_record.to_asset_specs())
        def an_asset() -> None: ...

        return Definitions(assets=[an_asset])

Both Definitions and LoadableDefinitions can be combined in conveniences apis like load_all.

def defs(context: DefLoadingContext) -> Definitions:
    # convienence function to load all the defs and merge them
    # Definitions.load_all(...)
    return load_all(
        context,
        loadable_legacy_system_defs(),
        LakehouseDefs(),
        HardedPartitionedDefs(),
        Definitions(assets=[another_asset]),
    )

However if you need to do things like evaluate defs and then pass them to other factories, you can do that as well.

def defs(context: DefLoadingContext) -> Definitions:
    # This is now a vanilla definitions object.
    legacy_system_defs = loadable_legacy_system_defs().load(context)

    return Definitions.merge(
        legacy_system_defs,
        Definitions(asset_checks=build_freshness_checks(legacy_system_defs))
    )

It may be worth examining having the context argument. While inconsistent with the rest of Dagster, we could manage a global context stack on the users behalf. This would eliminate the need to thread a context object everywhere, but it is also magical global state that encourages "spooky action at a distance". But refactoring definitions creation code to have access to the context is annoying, and you have to do it the moment you bring integrations that have loadable definitions.

Another important utility here is LoadableCacheableAssetsDefinitionAdapter which would allow us to wrap existing CacheableAssetsDefinition instances:

e.g.

    return LoadableCacheableAssetsDefinitionAdapter(
        LegacySystemCacheableAssetsDefinition("ksjdkfsjdklfjslkdjf")
    )

This adapter is quite simple and I think proves the benefits of this approach. It contains the complexity blast radius of CacheableAssetsDefinition, eliminates the need for the various and sundry subclasses of that class. Once everything is ported to this adapter we will be able to delete a huge amount of code.

Proposed plan of action

  • Do the work to flesh out this diff. We need to do quite a bit of instance threading. We should also add a decorator to make a defs function so we don't rely solely on a hardcoded name.
  • Immediately move all docs and examples to def defs instead of defs = . It is better anyways since by default it will avoid import-based side effects in unit testing scenarios and so forth.
  • Port all existing CacheableAssetDefinition instances to the provided adapter and deprecate any API that returns CacheableAssetsDefinition immediately. Once this is done an we can delete an enormous amount of complexity in the codebase. PendingRepositoryDefinition and all the chicanery around it can be eliminated.

How I Tested These Changes

loadable_example.py

I also refactored Airlift on top of this was fairly pleased *(https://app.graphite.dev/github/pr/dagster-io/dagster/23887/Try-airlift-cacheables-on-new-abstraction) aside from having to refactor examples and tests to deal with the context.

Changelog [New | Bug | Docs]

NOCHANGELOG

schrockn avatar Aug 24 '24 14:08 schrockn