scalding
scalding copied to clipboard
Make projected parquet collection schema forward compatible with the given file schema
- Problem: projection schema is created by ThriftSchemaConvertVisitor which suffix fields with
_tuple
https://github.com/twitter/scalding/blob/59d932731f7396eaaf6024624d1ce6660534ca77/scalding-parquet-scrooge/src/main/java/com/twitter/scalding/parquet/scrooge/ScroogeReadSupport.java#L98-L101 - When reading parquet file with standard format 3-level or even legacy ones 2-level
array
, projection fails because it has name_tuple
which is incompatible. For example, given file schemas to representList<String> country_codes
message compliant_schema {
required group country_codes (LIST) {
repeated binary array (UTF8);
}
required int32 to_be_removed;
}
will fail with the generated projection
message thrift_generated {
optional group country_codes (LIST) {
repeated binary country_codes_tuple (UTF8);
}
}
-
This PR introduces
ParquetListFormatForwardCompatibility
to convert thrift-generated schema to compliant ones. The difficulties here are:- we can not simply just traverse the two graphs and swap node one-for-one. The node has to be from the projection, because it has subset of fields and different optional/required info.
- there are at least 4 variations of compliant parquet types, and we want to convert from non-compliant to compliant, and among the compliant.
-
The strategy here is to implement different rules for each format. The rule tell us how to decompose the list schema, and to reconstruct it again.
- For example, we match projected schema with Rule 1, and match file schema with Rule 2.
- Rule 1 will decompose the projected schema, and Rule 2 will take that information to construct in light of the file schema.
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you all sign our Contributor License Agreement before we can accept your contribution.
1 out of 2 committers have signed the CLA.
:white_check_mark: mickjermsurawong-stripe
:x: joshrosen-stripe
You have signed the CLA already but the status is still pending? Let us recheck it.
cc @moulimukherjee
Hi @johnynek, We ran into a problem where Scalding job fails reading parequet files produced by another Spark job. The problem we discovered is due to projected schema which is generated from Thrift class has different list format than our Spark job.
Just would like to ask for a quick look whether this is the right strategy, and if this is something that we would like to push it to the public one as well.
Also, you need to bump the .travis.yml
jdk setting to openjdk8
since oraclejdk8
I think no longer works on travis.
Thanks @johnynek. Ah I haven't done any tests internally on actual data. Just purely unit tests now. Will follow up with that.
I can port all these to Scala, and will make the resolver stateless; yup the object instantiation isn't neccessary.
hi @isnotinvain or @julienledem, I've addressed the first round of general feedback, and added support for map schema compat as well. Would appreciate your review here.
hi @isnotinvain thank you for the note.
Will go through comments by my colleagues and improve docs as you suggested.
The test class is in ParquetCollectionFormatForwardCompatibilityTests.scala
not sure if that's what you are looking for, but that demonstrates the format conversion here.
@joshrosen-stripe, @tchow-stripe Thanks for the really helpful review. I've simplified many parts of the code, and addressed the PR feedback. Pending issues are figuring out to do end-to-end testing here. @isnotinvain this is ready for another round of review.
Thanks @xton, addressed the feedback. @isnotinvain could you help to take a look on this please?
hi @isnotinvain, would like to follow-up on this if you have more feedback here please.
Hi @isnotinvain, another bump on this please.
Updates from our side: we actually have implemented this in our internally (via subclassing of ParquetScroogeScheme
), and it does cover production use cases of read Spark job schema.
- list having 3 level, and 2-level in legacy mode, compared to
_tuple
suffixed via generated schema from thrift class. - map having repeated group named
key_value
instead ofmap
This will also cover more conversion types as outlined inParquetMapFormatter
andParquetListFormatter
Without this patch, scalding job reading Spark job output can either fail hard or falsely read empty collection.