spark icon indicating copy to clipboard operation
spark copied to clipboard

[BUG]: Vector UDF doesn't work in databricks environment

Open grazy27 opened this issue 7 months ago • 11 comments

Vector UDF Incompatibility in Databricks Environment

Background

Dotnet.spark.worker, when parses binary stream, expects for a single large Arrow batch or data. However, for Databricks Runtime 14.3 (As i understand, for any linux env), behavior differs, as driver dataset divided by multiple batches, 10k rows in each. This format uses different binary format(It has more arguments) Due to differing binary representations, it misinterprets incoming byte streams—reading incorrect data, resulting in a hang with no response.

Reproduction

Create a Spark pipeline that:

  • Ingests a moderate-sized dataset.
  • Performs a GroupBy (ensure groups contain >10k rows).
  • Applies a vector UDF via GroupBy().Apply(...), operating on the grouped RecordBatch.

Actual results

The UDF execution never completes. The .NET Spark worker attempts to read more bytes than are available, blocking indefinitely while waiting for data that is never sent.

Expected result

UDF should execute and return successfully.

TBD: Either a single consolidated RecordBatch or an IEnumerable<RecordBatch> should be passed into the UDF.

Investigation Summary

Two issues prevent vector UDFs from functioning correctly in the Databricks environment.

1. Vector UDFs operate on a collection of RecordBatch instances, not a single batch

The spark.sql.execution.arrow.maxRecordsPerBatch setting (introduced in Spark 2.3.0, default: 10,000) is central to this behavior. Batch-based processing is advantageous for .NET for Spark, as it avoids the 2GB Arrow buffer limit encountered with GroupBy().Apply(...).

I implemented a PoC to validate performance for a specific use case. It successfully resolves the issue with the following changes:

  • The vector UDF now accepts IEnumerable<RecordBatch> instead of a single RecordBatch.
  • Additional parameters are passed but intentionally ignored. In the Python implementation, these enforce batch ordering; however, in .NET it mean we need to collect all batches before reordering and sending them to UDF, which is a memory-demanding task. In observed cases, batches arrived sequentially, so this functionality is omitted from PoC.

2. In DBR 15.3+, all UDFs fail to execute

This issue arises from a change in Databricks’ fork of Spark. The CreatePythonFunction API introduces an additional optional parameter that is not accounted for in the current implementation. As a result, UDF execution fails with the following error:

[Error] [JvmBridge] JVM method execution failed: Static method 'createPythonFunction' failed for class 'org.apache.spark.sql.api.dotnet.SQLUtils' when called with 7 arguments ([Index=1, Type=Byte[], Value=System.Byte[]], [Index=2, Type=Hashtable, Value=Microsoft.Spark.Interop.Internal.Java.Util.Hashtable], [Index=3, Type=ArrayList, Value=Microsoft.Spark.Interop.Internal.Java.Util.ArrayList], [Index=4, Type=String, Value=Microsoft.Spark.Worker], [Index=5, Type=String, Value=2.1.1.0], [Index=6, Type=ArrayList, Value=Microsoft.Spark.Interop.Internal.Java.Util.ArrayList], [Index=7, Type=null, Value=null])
[2024-09-13T10:47:53.1569404Z] [machine] [Error] [JvmBridge] java.lang.NoSuchMethodError: org.apache.spark.api.python.SimplePythonFunction.<init>(Lscala/collection/Seq;Ljava/util/Map;Ljava/util/List;Ljava/lang/String;Ljava/lang/String;Ljava/util/List;Lorg/apache/spark/api/python/PythonAccumulatorV2;)V
	at org.apache.spark.sql.api.dotnet.SQLUtils$.createPythonFunction(SQLUtils.scala:35)
	at org.apache.spark.sql.api.dotnet.SQLUtils.createPythonFunction(SQLUtils.scala)

References:

Notes:

  • CoGroupedMap UDFs are unaffected as they do not leverage batching (as of Spark 3.5).
  • Databricks runtime 15.3, 16.3 doesn't work with dotnet.spark entirely due to the second issue, so no testing is performed on it. Details in the linked PR
  • UseArrow can be disabled to enable backward-compatible format, that should work with current dotnet.spark implementation

grazy27 avatar May 17 '25 16:05 grazy27

We may need to allow different format, so user can make that depending on the runtime.

