pytest-xdist icon indicating copy to clipboard operation
pytest-xdist copied to clipboard

Support running a fixture only once per entire execution

Open liadmord opened this issue 3 years ago • 13 comments

For some cases, we might want the setup of the test to run only once, even when something is multi processed, for example:

We write our end-to-end tests in python using pytest, this means that one of our fixtures takes care of bringing up the entire environment of the test.

One of the current problems we are facing with parallelizing our tests is that we can't use xdist, since it runs a session-scoped fixture for every instance.

A way to run a fixture only once, no matter how many processes are there would be very beneficial for such cases where the setup should only run once in a background process.

liadmord avatar May 12 '22 16:05 liadmord

This is a long awaited feature.

I have seen some people starting to work on a new backend implementation and even new plugins came up but did not succeed to provide this feature so far. I have tried that myself too but I still need more time to learn things about python multiprocess and pytest internals.

One hacky way I found out to accomplish this with xdist it this way:

def pytest_configure(config):
    """This is run when pytest is setting up in the controller process and in the workers too"""
    if hasattr(pytest_config, 'workerinput'):
        # prevent workers to run the same code
        return
    before_session(config) # Method doing stuff before tests start to run.

That way the code inside before_session is run in the controller process only and it is run before any test is executed in the workers where the tests are executed.

I hope this helps.

lucrib avatar May 16 '22 12:05 lucrib

@lucrib Sadly this doesn't help me since I need information from fixtures for my "before_session" setup..

liadmord avatar May 17 '22 14:05 liadmord

Have you tried this?

https://pytest-xdist.readthedocs.io/en/latest/how-to.html#making-session-scoped-fixtures-execute-only-once

I have it working in a project I maintain -- run an expensive computational job once, runs all tests in parallel, and tears down.

heitorlessa avatar Sep 05 '22 20:09 heitorlessa

Have you tried this?

https://pytest-xdist.readthedocs.io/en/latest/how-to.html#making-session-scoped-fixtures-execute-only-once

I have it working in a project I maintain -- run an expensive computational job once, runs all tests in parallel, and tears down.

Thanks. I needed to do exactly that for some stuff I was playing with and people couldn't comprehend the need.

Thanks again!

swills1 avatar Mar 20 '23 02:03 swills1

Any progress on this topic?

I just came up with the following solution for a simple docker container spinup and teardown. It basically maintains a list with worker ids in a file accessed with a lock and acts if it is the first or the last one.

def _load_worker_list(fn: Path) -> List[str]:
    """Reads the worker id list from a file"""
    return json.loads(fn.read_text())["workers"]


def _write_worker_list(fn: Path, workers: List[str]) -> None:
    """Writes the worker id list to a file"""
    fn.write_text(json.dumps({"workers": workers}))


@pytest.fixture(autouse=True, scope="session")
def run_minio(
    tmp_path_factory: pytest.TempPathFactory, worker_id: str
) -> Generator[None, Any, None]:
    """Runs and stops a minio server for tests yields None"""
    if worker_id == "master":
        run_minio_server()
        create_test_bucket()
        yield None
        stop_minio_server()
        return

    root_tmp_dir: Path = tmp_path_factory.getbasetemp().parent

    wfile: Path = root_tmp_dir / "minio.workers"
    fn = root_tmp_dir / "minio_run.lock"
    with FileLock(str(fn)):
        start: bool = False
        if wfile.is_file():
            # a process has already started minio then register our worker id
            worker_list = _load_worker_list(wfile)
            if len(worker_list) == 0:
                # all other processes have finished and this one is late to the party but it can restart the list
                start = True
            worker_list.append(worker_id)
            _write_worker_list(wfile, worker_list)
        else:
            # This is the first process so it must create the worker list file
            _write_worker_list(wfile, [worker_id])
            start = True
        if start:
            run_minio_server()
            create_test_bucket()

    yield None
    # process is finished so it should teardown minio if it is the last process
    with FileLock(str(fn)):
        workers = _load_worker_list(wfile)
        workers.remove(worker_id)
        _write_worker_list(wfile, workers)
        if len(workers) == 0:
            # it seems like we are the last worker so we can quit minio
            stop_minio_server()

This seems to work on my setup but not shure what happens if process is killed. Most likely it will not cleanup the container.

Overall the effort needed for such use cases is very high. Maybe someone has a better idea?

butjo avatar Nov 05 '23 00:11 butjo