dask-ml icon indicating copy to clipboard operation
dask-ml copied to clipboard

ENH: prioritizes fitting by score in model selection

Open stsievert opened this issue 6 years ago • 33 comments
trafficstars

What does this PR implement? This PR prioritizes model training/scoring based on the most recent score in model selection.

stsievert avatar Jun 21 '19 22:06 stsievert

This matters most when IncrementalSearchCV is fit in parallel. If only one IncrementalSearchCV instance is being run, there's no real advantage because the scores are only recorded after all models are finished training.

I've run some simulations on this for https://github.com/scipy-conference/scipy_proceedings/pull/464. I've run these simulations both before and after this PR (respectively via the prioritization random and high-scores). I've run Hyperband 20 times in both cases with the same random state.

Screen Shot 2019-06-22 at 6 18 27 PM

The shaded regions correspond to the 25% to 75% percentile of scores.

Each run has the same random state, so both random and high-scores have the same parameters and validation set.

stsievert avatar Jun 22 '19 23:06 stsievert

The comparison in https://github.com/dask/dask-ml/pull/527#issuecomment-504705594 isn't valid: it uses different train datasets and different random states for each model.

Here's a comparison that runs one model. It have the same parameters, the same train data, the same validation data, and both models have the same random state.

Screen Shot 2019-06-22 at 9 51 33 PM

This is run with 4 Dask workers. The prioritization makes provides with the most gain in serial environments (aka 4 Dask workers).

stsievert avatar Jun 23 '19 02:06 stsievert

Are you able to reproduce the CI failure locally?

TomAugspurger avatar Jun 24 '19 13:06 TomAugspurger

Are you able to reproduce the CI failure locally?

The timeout errors on test_incremental.py#test_basic? Yes, but they seem to happen randomly (with about 50% probability). I'm not sure why they happen, but here's the relevant output from pytest (which also appears on the CI logs):

	================================= FAILURES ==================================
	________________________________ test_basic _________________________________
    def test_func():
        del _global_workers[:]
        _global_clients.clear()
        Comm._instances.clear()
        active_threads_start = set(threading._active)

        reset_config()

        dask.config.set({"distributed.comm.timeouts.connect": "5s"})
        # Restore default logging levels
        # XXX use pytest hooks/fixtures instead?
        for name, level in logging_levels.items():
            logging.getLogger(name).setLevel(level)

        result = None
        workers = []

        with pristine_loop() as loop:
            with check_active_rpc(loop, active_rpc_timeout):

                @gen.coroutine
                def coro():
                    with dask.config.set(config):
                        s = False
                        for i in range(5):
                            try:
                                s, ws = yield start_cluster(
                                    ncores,
                                    scheduler,
                                    loop,
                                    security=security,
                                    Worker=Worker,
                                    scheduler_kwargs=scheduler_kwargs,
                                    worker_kwargs=worker_kwargs,
                                )
                            except Exception as e:
                                logger.error(
                                    "Failed to start gen_cluster, retrying",
                                    exc_info=True,
                                )
                            else:
                                workers[:] = ws
                                args = [s] + workers
                                break
                        if s is False:
                            raise Exception("Could not start cluster")
                        if client:
                            c = yield Client(
                                s.address,
                                loop=loop,
                                security=security,
                                asynchronous=True,
                                **client_kwargs
                            )
                            args = [c] + args
                        try:
                            future = func(*args)
                            if timeout:
                                future = gen.with_timeout(
                                    timedelta(seconds=timeout), future
                                )
                            result = yield future
                            if s.validate:
                                s.validate_state()
                        finally:
                            if client and c.status not in ("closing", "closed"):
                                yield c._close(fast=s.status == "closed")
                            yield end_cluster(s, workers)
                            yield gen.with_timeout(
                                timedelta(seconds=1), cleanup_global_workers()
                            )

                        try:
                            c = yield default_client()
                        except ValueError:
                            pass
                        else:
                            yield c._close(fast=True)

                        for i in range(5):
                            if all(c.closed() for c in Comm._instances):
                                break
                            else:
                                yield gen.sleep(0.05)
                        else:
                            L = [c for c in Comm._instances if not c.closed()]
                            Comm._instances.clear()
                            # raise ValueError("Unclosed Comms", L)
                            print("Unclosed Comms", L)

                        raise gen.Return(result)

                result = loop.run_sync(
>                   coro, timeout=timeout * 2 if timeout else timeout
                )

/Users/scott/anaconda3/lib/python3.6/site-packages/distributed/utils_test.py:1019:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/Users/scott/anaconda3/lib/python3.6/site-packages/tornado/ioloop.py:581: in run_sync
    return future_cell[0].result()
/Users/scott/anaconda3/lib/python3.6/site-packages/tornado/gen.py:1113: in run
    yielded = self.gen.send(value)
