Daft icon indicating copy to clipboard operation
Daft copied to clipboard

Expose a complete SQL queries to be utilised by Apache Beam

Open jaehyeon-kim opened this issue 4 months ago • 0 comments

Is your feature request related to a problem? Please describe.

Apache Beam is an open-source, unified programming model for batch and streaming data processing pipelines that simplifies large-scale data processing dynamics. I'm developing Apache Beam Python I/O connectors that utilises your integration features of SQL and open table format sources (and possibly sinks). Following the I/O connector development guide, I'll be building Splittable DoFn objects, and they require a complete set of SQL queries as per partition conditions (eg partition_col, num_partitons...). Then, those queries are executed in multiple instances of DoFn objects followed by combining the results before ingesting into subsequent tasks.

For SQL sources as an example, the physical plan scheduler prints SQL queries but they don't seem to be available in Python. I might be able to obtain the queries using the sql scan operator but it looks incomplete because the pushdowns object is not directly obtainable in python. In this regards, it'll be good if a complete SQL queries can be obtained in python.

Describe the solution you'd like

I guess the physical plan schedule is available for both SQL and open table format sources. Although it includes SQL queries but they cannot be obtained in python. It'll be good if there is a to_dict() method or similar that support to obtain a complete SQL queries.

USER_STMT = "SELECT id, first_name, last_name, email FROM staging.users"
## not fully evaluated
df = daft.read_sql(
    sql=USER_STMT, conn=create_connection, partition_col="id", num_partitions=9
)
## physical plan schedule should exist for sql and open table formats
physical_plan_scheduler = df._builder.to_physical_plan_scheduler(
    daft.context.get_context().daft_execution_config
)
## can we have a to_dict method or another that can obtain a complete sql queries?
physical_plan_scheduler
# * TabularScan:
# |   Num Scan Tasks = 9
# |   Estimated Scan Bytes = 685008
# |   Clustering spec = { Num partitions = 9 }
# |   SQL Queries = [SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users) AS subquery WHERE id >= 1 AND id < 1112,..]
# |   Schema: {id#Int64, first_name#Utf8, last_name#Utf8, email#Utf8}
# |   Scan Tasks: [
# |   {Database {postgresql+psycopg2://devuser:***@localhost/devdb}}
# |   {Database {postgresql+psycopg2://devuser:***@localhost/devdb}}
# |   {Database {postgresql+psycopg2://devuser:***@localhost/devdb}}
# |   ...
# |   {Database {postgresql+psycopg2://devuser:***@localhost/devdb}}
# |   {Database {postgresql+psycopg2://devuser:***@localhost/devdb}}
# |   {Database {postgresql+psycopg2://devuser:***@localhost/devdb}}
# |   ]

Describe alternatives you've considered A clear and concise description of any alternative solutions or features you've considered.

Additional context Add any other context or screenshots about the feature request here.

jaehyeon-kim avatar Sep 30 '24 22:09 jaehyeon-kim