datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

Refactor protobuf ser/de traits

Open adriangb opened this issue 1 month ago • 18 comments

Relevant issues

Closes #18477

Motivation

The current protobuf serialization/deserialization system in DataFusion has limitations when users need to customize or intercept the serialization process. Specifically:

  1. Limited interception points: The existing PhysicalExtensionCodec trait only provides try_encode/try_decode for custom extension nodes and try_encode_expr/try_decode_expr for custom expressions. There is no way to intercept the serialization/deserialization of every node in a plan tree.

  2. No pre/post-processing hooks: Users cannot run custom logic before or after the default serialization/deserialization logic. This is critical for use cases like:

  • Caching deserialized expressions to avoid redundant work
  • Preserving custom metadata that isn't part of the protobuf schema (e.g., PhysicalExprAdapter factories)
  • Wrapping standard nodes with additional context during serialization
  • Injecting custom state during deserialization
  1. Inflexible for advanced use cases: Distributed query systems, FFI boundaries, and custom execution engines often need fine-grained control over the serialization process that the current trait object-based design doesn't provide.

This was discussed in detail in #18477, where several use cases were identified that would benefit from more flexible serialization hooks.

Changes Made

This PR refactors the PhysicalExtensionCodec trait and related serialization infrastructure to enable interception at every node in the plan tree:

1. Generics Instead of Trait Objects

Changed all codec parameters from &dyn PhysicalExtensionCodec to generic &C where C: PhysicalExtensionCodec + ?Sized. This is primarily because of limitations on recursive calls with unsized trait objects; happy to hear alternatives though. It should mostly be backawards compatible and may have a performance benefit (I imagine most users only have 1-2 concrete implementations of the codec traits).

Updated traits and functions:

  • AsExecutionPlan::try_into_physical_plan and try_from_physical_plan
  • All helper functions in from_proto.rs and to_proto.rs (e.g., parse_physical_expr, serialize_physical_expr, parse_physical_sort_expr, etc.)

2. New Interception Methods on PhysicalExtensionCodec

Added four new methods with default implementations that route through the codec:

fn deserialize_physical_plan(&self, proto: &PhysicalPlanNode, ctx: &TaskContext)
   -> Result<Arc<dyn ExecutionPlan>>

fn serialize_physical_plan(&self, plan: Arc<dyn ExecutionPlan>)
   -> Result<PhysicalPlanNode>

fn deserialize_physical_expr(&self, proto: &PhysicalExprNode, ctx: &TaskContext,
   input_schema: &Schema) -> Result<Arc<dyn PhysicalExpr>>

fn serialize_physical_expr(&self, expr: Arc<dyn PhysicalExpr>)
   -> Result<PhysicalExprNode>

These methods:

  • Have default implementations that call the corresponding default_* functions
  • Enable users to intercept every plan and expression node during serialization/deserialization
  • Support the decorator pattern: run custom logic, call the default implementation, then post-process

3. New Public default_* Functions

Extracted the actual serialization/deserialization logic into public functions:

  • default_deserialize_physical_plan
  • default_serialize_physical_plan
  • default_parse_physical_expr (renamed from parse_physical_expr)

These functions:

  • Contain the core serialization logic
  • Can be called from custom codec implementations
  • Enable the decorator pattern without code duplication

4. Comprehensive Example

Added adapter_serialization.rs example demonstrating:

  • How to intercept serialization to detect plans with custom PhysicalExprAdapter factories
  • Wrapping standard plans as extension nodes with nested JSON metadata
  • Restoring custom state during deserialization
  • Using the decorator pattern with the new interception methods

The example showcases a real-world use case: preserving FileScanConfig::expr_adapter_factory across serialization boundaries, which is not serialized by default but is critical for maintaining filter pushdown behavior.

Public API Breaking Changes

See the "Upgrading Guide" section in docs/source/library-user-guide/upgrading.md for detailed migration instructions. Summary of breaking changes:

1. AsExecutionPlan trait methods now use generics

Before:

fn try_into_physical_plan(
   &self,
   ctx: &TaskContext,
   extension_codec: &dyn PhysicalExtensionCodec,
) -> Result<Arc<dyn ExecutionPlan>>

After:

fn try_into_physical_plan<C: PhysicalExtensionCodec + ?Sized>(
   &self,
   ctx: &TaskContext,
   extension_codec: &C,
) -> Result<Arc<dyn ExecutionPlan>>

Who is affected: Anyone implementing AsExecutionPlan on custom types

Migration: Add generic parameter <C: PhysicalExtensionCodec + ?Sized> to method signatures

2. Helper functions now use generics

Functions like parse_physical_expr, parse_physical_sort_expr, serialize_physical_expr, etc. now take generic &C instead of &dyn PhysicalExtensionCodec.

Who is affected: Code calling these helper functions directly

Migration: Most code will continue to work without changes since &dyn Trait satisfies T: Trait + ?Sized. In rare cases where type inference fails, you may need to add type annotations.

3. New methods on PhysicalExtensionCodec

Four new methods added with default implementations:

  • deserialize_physical_plan
  • serialize_physical_plan
  • deserialize_physical_expr
  • serialize_physical_expr

Who is affected: No one for basic usage, since these have default implementations

Opportunity: Users can now override these methods to intercept serialization at every node

Extending PhysicalExtensionCodec vs. new traits

As proposed by @milenkovicm in the weekly meeting when we discussed this I re-used PhysicalExtensionCodec instead of adding new traits. This resulted in considerably less code churn.

