fugue icon indicating copy to clipboard operation
fugue copied to clipboard

[BUG] duckdb read_csv_auto kwarg columns clashes with load_df

Open rdmolony opened this issue 2 years ago • 6 comments

Minimal Code To Reproduce

import os
import tempfile
import textwrap
import typing

import duckdb
from fugue import DataFrame
from fugue import ExecutionEngine
from fugue import FugueWorkflow
from fugue_duckdb import DuckExecutionEngine
from fugue_sql import fsql
import pandas as pd


def create_temporary_file(
    _content: str, suffix: str, prefix: str="fugue_example_"
) -> str:
    text_file = tempfile.NamedTemporaryFile(
        suffix=suffix, prefix=prefix, delete=False
    )
    text_file.write(_content)
    return text_file.name


def read_header(filepath: str) -> typing.List[str]:
    row_1 = pd.read_csv(filepath, skiprows=1, nrows=0).columns
    header = [row_1[0].replace("columns: ", ""), *row_1[1:]]
    return header


# Let's say I have a non-standard `csv` ...
content = textwrap.dedent("""\
    date: 2022-10-17
    columns: a,b,c
    1,2,3
    1,2,3"""
).encode("utf-8")


# ... I can read it no problem using the `duckdb` execution engine directly ...
def read_text_file(filepath: str) -> DataFrame:
    headers = read_header(filepath)
    engine = DuckExecutionEngine()
    return engine.load_df(csv_filepath, skip=2, columns=headers)

csv_filepath = create_temporary_file(content, suffix=".csv")

dag = FugueWorkflow()
df = dag.create(read_text_file, params={"filepath": csv_filepath})
df.show()
dag.run(engine="duck")

os.unlink(csv_filepath)


# ... I can't easily use `fsql` instead as `columns` clashes ...
def read_text_file(filepath: str) -> DataFrame:
    headers = read_header(filepath)
    return fsql(f"LOAD '{filepath}' (skip=2, columns={headers})")

csv_filepath = create_temporary_file(content, suffix=".csv")

dag = FugueWorkflow()
df = dag.create(read_text_file, params={"filepath": csv_filepath})
df.show()

try:
    dag.run(engine="duck")
except Exception as e:
    print(e)

os.unlink(csv_filepath)

Describe the bug It seems that the fugue execution engine kwarg columns clashes with duckdb columns on parsing the sql ...

def visitFugueLoadTask(self, ctx: fp.FugueLoadTaskContext) -> WorkflowDataFrame:
        data = self.get_dict(ctx, "fmt", "path", "params", "columns")
        __modified_exception__ = self.to_runtime_error(ctx)  # noqa
        return self.workflow.load(
            path=data["path"],
            fmt=data.get("fmt", ""),
            columns=data.get("columns"),
            **data.get("params", {}),
        )

... columns is passed within params and not as columns

Expected behavior fsql should parse LOAD ... so that engine.load_df receives the same arguments as using it directly

Environment (please complete the following information):

  • Backend: duckdb
  • Backend version: duckdb 0.5.1
  • Python version: 3.10.6
  • OS: linux (WSL2)

Mentioned in https://github.com/fugue-project/tutorials/issues/170 Example adapted from https://github.com/fugue-project/tutorials/pull/178 &

rdmolony avatar Oct 19 '22 19:10 rdmolony

The easiest thing might be to do something like ...

def visitFugueLoadTask(self, ctx: fp.FugueLoadTaskContext) -> WorkflowDataFrame:
        data = self.get_dict(ctx, "fmt", "path", "params", "columns")
        __modified_exception__ = self.to_runtime_error(ctx)  # noqa
        
        params = data.get("params", {})
        try:
            columns = data["columns"]
        except:
            columns = params.pop("columns", "")
    
        return self.workflow.load(
            path=data["path"],
            fmt=data.get("fmt", ""),
            columns=columns,
            **params,
        )

... though I'm not too familiar with testing this under fugue_sql, it looks like test_workflow_parse.test_load is the right place for this sort of test as it is a LOAD related sql bug

rdmolony avatar Oct 19 '22 20:10 rdmolony

Ok, I see there are a couple of issues, look at this code

def read_text_file(filepath: str) -> DataFrame:
    headers = read_header(filepath)
    engine = DuckExecutionEngine()
    return engine.load_df(csv_filepath, skip=2, columns=headers)

csv_filepath = create_temporary_file(content, suffix=".csv")

dag = FugueWorkflow()
df = dag.create(read_text_file, params={"filepath": csv_filepath})
df.show()
dag.run(engine="duck")

It may work, but it works with luck, the best way is:

def read_text_file(engine:ExecutionEngine, filepath: str) -> DataFrame:
    headers = read_header(filepath)
    return engine.load_df(csv_filepath, skip=2, columns=headers)

So you don't instantiate an engine by yourself. And this could also work with different engines. The workflow part stays the same.

goodwanghan avatar Oct 25 '22 05:10 goodwanghan

I think he had to instantiate the engine because there was a bit of inconsistent behavior between the Pandas and DuckDB engines when reading multi-header CSVs. In this issue in the tutorials repo, he wrote the following code to make things consistent:

