loky
loky copied to clipboard
Loky get_reusable_executor initializer fails to set global variable
When using the initializer
and initargs
of get_reusable_executor
to set a global variable, the value of the global is lost within the child processes.
A simple script describes this issue:
import logging
import multiprocessing
import time
import os
from loky import get_reusable_executor
from concurrent.futures import ProcessPoolExecutor
some_value = 0
def set_value(value):
global some_value
some_value = value
print(f"PID: {os.getpid()} :: Setter has set some_value: {some_value}")
def do_something(val):
print(f"PID: {os.getpid()} :: The some_value: {some_value}")
return some_value * val
def main():
n_jobs = 2
with get_reusable_executor(
max_workers=n_jobs,
initializer=set_value,
initargs=(10,),
) as executor:
result_generator = executor.map(
do_something,
range(10),
)
print(list(result_generator))
main()
The output is the following because the some_value
was not changed from its 0 value in the child processes, although clearly showing the correct value in the set_value
initializer.
PID: 14941 :: Setter has set some_value: 10
PID: 14942 :: Setter has set some_value: 10
PID: 14941 :: The some_value: 0
PID: 14942 :: The some_value: 0
PID: 14941 :: The some_value: 0
PID: 14942 :: The some_value: 0
PID: 14941 :: The some_value: 0
PID: 14942 :: The some_value: 0
PID: 14941 :: The some_value: 0
PID: 14942 :: The some_value: 0
PID: 14941 :: The some_value: 0
PID: 14942 :: The some_value: 0
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
Replacing the get_reusable_executor
to use more vanilla parallelization with ProcessPoolExecutor
works as expected producing the following output:
PID: 14931 :: Setter has set some_value: 10
PID: 14934 :: Setter has set some_value: 10
PID: 14931 :: The some_value: 10
PID: 14931 :: The some_value: 10
PID: 14934 :: The some_value: 10
PID: 14931 :: The some_value: 10
PID: 14931 :: The some_value: 10
PID: 14934 :: The some_value: 10
PID: 14931 :: The some_value: 10
PID: 14934 :: The some_value: 10
PID: 14931 :: The some_value: 10
PID: 14934 :: The some_value: 10
[0, 10, 20, 30, 40, 50, 60, 70, 80, 90]
Loky version: 3.1.0 Python version: Python 3.9.10
This behavior is due to the fact that loky
uses cloudpickle
to serialize its communications.
cloudpickle
propagate the value of the global variables from the main process to the child processes for each task, in order to ensure that the global variable is the same as the one in the main process at time of serialization, while pickle
does not, hence the discrepancy between the two codes.
To cope with this, I think there is for now no better options than moving the global variable some_value
in another module, with getter and setter:
# module.py
import os
some_value = 0
def set_value(value):
global some_value
some_value = value
print(f"PID: {os.getpid()} :: Setter has set some_value: {some_value}")
def get_value():
return some_value
And the main script:
# main.py
import multiprocessing
import os
from loky import get_reusable_executor
from concurrent.futures import ProcessPoolExecutor
from module import set_value, get_value
def do_something(val):
print(f"PID: {os.getpid()} :: The some_value: {get_value()}")
return get_value() * val
def main():
n_jobs = 2
with get_reusable_executor(
max_workers=n_jobs,
initializer=set_value,
initargs=(10,),
) as executor:
result_generator = executor.map(
do_something,
range(10),
)
print(list(result_generator))
main()
This gives the expected results:
$ python main.py
PID: 280930 :: Setter has set some_value: 10
PID: 280930 :: The some_value: 10
PID: 280931 :: Setter has set some_value: 10
PID: 280931 :: The some_value: 10
PID: 280930 :: The some_value: 10
PID: 280930 :: The some_value: 10
PID: 280930 :: The some_value: 10
PID: 280930 :: The some_value: 10
PID: 280930 :: The some_value: 10
PID: 280930 :: The some_value: 10
PID: 280930 :: The some_value: 10
PID: 280930 :: The some_value: 10
[0, 10, 20, 30, 40, 50, 60, 70, 80, 90]
Note that this is linked to what was discussed in #206, and that if the extra module is not possible, it is possible to have similar behavior with a hackish trick:
import multiprocessing
import os
from loky import get_reusable_executor
from concurrent.futures import ProcessPoolExecutor
some_value = 0
def set_value(value):
os.some_value = value
print(f"PID: {os.getpid()} :: Setter has set some_value: {value}")
def do_something(val):
some_value = os.some_value
print(f"PID: {os.getpid()} :: The some_value: {some_value}")
return some_value * val
def main():
n_jobs = 2
with get_reusable_executor(
max_workers=n_jobs,
initializer=set_value,
initargs=(10,),
) as executor:
result_generator = executor.map(
do_something,
range(10),
)
print(list(result_generator))
main()
Also stumbled over this limitation today! Any new recommendations since last year?