datafusion
datafusion copied to clipboard
Introduce the `DynamicFileCatalog` in `datafusion-catalog`
Which issue does this PR close?
Closes #10986 .
Rationale for this change
Follow the idea of #4838 to implement DynamicFileCatalog in datafusion-catalog. Users can enable this feature for an existing SessionContext by SessoinContext::enable_url_table.
let ctx = SessionContext::new().enable_url_table();
ctx.sql("SELECT * FROM 'tests/data/example.csv' as example").await?.show().await?;
What changes are included in this PR?
- Implement
SessionStoreto accessSessionStatefor he runtime table building. - In
datafusion-cli,DynamicFileCatalogis renamed toDynamicObjectStoreCatalogbecause it's responsible for registering the required object store now.
Are these changes tested?
yes
Are there any user-facing changes?
- add new API of
SessoinContext:enable_url_table - add new feature:
home_dirfor the URL table.
Thanks @goldmedal -- I hope to review this PR this wekeend, likely tomorrow
I am sorry for the delay -- I plan to review this tomorrow
Thanks, @alamb, for the suggestions. I'll address them in a few days.
I think we should therefore not register this provider with the SessionContext by default and instead add some examples showing how to register it by itself
I agree with you. I think we have a few ways to do it:
- Have a config called
allow_url_tableinCatalogOptionand set it to false by default. User can enable this when creating a SessionContext. - Create an example for registering the
DynamicFileSchemaProviderat runtime. However, I think this example might be complex because there are some required components (e.g., StateStore, Inner Schema...). - Provide an API in
SessionContextorSessionStatelikeSessionContext::enable_url_table. I haven't thought through the details about this yet, but I think it will handle everything mentioned in the second way for the user. It would be more user-friendly.
What do you think? I prefer the 3rd way.
I think options 1 or 3 would work well
In terms of an example being complicated, I find that is one of the benefits of working on adding such examples -- specifically that sometimes the complexity in using the APIs is easier to see and perhaps improve
I plan to give this a carefully look again tomorrow
I apologize for not finding time yet to re-review this PR. It contains a substantial seeming API change and I need to find enough contiguous review time to review it carefully. It is on my list but I don't yet have an ETA :(
I apologize for not finding time yet to re-review this PR. It contains a substantial seeming API change and I need to find enough contiguous review time to review it carefully. It is on my list but I don't yet have an ETA :(
It's ok. I understand. The feature involves some core API changes. We should be careful. Many thanks for the reply.
BTW I think https://github.com/apache/datafusion/pull/11516 is very related to this PR -- maybe once we get that one in then the API changes needed for this feature will become more natural / easier to fit
BTW I think #11516 is very related to this PR -- maybe once we get that one in then the API changes needed for this feature will become more natural / easier to fit
Agreed. After it is merged, I'll refine this PR.
We plan to merge https://github.com/apache/datafusion/pull/11516 tomorrow
Converting to draft so it is clear this isn't waiting on additional review (yet)
I found a problem here. When we use a dynamic file source, it makes the logical plan resolve the relations using the file path, which makes the logical plan unreadable.
Projection: ./test/arrow-datafusion/datafusion/common/../../testing/data/csv/aggregate_test_100.csv.c1, MIN(./test/arrow-datafusion/datafusion/common/../../testing/data/csv/aggregate_test_100.csv.c12), MAX(./test/arrow-datafusion/datafusion/common/../../testing/data/csv/aggregate_test_100.csv.c12)
Aggregate: groupBy=[[./test/arrow-datafusion/datafusion/common/../../testing/data/csv/aggregate_test_100.csv.c1]], aggr=[[MIN(./test/arrow-datafusion/datafusion/common/../../testing/data/csv/aggregate_test_100.csv.c12), MAX(./test/arrow-datafusion/datafusion/common/../../testing/data/csv/aggregate_test_100.csv.c12)]]
Filter: ./test/arrow-datafusion/datafusion/common/../../testing/data/csv/aggregate_test_100.csv.c11 > Float64(0.1) AND ./test/arrow-datafusion/datafusion/common/../../testing/data/csv/aggregate_test_100.csv.c11 < Float64(0.9)
TableScan: ./test/arrow-datafusion/datafusion/common/../../testing/data/csv/aggregate_test_100.csv
Is there any way we can improve this?
Is there any way we can improve this?
The root cause is that we directly use the path as the table name. I guess we can add a default alias for the TableScan plan for displaying pretty. 🤔
Is there any way we can improve this?
The root cause is that we directly use the path as the table name. I guess we can add a default alias for the
TableScanplan for displaying pretty. 🤔
Yes, I saw something like that in the code: using tmp_table as the default alias. But I'm not sure if it is the right way, because it might cause problems when resolving column names?
It would be pretty interesting to try and refresh this code / idea now that @findepi has managed to extract the Catalog traits out of the core https://github.com/apache/datafusion/pull/11516
It would be pretty interesting to try and refresh this code / idea now that @findepi has managed to extract the Catalog traits out of the core #11516
Sure, I'm working on refreshing this PR now (I finally finished other tasks). After making some changes, I encountered failures with Rust linking with cc, which failed 😢. I'm not sure what's going on, but I'm trying to solve it and may push a draft in a few days (maybe today?).
fter making some changes, I encountered failures with Rust linking with cc, which failed 😢. I'
Sometimes I have found that wiping out the target directory solves this problem (aka running cargo clean or the equivalent)
Yes, I saw something like that in the code: using tmp_table as the default alias. But I'm not sure if it is the right way, because it might cause problems when resolving column names?
@holicc
After some experimentation, I found that it's not straightforward. I tried implementing a TableProvider with a custom get_logical_plan method to set an alias for the table by default. However, I found that the internal plan is invoked during the analysis phase, which is too late to modify column names since all projections have already been planned.
The plan will look like this:
> EXPLAIN SELECT sum(a) FROM '/Users/jax/git/datafusion/datafusion/core/tests/data/2.json'
+---------------+-------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Aggregate: groupBy=[[]], aggr=[[sum(/Users/jax/git/datafusion/datafusion/core/tests/data/2.json.a)]] |
| | SubqueryAlias: /Users/jax/git/datafusion/datafusion/core/tests/data/2.json |
| | TableScan: ?url? projection=[a] |
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[sum(/Users/jax/git/datafusion/datafusion/core/tests/data/2.json.a)] |
| | CoalescePartitionsExec |
| | AggregateExec: mode=Partial, gby=[], aggr=[sum(/Users/jax/git/datafusion/datafusion/core/tests/data/2.json.a)] |
| | RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 |
| | JsonExec: file_groups={1 group: [[Users/jax/git/datafusion/datafusion/core/tests/data/2.json]]}, projection=[a] |
| | |
+---------------+-------------------------------------------------------------------------------------------------------------------------+
If we want to improve readability, we might need to create an AnalyzerRule for it. However, this is not easy due to the complexity of column resolution, as you mentioned. I think that we could address this issue in a separate pull request if needed.
A simpler solution is to manually add an alias when querying:
> EXPLAIN SELECT sum(a) FROM '/Users/jax/git/datafusion/datafusion/core/tests/data/2.json' as t
+---------------+-------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Aggregate: groupBy=[[]], aggr=[[sum(t.a)]] |
| | SubqueryAlias: t |
| | TableScan: /Users/jax/git/datafusion/datafusion/core/tests/data/2.json projection=[a] |
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[sum(t.a)] |
| | CoalescePartitionsExec |
| | AggregateExec: mode=Partial, gby=[], aggr=[sum(t.a)] |
| | RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 |
| | JsonExec: file_groups={1 group: [[Users/jax/git/datafusion/datafusion/core/tests/data/2.json]]}, projection=[a] |
| | |
+---------------+-------------------------------------------------------------------------------------------------------------------------+
This is a straightforward way to produce a more readable plan without complicating the code.
cc @alamb
I also modified the PR description for the latest implementation.
Thanks @goldmedal -- I'll try and review this shortly
I will review this PR over the next day or two
I plan to leave this PR open until Monday so anyone else who is interested can take a look at it prior to merge.
👍
I left some suggestions on how to potentially improve the comments / documentation, but I think we (I can do it) as a follow on PR too
Thanks @alamb I try to address the suggestions here. Only about the comment related to SessionStateBuilder, I prefer to do it in another PR.