def read_text_file(engine: ExecutionEngine, filepath: str) -> DataFrame:
    headers = read_header(filepath)
    if isinstance(engine, NativeExecutionEngine):
        # load_df uses pandas.read_csv
        df = engine.load_df(filepath, infer_schema=True, header=True, skiprows=3, names=headers)
    elif isinstance(engine, DuckExecutionEngine):
        # load_df uses duckdb read_csv_auto
        df = engine.load_df(filepath, infer_schema=True, skip=4, columns=headers)
    elif isinstance(engine, DaskExecutionEngine):
        # load_df uses dask.dataframe.read_csv
        df = engine.load_df(filepath, infer_schema=True, header=True, skiprows=3, names=headers)
    else:
        supported_engines = {NativeExecutionEngine, DuckExecutionEngine, DaskExecutionEngine}   
        raise ValueError(f"Engine {engine} is not supported, must be one of {supported_engines}")
    return df

Native engine and Duck engine have different values for skip to make it work.

I think the takeaway here is that using the engine method is deferred and we won't design (for now) to accommodate such multi-header files until we see them more? Making a uniform interface for all the kwargs of CSVs is quite a bit of effort so I suggest we just rename and keep the issue open.

kvnkho avatar Oct 25 '22 05:10 kvnkho

This code has multiple issues:

# ... I can't easily use `fsql` instead as `columns` clashes ...
def read_text_file(filepath: str) -> DataFrame:
    headers = read_header(filepath)
    return fsql(f"LOAD '{filepath}' (skip=2, columns={headers})")

csv_filepath = create_temporary_file(content, suffix=".csv")

dag = FugueWorkflow()
df = dag.create(read_text_file, params={"filepath": csv_filepath})
df.show()

First, I think your first solution is better, CSV needs heavy customization, using a creator to wrap the complexity is the better way.

Second, you could do something like:

schema = ",".join([x+":str" for x in headers])
fsql(f"LOAD '{filepath}' (skip=2) COLUMNS {schema}")

But, you already see it's tedious and not so intuitive, and actually it still can't work, this is because

Third, fsql creates a FugueWorkflow, so it's not supposed to be used inside another FugueWorkflow.

So what you could do, if you really want a programmatical solution, may be like this:

from fugue_sql import FugueSQLWorkflow

schema = ",".join([x+":str" for x in headers])

dag = FugueSQLWorkflow()
df = dag(f"LOAD '{filepath}' (skip=2) COLUMNS {schema}")
df.show()

FugueSQLWorkflow is the object fsql created.

But this interface is not supposed to be used by end users, and actually we are going to merge FugueSQLWorkflow into FugueWorkflow very soon. So I don't recommend using it directly.

goodwanghan avatar Oct 25 '22 05:10 goodwanghan

I think he had to instantiate the engine because there was a bit of inconsistent behavior between the Pandas and DuckDB engines when reading multi-header CSVs. In this issue in the tutorials repo, he wrote the following code to make things consistent:

def read_text_file(engine: ExecutionEngine, filepath: str) -> DataFrame:
    headers = read_header(filepath)
    if isinstance(engine, NativeExecutionEngine):
        # load_df uses pandas.read_csv
        df = engine.load_df(filepath, infer_schema=True, header=True, skiprows=3, names=headers)
    elif isinstance(engine, DuckExecutionEngine):
        # load_df uses duckdb read_csv_auto
        df = engine.load_df(filepath, infer_schema=True, skip=4, columns=headers)
    elif isinstance(engine, DaskExecutionEngine):
        # load_df uses dask.dataframe.read_csv
        df = engine.load_df(filepath, infer_schema=True, header=True, skiprows=3, names=headers)
    else:
        supported_engines = {NativeExecutionEngine, DuckExecutionEngine, DaskExecutionEngine}   
        raise ValueError(f"Engine {engine} is not supported, must be one of {supported_engines}")
    return df

Native engine and Duck engine have different values for skip to make it work.

I think the takeaway here is that using the engine method is deferred and we won't design (for now) to accommodate such multi-header files until we see them more? Making a uniform interface for all the kwargs of CSVs is quite a bit of effort so I suggest we just rename and keep the issue open.

Reading CSV is extremely hard to unify, by unifying it, we actually just create a new rule that may or may not be reasonable. And as you can see none of the CSV reading functions from different backends are perfect, and they are all drastically different.

On the other hand, unifying CSV reading makes people want to stay with CSV, that is what we don't want to see. We want people to move away from CSV as early as possible.

So it is hard to justify the effort to further improve the CSV features, at least for now, we can't prioritize it, users can create Creators to read their special CSVs.

By the way, I think to have if-else on Engines inside a custom function is not a good practice. Remember Fugue should make the coupling very loose, but this code is doing the opposite.

Instead, if you know you will only use DuckDB, you can do this:

from duckdb import DuckDBPyRelation, DuckDBPyConnection

def read_text_file(engine:DuckDBPyConnection, filepath: str) -> DuckDBPyRelation:
    headers = read_header(filepath)
    return engine.from_csv_auto(...)

This way, your creator is totally independent from Fugue, and can only work with duckdb backend. DuckDB backend can recognize (convert) DuckDBPyConnection as an ExecutionEngine, and DuckDBPyRelation as a DataFrame. See https://github.com/fugue-project/fugue/blob/68975b4672a321238935228696c3e8575fbf5c7f/fugue_duckdb/registry.py#L76

goodwanghan avatar Oct 25 '22 06:10 goodwanghan

Thanks @goodwanghan & @kvnkho

I'm happy for my particular job to be duckdb only so

from duckdb import DuckDBPyRelation, DuckDBPyConnection

def read_text_file(engine:DuckDBPyConnection, filepath: str) -> DuckDBPyRelation:
    headers = read_header(filepath)
    return engine.from_csv_auto(...)

is a good fit :)

rdmolony avatar Oct 25 '22 08:10 rdmolony