BTW, does Databricks runtime 16.3 support pyspark? I think spark.NET just mimic pyspark, so from spark's view point it looks like a pyspark job, except the entry point is different.

wudanzy avatar May 17 '25 23:05 wudanzy

@wudanzy Hello, PySpark remains a top-priority language for Databricks and will continue to be supported and extended.

From Spark's perspective, it interacts with PySpark via the SimpleFunction, which launches a specified executable with given arguments, initiating socket communication, etc.

However, in DBR 15.3, 16.3, this method is no longer accessible via the dotnet-spark JAR. It still works in DBR 14.3, even though both use Spark 3.5.

Even though it's still in the latest master and spark 3.5.x, there's an error on newer runtime.

Image

This suggests Databricks may have modified or replaced the implementation, or changed its signature.

[Error] [JvmBridge] JVM method execution failed: Static method 'createPythonFunction' failed for class 'org.apache.spark.sql.api.dotnet.SQLUtils' when called with 7 arguments ([Index=1, Type=Byte[], Value=System.Byte[]], [Index=2, Type=Hashtable, Value=Microsoft.Spark.Interop.Internal.Java.Util.Hashtable], [Index=3, Type=ArrayList, Value=Microsoft.Spark.Interop.Internal.Java.Util.ArrayList], [Index=4, Type=String, Value=Microsoft.Spark.Worker], [Index=5, Type=String, Value=2.1.1.0], [Index=6, Type=ArrayList, Value=Microsoft.Spark.Interop.Internal.Java.Util.ArrayList], [Index=7, Type=null, Value=null])
[2024-09-13T10:47:53.1569404Z] [machine] [Error] [JvmBridge] java.lang.NoSuchMethodError: org.apache.spark.api.python.SimplePythonFunction.<init>(Lscala/collection/Seq;Ljava/util/Map;Ljava/util/List;Ljava/lang/String;Ljava/lang/String;Ljava/util/List;Lorg/apache/spark/api/python/PythonAccumulatorV2;)V
	at org.apache.spark.sql.api.dotnet.SQLUtils$.createPythonFunction(SQLUtils.scala:35)
	at org.apache.spark.sql.api.dotnet.SQLUtils.createPythonFunction(SQLUtils.scala)

To confirm, we’d need to extract and inspect the Databricks-specific Spark JARs from the environment.

Given Databricks’ ongoing investment in PySpark, we should anticipate regular breaking changes. If this project is to continue actively, Microsoft and Databricks will likely need to align on source sharing to streamline development.

grazy27 avatar May 18 '25 10:05 grazy27

I am facing similar issue with 15.x and 16.x LTS releases as well. (incase someone needed that info)

using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Streaming;
using static Microsoft.Spark.Sql.Functions;
using System.Net.Http;
using System.Text.Json;
using System.Collections.Generic;
using System.Linq;

namespace HelloSpark
{
    // Define the response model for the cocktail API
    public class CocktailResponse
    {
        public List<Drink> drinks { get; set; } = new List<Drink>();
    }

    public class Drink
    {
        public string strDrink { get; set; } = string.Empty;
        public string strDrinkAlternate { get; set; } = string.Empty;
        public string strTags { get; set; } = string.Empty;
        public string strVideo { get; set; } = string.Empty;
        public string strCategory { get; set; } = string.Empty;
        public string strIBA { get; set; } = string.Empty;
        public string strAlcoholic { get; set; } = string.Empty;
        public string strGlass { get; set; } = string.Empty;
        public string strInstructions { get; set; } = string.Empty;
        public string strDrinkThumb { get; set; } = string.Empty;
    }

