Refactor protobuf ser/de traits to allow decorator pattern
Refactor PhysicalExtensionCodec with &dyn dispatch (Alternative to #19234)
Which issue does this PR close?
Closes #18477
Related to #19234 - this PR provides an alternative implementation approach.
Rationale for this change
DataFusion's protobuf serialization system currently has limited interception points. The existing PhysicalExtensionCodec trait only provides hooks for custom extensions (try_decode/try_encode) and unknown expressions (try_decode_expr/try_encode_expr), but users cannot intercept serialization of all plan and expression nodes.
This limitation prevents important use cases:
- Caching: Reusing previously deserialized expressions to avoid redundant parsing
- Transformation: Modifying nodes during serialization/deserialization
- Metadata injection: Preserving custom state not captured in the protobuf schema
- Decorator patterns: Wrapping standard serialization with custom pre/post-processing
PR #19234 addresses this by switching from &dyn PhysicalExtensionCodec to generics (&C where C: PhysicalExtensionCodec + ?Sized). However, https://github.com/apache/datafusion/pull/19234#issuecomment-2869946159 raised concerns about FFI compatibility, since distributed systems and FFI boundaries often require dynamic dispatch.
What changes are included in this PR?
This PR takes an alternative approach: keep &dyn dispatch but add 4 new required methods that intercept every plan/expression node during serialization:
pub trait PhysicalExtensionCodec: Debug + Send + Sync {
// ... existing methods unchanged ...
fn deserialize_physical_plan(&self, proto: &PhysicalPlanNode, ctx: &TaskContext)
-> Result<Arc<dyn ExecutionPlan>>;
fn serialize_physical_plan(&self, plan: Arc<dyn ExecutionPlan>)
-> Result<PhysicalPlanNode>;
fn deserialize_physical_expr(&self, proto: &PhysicalExprNode, ctx: &TaskContext, input_schema: &Schema)
-> Result<Arc<dyn PhysicalExpr>>;
fn serialize_physical_expr(&self, expr: &Arc<dyn PhysicalExpr>)
-> Result<PhysicalExprNode>;
}
Key implementation details:
- All recursive calls in from_proto.rs and to_proto.rs now go through the codec methods
- Public default_* helper functions allow implementations to delegate to standard behavior
- No default implementations - users must explicitly implement all methods (with documented examples)
Comparison with PR #19234
| Aspect | PR #19234 | This PR |
|---|---|---|
| Dispatch | Generic &C where C: PhysicalExtensionCodec + ?Sized | Keep &dyn PhysicalExtensionCodec |
| Default implementations | Yes - methods have defaults calling helpers | No - all methods required |
| FFI compatibility | Requires concrete types at boundaries | Fully compatible with dynamic dispatch |
| Migration effort | Minimal (defaults handle it) | Explicit (must add 4 methods, but with simple delegation) |
| Monomorphization | Yes - separate code paths per codec type | No - single code path via vtable |
| Compile time impact | Potentially increased | None |
Pros of this approach:
- Maintains FFI compatibility for distributed systems using dynamic codec resolution
- No monomorphization overhead - single code path for all codec implementations
- Explicit is better than implicit - users acknowledge the new API surface
- Preserves existing &dyn patterns used throughout the codebase
Cons of this approach:
- Breaking change requires updating all PhysicalExtensionCodec implementations
- Users must add 4 method implementations (though simple delegation to default_* helpers works)
Files changed
- datafusion/proto/src/physical_plan/mod.rs - Added 4 new trait methods + helper functions
- datafusion/proto/src/physical_plan/from_proto.rs - Recursive calls go through codec
- datafusion/proto/src/physical_plan/to_proto.rs - Recursive calls go through codec
- datafusion/ffi/src/proto/physical_extension_codec.rs - Updated FFI codec implementations
- datafusion-examples/examples/proto/composed_extension_codec.rs - Updated example codecs
- datafusion/proto/tests/cases/roundtrip_physical_plan.rs - Updated test codecs
- docs/source/library-user-guide/upgrading.md - Added migration guide
Migration example
For existing implementations that don't need custom interception:
use datafusion_proto::physical_plan::{
default_deserialize_physical_plan, default_serialize_physical_plan,
default_deserialize_physical_expr, default_serialize_physical_expr,
};
impl PhysicalExtensionCodec for MyCodec {
// ... existing methods ...
fn deserialize_physical_plan(&self, proto: &PhysicalPlanNode, ctx: &TaskContext)
-> Result<Arc<dyn ExecutionPlan>> {
default_deserialize_physical_plan(proto, ctx, self)
}
fn serialize_physical_plan(&self, plan: Arc<dyn ExecutionPlan>)
-> Result<PhysicalPlanNode> {
default_serialize_physical_plan(plan, self)
}
fn deserialize_physical_expr(&self, proto: &PhysicalExprNode, ctx: &TaskContext, input_schema: &Schema)
-> Result<Arc<dyn PhysicalExpr>> {
default_deserialize_physical_expr(proto, ctx, input_schema, self)
}
fn serialize_physical_expr(&self, expr: &Arc<dyn PhysicalExpr>)
-> Result<PhysicalExprNode> {
default_serialize_physical_expr(expr, self)
}
}
I find the requirement to implement those four methods a bit confusing and the increasing API complexity unnecessary, ("we have defaults for you but we force you to implement it yourself"). No perfect solutions apparently
anyway, lets give @timsaucer chance to have a look
I find the requirement to implement those four methods a bit confusing and the increasing API complexity unnecessary, ("we have defaults for you but we force you to implement it yourself"). No perfect solutions apparently
Yes agreed it's unfortunate but I don't see any way around it. It's just a limitation of Rust.
@adriangb do you need before and after interception points for encode and decode or you could work with before_encode_* and after_decode_*? Not sure if that would do what you intent to do and remove need for recursive calls in ExtensionCodec
I think it’s useful to be able to control if the default serializer is called or not. For example in the case of caching Arc’ed stuff you want to check the cache, delegate to the default, and then cache after.
I also don’t think that would help the recursiveness: you’d still have to in the default implementation pass a reference to &self where self is unsized to &dyn Trait.
Would it help if we provided a macro to implement the defaults?
I agree with you it's hard problem to crack. I'm not sure about macros, maybe. If we need to explain how to use the API, maybe we overcomplicate it.
Would [before|after]_[encode|decode]_[plan|expr] where before can return either proceed or stop result which would then trigger default method recurssion? They could have a default implementation, plus users who do not need such functionality will never need to implement it. We would have like eight new methods, which increase API complexity but not necessarily by too much as they will have a default implementation.
or if [encode|decode]_[plan|expr] return some kind of "aspect" instance which would go to recursion rather than codec itself would help
I'm just throwing ideas, trying to help, will have a look tomorrow
Maybe something similar to the TreeNode API would work?
just rough idea,
this goes to encoder:
fn before_encode_expr(
&self,
_node: &Arc<dyn PhysicalExpr>,
) -> Result<Option<protobuf::PhysicalExprNode>> {
Ok(None)
}
I don't like use of protobuf::PhysicalExprNode there as method is not similar to others in the traint
this part is to_proto:
pub fn serialize_physical_expr(
value: &Arc<dyn PhysicalExpr>,
codec: &dyn PhysicalExtensionCodec,
) -> Result<protobuf::PhysicalExprNode> {
match codec.before_encode_expr(value)? {
Some(p) => Ok(p),
None => serialize_physical_expr(value, codec),
}
}
maybe tree node like approach may make sense as well
This feels like it's more complicated than it needs to be. But I also must be missing something because I still don't see what it was that my original proposal in https://github.com/apache/datafusion/pull/18813 didn't meet the needs. I think you have some use cases and context in mind @adriangb that I clearly don't understand.
Regarding this PR, do you also need to cover the other functions in the codec? We now have two methods we need for physical plans, try_decode and deserialize_physical_plan. The first seems very codec-y to me. Given a bunch of bytes, produce a plan. It doesn't even necessarily need to use protobuf (up to the implementer). The second seems to be more about hooking into the process of using the codecs + non-codecs.
I think the problem we're running into with not having an elegant way of hooking these together is demonstrating there's a friction point between what the codec is for and what the protobuf processing part is for. I'm sure I'm biased towards my initial implementation approach, but I still feel that's a cleaner way of doing it.
@timsaucer thank you for taking a look at this. I have my two key use cases as examples in this PR. When I first looked at your change it seemed like it would not be able to implement these examples, but I can give it another try to see if they can be made to work with your proposal.