coveragepy icon indicating copy to clipboard operation
coveragepy copied to clipboard

Coverage of pyspark user defined function

Open nedbat opened this issue 7 years ago • 25 comments

Originally reported by Abdeali Kothari (Bitbucket: AbdealiJK, GitHub: AbdealiJK)


I have a case where I have some pyspark codes in my code base and I am trying to test them. When doing that, I find that any python UDF I can with spark does not get covered even though I am running it. Note that I am running it in the local spark mode.

Reproducible example:

#!python

def get_new_col(spark, df):
    def myadd(x, y):
        import sys, os
        print("sys.version_info =", sys.version_info)
        print({k: v for k, v in os.environ.items() if k.lower().startswith('cov')})
        x1 = x
        y1 = y
        return str(float(x1) + float(y1))

    spark.udf.register('myadd', myadd)
    return df.selectExpr(['*', 'myadd(x, y) as newcol'])


def run():
    try:
        import findspark
        findspark.init()
    except ImportError:
        pass
    import pyspark
    spark = pyspark.sql.SparkSession.Builder().master("local[2]").getOrCreate()
    df = spark.createDataFrame([
        [1.0, 1.0],
        [1.0, 2.0],
        [1.0, 2.0]
    ], ['x', 'y'])

    outdf = get_new_col(spark, df)
    outdf.show()
    outdf.printSchema()
    assert outdf.columns == (df.columns + ['newcol'])

    spark.stop()


if __name__ == '__main__':
    run()

This says the UDF was not covered even though it did run.

Here are the logs when I run it:

#!python
$ coverage run example.py
2018-05-04 14:58:29 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2018-05-04 14:58:30 WARN  Utils:66 - Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
[Stage 0:>                                                          (0 + 1) / 1]sys.version_info = sys.version_info(major=3, minor=6, micro=4, releaselevel='final', serial=0)
{'COVERAGE_PROCESS_START': ''}
sys.version_info = sys.version_info(major=3, minor=6, micro=4, releaselevel='final', serial=0)
{'COVERAGE_PROCESS_START': ''}
sys.version_info = sys.version_info(major=3, minor=6, micro=4, releaselevel='final', serial=0)
{'COVERAGE_PROCESS_START': ''}
+---+---+------+
|  x|  y|newcol|
+---+---+------+
|1.0|1.0|   2.0|
|1.0|2.0|   3.0|
|1.0|2.0|   3.0|
+---+---+------+

root
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- newcol: string (nullable = true)

Relevant packages: Python 3.6.4 :: Anaconda, Inc. coverage (4.5.1)

Edit 1: Simplified the reproducible example to remove unittest and pytest.


  • Bitbucket: https://bitbucket.org/ned/coveragepy/issue/658

nedbat avatar May 02 '18 08:05 nedbat

Original comment by Abdeali Kothari (Bitbucket: AbdealiJK, GitHub: AbdealiJK)


Any thoughts on this Ned ?

I'm not sure if I'm doing something wrong for the subprocess thing. Or is the subprocess work only if the coverage run's python process creates the subprocess ?

nedbat avatar May 08 '18 05:05 nedbat

Original comment by Abdeali Kothari (Bitbucket: AbdealiJK, GitHub: AbdealiJK)