    class Program
    {
        static void Main(string[] args)
        {
            var spark = SparkSession.Builder().AppName("HelloSpark").GetOrCreate();

            // Fetch JSON data from a web API
            var httpClient = new HttpClient();
            var response = httpClient.GetStringAsync("https://www.thecocktaildb.com/api/json/v1/1/search.php?s=margarita").Result;

            // Deserialize the JSON data into a C# object
            var cocktailData = JsonSerializer.Deserialize<CocktailResponse>(response);
            
            // Check if data was successfully deserialized and drinks list exists
            if (cocktailData?.drinks == null)
            {
                Console.WriteLine("No cocktail data found or failed to deserialize.");
                return;
            }
            
            // Create a DataFrame from the cocktail data using a simpler approach
            // First, let's create a temporary view from the data
            var drinkNames = cocktailData.drinks.Select(drink => drink.strDrink).ToArray();
            
            // Create a simple DataFrame using Spark SQL with VALUES clause
            var valuesList = string.Join(", ", drinkNames.Select(name => $"('{name.Replace("'", "''")}')"));
            var dataFrame = spark.Sql($"SELECT DrinkName FROM VALUES {valuesList} AS t(DrinkName)");

            // Display the DataFrame
            dataFrame.Show();

            Console.WriteLine("\n=== APPROACH 1: Using UDF (Runs on Executors) ===");
            // Register a UDF that prints and returns the value
            var printAndReturnUdf = Udf<string, string>((drinkName) =>
            {
                Console.WriteLine($"[Executor UDF] Processing drink: {drinkName}");
                return drinkName; // Return the same value
            });
            
            // Apply the UDF to the DataFrame
            var processedDF = dataFrame.Select(printAndReturnUdf(dataFrame["DrinkName"]).As("DrinkName"));
            
            // Trigger execution by calling an action
            var count = processedDF.Count(); // This forces execution of the UDF on executors
            Console.WriteLine($"[Driver] Processed {count} drinks on executors");

            Console.WriteLine("\n=== APPROACH 2: Using Another UDF (Runs on Executors) ===");
            // Another UDF example
            var logDrinkUdf = Udf<string, bool>((drinkName) =>
            {
                Console.WriteLine($"[Executor Transform] Logging drink: {drinkName}");
                return true; // Return a boolean to indicate processing
            });
            
            // Apply and force execution
            dataFrame.Select(dataFrame["DrinkName"], logDrinkUdf(dataFrame["DrinkName"]).As("processed")).Count();

            Console.WriteLine("\n=== APPROACH 3: Driver Collection (for comparison) ===");
            // Also collect data and iterate through drinks on driver for comparison
            var collectedRows = dataFrame.Collect();
            foreach (var row in collectedRows)
            {
                Console.WriteLine($"[Driver] Drink: {row.GetAs<string>("DrinkName")}");
            }
        }
    }
}

davinder-veeam avatar Sep 04 '25 09:09 davinder-veeam

similar issue: https://github.com/dotnet/spark/issues/1220

davinder-veeam avatar Sep 08 '25 05:09 davinder-veeam

Hi @grazy27, thanks for the investigation! Do you know how does DBR 15.3, 16.3 work with python? If we know how it works in Databracks, we can mimic that.

Our team focuses on internal teams, and we don't have a channel with Databracks.

wudanzy avatar Sep 08 '25 06:09 wudanzy

@wudanzy , he mentioned, it works with 14.3 LTS only, any higher version gives that createPythonFunction error.

davinder-veeam avatar Sep 08 '25 07:09 davinder-veeam

So DBR 15.3 and 16.3 will drop the python support?

wudanzy avatar Sep 08 '25 07:09 wudanzy

I don't think, as dbruntime and dbutils is created with python under the hood. It just that they are changing the function definitions or replacing them with something hidden so there goals can be met. rest I will leave to experience developers as I only focus on platform or devops and some tinkering.

davinder-veeam avatar Sep 08 '25 07:09 davinder-veeam

Hello, @wudanzy , I had opportunity to reach out to Databricks staff engineer, and he confirms there's an additional parameter in 'createPythonFunction' in Databricks's fork of Spark.

Argument is optional, but still breaks our implementation

This is the second issue related to the same bug. I've updated the description to reduce ambiguity

grazy27 avatar Sep 09 '25 07:09 grazy27

Hi @grazy27, thanks for looking for that! Looking at the strack trace, looks like the problem is in the constructor of SimplePythonFunction.

java.lang.NoSuchMethodError: 'void org.apache.spark.api.python.SimplePythonFunction.(scala.collection.Seq, java.util.Map, java.util.List, java.lang.String, java.lang.String, java.util.List, org.apache.spark.api.python.PythonAccumulatorV2)' at org.apache.spark.sql.api.dotnet.SQLUtils$.createPythonFunction(SQLUtils.scala:35) ... at Microsoft.Spark.Utils.UdfUtils.CreatePythonFunction(...) at Microsoft.Spark.Sql.Streaming.DataStreamWriter.Foreach(IForeachWriter writer)

wudanzy avatar Sep 14 '25 12:09 wudanzy

I don't currently have access to Databricks runtimes :(

wudanzy avatar Sep 14 '25 12:09 wudanzy