pandera icon indicating copy to clipboard operation
pandera copied to clipboard

[PySpark] Performance issues during validation

Open filipeo2-mck opened this issue 1 year ago • 5 comments

Describe the bug

Symptom

I've seen a huge increase in processing time when doing schema and data validation using Pandera, in some PySpark dataframes that have a complex processing DAG. In my scenario, processing some dataframes that were taking around 7~10 minutes to do schema validation only using Pydantic, now it is taking 1h20m to finish when using Pandera data+validation (I know, not completely fair comparison but the processing time is much longer, which may denote some performance issue). Also, the number of Spark stages being executed increased by a lot when comparing both approaches.

Small fix

One of the issues is that the check for null values was being done for every column, disregarding if the nullable=True was set or not. The fix for this case is the PR #1403 and, for the 92 Fields I have in my scenario, skipping the null checks for the 42 of them (which had the nullable=True property set) decreased the processing time by half: 41m. Better but it still is not the ideal.

Cause

After digging into the above issue, I realized that the pandera.pyspark schema and column validations work based on "create the failing condition" and "count() the results from this filtered condition" basis, which is a natural way of doing it and shouldn't be changed. The problem is that each of these count() actions triggers a new Spark job, which naturally makes the Spark engine do some degree of reprocessing for each of them: image

Proposed solutions, open for discussion

If the dataframe where lots of Spark actions happens is cached, all these Pandera validations will make use of the cached dataframe and avoid recomputing the DAG and reprocessing it every time. From an implementation POV, both would need a new Python context manager to ensure the ending cleaning steps and they would need an additional kwarg to be passed to both DataFrameModel and DataFrameSchema's .validate() functions.:

pandera_schema.validate(my_df[, allow_caching=True][, allow_checkpointing=True])
Proposal 1 - Make use of .cache() in dataframe with its existing state, before applying any Pandera validation and .unpersist() it afterwards.

Benefits:

  • keeps the spark DAG for future reprocessing, in case of node failures
  • caches are cleaned when Spark’s block manager dies and we don’t need to keep control/clean any directory (like the 2nd proposal)

Risks:

  • uses worker’s limited memory for caching, which is not advisable

I tried a dirty implementation for this and the processing time went from 41m to 17m: image

Proposal 2 - Make use of .checkpoint() for the dataframe, before applying any Pandera validation and clean this directory after.

Benefits:

  • virtually unlimited disk storage available, much higher than the available memory in most scenarios.

Risks:

  • does not keep the spark dag for future reprocessing
  • it is not naturally swapped out from disk when spark’s block manager dies, it need to be cleaned to avoid disk space leakage. We should inform the user about which directory is being used (the temp folder)
  • not as performant as Proposal 1

Note: I believe that the two first risks are dangerous together, if the Spark DAG is not being kept for reprocessing and we are cleaning the checkpointed state in the temp folder after our validation ⚠️

I didn't try this implementation.

Proposal 3 - Union of 1 and 2

Enabling both and let the user choose between both of them?

Expected behavior

The validation should be much faster.


  • [x] I have checked that this issue has not already been reported.
  • [x] I have confirmed this bug exists on the latest version of pandera.
  • [x] (optional) I have confirmed this bug exists on the master branch of pandera.

Desktop (please complete the following information):

  • OS: MacOS Ventura 13.6.1
  • Browser: Safari
  • Version: 0.17.2+, pointing directly to main branch

filipeo2-mck avatar Nov 06 '23 18:11 filipeo2-mck

@NeerajMalhotra-QB

filipeo2-mck avatar Nov 06 '23 18:11 filipeo2-mck

I believe that both solutions are complementary, as each infrastructure available is different from others. In some scenarios, cache will be better, to other, checkpoints. I would like to have more opinions, from Spark masters :)

filipeo2-mck avatar Nov 06 '23 18:11 filipeo2-mck

Acknowledging the resource-intensive nature of data validations, I concur that caching could be an ideal solution.

However, before implementing this within pandera, I recommend conducting performance tests on a suitable cluster, as personal laptops might not provide accurate performance insights.

It's crucial to consider that caching could potentially create bottlenecks, particularly on single-node machines, so our solution should address this potential issue as well.

While the post didn't specify the dataframe size, assuming it wasn't substantial due to testing on a local laptop, it would be beneficial to consider leveraging optimized file storage formats (e.g., parquet) for real-world applications.

If the dataframe was loaded from a plain CSV, could you please rerun the same test using a parquet file? This approach will allow for a more accurate comparison and evaluation.

NeerajMalhotra-QB avatar Nov 06 '23 21:11 NeerajMalhotra-QB

Hey @filipeo2-mck . Your analysis and the proposed solutions offer a great starting point. The re-computation of the DAG for each validation indeed adds overhead, and your proposals to utilize caching or checkpointing to mitigate this are quite valid.

  • Caching is indeed a quick win, but as you’ve pointed out, it comes with the risk of exhausting memory. However, it could be beneficial in scenarios where the dataframe fits comfortably in memory or when running on a larger cluster.
  • Checkpointing to disk. Although it's not as fast as caching and requires clean-up, it guarantees that intermediate results persist, potentially reducing the processing time.

Finally, if we add these arguments to the validate method, allowing them to tailor the validation process, should we also allow a way to control this process with an environment variable override? I.e maybe you want to limit panderas way of caching without changing the application code?

kasperjanehag avatar Nov 07 '23 09:11 kasperjanehag

Hi!

@NeerajMalhotra-QB , the current setup only uses parquet files and I tested it locally only 👍

@kasperjanehag , agree, env vars will give enough flexibility to the user.

filipeo2-mck avatar Nov 07 '23 11:11 filipeo2-mck