scalding icon indicating copy to clipboard operation
scalding copied to clipboard

Make projected parquet collection schema forward compatible with the given file schema

Open mickjermsurawong-stripe opened this issue 5 years ago • 11 comments

  • Problem: projection schema is created by ThriftSchemaConvertVisitor which suffix fields with _tuple https://github.com/twitter/scalding/blob/59d932731f7396eaaf6024624d1ce6660534ca77/scalding-parquet-scrooge/src/main/java/com/twitter/scalding/parquet/scrooge/ScroogeReadSupport.java#L98-L101
  • When reading parquet file with standard format 3-level or even legacy ones 2-level array, projection fails because it has name _tuple which is incompatible. For example, given file schemas to represent List<String> country_codes
message compliant_schema {
  required group country_codes (LIST) {
    repeated binary array (UTF8);
  }
  required int32 to_be_removed;
}

will fail with the generated projection

message thrift_generated {
  optional group country_codes (LIST) {
    repeated binary country_codes_tuple (UTF8);
  }
}
  • This PR introduces ParquetListFormatForwardCompatibility to convert thrift-generated schema to compliant ones. The difficulties here are:

    • we can not simply just traverse the two graphs and swap node one-for-one. The node has to be from the projection, because it has subset of fields and different optional/required info.
    • there are at least 4 variations of compliant parquet types, and we want to convert from non-compliant to compliant, and among the compliant.
  • The strategy here is to implement different rules for each format. The rule tell us how to decompose the list schema, and to reconstruct it again.

    • For example, we match projected schema with Rule 1, and match file schema with Rule 2.
    • Rule 1 will decompose the projected schema, and Rule 2 will take that information to construct in light of the file schema.

mickjermsurawong-stripe avatar Sep 27 '19 22:09 mickjermsurawong-stripe

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you all sign our Contributor License Agreement before we can accept your contribution.
1 out of 2 committers have signed the CLA.

:white_check_mark: mickjermsurawong-stripe
:x: joshrosen-stripe
You have signed the CLA already but the status is still pending? Let us recheck it.

CLAassistant avatar Sep 27 '19 22:09 CLAassistant

cc @moulimukherjee

mickjermsurawong-stripe avatar Sep 27 '19 22:09 mickjermsurawong-stripe

Hi @johnynek, We ran into a problem where Scalding job fails reading parequet files produced by another Spark job. The problem we discovered is due to projected schema which is generated from Thrift class has different list format than our Spark job.

Just would like to ask for a quick look whether this is the right strategy, and if this is something that we would like to push it to the public one as well.

mickjermsurawong-stripe avatar Sep 27 '19 23:09 mickjermsurawong-stripe

Also, you need to bump the .travis.yml jdk setting to openjdk8 since oraclejdk8 I think no longer works on travis.

johnynek avatar Sep 28 '19 00:09 johnynek

Thanks @johnynek. Ah I haven't done any tests internally on actual data. Just purely unit tests now. Will follow up with that.

I can port all these to Scala, and will make the resolver stateless; yup the object instantiation isn't neccessary.

mickjermsurawong-stripe avatar Sep 28 '19 00:09 mickjermsurawong-stripe

hi @isnotinvain or @julienledem, I've addressed the first round of general feedback, and added support for map schema compat as well. Would appreciate your review here.

mickjermsurawong-stripe avatar Sep 30 '19 18:09 mickjermsurawong-stripe

hi @isnotinvain thank you for the note. Will go through comments by my colleagues and improve docs as you suggested. The test class is in ParquetCollectionFormatForwardCompatibilityTests.scala not sure if that's what you are looking for, but that demonstrates the format conversion here.

mickjermsurawong-stripe avatar Sep 30 '19 23:09 mickjermsurawong-stripe

@joshrosen-stripe, @tchow-stripe Thanks for the really helpful review. I've simplified many parts of the code, and addressed the PR feedback. Pending issues are figuring out to do end-to-end testing here. @isnotinvain this is ready for another round of review.

mickjermsurawong-stripe avatar Oct 01 '19 20:10 mickjermsurawong-stripe

Thanks @xton, addressed the feedback. @isnotinvain could you help to take a look on this please?

mickjermsurawong-stripe avatar Oct 04 '19 18:10 mickjermsurawong-stripe

hi @isnotinvain, would like to follow-up on this if you have more feedback here please.

mickjermsurawong-stripe avatar Oct 08 '19 15:10 mickjermsurawong-stripe

Hi @isnotinvain, another bump on this please.

Updates from our side: we actually have implemented this in our internally (via subclassing of ParquetScroogeScheme), and it does cover production use cases of read Spark job schema.

  • list having 3 level, and 2-level in legacy mode, compared to _tuple suffixed via generated schema from thrift class.
  • map having repeated group named key_value instead of map This will also cover more conversion types as outlined in ParquetMapFormatter and ParquetListFormatter

Without this patch, scalding job reading Spark job output can either fail hard or falsely read empty collection.

mickjermsurawong-stripe avatar Oct 16 '19 19:10 mickjermsurawong-stripe