datafusion-ballista icon indicating copy to clipboard operation
datafusion-ballista copied to clipboard

Ballista Python Issue(s)

Open milenkovicm opened this issue 1 year ago • 11 comments

First of all, I'm not expert in rust-python (pyo3) integration, if I've done/said something stupid, my apologies.

Current implementation of (py)ballista has limitation when it comes to DataFrame operations.

following code will result with an error:

from pyballista import BallistaBuilder
from datafusion import SessionContext
from datafusion import functions as f

# %%
ctx: SessionContext = BallistaBuilder()\
    .config("ballista.job.name", "example ballista")\
    .config("ballista.shuffle.partitions", "16")\
    .standalone()
    
df = ctx.sql("SELECT 1 as r").aggregate(
    [f.col("r")], [f.count_star()]
)
df.show()

it will throw exception (similar to):

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
File /Users/user/git/arrow-ballista/python/examples/example.py:3
      1 # %% 
      2 # Select 1 to verify its working
----> 3 df = ctx.sql("SELECT 1 as r").aggregate(
      4     [f.col("r")], [f.count_star()]
      5 )
      6 df.show()

TypeError: argument 'group_by': 'Expr' object cannot be converted to 'Expr'

Actually previous implementation had the same problem, the same error will be thrown (git checkout 2f223db21557c15080bf865ac692d276b8f0b770)

# %%
from pyballista import SessionContext
from datafusion import functions as f

ctx = SessionContext("localhost", 50050)

df = ctx.sql("SELECT 1 as r").aggregate(
    [f.col("r")], [f.count_star()]
)
df.show()

The similar issue is there if SessionConfig is used:

from ballista import Ballista, RuntimeConfig, SessionConfig
from datafusion import RuntimeConfig, SessionConfig, SessionContext

runtime = RuntimeConfig().with_disk_manager_os().with_fair_spill_pool(10000000)
config = (
    SessionConfig()
    .with_create_default_catalog_and_schema(True)
    .with_default_catalog_and_schema("foo", "bar")
    .with_target_partitions(8)
    .with_information_schema(True)
    .with_repartition_joins(False)
    .with_repartition_aggregations(False)
    .with_repartition_windows(False)
    .with_parquet_pruning(False)
    .set("datafusion.execution.parquet.pushdown_filters", "true")
)

# %%
ctx: SessionContext = Ballista.builder\
    .with_runtime(runtime)\ # it will panic at this point, complaining that `RuntimeConfig` object cannot be converted to `RuntimeConfig`
    .with_config(config)\
    .standalone()

ctx.sql("SELECT 1").show()

problem with RuntimeConfig, SessionConfig could be solved if they are re-exported in ballista:

from ballista import Ballista, RuntimeConfig, SessionConfig
from datafusion import SessionContext

but the first problem with DataFrame would still remain.

My guess is that there is FFI issue as ballista and datafusion is different package, I'm not sure what the problem is nor how to resolve this issue.

@timsaucer comment https://github.com/apache/datafusion-ballista/issues/1091#issuecomment-2436167064 make more sense to me now.

Possible Solution (I)

One obvious way would be to move ballista context creation to datafusion-python. We need one line context creation:

let ctx = datafusion::prelude::SessionContext::remote("df://localhost:50050").await?;

As ballista context is the SessionContext it would be trivial to integrate, and, I believe, it would avoid previous issues.

We could only provide "remote context" (no standalone), making it optional feature for which users python datafusion users could to opt in. This would somewhat limit number of libraries ballista would bring to datafusion-python (we could split core to core and client-core to further reduce deps)

This proposal would mean that we would have to bring optional dependency to datafusion-python, and additional complexity in (datafusion-python) release process.

(py)ballista would stay, it could expose scheduler and executor control as proposed in https://github.com/apache/datafusion-ballista/issues/1107

Big risk for of this proposal is that ballista could block datafusion python release in case it goes back to unmaintained mode.

Possible Solution (II)

Another possible solution is to re-export all classes from datafusion-python in ballista. I'm not sure how complex or practical this is going to be. I'm not sure if datafusion python applications would need some kind of re-writing to be able to run on ballista.

This would put additional responsibility to ballista maintainers (not too many of them).

Any Other Solution?

I'm not sure, open to suggestions

Proposal

Short term proposal:

We should release (py)ballista once we figure out the best approach to fix it.

milenkovicm avatar Nov 29 '24 09:11 milenkovicm

