[BUG]: Vector UDF doesn't work in databricks environment
Vector UDF Incompatibility in Databricks Environment
Background
Dotnet.spark.worker, when parses binary stream, expects for a single large Arrow batch or data. However, for Databricks Runtime 14.3 (As i understand, for any linux env), behavior differs, as driver dataset divided by multiple batches, 10k rows in each. This format uses different binary format(It has more arguments) Due to differing binary representations, it misinterprets incoming byte streams—reading incorrect data, resulting in a hang with no response.
Reproduction
Create a Spark pipeline that:
- Ingests a moderate-sized dataset.
- Performs a GroupBy (ensure groups contain >10k rows).
- Applies a vector UDF via GroupBy().Apply(...), operating on the grouped RecordBatch.
Actual results
The UDF execution never completes. The .NET Spark worker attempts to read more bytes than are available, blocking indefinitely while waiting for data that is never sent.
Expected result
UDF should execute and return successfully.
TBD: Either a single consolidated RecordBatch or an IEnumerable<RecordBatch> should be passed into the UDF.
Investigation Summary
Two issues prevent vector UDFs from functioning correctly in the Databricks environment.
1. Vector UDFs operate on a collection of RecordBatch instances, not a single batch
The spark.sql.execution.arrow.maxRecordsPerBatch setting (introduced in Spark 2.3.0, default: 10,000) is central to this behavior. Batch-based processing is advantageous for .NET for Spark, as it avoids the 2GB Arrow buffer limit encountered with GroupBy().Apply(...).
I implemented a PoC to validate performance for a specific use case. It successfully resolves the issue with the following changes:
- The vector UDF now accepts IEnumerable<RecordBatch> instead of a single RecordBatch.
- Additional parameters are passed but intentionally ignored. In the Python implementation, these enforce batch ordering; however, in .NET it mean we need to collect all batches before reordering and sending them to UDF, which is a memory-demanding task. In observed cases, batches arrived sequentially, so this functionality is omitted from PoC.
2. In DBR 15.3+, all UDFs fail to execute
This issue arises from a change in Databricks’ fork of Spark. The CreatePythonFunction API introduces an additional optional parameter that is not accounted for in the current implementation. As a result, UDF execution fails with the following error:
[Error] [JvmBridge] JVM method execution failed: Static method 'createPythonFunction' failed for class 'org.apache.spark.sql.api.dotnet.SQLUtils' when called with 7 arguments ([Index=1, Type=Byte[], Value=System.Byte[]], [Index=2, Type=Hashtable, Value=Microsoft.Spark.Interop.Internal.Java.Util.Hashtable], [Index=3, Type=ArrayList, Value=Microsoft.Spark.Interop.Internal.Java.Util.ArrayList], [Index=4, Type=String, Value=Microsoft.Spark.Worker], [Index=5, Type=String, Value=2.1.1.0], [Index=6, Type=ArrayList, Value=Microsoft.Spark.Interop.Internal.Java.Util.ArrayList], [Index=7, Type=null, Value=null])
[2024-09-13T10:47:53.1569404Z] [machine] [Error] [JvmBridge] java.lang.NoSuchMethodError: org.apache.spark.api.python.SimplePythonFunction.<init>(Lscala/collection/Seq;Ljava/util/Map;Ljava/util/List;Ljava/lang/String;Ljava/lang/String;Ljava/util/List;Lorg/apache/spark/api/python/PythonAccumulatorV2;)V
at org.apache.spark.sql.api.dotnet.SQLUtils$.createPythonFunction(SQLUtils.scala:35)
at org.apache.spark.sql.api.dotnet.SQLUtils.createPythonFunction(SQLUtils.scala)
References:
-
PoC Commit, see SqlCommandExecutor::ExecuteArrowGroupedMapCommand
Notes:
- CoGroupedMap UDFs are unaffected as they do not leverage batching (as of Spark 3.5).
- Databricks runtime 15.3, 16.3 doesn't work with dotnet.spark entirely due to the second issue, so no testing is performed on it. Details in the linked PR
UseArrowcan be disabled to enable backward-compatible format, that should work with current dotnet.spark implementation
We may need to allow different format, so user can make that depending on the runtime.
BTW, does Databricks runtime 16.3 support pyspark? I think spark.NET just mimic pyspark, so from spark's view point it looks like a pyspark job, except the entry point is different.
@wudanzy Hello, PySpark remains a top-priority language for Databricks and will continue to be supported and extended.
From Spark's perspective, it interacts with PySpark via the SimpleFunction, which launches a specified executable with given arguments, initiating socket communication, etc.
However, in DBR 15.3, 16.3, this method is no longer accessible via the dotnet-spark JAR. It still works in DBR 14.3, even though both use Spark 3.5.
Even though it's still in the latest master and spark 3.5.x, there's an error on newer runtime.
This suggests Databricks may have modified or replaced the implementation, or changed its signature.
[Error] [JvmBridge] JVM method execution failed: Static method 'createPythonFunction' failed for class 'org.apache.spark.sql.api.dotnet.SQLUtils' when called with 7 arguments ([Index=1, Type=Byte[], Value=System.Byte[]], [Index=2, Type=Hashtable, Value=Microsoft.Spark.Interop.Internal.Java.Util.Hashtable], [Index=3, Type=ArrayList, Value=Microsoft.Spark.Interop.Internal.Java.Util.ArrayList], [Index=4, Type=String, Value=Microsoft.Spark.Worker], [Index=5, Type=String, Value=2.1.1.0], [Index=6, Type=ArrayList, Value=Microsoft.Spark.Interop.Internal.Java.Util.ArrayList], [Index=7, Type=null, Value=null])
[2024-09-13T10:47:53.1569404Z] [machine] [Error] [JvmBridge] java.lang.NoSuchMethodError: org.apache.spark.api.python.SimplePythonFunction.<init>(Lscala/collection/Seq;Ljava/util/Map;Ljava/util/List;Ljava/lang/String;Ljava/lang/String;Ljava/util/List;Lorg/apache/spark/api/python/PythonAccumulatorV2;)V
at org.apache.spark.sql.api.dotnet.SQLUtils$.createPythonFunction(SQLUtils.scala:35)
at org.apache.spark.sql.api.dotnet.SQLUtils.createPythonFunction(SQLUtils.scala)
To confirm, we’d need to extract and inspect the Databricks-specific Spark JARs from the environment.
Given Databricks’ ongoing investment in PySpark, we should anticipate regular breaking changes. If this project is to continue actively, Microsoft and Databricks will likely need to align on source sharing to streamline development.
I am facing similar issue with 15.x and 16.x LTS releases as well. (incase someone needed that info)
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Streaming;
using static Microsoft.Spark.Sql.Functions;
using System.Net.Http;
using System.Text.Json;
using System.Collections.Generic;
using System.Linq;
namespace HelloSpark
{
// Define the response model for the cocktail API
public class CocktailResponse
{
public List<Drink> drinks { get; set; } = new List<Drink>();
}
public class Drink
{
public string strDrink { get; set; } = string.Empty;
public string strDrinkAlternate { get; set; } = string.Empty;
public string strTags { get; set; } = string.Empty;
public string strVideo { get; set; } = string.Empty;
public string strCategory { get; set; } = string.Empty;
public string strIBA { get; set; } = string.Empty;
public string strAlcoholic { get; set; } = string.Empty;
public string strGlass { get; set; } = string.Empty;
public string strInstructions { get; set; } = string.Empty;
public string strDrinkThumb { get; set; } = string.Empty;
}
class Program
{
static void Main(string[] args)
{
var spark = SparkSession.Builder().AppName("HelloSpark").GetOrCreate();
// Fetch JSON data from a web API
var httpClient = new HttpClient();
var response = httpClient.GetStringAsync("https://www.thecocktaildb.com/api/json/v1/1/search.php?s=margarita").Result;
// Deserialize the JSON data into a C# object
var cocktailData = JsonSerializer.Deserialize<CocktailResponse>(response);
// Check if data was successfully deserialized and drinks list exists
if (cocktailData?.drinks == null)
{
Console.WriteLine("No cocktail data found or failed to deserialize.");
return;
}
// Create a DataFrame from the cocktail data using a simpler approach
// First, let's create a temporary view from the data
var drinkNames = cocktailData.drinks.Select(drink => drink.strDrink).ToArray();
// Create a simple DataFrame using Spark SQL with VALUES clause
var valuesList = string.Join(", ", drinkNames.Select(name => $"('{name.Replace("'", "''")}')"));
var dataFrame = spark.Sql($"SELECT DrinkName FROM VALUES {valuesList} AS t(DrinkName)");
// Display the DataFrame
dataFrame.Show();
Console.WriteLine("\n=== APPROACH 1: Using UDF (Runs on Executors) ===");
// Register a UDF that prints and returns the value
var printAndReturnUdf = Udf<string, string>((drinkName) =>
{
Console.WriteLine($"[Executor UDF] Processing drink: {drinkName}");
return drinkName; // Return the same value
});
// Apply the UDF to the DataFrame
var processedDF = dataFrame.Select(printAndReturnUdf(dataFrame["DrinkName"]).As("DrinkName"));
// Trigger execution by calling an action
var count = processedDF.Count(); // This forces execution of the UDF on executors
Console.WriteLine($"[Driver] Processed {count} drinks on executors");
Console.WriteLine("\n=== APPROACH 2: Using Another UDF (Runs on Executors) ===");
// Another UDF example
var logDrinkUdf = Udf<string, bool>((drinkName) =>
{
Console.WriteLine($"[Executor Transform] Logging drink: {drinkName}");
return true; // Return a boolean to indicate processing
});
// Apply and force execution
dataFrame.Select(dataFrame["DrinkName"], logDrinkUdf(dataFrame["DrinkName"]).As("processed")).Count();
Console.WriteLine("\n=== APPROACH 3: Driver Collection (for comparison) ===");
// Also collect data and iterate through drinks on driver for comparison
var collectedRows = dataFrame.Collect();
foreach (var row in collectedRows)
{
Console.WriteLine($"[Driver] Drink: {row.GetAs<string>("DrinkName")}");
}
}
}
}
similar issue: https://github.com/dotnet/spark/issues/1220
Hi @grazy27, thanks for the investigation! Do you know how does DBR 15.3, 16.3 work with python? If we know how it works in Databracks, we can mimic that.
Our team focuses on internal teams, and we don't have a channel with Databracks.
@wudanzy , he mentioned, it works with 14.3 LTS only, any higher version gives that createPythonFunction error.
So DBR 15.3 and 16.3 will drop the python support?
I don't think, as dbruntime and dbutils is created with python under the hood. It just that they are changing the function definitions or replacing them with something hidden so there goals can be met. rest I will leave to experience developers as I only focus on platform or devops and some tinkering.
Hello, @wudanzy , I had opportunity to reach out to Databricks staff engineer, and he confirms there's an additional parameter in 'createPythonFunction' in Databricks's fork of Spark.
Argument is optional, but still breaks our implementation
This is the second issue related to the same bug. I've updated the description to reduce ambiguity
Hi @grazy27, thanks for looking for that! Looking at the strack trace, looks like the problem is in the constructor of SimplePythonFunction.
java.lang.NoSuchMethodError: 'void org.apache.spark.api.python.SimplePythonFunction.
I don't currently have access to Databricks runtimes :(