Use of AI

Please note that after initial design work a lot of this code was automated by AI which was very nice because there was a lot of mechanical updating of dyn trait to generics.

adriangb avatar Dec 09 '25 12:12 adriangb

cc @timsaucer

adriangb avatar Dec 09 '25 16:12 adriangb

will have a better look later, on the quick look I think it makes sense. One thing i'm not sure is use of generics, it might be better to leave it as &dyn (especially with Tims ffi work). I don't think this is on critical path so it should not impact performance (a lot)

Would it be possible to keep it as dyn @adriangb ?

milenkovicm avatar Dec 09 '25 16:12 milenkovicm

  • https://github.com/milenkovicm/datafusion-ballista/blob/26fcbf66233de2c6d12545277242e16b4ede386a/ballista/scheduler/src/config.rs#L217-L218 I'm not sure if it could be changed.

also with @timsaucer FFI work i think it would make sense to keep it as dyn

milenkovicm avatar Dec 09 '25 16:12 milenkovicm

will have a better look later, on the quick look I think it makes sense. One thing i'm not sure is use of generics, it might be better to leave it as &dyn (especially with Tims ffi work). I don't think this is on critical path so it should not impact performance (a lot)

Would it be possible to keep it as dyn @adriangb ?

Under the current design it's not, here's an example demonstrating the problem: https://play.rust-lang.org/?version=stable&mode=debug&edition=2024&gist=636bc71d4c8c5e8172e41bddef3b1160.

Essentially with dynamic dispatch the trait cannot participate in the code flow, it can only handle being a "fallback" as it is now / cannot call back into tree recursion. I haven't found a way around that (other than using generics as I do here)

adriangb avatar Dec 09 '25 17:12 adriangb

I think we can keep dyn if we don't provide default implementations. But then this becomes a bit bigger of a breaking change for users since they are forced to implement all of the methods (even if they are just delegating to a default implementation and it's just 1LOC per method).

adriangb avatar Dec 09 '25 17:12 adriangb

#19079 I'd say it requires dyn if we want to support FFI implementations

milenkovicm avatar Dec 09 '25 18:12 milenkovicm

would it be possible to split the interface? keep existing one as is and wrap it up with "CorePhysicalCodec" which would have those few methods you need ?

milenkovicm avatar Dec 09 '25 18:12 milenkovicm

If we need dyn for FFI I agree the current approach is going to be non-viable.

adriangb avatar Dec 09 '25 18:12 adriangb

would it be possible to split the interface? keep existing one as is and wrap it up with "CorePhysicalCodec" which would have those few methods you need ?

I'm not sure I follow, could you elaborate how that would work/what the advantage will be?

My thinking is if we're going to keep dyn we should just add the methods and force implementers to delegate to the defaults manually.

adriangb avatar Dec 09 '25 19:12 adriangb

I'm on travel all week, but I'll try to take a look as soon as I can.

timsaucer avatar Dec 09 '25 19:12 timsaucer

I'm not sure, you stress me out with hard questions 😀

EDIT: apologise, i've missed things

milenkovicm avatar Dec 09 '25 20:12 milenkovicm

Would intercept_[encode|decode]_[plan|expr] make trait name a bit more self explanatory?

milenkovicm avatar Dec 10 '25 09:12 milenkovicm

maybe @lewiszlw would be interested in the change

milenkovicm avatar Dec 10 '25 09:12 milenkovicm

Thanks for mentioning me @milenkovicm . The refactor looks more clear in first look. Looks like PhysicalExtensionCodec should be called PhysicalPlanCodec.

lewiszlw avatar Dec 10 '25 09:12 lewiszlw

When I was testing this with the FFI work previously I found switching from &dyn PhysicalExtensionCodec to using generics on the functions was a blocker for the FFI work. However I moved away from using the physical codec in my work and I only use the logical codec. As long as we don't make the same changes in the logical codec, it shouldn't be a blocker for our existing work. I could see a potential for someone who wants to use the FFI_PhysicalExtensionCodec because it doesn't know the concrete type.

It feels not ideal to then have two different code structures for the logical and physical codecs.

Do you think there's a real performance enhancement to gain by using generics over keeping it with passing &dyn? Is it really about how many methods people need to implement?

timsaucer avatar Dec 10 '25 09:12 timsaucer

The PhysicalExtensionCodec remains unchanged except for added methods for which no user action is required. It follows the same approach as the LogicaExtensionCodec. The public-facing methods still accept &dyn PhysicalExtensionCodec. I'm not sure if there will be huge performance implication of the change and if that even matters.

Maybe as a follow up we should consider value of exposing AsExecutionPlan as public interface or we just keep from|to_bytes methos

I need to verify one other bit if it is compatible with the ballista, (which I believe it does), but over all I do believe its solid proposal

milenkovicm avatar Dec 10 '25 12:12 milenkovicm

Sorry, I should be more clear. We use serialize_physical_sort_exprs which looks like it is included in the change here. It isn't a problem in the current main because that only has DefaultPhysicalExtensionCodec. If we make similar changes on the logical side as are proposed here or the physical side it will be a problem since we do not have concrete types as soon as we switch over to our FFI_LogicalExtensionCodec.

Sorry for the brevity, trying to touch bases while at a work retreat.

timsaucer avatar Dec 10 '25 12:12 timsaucer

Here's the alternative approach: https://github.com/apache/datafusion/pull/19267

adriangb avatar Dec 10 '25 21:12 adriangb