I’ve been meaning to dive into this and also some work happening on datafusion-ray that may encounter similar problems. One thing the datafusion-python package is doing is adding wrappers around its internal classes. To solve the first bug you probably have ballista exposing a SessionContext internal class not the wrapper class. So the expressions getting created are not wrapper ones that the functions are looking for.

I don’t have my computer this weekend so I can’t test to verify but you may get unblocked if you do ctx = SessionContext(ctx)

I did write up an issue to improve these confusing errors. https://github.com/apache/datafusion-python/issues/853

timsaucer avatar Nov 29 '24 12:11 timsaucer

But even if that unblocks you I worry it still doesn’t resolve to core issue of trying to share that session context from one python package to another.

timsaucer avatar Nov 29 '24 12:11 timsaucer

Draft patch to illustrate "Possible Solution (I)", for datafusion-python (v42) which would solve (py)ballista issues:

diff --git a/Cargo.lock b/Cargo.lock
index 815323b..a00bdc5 100644
diff --git a/Cargo.toml b/Cargo.toml
index df72cd4..cf3cb1c 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -47,6 +47,7 @@ async-trait = "0.1"
 futures = "0.3"
 object_store = { version = "0.11.0", features = ["aws", "gcp", "azure", "http"] }
 url = "2"
+ballista = { path = "../arrow-ballista/ballista/client", default-features = false }
 
 [build-dependencies]
 prost-types = "0.13" # keep in line with `datafusion-substrait`
diff --git a/python/datafusion/context.py b/python/datafusion/context.py
index 957d7e3..ca6094a 100644
--- a/python/datafusion/context.py
+++ b/python/datafusion/context.py
@@ -423,7 +423,7 @@ class SessionContext:
     """
 
     def __init__(
-        self, config: SessionConfig | None = None, runtime: RuntimeConfig | None = None
+        self, config: SessionConfig | None = None, runtime: RuntimeConfig | None = None, url: str | None = None
     ) -> None:
         """Main interface for executing queries with DataFusion.
 
@@ -448,7 +448,7 @@ class SessionContext:
         config = config.config_internal if config is not None else None
         runtime = runtime.config_internal if runtime is not None else None
 
