hudi
hudi copied to clipboard
[HUDI-8126] Support proto messages for spark kryo serializer excluding DynamicMessages
Change Logs
Persisting a proto RDD<Message> is failing because KyroSerializer in Spark doesn't have handling for proto messages and failing with errors like this.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1) (vinish-ka-mbp executor driver): com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
dependencies (com.google.protobuf.Descriptors$DescriptorPool)
pool (com.google.protobuf.Descriptors$FileDescriptor)
file (com.google.protobuf.Descriptors$FieldDescriptor)
fields (com.google.protobuf.Descriptors$Descriptor)
containingType (com.google.protobuf.Descriptors$Descriptor)
descriptor (com.google.protobuf.MapEntry$Metadata)
metadata (com.google.protobuf.MapEntry)
defaultEntry (com.google.protobuf.MapField$ImmutableMessageConverter)
converter (com.google.protobuf.MapField)
mapMessage_ (org.apache.hudi.utilities.test.proto.Sample)
Spark already uses the scala kryo serializers provided by https://github.com/twitter/chill, this PR includes the proto one as well.
Added another class ProtobufDynamicMessageSerializer
to handle dynamic messages as the default available in chill library uses parseFrom(byte[]..)
method available in compiled proto messages. DynamicMessage doesn't have this method, and we need to handle this serialization through descriptors.
Impact
No impact on existing functionality, improving HoodieSparkKryoRegistrar
to handle proto messages as well.
Risk level (write none, low medium or high below)
Medium.
Documentation Update
None
Contributor's checklist
- [x] Read through contributor's guide
- [x] Change Logs and Impact were stated clearly
- [x] Adequate tests were added if applicable
- [x] CI passed