dagster icon indicating copy to clipboard operation
dagster copied to clipboard

load_asset_value() fails with resource_config

Open zhh210 opened this issue 9 months ago • 0 comments

Dagster version

1.7.4

What's the issue?

I have a customized io manager (S3IOManager) that requires s3_path in its config:

resources = {
    "pyspark_step_launcher": no_step_launcher,
    "pyspark": deepsea_pyspark_resource,
    "s3_io_manager": S3IOManager(
        s3_path='<custom_s3>',
    ),
    "io_manager": mem_io_manager,

Materialization works ok but when trying to access the materialized asset, load_asset_value fails to use the resource_config:

with defs.get_asset_value_loader(instance=instance) as loader:
    df2 = loader.load_asset_value(AssetKey("df_sample_labels"), partition_key='2023-09-18',\
          resource_config={'s3_io_manager':{'config':{'s3_path':resources['s3_io_manager'].s3_path}}})

will report error:


File /opt/conda/lib/python3.9/site-packages/dagster/_core/decorator_utils.py:203, in _wrap_with_pre_call_fn.<locals>.wrapped_with_pre_call_fn(*args, **kwargs)
    201 if condition is None or condition(*args, **kwargs):
    202     pre_call_fn()
--> 203 return fn(*args, **kwargs)

File /opt/conda/lib/python3.9/site-packages/dagster/_core/storage/asset_value_loader.py:166, in AssetValueLoader.load_asset_value(self, asset_key, python_type, partition_key, input_definition_metadata, resource_config, metadata)
    136 io_manager_config = get_mapped_resource_config(
    137     {io_manager_key: io_manager_def}, io_resource_config
    138 )
    140 input_context = build_input_context(
    141     name=None,
    142     asset_key=asset_key,
   (...)
    163     ),
    164 )
--> 166 return io_manager.load_input(input_context)

File io_manager.py:45, in S3IOManager.load_input(self, context)
     43 def load_input(self, context):
     44     spark = self.pyspark.spark_session
---> 45     print('====load dataframe from {}===='.format(self._get_path(context.upstream_output)))
     46     return spark.read.parquet(self._get_path(context.upstream_output))

File io_manager.py:26, in S3IOManager._get_path(self, context)
     21     return os.path.join(
     22     context.resource_config["s3_path"], context.step_key, partition_name
     23 )
     24 else:
     25     return os.path.join(
---> 26     context.resource_config["s3_path"], context.step_key
     27 )

KeyError: 's3_path'

However, if I put both s3_io_manager and s3_path in resource_config:

with defs.get_asset_value_loader(instance=instance) as loader:
    df2 = loader.load_asset_value(AssetKey("df_sample_labels"), partition_key='2023-09-18',\
          resource_config={'s3_io_manager':{'config':{'s3_path':resources['s3_io_manager'].s3_path}}, 's3_path': 'abc'})

It will complain s3_path can't be there:

DagsterInvalidConfigError: Error in config for resources 
    Error 1: Received unexpected config entry "s3_path" at the root. Expected: "{ s3_io_manager?: { config?: { s3_path: (String | { env: String }) } } }".

What did you expect to happen?

load_asset_value() will easily reuse the resources to access the materialized assets.

How to reproduce?

No response

Deployment type

Local

Deployment details

No response

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

zhh210 avatar May 08 '24 18:05 zhh210