-        self.ctx = SessionContextInternal(config, runtime)
+        self.ctx = SessionContextInternal(config, runtime, url)
 
     def register_object_store(
         self, schema: str, store: Any, host: str | None = None
diff --git a/src/context.rs b/src/context.rs
index f445874..a40bc47 100644
--- a/src/context.rs
+++ b/src/context.rs
@@ -23,6 +23,7 @@ use std::sync::Arc;
 use arrow::array::RecordBatchReader;
 use arrow::ffi_stream::ArrowArrayStreamReader;
 use arrow::pyarrow::FromPyArrow;
+use ballista::prelude::SessionContextExt;
 use datafusion::execution::session_state::SessionStateBuilder;
 use object_store::ObjectStore;
 use url::Url;
@@ -271,11 +272,13 @@ pub struct PySessionContext {
 
 #[pymethods]
 impl PySessionContext {
-    #[pyo3(signature = (config=None, runtime=None))]
+    #[pyo3(signature = (config=None, runtime=None, ballista_url=None))]
     #[new]
     pub fn new(
         config: Option<PySessionConfig>,
         runtime: Option<PyRuntimeConfig>,
+        ballista_url: Option<String>,
+        py: Python,
     ) -> PyResult<Self> {
         let config = if let Some(c) = config {
             c.config
@@ -293,9 +296,16 @@ impl PySessionContext {
             .with_runtime_env(runtime)
             .with_default_features()
             .build();
-        Ok(PySessionContext {
-            ctx: SessionContext::new_with_state(session_state),
-        })
+
+        match ballista_url {
+            Some(url) => Ok(PySessionContext {
+                ctx: wait_for_future(py, SessionContext::remote_with_state(&url, session_state))
+                    .map_err(DataFusionError::from)?,
+            }),
+            None => Ok(PySessionContext {
+                ctx: SessionContext::new_with_state(session_state),
+            }),
+        }
     }
 
     /// Register an object store with the given name

more details at https://github.com/apache/datafusion-python/compare/main...milenkovicm:datafusion-python:feat_add_ballista

If we go this direction we would need to make ballista optional feature

milenkovicm avatar Nov 29 '24 21:11 milenkovicm

I don’t have my computer this weekend so I can’t test to verify but you may get unblocked if you do ctx = SessionContext(ctx)

I finally got some time to try this, but unfortunately no luck, no such function.

I tried variation of the proposal wrapping DataFrame, but same error

from ballista import BallistaBuilder
# from datafusion.context import SessionContext
from datafusion import functions as f
from datafusion.dataframe import DataFrame


ctx = BallistaBuilder()\
    .standalone()

df = ctx.sql("SELECT 1 as r")

df0 = DataFrame(df)
df0.aggregate(
    [f.col("r")], [f.count_star()]
)
df0.show()
TypeError                                 Traceback (most recent call last)
Cell In[4], [line 14](vscode-notebook-cell:?execution_count=4&line=14)
     [11](vscode-notebook-cell:?execution_count=4&line=11) df = ctx.sql("SELECT 1 as r")
     [13](vscode-notebook-cell:?execution_count=4&line=13) df0 = DataFrame(df)
---> [14](vscode-notebook-cell:?execution_count=4&line=14) df0.aggregate(
     [15](vscode-notebook-cell:?execution_count=4&line=15)     [f.col("r")], [f.count_star()]
     [16](vscode-notebook-cell:?execution_count=4&line=16) )
     [17](vscode-notebook-cell:?execution_count=4&line=17) df0.show()

File ~/git/arrow-ballista/python/venv/lib/python3.12/site-packages/datafusion/dataframe.py:197, in DataFrame.aggregate(self, group_by, aggs)
    [195](.../datafusion/dataframe.py:195) group_by = [e.expr for e in group_by]
    [196](.../datafusion/dataframe.py:196) aggs = [e.expr for e in aggs]
--> [197](.../datafusion/dataframe.py:197) return DataFrame(self.df.aggregate(group_by, aggs))

Update:

I have also tried:

ctx = SessionContext()
ctx.ctx = BallistaBuilder()\
    .standalone()

same issue with function conversion as previous

milenkovicm avatar Dec 04 '24 10:12 milenkovicm

After spending some time and reading https://github.com/PyO3/pyo3/issues/1444 there is no simple solution for the problem.

milenkovicm avatar Dec 04 '24 16:12 milenkovicm

Summary

After some instigation and reading https://github.com/PyO3/pyo3/issues/1444 it looks not trivial to share (pyo3) structures between multiple crates, there might be some hacks but its a long shot.

So options mentioned in #1142 still stands:

Starting from option 2 - re-export all the (py)datafusion structures and functions as part of (py)ballista. I can't comment about effort scale, but if we go with it we could get into same situation were ballista was, constantly lagging behind (py)datafusion. Thus I'd argue that this approach would be dead-on-arrival due to lack of maintainers, and overall duplicated work.

Option 1 - creating ballista specific context in (py)datafusion. IMHO, this approach makes the most sense from technical perspective. We would just need to expose optional (py)datafusion ballista integration. This would mean a bit of extra work on (py)datafusion team. Ballista would be baggage which in the long run may go to "unmaintained" mode.

In short term, I would suggest not to release (py)ballista bindings, until we make decision on approach. Also, if we decide to go with "Option 1" we could use (py)ballista project for scheduler/executor py bindings.

Open for any suggestion

milenkovicm avatar Dec 05 '24 16:12 milenkovicm

One more option to throw in. Could we reduce the scope for (py)Ballista for now to just support SQL and not the DataFrame API?

We would just need the ability to send SQL to the server (perhaps via FlightSQL) and then fetch record batches.

andygrove avatar Dec 05 '24 16:12 andygrove

we dont even need flightsql, protocol supports sending sql statement:

https://github.com/apache/datafusion-ballista/blob/cc2ddcb8bac7bb15c4f8a5d300fcec2f5b3eb1b3/ballista/core/proto/ballista.proto#L523

so we would not need any context on (py)ballista side just a grpc client

milenkovicm avatar Dec 05 '24 16:12 milenkovicm

personally I find (py)datafusion running on ballista killer feature :) a great way to avoid GIL limitations

milenkovicm avatar Dec 05 '24 16:12 milenkovicm

One more option to throw in. Could we reduce the scope for (py)Ballista for now to just support SQL and not the DataFrame API?

We would just need the ability to send SQL to the server (perhaps via FlightSQL) and then fetch record batches.

@andygrove may I ask what kind of scenarios you'd like to support with "option 3"?

milenkovicm avatar Dec 06 '24 15:12 milenkovicm

for who ever is still interested in ballista python support, a new maybe promising POC #1338 to add ballista support on top of datafusion python.

milenkovicm avatar Oct 30 '25 19:10 milenkovicm