Add support for mocking `COPY INTO` with Parquet output in `FakeSnowflakeCursor`
In my project I needed to mock COPY INTO statements so that queries could produce Parquet files in a local stage path. To achieve this, I extended fakesnow.cursor.FakeSnowflakeCursor and overrode execute as follows:
def execute(self, command: str, params: Sequence[Any] | dict[Any, Any] | None = None) -> Self:
if self._stage_path is not None:
formatted_command = command % params if params else command
parsed_command = sqlglot.parse_one(formatted_command, read='snowflake')
if isinstance(parsed_command, sqlglot.expressions.Copy):
inner_sql = parsed_command.args['files'][0].this
self._cursor.execute(inner_sql.sql(dialect='snowflake'), params)
self._write_parquet(parsed_command, self._stage_path)
self._results = []
return self
self._cursor.execute(command, params)
self._results = self._cursor.fetchall()
return self
The _write_parquet helper inspects the COPY INTO parameters and writes the results into a Parquet file. Only Parquet is supported today, but it could be expanded in a similar way to handle other formats:
def _write_parquet(
self, copy_command: sqlglot.expressions.Copy, stage_path: pathlib.Path
) -> None:
stage_target = str(copy_command.this.this.this).lstrip('@')
file_type = 'parquet'
compression: _PARQUET_COMPRESSION_TYPES = 'snappy'
for param in copy_command.args.get('params', []):
if str(param.this.this).upper() == 'FILE_FORMAT':
for expr in param.args.get('expressions', []):
prop_name = str(expr.this.this).upper()
prop_value = str(expr.args['value'].this).strip('"\'')
if prop_name == 'TYPE':
file_type = prop_value.lower()
elif prop_name == 'COMPRESSION':
compression = typing.cast(_PARQUET_COMPRESSION_TYPES, prop_value.lower())
if file_type != 'parquet':
raise ValueError(f'Only parquet is supported by the mock. Got TYPE={file_type!r}')
columns = (
[d[0] for d in self._cursor.description] if self._cursor.description else ['_COL_0']
)
fetched_rows = self._cursor.fetchall()
if fetched_rows:
if self._cursor._use_dict_result:
rows = [
tuple(r.get(c) for c in columns)
for r in typing.cast(Sequence[dict[str, Any]], fetched_rows)
]
else:
rows = [tuple(r) for r in fetched_rows]
table = pyarrow.Table.from_arrays(
[pyarrow.array(column) for column in list(zip(*rows, strict=False))],
names=columns,
)
else:
table = pyarrow.table({c: pyarrow.array([], type=pyarrow.null()) for c in columns})
file_path = stage_path.joinpath(stage_target).with_suffix(f'.{file_type}')
file_path.parent.mkdir(parents=True, exist_ok=True)
pyarrow.parquet.write_table(table, file_path, compression=compression)
This approach makes it possible to mock COPY INTO behavior realistically during tests, producing actual Parquet files in a temporary stage directory.
Would you be open to a PR that adds this functionality (or something along these lines) directly into FakeSnowflakeCursor?
Do you have an example unloading query that you'd want to support?
Do you have an example unloading query that you'd want to support?
Something like this:
def test_copy_into_parquet_with_params(
mocked_stage_path: pathlib.Path,
mocked_snow_connection_with_stage: snowflake.connector.SnowflakeConnection,
) -> None:
"""Test that the function copies into a stage with params."""
row_id = uuid.uuid4().bytes
cursor = mocked_snow_connection_with_stage.cursor()
cursor.execute(
"""
COPY INTO @stage/test_output
FROM (SELECT %(id)s AS ID, %(value)s AS VALUE)
FILE_FORMAT = (TYPE = 'PARQUET')
""",
params={'id': row_id, 'value': 'value'},
)
assert pyarrow.parquet.read_table(
mocked_stage_path.joinpath('stage/test_output.parquet')
).to_pylist() == [
{'ID': row_id, 'VALUE': 'value'},
]
Thank you for your prompt response.
Ok got it yeh a PR would be great!
In terms of the implementation I would suggest considering transforming the sql into a duckdb copy command to write the parquet. This would be consistent with how we load parquet and easily enable support for s3.