dagster
dagster copied to clipboard
load_asset_value() fails with resource_config
Dagster version
1.7.4
What's the issue?
I have a customized io manager (S3IOManager) that requires s3_path
in its config:
resources = {
"pyspark_step_launcher": no_step_launcher,
"pyspark": deepsea_pyspark_resource,
"s3_io_manager": S3IOManager(
s3_path='<custom_s3>',
),
"io_manager": mem_io_manager,
Materialization works ok but when trying to access the materialized asset, load_asset_value
fails to use the resource_config:
with defs.get_asset_value_loader(instance=instance) as loader:
df2 = loader.load_asset_value(AssetKey("df_sample_labels"), partition_key='2023-09-18',\
resource_config={'s3_io_manager':{'config':{'s3_path':resources['s3_io_manager'].s3_path}}})
will report error:
File /opt/conda/lib/python3.9/site-packages/dagster/_core/decorator_utils.py:203, in _wrap_with_pre_call_fn.<locals>.wrapped_with_pre_call_fn(*args, **kwargs)
201 if condition is None or condition(*args, **kwargs):
202 pre_call_fn()
--> 203 return fn(*args, **kwargs)
File /opt/conda/lib/python3.9/site-packages/dagster/_core/storage/asset_value_loader.py:166, in AssetValueLoader.load_asset_value(self, asset_key, python_type, partition_key, input_definition_metadata, resource_config, metadata)
136 io_manager_config = get_mapped_resource_config(
137 {io_manager_key: io_manager_def}, io_resource_config
138 )
140 input_context = build_input_context(
141 name=None,
142 asset_key=asset_key,
(...)
163 ),
164 )
--> 166 return io_manager.load_input(input_context)
File io_manager.py:45, in S3IOManager.load_input(self, context)
43 def load_input(self, context):
44 spark = self.pyspark.spark_session
---> 45 print('====load dataframe from {}===='.format(self._get_path(context.upstream_output)))
46 return spark.read.parquet(self._get_path(context.upstream_output))
File io_manager.py:26, in S3IOManager._get_path(self, context)
21 return os.path.join(
22 context.resource_config["s3_path"], context.step_key, partition_name
23 )
24 else:
25 return os.path.join(
---> 26 context.resource_config["s3_path"], context.step_key
27 )
KeyError: 's3_path'
However, if I put both s3_io_manager
and s3_path
in resource_config:
with defs.get_asset_value_loader(instance=instance) as loader:
df2 = loader.load_asset_value(AssetKey("df_sample_labels"), partition_key='2023-09-18',\
resource_config={'s3_io_manager':{'config':{'s3_path':resources['s3_io_manager'].s3_path}}, 's3_path': 'abc'})
It will complain s3_path
can't be there:
DagsterInvalidConfigError: Error in config for resources
Error 1: Received unexpected config entry "s3_path" at the root. Expected: "{ s3_io_manager?: { config?: { s3_path: (String | { env: String }) } } }".
What did you expect to happen?
load_asset_value() will easily reuse the resources to access the materialized assets.
How to reproduce?
No response
Deployment type
Local
Deployment details
No response
Additional information
No response
Message from the maintainers
Impacted by this issue? Give it a 👍! We factor engagement into prioritization.