/Users/scott/anaconda3/lib/python3.6/site-packages/distributed/utils_test.py:987: in coro
    result = yield future
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <tornado.gen.Runner object at 0x1c25fc4240>

    def run(self):
        """Starts or resumes the generator, running until it reaches a
        yield point that is not ready.
        """
        if self.running or self.finished:
            return
        try:
            self.running = True
            while True:
                future = self.future
                if not future.done():
                    return
                self.future = None
                try:
                    orig_stack_contexts = stack_context._state.contexts
                    exc_info = None

                    try:
>                       value = future.result()
E                       tornado.util.TimeoutError: Timeout

/Users/scott/anaconda3/lib/python3.6/site-packages/tornado/gen.py:1099: TimeoutError

stsievert avatar Jun 24 '19 16:06 stsievert

Prioritizing models with high score helps find models with high score quicker on this iteration, but it can also mean a longer time to solution (i.e., the lowest scoring model will be trained after all other models are finished training). I'll reshuffle the priorities a bit to protect against this.

stsievert avatar Jun 25 '19 05:06 stsievert

I've made some changes to the prioritization scheme. I don't fully prioritize by score now, only partially. The last num_workers models all have the same score, np.median(low_scores). Here's the resulting figure with the same setup as in https://github.com/dask/dask-ml/pull/527#issuecomment-504714808:

Screen Shot 2019-06-25 at 10 04 10 PM

stsievert avatar Jun 26 '19 03:06 stsievert

I'm also getting a TimeoutError on my local machine for model_selection/test_incremental.py#test_basic. I only get it on this branch, not on the master branch.

The CI also has 9 failing tests on preprocessing/test_encoders.py#test_basic_array. I think those are unrelated. Here's some output on that CI error:

        else:
            expected = a.fit_transform(X)
>           result = b.fit_transform(dX)

/usr/share/miniconda/envs/dask-ml-test/lib/python3.7/site-packages/dask/array/core.py:3234:   ValueError
/usr/share/miniconda/envs/dask-ml-test/lib/python3.7/site-packages/sklearn/preprocessing/_encoders.py:631: in fit_transform
    return self.fit(X).transform(X)
dask_ml/preprocessing/_encoders.py:209: in transform
    return self._transform(X)
dask_ml/preprocessing/_encoders.py:237: in _transform
    X = da.concatenate(Xs, axis=1)
>       meta = np.concatenate([meta_from_array(s) for s in seq], axis=axis)
E       ValueError: zero-dimensional arrays cannot be concatenated

/usr/share/miniconda/envs/dask-ml-test/lib/python3.7/site-packages/dask/array/core.py:3234: ValueError

stsievert avatar Jun 26 '19 03:06 stsievert

Looking into the CI now, over in #529

TomAugspurger avatar Jun 26 '19 15:06 TomAugspurger

I'm getting CI errors on Azure on tests/model_selection/test_incremental.py::test_basic that I can't reproduce locally (it throws a tornado.util.TimeoutError). This only happens on some tests (linux linux and linux earliest).

Any ideas on what might be throwing this error?

stsievert avatar Oct 20 '19 16:10 stsievert

I'm not sure yet. I haven't seen it locally either.

On Sun, Oct 20, 2019 at 11:12 AM Scott Sievert [email protected] wrote:

I'm getting CI errors on Azure on tests/model_selection/test_incremental.py::test_basic that I can't reproduce locally (it throws a tornado.util.TimeoutError). This only happens on some tests (linux linux and linux earliest).

Any ideas on what might be throwing this error?

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/dask/dask-ml/pull/527?email_source=notifications&email_token=AAKAOIWGRACEA4U5733FJUTQPR7PFA5CNFSM4H2VHZV2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEBYNTLI#issuecomment-544266669, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAKAOIVUSMDXQHQMV3TOD4TQPR7PFANCNFSM4H2VHZVQ .

TomAugspurger avatar Oct 21 '19 17:10 TomAugspurger

I've figured out what the CI failure is. I can reproduce the timeout on tests/model_selection/test_incremental.py::test_basic (at least, it hangs and does not complete). The test passes sometimes, and fails randomly with maybe 20% probability.

It looks like this the test is failing because the cluster isn't cleaning up nicely. This diff makes the test pass:

-     while c.futures or s.tasks:  # Cleans up cleanly after running
-         yield gen.sleep(0.01)
+     start = time()
+     while c.futures or s.tasks:  # Cleans up cleanly after running
+         yield gen.sleep(0.01)
+        if time() - start > 5:
+            break

Here's part of the diff from this PR that could play a role:

# def _fit(...):
-       d_partial_fit = dask.delayed(_partial_fit)
+       def d_partial_fit(*args, **kwargs):
+           return client.submit(_partial_fit, *args, **kwargs)

stsievert avatar Nov 28 '19 20:11 stsievert

The tests pass now. I ran this 100 times on my own laptop, and every time test_incremental.py#test_basic passed. 96 of these tests finished in less than 1.25 seconds (most finished in less than 0.9s).

However, the tests can run slow. The 4 tests that took longer than 1.25 seconds took 6s, 12s, 60s, and 496s.

stsievert avatar Apr 26 '20 22:04 stsievert

