Coverage of pyspark user defined function
Originally reported by Abdeali Kothari (Bitbucket: AbdealiJK, GitHub: AbdealiJK)
I have a case where I have some pyspark codes in my code base and I am trying to test them. When doing that, I find that any python UDF I can with spark does not get covered even though I am running it. Note that I am running it in the local spark mode.
Reproducible example:
#!python
def get_new_col(spark, df):
def myadd(x, y):
import sys, os
print("sys.version_info =", sys.version_info)
print({k: v for k, v in os.environ.items() if k.lower().startswith('cov')})
x1 = x
y1 = y
return str(float(x1) + float(y1))
spark.udf.register('myadd', myadd)
return df.selectExpr(['*', 'myadd(x, y) as newcol'])
def run():
try:
import findspark
findspark.init()
except ImportError:
pass
import pyspark
spark = pyspark.sql.SparkSession.Builder().master("local[2]").getOrCreate()
df = spark.createDataFrame([
[1.0, 1.0],
[1.0, 2.0],
[1.0, 2.0]
], ['x', 'y'])
outdf = get_new_col(spark, df)
outdf.show()
outdf.printSchema()
assert outdf.columns == (df.columns + ['newcol'])
spark.stop()
if __name__ == '__main__':
run()
This says the UDF was not covered even though it did run.
Here are the logs when I run it:
#!python
$ coverage run example.py
2018-05-04 14:58:29 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2018-05-04 14:58:30 WARN Utils:66 - Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
[Stage 0:> (0 + 1) / 1]sys.version_info = sys.version_info(major=3, minor=6, micro=4, releaselevel='final', serial=0)
{'COVERAGE_PROCESS_START': ''}
sys.version_info = sys.version_info(major=3, minor=6, micro=4, releaselevel='final', serial=0)
{'COVERAGE_PROCESS_START': ''}
sys.version_info = sys.version_info(major=3, minor=6, micro=4, releaselevel='final', serial=0)
{'COVERAGE_PROCESS_START': ''}
+---+---+------+
| x| y|newcol|
+---+---+------+
|1.0|1.0| 2.0|
|1.0|2.0| 3.0|
|1.0|2.0| 3.0|
+---+---+------+
root
|-- x: double (nullable = true)
|-- y: double (nullable = true)
|-- newcol: string (nullable = true)
Relevant packages: Python 3.6.4 :: Anaconda, Inc. coverage (4.5.1)
Edit 1: Simplified the reproducible example to remove unittest and pytest.
- Bitbucket: https://bitbucket.org/ned/coveragepy/issue/658
Original comment by Abdeali Kothari (Bitbucket: AbdealiJK, GitHub: AbdealiJK)
Any thoughts on this Ned ?
I'm not sure if I'm doing something wrong for the subprocess thing. Or is the subprocess work only if the coverage run's python process creates the subprocess ?
Original comment by Abdeali Kothari (Bitbucket: AbdealiJK, GitHub: AbdealiJK)
I am running it with my root anaconda - so, I think this is the right one. (Considering it was giving me an error of "invalid config path '1'" when i gave COVERAGE_PROCESS_START=1 - i believe it is the right one.
And I do not have any .coveragerc file (just default configs)
@AbdealiJK Thanks for doing all this! One thing that looks wrong to me: the COVERAGE_PROCESS_START environment variable needs to refer to the location of the .covergerc file to use:
export COVERAGE_PROCESS_START=/path/to/.coveragerc
And you need to create a .pth file in the Python environment that is running the subprocesses.
Original comment by Abdeali Kothari (Bitbucket: AbdealiJK, GitHub: AbdealiJK)
I read over http://coverage.readthedocs.io/en/latest/subprocess.html and tried:
EDIT: Realized that setting the env variable to 1 was causing some issues as the value is taken as the coverage configuration file to use.
export COVERAGE_PROCESS_START= fixed that error but it didnt cover the UDF :(
What I did:
- Set the environment variable
export COVERAGE_PROCESS_START= - Then added
/Users/abdealijk/anaconda3/lib/python3.6/site-packages/sitecustomize.pywithimport coverage; coverage.process_startup()
But this didnt increase my coverage.
Inside the function, when I do print({k: v for k, v in os.environ.items() if k.lower().startswith('cov')}) I can see {'COVERAGE_PROCESS_START': ''} which does seem correct.
For debugging I even tried:
def myadd(x, y):
import coverage
cov = coverage.Coverage(config_file=None)
cov.start()
import sys, os
print("sys.version_info =", sys.version_info)
print({k: v for k, v in os.environ.items() if k.lower().startswith('cov')})
x1 = x
y1 = y
return str(float(x1) + float(y1))
but the coverage did not increase.
Original comment by Abdeali Kothari (Bitbucket: AbdealiJK, GitHub: AbdealiJK)
Ned, I am trying to see if I can understand what spark exactly does so we can figure this out. Here are the steps:
- I open a python shell
- I import pyspark and create a session/context
- Spark will now call a
Popen()to a bash script. - The bash script contains bash some environment variable creation
- Then it calls a Java jar
- After this all communication between the Python shell and Java jar are done using Socket communication.
- Using the socket-communication, the python function
get_new_colis sent (serialized by cloudpickle i think) and the serialized-python-function is saved in Java - To execute this function, the Java process creates a
ProcessBuilderto create a new Python process. And runs the code in this second python process.
I ran the following in the background:
#!bash
while sleep 0.1; do echo date=$(date) py=$(ps aux | grep pytho[n] | wc -l) java=$(ps aux | grep jav[a] | wc -l) cov=$(ps aux | grep coverag[e] | wc -l); done
And verified that the sequence is:
- Python process created + Coverage process created
- Java process created
- Python process created (second)
- Python process killed (second)
- Java process killed
- Python process killed + Coverage process killed
So, the question I think can boil down to how to make all these python processes use coverage.
EDIT: The processes are:
/Users/abdealijk/anaconda3/bin/python /Users/abdealijk/anaconda3/bin/coverage run example.py
/Library/Java/JavaVirtualMachines/jdk1.8.0_162.jdk/Contents/Home/bin/java -cp /usr/local/hadoop/spark/conf/:/usr/local/hadoop/spark/jars/*:/usr/local/hadoop/hadoop/etc/hadoop/ -Xmx1g org.apache.spark.deploy.SparkSubmit --conf spark.master=local[1] pyspark-shell
/Users/abdealijk/anaconda3/bin/python -m pyspark.daemon
Issue #657 is also about PySpark.
Original comment by Abdeali Kothari (Bitbucket: AbdealiJK, GitHub: AbdealiJK)
Hm, it may be a bit complicated to setup (spark can get messy to install)
To reproduce, install:
- Apache Spark
- Java
- Scala
- xcode dev installations
- Python 3.x testing stuff
- pytest
- pytest-cov
- coverage
For Spark you could try: https://medium.freecodecamp.org/installing-scala-and-apache-spark-on-mac-os-837ae57d283f
I have not had much luck getting it to work with brew though - but my setup is a little more complicated than just spark. It's never worked for me in one shot :P We can talk on gitter or IRC if you like if you run into issues trying to reproduce.
A quick note if you're not familiar with this system, PySpark uses Py4J which calls internal Java routines. So the df.selectExpr you see it actually calling a Java function internally.
And that Java function goes back to call the registered UDF with spark.udf.register().
Hence the function is definitely running in a different process inside that JVM I believe.
It's Python Shell > JVM > Python Shell
Thanks for the report. I've never used PySpark. Before I try to reproduce this, what packages do I need to install to be able to run this code? I'm on a Mac, with Python 3.6. Give me the complete details of what I need to do to see the problem.
Hi,
Any update on this issue? I am facing the same problem when I run pytest-cov to test python methods that use @udf decorated nested methods.
Thanks!
Do these help?
- https://github.com/nvembar/spark-coverage-example
- https://stackoverflow.com/a/67184714/14343
- https://stackoverflow.com/a/67184714/14343
I'm running into the same problem. I tried your two suggestions, @nedbat , and had no luck.
I'm invoking coverage.py via pytest-cov. I'm using Python 3.8 and have the following pyspark, pytest, pytest-cov, and coverage modules
Name: pyspark
Version: 2.4.7
Name: pytest-cov
Version: 2.11.1
Name: coverage
Version: 5.4
Hello, running into the same problem too:
pyspark==3.2.1 pytest==7.1.1 pytest-cov==3.0.0
If someone could provide very specific step-by-step instructions to reproduce the failure, that would help move this forward (though no guarantees...)
If someone could provide very specific step-by-step instructions to reproduce the failure, that would help move this forward (though no guarantees...)
@nedbat I tried to provide that here: https://github.com/AndrewLane/repro-coverage-issue-with-pyspark-udf
@AndrewLane experimenting a bit with this, my guess is that the code is running in a subprocess, but that process is started in a way that doesn't get coverage started on it, perhaps because it's started from Java.
Here is what I have tried. I created a doit.sh file in my copy of the repo to configure subprocess measurement, and to simplify re-running it:
cd /home
rm -f debug.txt
pip install pytest==7.0.1 coverage==6.2
echo "import coverage; coverage.process_startup()" > /usr/local/lib/python3.6/site-packages/00coverage.pth
export COVERAGE_PROCESS_START=$(pwd)/.coveragerc
coverage run -m pytest tests/test.py --disable-pytest-warnings
I created a .coveragerc file in /home:
[run]
parallel = true
source = ./src
I changed code.py to add some debugging like this:
import os, sys
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
with open("debug.txt", "a") as f:
print(f"outside: {os.getpid()}: trace: {sys.gettrace()}\n", file=f)
def translate(english):
with open("debug.txt", "a") as f:
print(f"inside: {os.getpid()}: trace: {sys.gettrace()}\n", file=f)
if english == "1":
return "uno"
elif english == "2":
return "dos"
else:
return f"Cannot translate {english}"
translation_udf = udf(lambda english: translate(english), StringType())
def transform_data(df):
return df.withColumn("spanish", translation_udf(col("english")))
When I run (with source /home/doit.sh), I have a debug.txt that looks like this:
outside: 14: trace: <coverage.CTracer object at 0x7fb34c1483f0>
outside: 172: trace: <coverage.CTracer object at 0x7fc301112030>
outside: 183: trace: <coverage.CTracer object at 0x7fc301112030>
outside: 186: trace: <coverage.CTracer object at 0x7fc301112030>
inside: 186: trace: <coverage.CTracer object at 0x7fc301112030>
outside: 180: trace: <coverage.CTracer object at 0x7fc301112030>
outside: 177: trace: <coverage.CTracer object at 0x7fc301112030>
outside: 188: trace: <coverage.CTracer object at 0x7fc301112030>
inside: 177: trace: <coverage.CTracer object at 0x7fc301112030>
inside: 188: trace: <coverage.CTracer object at 0x7fc301112030>
inside: 183: trace: <coverage.CTracer object at 0x7fc301112030>
inside: 186: trace: <coverage.CTracer object at 0x7fc301112030>
outside: 136: trace: <coverage.CTracer object at 0x7fc301112030>
inside: 136: trace: <coverage.CTracer object at 0x7fc301112030>
and the data files are:
root@5fcaa35b647e:/home# ls -al .coverage*
-rw-r--r-- 1 root root 53248 May 7 12:50 .coverage
-rw-r--r-- 1 root root 53248 May 7 13:19 .coverage.5fcaa35b647e.118.982518
-rw-r--r-- 1 root root 53248 May 7 13:19 .coverage.5fcaa35b647e.14.449311
-rw-r--r-- 1 root root 37 May 7 12:46 .coveragerc
That should mean that process id's 118 and 14 recorded data, but the debug output doesn't show those ids. Also, the id of the CTracer object is the same for many different processes, so maybe we are dealing with something similar to #310?
Hi Ned, sorry for the lack of info.
I fastly made a repo to reproduce the lack of coverage on UDFs.
https://github.com/RaccoonForever/py-cov-potential-issue
Don't look at code, it was just to reproduce :) Thanks for your help!
Does someone have a workaround ? :'(
I have the same issue when testing pySpark UDFs using pytest. But as a workaround, the UDFs are python functions and I can create tests that specifically test that function only.