Ballista Python Issue(s)
First of all, I'm not expert in rust-python (pyo3) integration, if I've done/said something stupid, my apologies.
Current implementation of (py)ballista has limitation when it comes to DataFrame operations.
following code will result with an error:
from pyballista import BallistaBuilder
from datafusion import SessionContext
from datafusion import functions as f
# %%
ctx: SessionContext = BallistaBuilder()\
.config("ballista.job.name", "example ballista")\
.config("ballista.shuffle.partitions", "16")\
.standalone()
df = ctx.sql("SELECT 1 as r").aggregate(
[f.col("r")], [f.count_star()]
)
df.show()
it will throw exception (similar to):
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
File /Users/user/git/arrow-ballista/python/examples/example.py:3
1 # %%
2 # Select 1 to verify its working
----> 3 df = ctx.sql("SELECT 1 as r").aggregate(
4 [f.col("r")], [f.count_star()]
5 )
6 df.show()
TypeError: argument 'group_by': 'Expr' object cannot be converted to 'Expr'
Actually previous implementation had the same problem, the same error will be thrown (git checkout 2f223db21557c15080bf865ac692d276b8f0b770)
# %%
from pyballista import SessionContext
from datafusion import functions as f
ctx = SessionContext("localhost", 50050)
df = ctx.sql("SELECT 1 as r").aggregate(
[f.col("r")], [f.count_star()]
)
df.show()
The similar issue is there if SessionConfig is used:
from ballista import Ballista, RuntimeConfig, SessionConfig
from datafusion import RuntimeConfig, SessionConfig, SessionContext
runtime = RuntimeConfig().with_disk_manager_os().with_fair_spill_pool(10000000)
config = (
SessionConfig()
.with_create_default_catalog_and_schema(True)
.with_default_catalog_and_schema("foo", "bar")
.with_target_partitions(8)
.with_information_schema(True)
.with_repartition_joins(False)
.with_repartition_aggregations(False)
.with_repartition_windows(False)
.with_parquet_pruning(False)
.set("datafusion.execution.parquet.pushdown_filters", "true")
)
# %%
ctx: SessionContext = Ballista.builder\
.with_runtime(runtime)\ # it will panic at this point, complaining that `RuntimeConfig` object cannot be converted to `RuntimeConfig`
.with_config(config)\
.standalone()
ctx.sql("SELECT 1").show()
problem with RuntimeConfig, SessionConfig could be solved if they are re-exported in ballista:
from ballista import Ballista, RuntimeConfig, SessionConfig
from datafusion import SessionContext
but the first problem with DataFrame would still remain.
My guess is that there is FFI issue as ballista and datafusion is different package, I'm not sure what the problem is nor how to resolve this issue.
@timsaucer comment https://github.com/apache/datafusion-ballista/issues/1091#issuecomment-2436167064 make more sense to me now.
Possible Solution (I)
One obvious way would be to move ballista context creation to datafusion-python. We need one line context creation:
let ctx = datafusion::prelude::SessionContext::remote("df://localhost:50050").await?;
As ballista context is the SessionContext it would be trivial to integrate, and, I believe, it would avoid previous issues.
We could only provide "remote context" (no standalone), making it optional feature for which users python datafusion users could to opt in. This would somewhat limit number of libraries ballista would bring to datafusion-python (we could split core to core and client-core to further reduce deps)
This proposal would mean that we would have to bring optional dependency to datafusion-python, and additional complexity in (datafusion-python) release process.
(py)ballista would stay, it could expose scheduler and executor control as proposed in https://github.com/apache/datafusion-ballista/issues/1107
Big risk for of this proposal is that ballista could block datafusion python release in case it goes back to unmaintained mode.
Possible Solution (II)
Another possible solution is to re-export all classes from datafusion-python in ballista. I'm not sure how complex or practical this is going to be. I'm not sure if datafusion python applications would need some kind of re-writing to be able to run on ballista.
This would put additional responsibility to ballista maintainers (not too many of them).
Any Other Solution?
I'm not sure, open to suggestions
Proposal
Short term proposal:
- Remove test used to test (py)ballista code on ballista commit https://github.com/apache/datafusion-ballista/blob/81cfa632f94ef794cc9f35c81676cf9a010c1dbe/.github/workflows/rust.yml#L121-L122 .
- update datafusion to latest https://github.com/apache/datafusion-ballista/pull/1125 .
- focus on ballista (rust) release https://github.com/apache/datafusion-ballista/issues/974 .
We should release (py)ballista once we figure out the best approach to fix it.
I’ve been meaning to dive into this and also some work happening on datafusion-ray that may encounter similar problems. One thing the datafusion-python package is doing is adding wrappers around its internal classes. To solve the first bug you probably have ballista exposing a SessionContext internal class not the wrapper class. So the expressions getting created are not wrapper ones that the functions are looking for.
I don’t have my computer this weekend so I can’t test to verify but you may get unblocked if you do ctx = SessionContext(ctx)
I did write up an issue to improve these confusing errors. https://github.com/apache/datafusion-python/issues/853
But even if that unblocks you I worry it still doesn’t resolve to core issue of trying to share that session context from one python package to another.
Draft patch to illustrate "Possible Solution (I)", for datafusion-python (v42) which would solve (py)ballista issues:
diff --git a/Cargo.lock b/Cargo.lock
index 815323b..a00bdc5 100644
diff --git a/Cargo.toml b/Cargo.toml
index df72cd4..cf3cb1c 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -47,6 +47,7 @@ async-trait = "0.1"
futures = "0.3"
object_store = { version = "0.11.0", features = ["aws", "gcp", "azure", "http"] }
url = "2"
+ballista = { path = "../arrow-ballista/ballista/client", default-features = false }
[build-dependencies]
prost-types = "0.13" # keep in line with `datafusion-substrait`
diff --git a/python/datafusion/context.py b/python/datafusion/context.py
index 957d7e3..ca6094a 100644
--- a/python/datafusion/context.py
+++ b/python/datafusion/context.py
@@ -423,7 +423,7 @@ class SessionContext:
"""
def __init__(
- self, config: SessionConfig | None = None, runtime: RuntimeConfig | None = None
+ self, config: SessionConfig | None = None, runtime: RuntimeConfig | None = None, url: str | None = None
) -> None:
"""Main interface for executing queries with DataFusion.
@@ -448,7 +448,7 @@ class SessionContext:
config = config.config_internal if config is not None else None
runtime = runtime.config_internal if runtime is not None else None
- self.ctx = SessionContextInternal(config, runtime)
+ self.ctx = SessionContextInternal(config, runtime, url)
def register_object_store(
self, schema: str, store: Any, host: str | None = None
diff --git a/src/context.rs b/src/context.rs
index f445874..a40bc47 100644
--- a/src/context.rs
+++ b/src/context.rs
@@ -23,6 +23,7 @@ use std::sync::Arc;
use arrow::array::RecordBatchReader;
use arrow::ffi_stream::ArrowArrayStreamReader;
use arrow::pyarrow::FromPyArrow;
+use ballista::prelude::SessionContextExt;
use datafusion::execution::session_state::SessionStateBuilder;
use object_store::ObjectStore;
use url::Url;
@@ -271,11 +272,13 @@ pub struct PySessionContext {
#[pymethods]
impl PySessionContext {
- #[pyo3(signature = (config=None, runtime=None))]
+ #[pyo3(signature = (config=None, runtime=None, ballista_url=None))]
#[new]
pub fn new(
config: Option<PySessionConfig>,
runtime: Option<PyRuntimeConfig>,
+ ballista_url: Option<String>,
+ py: Python,
) -> PyResult<Self> {
let config = if let Some(c) = config {
c.config
@@ -293,9 +296,16 @@ impl PySessionContext {
.with_runtime_env(runtime)
.with_default_features()
.build();
- Ok(PySessionContext {
- ctx: SessionContext::new_with_state(session_state),
- })
+
+ match ballista_url {
+ Some(url) => Ok(PySessionContext {
+ ctx: wait_for_future(py, SessionContext::remote_with_state(&url, session_state))
+ .map_err(DataFusionError::from)?,
+ }),
+ None => Ok(PySessionContext {
+ ctx: SessionContext::new_with_state(session_state),
+ }),
+ }
}
/// Register an object store with the given name
more details at https://github.com/apache/datafusion-python/compare/main...milenkovicm:datafusion-python:feat_add_ballista
If we go this direction we would need to make ballista optional feature
I don’t have my computer this weekend so I can’t test to verify but you may get unblocked if you do
ctx = SessionContext(ctx)
I finally got some time to try this, but unfortunately no luck, no such function.
I tried variation of the proposal wrapping DataFrame, but same error
from ballista import BallistaBuilder
# from datafusion.context import SessionContext
from datafusion import functions as f
from datafusion.dataframe import DataFrame
ctx = BallistaBuilder()\
.standalone()
df = ctx.sql("SELECT 1 as r")
df0 = DataFrame(df)
df0.aggregate(
[f.col("r")], [f.count_star()]
)
df0.show()
TypeError Traceback (most recent call last)
Cell In[4], [line 14](vscode-notebook-cell:?execution_count=4&line=14)
[11](vscode-notebook-cell:?execution_count=4&line=11) df = ctx.sql("SELECT 1 as r")
[13](vscode-notebook-cell:?execution_count=4&line=13) df0 = DataFrame(df)
---> [14](vscode-notebook-cell:?execution_count=4&line=14) df0.aggregate(
[15](vscode-notebook-cell:?execution_count=4&line=15) [f.col("r")], [f.count_star()]
[16](vscode-notebook-cell:?execution_count=4&line=16) )
[17](vscode-notebook-cell:?execution_count=4&line=17) df0.show()
File ~/git/arrow-ballista/python/venv/lib/python3.12/site-packages/datafusion/dataframe.py:197, in DataFrame.aggregate(self, group_by, aggs)
[195](.../datafusion/dataframe.py:195) group_by = [e.expr for e in group_by]
[196](.../datafusion/dataframe.py:196) aggs = [e.expr for e in aggs]
--> [197](.../datafusion/dataframe.py:197) return DataFrame(self.df.aggregate(group_by, aggs))
Update:
I have also tried:
ctx = SessionContext()
ctx.ctx = BallistaBuilder()\
.standalone()
same issue with function conversion as previous
After spending some time and reading https://github.com/PyO3/pyo3/issues/1444 there is no simple solution for the problem.
Summary
After some instigation and reading https://github.com/PyO3/pyo3/issues/1444 it looks not trivial to share (pyo3) structures between multiple crates, there might be some hacks but its a long shot.
So options mentioned in #1142 still stands:
Starting from option 2 - re-export all the (py)datafusion structures and functions as part of (py)ballista. I can't comment about effort scale, but if we go with it we could get into same situation were ballista was, constantly lagging behind (py)datafusion. Thus I'd argue that this approach would be dead-on-arrival due to lack of maintainers, and overall duplicated work.
Option 1 - creating ballista specific context in (py)datafusion. IMHO, this approach makes the most sense from technical perspective. We would just need to expose optional (py)datafusion ballista integration. This would mean a bit of extra work on (py)datafusion team. Ballista would be baggage which in the long run may go to "unmaintained" mode.
In short term, I would suggest not to release (py)ballista bindings, until we make decision on approach. Also, if we decide to go with "Option 1" we could use (py)ballista project for scheduler/executor py bindings.
Open for any suggestion
One more option to throw in. Could we reduce the scope for (py)Ballista for now to just support SQL and not the DataFrame API?
We would just need the ability to send SQL to the server (perhaps via FlightSQL) and then fetch record batches.
we dont even need flightsql, protocol supports sending sql statement:
https://github.com/apache/datafusion-ballista/blob/cc2ddcb8bac7bb15c4f8a5d300fcec2f5b3eb1b3/ballista/core/proto/ballista.proto#L523
so we would not need any context on (py)ballista side just a grpc client
personally I find (py)datafusion running on ballista killer feature :) a great way to avoid GIL limitations
One more option to throw in. Could we reduce the scope for (py)Ballista for now to just support SQL and not the DataFrame API?
We would just need the ability to send SQL to the server (perhaps via FlightSQL) and then fetch record batches.
@andygrove may I ask what kind of scenarios you'd like to support with "option 3"?
for who ever is still interested in ballista python support, a new maybe promising POC #1338 to add ballista support on top of datafusion python.