I am running it with my root anaconda - so, I think this is the right one. (Considering it was giving me an error of "invalid config path '1'" when i gave COVERAGE_PROCESS_START=1 - i believe it is the right one.

And I do not have any .coveragerc file (just default configs)

nedbat avatar May 04 '18 11:05 nedbat

@AbdealiJK Thanks for doing all this! One thing that looks wrong to me: the COVERAGE_PROCESS_START environment variable needs to refer to the location of the .covergerc file to use:

export COVERAGE_PROCESS_START=/path/to/.coveragerc

And you need to create a .pth file in the Python environment that is running the subprocesses.

nedbat avatar May 04 '18 10:05 nedbat

Original comment by Abdeali Kothari (Bitbucket: AbdealiJK, GitHub: AbdealiJK)


I read over http://coverage.readthedocs.io/en/latest/subprocess.html and tried:

EDIT: Realized that setting the env variable to 1 was causing some issues as the value is taken as the coverage configuration file to use. export COVERAGE_PROCESS_START= fixed that error but it didnt cover the UDF :(

What I did:

  • Set the environment variable export COVERAGE_PROCESS_START=
  • Then added /Users/abdealijk/anaconda3/lib/python3.6/site-packages/sitecustomize.py with import coverage; coverage.process_startup()

But this didnt increase my coverage. Inside the function, when I do print({k: v for k, v in os.environ.items() if k.lower().startswith('cov')}) I can see {'COVERAGE_PROCESS_START': ''} which does seem correct.

For debugging I even tried:

    def myadd(x, y):
        import coverage
        cov = coverage.Coverage(config_file=None)
        cov.start()
        import sys, os
        print("sys.version_info =", sys.version_info)
        print({k: v for k, v in os.environ.items() if k.lower().startswith('cov')})
        x1 = x
        y1 = y
        return str(float(x1) + float(y1))

but the coverage did not increase.

nedbat avatar May 04 '18 09:05 nedbat

Original comment by Abdeali Kothari (Bitbucket: AbdealiJK, GitHub: AbdealiJK)


Ned, I am trying to see if I can understand what spark exactly does so we can figure this out. Here are the steps:

  • I open a python shell
  • I import pyspark and create a session/context
  • Spark will now call a Popen() to a bash script.
  • The bash script contains bash some environment variable creation
  • Then it calls a Java jar
  • After this all communication between the Python shell and Java jar are done using Socket communication.
  • Using the socket-communication, the python function get_new_col is sent (serialized by cloudpickle i think) and the serialized-python-function is saved in Java
  • To execute this function, the Java process creates a ProcessBuilder to create a new Python process. And runs the code in this second python process.

I ran the following in the background:

#!bash

while sleep 0.1; do echo date=$(date) py=$(ps aux | grep pytho[n] | wc -l) java=$(ps aux | grep jav[a] | wc -l) cov=$(ps aux | grep coverag[e] | wc -l); done

And verified that the sequence is:

  • Python process created + Coverage process created
  • Java process created
  • Python process created (second)
  • Python process killed (second)
  • Java process killed
  • Python process killed + Coverage process killed

So, the question I think can boil down to how to make all these python processes use coverage.

EDIT: The processes are:

/Users/abdealijk/anaconda3/bin/python /Users/abdealijk/anaconda3/bin/coverage run example.py
/Library/Java/JavaVirtualMachines/jdk1.8.0_162.jdk/Contents/Home/bin/java -cp /usr/local/hadoop/spark/conf/:/usr/local/hadoop/spark/jars/*:/usr/local/hadoop/hadoop/etc/hadoop/ -Xmx1g org.apache.spark.deploy.SparkSubmit --conf spark.master=local[1] pyspark-shell
/Users/abdealijk/anaconda3/bin/python -m pyspark.daemon

nedbat avatar May 04 '18 09:05 nedbat

Issue #657 is also about PySpark.

nedbat avatar May 03 '18 11:05 nedbat

Original comment by Abdeali Kothari (Bitbucket: AbdealiJK, GitHub: AbdealiJK)


Hm, it may be a bit complicated to setup (spark can get messy to install)

To reproduce, install:

  • Apache Spark
    • Java
    • Scala
    • xcode dev installations
  • Python 3.x testing stuff
    • pytest
    • pytest-cov
    • coverage

For Spark you could try: https://medium.freecodecamp.org/installing-scala-and-apache-spark-on-mac-os-837ae57d283f

I have not had much luck getting it to work with brew though - but my setup is a little more complicated than just spark. It's never worked for me in one shot :P We can talk on gitter or IRC if you like if you run into issues trying to reproduce.

A quick note if you're not familiar with this system, PySpark uses Py4J which calls internal Java routines. So the df.selectExpr you see it actually calling a Java function internally. And that Java function goes back to call the registered UDF with spark.udf.register().

Hence the function is definitely running in a different process inside that JVM I believe.

It's Python Shell > JVM > Python Shell

nedbat avatar May 02 '18 12:05 nedbat

Thanks for the report. I've never used PySpark. Before I try to reproduce this, what packages do I need to install to be able to run this code? I'm on a Mac, with Python 3.6. Give me the complete details of what I need to do to see the problem.

nedbat avatar May 02 '18 11:05 nedbat

Hi,

Any update on this issue? I am facing the same problem when I run pytest-cov to test python methods that use @udf decorated nested methods.

Thanks!

ketgo avatar Dec 17 '19 06:12 ketgo

Do these help?

  • https://github.com/nvembar/spark-coverage-example
  • https://stackoverflow.com/a/67184714/14343

nedbat avatar Apr 21 '21 10:04 nedbat

  • https://stackoverflow.com/a/67184714/14343

I'm running into the same problem. I tried your two suggestions, @nedbat , and had no luck.

I'm invoking coverage.py via pytest-cov. I'm using Python 3.8 and have the following pyspark, pytest, pytest-cov, and coverage modules

Name: pyspark
Version: 2.4.7

Name: pytest-cov
Version: 2.11.1

Name: coverage
Version: 5.4

kristen-cape avatar Apr 27 '21 02:04 kristen-cape

Hello, running into the same problem too:

pyspark==3.2.1 pytest==7.1.1 pytest-cov==3.0.0

RaccoonForever avatar May 06 '22 12:05 RaccoonForever

If someone could provide very specific step-by-step instructions to reproduce the failure, that would help move this forward (though no guarantees...)

nedbat avatar May 06 '22 15:05 nedbat

If someone could provide very specific step-by-step instructions to reproduce the failure, that would help move this forward (though no guarantees...)

@nedbat I tried to provide that here: https://github.com/AndrewLane/repro-coverage-issue-with-pyspark-udf

AndrewLane avatar May 06 '22 17:05 AndrewLane

@AndrewLane experimenting a bit with this, my guess is that the code is running in a subprocess, but that process is started in a way that doesn't get coverage started on it, perhaps because it's started from Java.

nedbat avatar May 07 '22 01:05 nedbat

Here is what I have tried. I created a doit.sh file in my copy of the repo to configure subprocess measurement, and to simplify re-running it:

cd /home
rm -f debug.txt
pip install pytest==7.0.1 coverage==6.2
echo "import coverage; coverage.process_startup()" > /usr/local/lib/python3.6/site-packages/00coverage.pth
export COVERAGE_PROCESS_START=$(pwd)/.coveragerc
coverage run -m pytest tests/test.py --disable-pytest-warnings

I created a .coveragerc file in /home:

[run]
parallel = true
source = ./src

I changed code.py to add some debugging like this:

import os, sys
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

with open("debug.txt", "a") as f:
    print(f"outside: {os.getpid()}: trace: {sys.gettrace()}\n", file=f)


def translate(english):
    with open("debug.txt", "a") as f:
        print(f"inside: {os.getpid()}: trace: {sys.gettrace()}\n", file=f)

    if english == "1":
        return "uno"
    elif english == "2":
        return "dos"
    else:
        return f"Cannot translate {english}"


translation_udf = udf(lambda english: translate(english), StringType())

def transform_data(df):
    return df.withColumn("spanish", translation_udf(col("english")))

When I run (with source /home/doit.sh), I have a debug.txt that looks like this:

outside: 14: trace: <coverage.CTracer object at 0x7fb34c1483f0>

outside: 172: trace: <coverage.CTracer object at 0x7fc301112030>

outside: 183: trace: <coverage.CTracer object at 0x7fc301112030>

outside: 186: trace: <coverage.CTracer object at 0x7fc301112030>

inside: 186: trace: <coverage.CTracer object at 0x7fc301112030>

outside: 180: trace: <coverage.CTracer object at 0x7fc301112030>

outside: 177: trace: <coverage.CTracer object at 0x7fc301112030>

outside: 188: trace: <coverage.CTracer object at 0x7fc301112030>

inside: 177: trace: <coverage.CTracer object at 0x7fc301112030>

inside: 188: trace: <coverage.CTracer object at 0x7fc301112030>

inside: 183: trace: <coverage.CTracer object at 0x7fc301112030>

inside: 186: trace: <coverage.CTracer object at 0x7fc301112030>

outside: 136: trace: <coverage.CTracer object at 0x7fc301112030>

inside: 136: trace: <coverage.CTracer object at 0x7fc301112030>

and the data files are:

root@5fcaa35b647e:/home# ls -al .coverage*
-rw-r--r-- 1 root root 53248 May  7 12:50 .coverage
-rw-r--r-- 1 root root 53248 May  7 13:19 .coverage.5fcaa35b647e.118.982518
-rw-r--r-- 1 root root 53248 May  7 13:19 .coverage.5fcaa35b647e.14.449311
-rw-r--r-- 1 root root    37 May  7 12:46 .coveragerc

That should mean that process id's 118 and 14 recorded data, but the debug output doesn't show those ids. Also, the id of the CTracer object is the same for many different processes, so maybe we are dealing with something similar to #310?

nedbat avatar May 07 '22 13:05 nedbat

Hi Ned, sorry for the lack of info.

I fastly made a repo to reproduce the lack of coverage on UDFs.

https://github.com/RaccoonForever/py-cov-potential-issue

Don't look at code, it was just to reproduce :) Thanks for your help!

RaccoonForever avatar May 13 '22 09:05 RaccoonForever

Does someone have a workaround ? :'(

RaccoonForever avatar May 23 '22 08:05 RaccoonForever

I have the same issue when testing pySpark UDFs using pytest. But as a workaround, the UDFs are python functions and I can create tests that specifically test that function only.

shsab avatar Jun 09 '22 17:06 shsab