fakesnow icon indicating copy to clipboard operation
fakesnow copied to clipboard

Add support for mocking `COPY INTO` with Parquet output in `FakeSnowflakeCursor`

Open zetaattopsortcom opened this issue 4 months ago • 3 comments

In my project I needed to mock COPY INTO statements so that queries could produce Parquet files in a local stage path. To achieve this, I extended fakesnow.cursor.FakeSnowflakeCursor and overrode execute as follows:

    def execute(self, command: str, params: Sequence[Any] | dict[Any, Any] | None = None) -> Self:
        if self._stage_path is not None:
            formatted_command = command % params if params else command
            parsed_command = sqlglot.parse_one(formatted_command, read='snowflake')
            if isinstance(parsed_command, sqlglot.expressions.Copy):
                inner_sql = parsed_command.args['files'][0].this
                self._cursor.execute(inner_sql.sql(dialect='snowflake'), params)
                self._write_parquet(parsed_command, self._stage_path)
                self._results = []
                return self
        self._cursor.execute(command, params)
        self._results = self._cursor.fetchall()
        return self

The _write_parquet helper inspects the COPY INTO parameters and writes the results into a Parquet file. Only Parquet is supported today, but it could be expanded in a similar way to handle other formats:

    def _write_parquet(
        self, copy_command: sqlglot.expressions.Copy, stage_path: pathlib.Path
    ) -> None:
        stage_target = str(copy_command.this.this.this).lstrip('@')

        file_type = 'parquet'
        compression: _PARQUET_COMPRESSION_TYPES = 'snappy'

        for param in copy_command.args.get('params', []):
            if str(param.this.this).upper() == 'FILE_FORMAT':
                for expr in param.args.get('expressions', []):
                    prop_name = str(expr.this.this).upper()
                    prop_value = str(expr.args['value'].this).strip('"\'')
                    if prop_name == 'TYPE':
                        file_type = prop_value.lower()
                    elif prop_name == 'COMPRESSION':
                        compression = typing.cast(_PARQUET_COMPRESSION_TYPES, prop_value.lower())

        if file_type != 'parquet':
            raise ValueError(f'Only parquet is supported by the mock. Got TYPE={file_type!r}')

        columns = (
            [d[0] for d in self._cursor.description] if self._cursor.description else ['_COL_0']
        )
        fetched_rows = self._cursor.fetchall()
        if fetched_rows:
            if self._cursor._use_dict_result:
                rows = [
                    tuple(r.get(c) for c in columns)
                    for r in typing.cast(Sequence[dict[str, Any]], fetched_rows)
                ]
            else:
                rows = [tuple(r) for r in fetched_rows]
            table = pyarrow.Table.from_arrays(
                [pyarrow.array(column) for column in list(zip(*rows, strict=False))],
                names=columns,
            )
        else:
            table = pyarrow.table({c: pyarrow.array([], type=pyarrow.null()) for c in columns})

        file_path = stage_path.joinpath(stage_target).with_suffix(f'.{file_type}')
        file_path.parent.mkdir(parents=True, exist_ok=True)
        pyarrow.parquet.write_table(table, file_path, compression=compression)

This approach makes it possible to mock COPY INTO behavior realistically during tests, producing actual Parquet files in a temporary stage directory.

Would you be open to a PR that adds this functionality (or something along these lines) directly into FakeSnowflakeCursor?

zetaattopsortcom avatar Aug 18 '25 21:08 zetaattopsortcom

Do you have an example unloading query that you'd want to support?

tekumara avatar Aug 19 '25 07:08 tekumara

Do you have an example unloading query that you'd want to support?

Something like this:

def test_copy_into_parquet_with_params(
    mocked_stage_path: pathlib.Path,
    mocked_snow_connection_with_stage: snowflake.connector.SnowflakeConnection,
) -> None:
    """Test that the function copies into a stage with params."""
    row_id = uuid.uuid4().bytes
    cursor = mocked_snow_connection_with_stage.cursor()
    cursor.execute(
        """
        COPY INTO @stage/test_output
        FROM (SELECT %(id)s AS ID, %(value)s AS VALUE)
        FILE_FORMAT = (TYPE = 'PARQUET')
    """,
        params={'id': row_id, 'value': 'value'},
    )
    assert pyarrow.parquet.read_table(
        mocked_stage_path.joinpath('stage/test_output.parquet')
    ).to_pylist() == [
        {'ID': row_id, 'VALUE': 'value'},
    ]

Thank you for your prompt response.

zetaattopsortcom avatar Aug 22 '25 04:08 zetaattopsortcom

Ok got it yeh a PR would be great!

In terms of the implementation I would suggest considering transforming the sql into a duckdb copy command to write the parquet. This would be consistent with how we load parquet and easily enable support for s3.

tekumara avatar Aug 22 '25 12:08 tekumara