strawberryfields icon indicating copy to clipboard operation
strawberryfields copied to clipboard

Strawberryfields with ProcessPoolExecutor

Open DS-Liu opened this issue 1 year ago • 6 comments

The number of shots can only be set to 1 for 'fock' backend, as described here. Therefore, to obtain 10000 measurement sampes, I need to run the circuit for 10000 times. I have already defined the parameterized program and the engine as attributes of my class GBS . Then I tried to use ProcessPoolExecutor to to run the circuit 10000 times concurrently. However, this error occurs: TypeError: LocalEngine.__new__() missing 1 required positional argument: 'backend.

A MWE is shown as follows:

from concurrent.futures import ProcessPoolExecutor

import strawberryfields as sf
from strawberryfields import ops

class GBS:
    def __init__(self):
        prog = sf.Program(2)
        a = prog.params('a')
        with prog.context as q:
            ops.Dgate(a ** 2) | q[0]  # free parameter
            ops.Sgate(1) | q[1]
            ops.MeasureFock() | q
        
        self.prog = prog
        self.eng = sf.Engine('fock', backend_options={'cutoff_dim': 5})
        
    def test(self, param):
        result = self.eng.run(self.prog, args={'a': param})
        return result

    def run(self):
        with ProcessPoolExecutor(1) as executor:
            future = executor.submit(self.test, 1)
            future.result()

def main():
    gbs = GBS()
    gbs.run()

if __name__=='__main__':
    main()

image

How can I solve this?

DS-Liu avatar Oct 28 '23 07:10 DS-Liu

When I instantiated the Engine in the test method rather than in the __init__ method, it worked. Why this happens?

from concurrent.futures import ProcessPoolExecutor

import strawberryfields as sf
from strawberryfields import ops

class GBS:
    def __init__(self):
        prog = sf.Program(2)
        a = prog.params('a')
        with prog.context as q:
            ops.Dgate(a ** 2) | q[0]  # free parameter
            ops.Sgate(1) | q[1]
            ops.MeasureFock() | q
        
        self.prog = prog
        
    def test(self, param):
        eng = sf.Engine('fock', backend_options={'cutoff_dim': 5})
        result = eng.run(self.prog, args={'a': param})
        return result

    def run(self):
        with ProcessPoolExecutor(1) as executor:
            future = executor.submit(self.test, 1)
            future.result()

def main():
    gbs = GBS()
    gbs.run()

if __name__=='__main__':
    main()

DS-Liu avatar Oct 28 '23 07:10 DS-Liu

Hey @DS-Liu! Is there a reason why you can't call test a bunch of times like this?

def main():
    gbs = GBS()
    results = []
    
    for _ in range(10):
        results.append(gbs.test(1))

I'm not super familiar with ProcessPoolExecutor, so I'm not sure why your first example wasn't working 🤔

Let me know if this helps!

isaacdevlugt avatar Oct 30 '23 22:10 isaacdevlugt

I want to run the code concurrently instead of a for loop which is quite slow. That's why I use ProcessPoolExecutor. I can't figure out why the Engine instantiated in the main process won't work in the child process. But when instantiated in the child process, it works.

DS-Liu avatar Nov 06 '23 04:11 DS-Liu

I had better luck with getting multiprocessing to at least work (see docs here: https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing):

class GBS:
    def __init__(self):
        prog = sf.Program(2)
        a = prog.params('a')
        with prog.context as q:
            ops.Dgate(a ** 2) | q[0]  # free parameter
            ops.Sgate(1) | q[1]
            ops.MeasureFock() | q
        
        self.prog = prog
        self.eng = sf.Engine('fock', backend_options={'cutoff_dim': 5})
        
    def test(self, param):
        result = self.eng.run(self.prog, args={'a': param})
        return result

def main():

    gbs = GBS()
    results = []
    num_runs = 50_000

    parallel_time = time.process_time()

    with Pool() as pool:
        # call the function for each item in parallel
        for _ in range(num_runs):
            results.append(gbs.test(1))

    parallel_time = time.process_time() - parallel_time

    serial_time = time.process_time()

    for _ in range(num_runs):
        results.append(gbs.test(1))

    serial_time = time.process_time() - serial_time

    print(parallel_time, serial_time)


if __name__=='__main__':
    main()  
25.561596 26.430917

Unfortunately the speedup isn't that great when I ran it on my machine. Not sure if I'm using it properly, but maybe this will help 😄

isaacdevlugt avatar Nov 06 '23 15:11 isaacdevlugt

I had better luck with getting multiprocessing to at least work (see docs here: https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing):

class GBS:
    def __init__(self):
        prog = sf.Program(2)
        a = prog.params('a')
        with prog.context as q:
            ops.Dgate(a ** 2) | q[0]  # free parameter
            ops.Sgate(1) | q[1]
            ops.MeasureFock() | q
        
        self.prog = prog
        self.eng = sf.Engine('fock', backend_options={'cutoff_dim': 5})
        
    def test(self, param):
        result = self.eng.run(self.prog, args={'a': param})
        return result

def main():

    gbs = GBS()
    results = []
    num_runs = 50_000

    parallel_time = time.process_time()

    with Pool() as pool:
        # call the function for each item in parallel
        for _ in range(num_runs):
            results.append(gbs.test(1))

    parallel_time = time.process_time() - parallel_time

    serial_time = time.process_time()

    for _ in range(num_runs):
        results.append(gbs.test(1))

    serial_time = time.process_time() - serial_time

    print(parallel_time, serial_time)


if __name__=='__main__':
    main()  
25.561596 26.430917

Unfortunately the speedup isn't that great when I ran it on my machine. Not sure if I'm using it properly, but maybe this will help 😄

Actually you haven't use Pool object in your code. It can be used by calling pool.map() function. There's no speedup since you're running it serially.

DS-Liu avatar Nov 07 '23 02:11 DS-Liu

Ah! Well, clearly you know more about parallelizing code than me 😅. In any case, if you use multiprocessing (properly) does it help?

isaacdevlugt avatar Nov 07 '23 23:11 isaacdevlugt