This PR is almost ready for merge: there's one bug to squash to make the tests pass, but it appears the failing test doesn't affect user experience. test_incremental.test_verbosity is failing because it can't detect logs are piped to stdout when verbose=True. When I test manually in a Jupyter notebook, logs are successfully shown on stdout (even though the test fails).

Do you have any tips on how to resolve this issue? The new async/await functionality first implemented in af78b6da9ae91777c61dc90dc71e6b141260587a might be relevant.

stsievert avatar May 21 '20 03:05 stsievert

I haven't looked closely, but it looks like the LoggingContext context manager is exiting before the logging is called. Is it being used in the right place(s)?

TomAugspurger avatar May 21 '20 13:05 TomAugspurger

I haven't looked closely, but it looks like the LoggingContext context manager is exiting before the logging is called. Is it being used in the right place(s)?

Thanks for that tip. I've resolved the issue now; I expect all tests to pass (they do on my machine).

Could you give this PR a review? I might do something a little hacky in _utils.py to avoid logging to sys.stdout multiple times when searches are run in parallel (i.e, Hyperband).

stsievert avatar May 25 '20 00:05 stsievert

I think this PR is ready for review.

stsievert avatar Jun 06 '20 01:06 stsievert

Can you think of some test that ensures priorities are being used appropriately? Perhaps through a scheduler plugin?

Thanks. I've implemented the test. I held off on implementing that test because I didn't want to test any internal state, but it should be tested (it's a feature that I've talked about and mentioned in the SciPy 2019 paper).

stsievert avatar Jun 09 '20 04:06 stsievert

I think this PR is ready for another review.

stsievert avatar Jul 17 '20 00:07 stsievert

It looks like this has gone a bit stale. @stsievert @TomAugspurger is this still worth pursuing?

mrocklin avatar Aug 04 '20 03:08 mrocklin

It looks like this has gone a bit stale. @stsievert @TomAugspurger is this still worth pursuing?

I'd like to see this PR merged. I've mentioned this as a feature of Dask-ML in talks/papers, though perhaps it's not relevant until #677 is complete and merged.

I've resolved the merge conflicts. Hopefully the CI passes; almost all of 413ba24 passed.

stsievert avatar Aug 04 '20 14:08 stsievert

Seem to be some test failures. I didn't look closely, but sklearn did have a release today.

Also perhaps a linting issue.

TomAugspurger avatar Aug 04 '20 15:08 TomAugspurger

Some sklearn-dev tests are failing; sklearn issues a FutureWarning for FeatureUnion with message "Using None as a transformer is deprecated in version 0.22 and will be removed in version 0.24. Please use 'drop' instead."

I need to look more closely at the tests again; looks like there's a couple failing tests on timeout error again.

stsievert avatar Aug 04 '20 15:08 stsievert

@stsievert if you merge master the unrelated CI failures should be fixed.

TomAugspurger avatar Aug 04 '20 21:08 TomAugspurger

Thanks, I've merged master. I made the test_incremental.py#test_basic weaker by adding a pytest.xfail for TimeoutErrors. The tests pass on my own machine w/o timeouts. They sometimes fail on Travis CI; I haven't been able to detect any pattern with machine OS/Python version (though windows does seem common).

stsievert avatar Aug 04 '20 21:08 stsievert

Still some timeout errors. The tests are running on azure pipelines, so that's what would need to be updated.

That said, I worry about just xfailing these. Do you think that it's truly just an issue on CI, or is indicating some fundamental issue?

TomAugspurger avatar Aug 05 '20 13:08 TomAugspurger

test_basic is genuinely hanging locally for me

mrocklin avatar Aug 05 '20 14:08 mrocklin

It's hanging for me too, not deterministically: test_basic completed in about a second in the 2 for 3 runs I've done this morning. Let me do some more debugging.

stsievert avatar Aug 05 '20 15:08 stsievert

I've added this diff to test_basic to make the test pass locally:

- while c.futures or s.tasks:  # Make sure cleans up cleanly after running
-     await asyncio.sleep(0.1)
+ _start = time()
+ while c.futures or s.tasks:  # Make sure cleans up cleanly after running
+     await asyncio.sleep(0.1)
+     if time() - _start >= 5:
+         assert c.futures == {}
+         assert all(task.state == "released" for task in s.tasks.values())
+         break

Is this an acceptable test modification? My justification is that "released" tasks are soon to be "forgotten" (source).

stsievert avatar Aug 05 '20 16:08 stsievert

IIUC, the tests are typically written with some sort of deadline for cleanup to complete. If that deadline passes without the condition then we error the test.

Something like

deadline = time() + 5
while c.futures:
    if time() > deadline:
        raise ValueError("Failed to clean up")
    await asyncio.sleep(0.1)

assert c.futures == {}
assert all(task.state == "released" for task in s.tasks.values())

TomAugspurger avatar Aug 05 '20 21:08 TomAugspurger

Hm... I've pushed that change, which involved this diff:

- while c.futures or s.tasks:
+ while c.futures:

Is that appropriate? test_client.py#L400-L435 seems to be testing how well the client cleans up, and only uses c.futures.

stsievert avatar Aug 05 '20 22:08 stsievert