fugue
fugue copied to clipboard
[BUG] duckdb read_csv_auto kwarg columns clashes with load_df
Minimal Code To Reproduce
import os
import tempfile
import textwrap
import typing
import duckdb
from fugue import DataFrame
from fugue import ExecutionEngine
from fugue import FugueWorkflow
from fugue_duckdb import DuckExecutionEngine
from fugue_sql import fsql
import pandas as pd
def create_temporary_file(
_content: str, suffix: str, prefix: str="fugue_example_"
) -> str:
text_file = tempfile.NamedTemporaryFile(
suffix=suffix, prefix=prefix, delete=False
)
text_file.write(_content)
return text_file.name
def read_header(filepath: str) -> typing.List[str]:
row_1 = pd.read_csv(filepath, skiprows=1, nrows=0).columns
header = [row_1[0].replace("columns: ", ""), *row_1[1:]]
return header
# Let's say I have a non-standard `csv` ...
content = textwrap.dedent("""\
date: 2022-10-17
columns: a,b,c
1,2,3
1,2,3"""
).encode("utf-8")
# ... I can read it no problem using the `duckdb` execution engine directly ...
def read_text_file(filepath: str) -> DataFrame:
headers = read_header(filepath)
engine = DuckExecutionEngine()
return engine.load_df(csv_filepath, skip=2, columns=headers)
csv_filepath = create_temporary_file(content, suffix=".csv")
dag = FugueWorkflow()
df = dag.create(read_text_file, params={"filepath": csv_filepath})
df.show()
dag.run(engine="duck")
os.unlink(csv_filepath)
# ... I can't easily use `fsql` instead as `columns` clashes ...
def read_text_file(filepath: str) -> DataFrame:
headers = read_header(filepath)
return fsql(f"LOAD '{filepath}' (skip=2, columns={headers})")
csv_filepath = create_temporary_file(content, suffix=".csv")
dag = FugueWorkflow()
df = dag.create(read_text_file, params={"filepath": csv_filepath})
df.show()
try:
dag.run(engine="duck")
except Exception as e:
print(e)
os.unlink(csv_filepath)
Describe the bug
It seems that the fugue
execution engine kwarg columns
clashes with duckdb
columns
on parsing the sql
...
def visitFugueLoadTask(self, ctx: fp.FugueLoadTaskContext) -> WorkflowDataFrame:
data = self.get_dict(ctx, "fmt", "path", "params", "columns")
__modified_exception__ = self.to_runtime_error(ctx) # noqa
return self.workflow.load(
path=data["path"],
fmt=data.get("fmt", ""),
columns=data.get("columns"),
**data.get("params", {}),
)
... columns
is passed within params
and not as columns
Expected behavior
fsql
should parse LOAD ...
so that engine.load_df
receives the same arguments as using it directly
Environment (please complete the following information):
- Backend: duckdb
- Backend version: duckdb 0.5.1
- Python version: 3.10.6
- OS: linux (WSL2)
Mentioned in https://github.com/fugue-project/tutorials/issues/170 Example adapted from https://github.com/fugue-project/tutorials/pull/178 &
The easiest thing might be to do something like ...
def visitFugueLoadTask(self, ctx: fp.FugueLoadTaskContext) -> WorkflowDataFrame:
data = self.get_dict(ctx, "fmt", "path", "params", "columns")
__modified_exception__ = self.to_runtime_error(ctx) # noqa
params = data.get("params", {})
try:
columns = data["columns"]
except:
columns = params.pop("columns", "")
return self.workflow.load(
path=data["path"],
fmt=data.get("fmt", ""),
columns=columns,
**params,
)
... though I'm not too familiar with testing this under fugue_sql
, it looks like test_workflow_parse.test_load
is the right place for this sort of test as it is a LOAD
related sql
bug
Ok, I see there are a couple of issues, look at this code
def read_text_file(filepath: str) -> DataFrame:
headers = read_header(filepath)
engine = DuckExecutionEngine()
return engine.load_df(csv_filepath, skip=2, columns=headers)
csv_filepath = create_temporary_file(content, suffix=".csv")
dag = FugueWorkflow()
df = dag.create(read_text_file, params={"filepath": csv_filepath})
df.show()
dag.run(engine="duck")
It may work, but it works with luck, the best way is:
def read_text_file(engine:ExecutionEngine, filepath: str) -> DataFrame:
headers = read_header(filepath)
return engine.load_df(csv_filepath, skip=2, columns=headers)
So you don't instantiate an engine by yourself. And this could also work with different engines. The workflow part stays the same.
I think he had to instantiate the engine because there was a bit of inconsistent behavior between the Pandas and DuckDB engines when reading multi-header CSVs. In this issue in the tutorials repo, he wrote the following code to make things consistent:
def read_text_file(engine: ExecutionEngine, filepath: str) -> DataFrame:
headers = read_header(filepath)
if isinstance(engine, NativeExecutionEngine):
# load_df uses pandas.read_csv
df = engine.load_df(filepath, infer_schema=True, header=True, skiprows=3, names=headers)
elif isinstance(engine, DuckExecutionEngine):
# load_df uses duckdb read_csv_auto
df = engine.load_df(filepath, infer_schema=True, skip=4, columns=headers)
elif isinstance(engine, DaskExecutionEngine):
# load_df uses dask.dataframe.read_csv
df = engine.load_df(filepath, infer_schema=True, header=True, skiprows=3, names=headers)
else:
supported_engines = {NativeExecutionEngine, DuckExecutionEngine, DaskExecutionEngine}
raise ValueError(f"Engine {engine} is not supported, must be one of {supported_engines}")
return df
Native engine and Duck engine have different values for skip
to make it work.
I think the takeaway here is that using the engine
method is deferred and we won't design (for now) to accommodate such multi-header files until we see them more? Making a uniform interface for all the kwargs of CSVs is quite a bit of effort so I suggest we just rename and keep the issue open.
This code has multiple issues:
# ... I can't easily use `fsql` instead as `columns` clashes ...
def read_text_file(filepath: str) -> DataFrame:
headers = read_header(filepath)
return fsql(f"LOAD '{filepath}' (skip=2, columns={headers})")
csv_filepath = create_temporary_file(content, suffix=".csv")
dag = FugueWorkflow()
df = dag.create(read_text_file, params={"filepath": csv_filepath})
df.show()
First, I think your first solution is better, CSV needs heavy customization, using a creator to wrap the complexity is the better way.
Second, you could do something like:
schema = ",".join([x+":str" for x in headers])
fsql(f"LOAD '{filepath}' (skip=2) COLUMNS {schema}")
But, you already see it's tedious and not so intuitive, and actually it still can't work, this is because
Third, fsql
creates a FugueWorkflow, so it's not supposed to be used inside another FugueWorkflow.
So what you could do, if you really want a programmatical solution, may be like this:
from fugue_sql import FugueSQLWorkflow
schema = ",".join([x+":str" for x in headers])
dag = FugueSQLWorkflow()
df = dag(f"LOAD '{filepath}' (skip=2) COLUMNS {schema}")
df.show()
FugueSQLWorkflow
is the object fsql
created.
But this interface is not supposed to be used by end users, and actually we are going to merge FugueSQLWorkflow
into FugueWorkflow
very soon. So I don't recommend using it directly.
I think he had to instantiate the engine because there was a bit of inconsistent behavior between the Pandas and DuckDB engines when reading multi-header CSVs. In this issue in the tutorials repo, he wrote the following code to make things consistent:
def read_text_file(engine: ExecutionEngine, filepath: str) -> DataFrame: headers = read_header(filepath) if isinstance(engine, NativeExecutionEngine): # load_df uses pandas.read_csv df = engine.load_df(filepath, infer_schema=True, header=True, skiprows=3, names=headers) elif isinstance(engine, DuckExecutionEngine): # load_df uses duckdb read_csv_auto df = engine.load_df(filepath, infer_schema=True, skip=4, columns=headers) elif isinstance(engine, DaskExecutionEngine): # load_df uses dask.dataframe.read_csv df = engine.load_df(filepath, infer_schema=True, header=True, skiprows=3, names=headers) else: supported_engines = {NativeExecutionEngine, DuckExecutionEngine, DaskExecutionEngine} raise ValueError(f"Engine {engine} is not supported, must be one of {supported_engines}") return df
Native engine and Duck engine have different values for
skip
to make it work.I think the takeaway here is that using the
engine
method is deferred and we won't design (for now) to accommodate such multi-header files until we see them more? Making a uniform interface for all the kwargs of CSVs is quite a bit of effort so I suggest we just rename and keep the issue open.
Reading CSV is extremely hard to unify, by unifying it, we actually just create a new rule that may or may not be reasonable. And as you can see none of the CSV reading functions from different backends are perfect, and they are all drastically different.
On the other hand, unifying CSV reading makes people want to stay with CSV, that is what we don't want to see. We want people to move away from CSV as early as possible.
So it is hard to justify the effort to further improve the CSV features, at least for now, we can't prioritize it, users can create Creators to read their special CSVs.
By the way, I think to have if-else on Engines inside a custom function is not a good practice. Remember Fugue should make the coupling very loose, but this code is doing the opposite.
Instead, if you know you will only use DuckDB, you can do this:
from duckdb import DuckDBPyRelation, DuckDBPyConnection
def read_text_file(engine:DuckDBPyConnection, filepath: str) -> DuckDBPyRelation:
headers = read_header(filepath)
return engine.from_csv_auto(...)
This way, your creator is totally independent from Fugue, and can only work with duckdb backend. DuckDB backend can recognize (convert) DuckDBPyConnection
as an ExecutionEngine, and DuckDBPyRelation
as a DataFrame. See https://github.com/fugue-project/fugue/blob/68975b4672a321238935228696c3e8575fbf5c7f/fugue_duckdb/registry.py#L76
Thanks @goodwanghan & @kvnkho
I'm happy for my particular job to be duckdb
only so
from duckdb import DuckDBPyRelation, DuckDBPyConnection
def read_text_file(engine:DuckDBPyConnection, filepath: str) -> DuckDBPyRelation:
headers = read_header(filepath)
return engine.from_csv_auto(...)
is a good fit :)