spaCy icon indicating copy to clipboard operation
spaCy copied to clipboard

Spacy + Dask?

Open AlJohri opened this issue 5 years ago • 9 comments

I'm unable to get a simple example using spacy + dask.distributed up and running. In the context of a Jupyter Notebook, I recieve this error:

OSError: [E050] Can't find model 'en_core_web_lg.vectors'. It doesn't seem to be a shortcut link, a Python package or a valid path to a data directory.

In the context of a script, it simply hangs.

How to reproduce the behavior

Here's my attempt at creating a reproducible script although I get slightly different behavior in a notebook setting.

"""
python3 -m venv .venv
.venv/bin/pip install -e git+https://github.com/explosion/spaCy#egg=spaCy
# explicitly download spacy model because otherwise "No compatible models found for v2.2.4 of spaCy"
.venv/bin/pip install https://github.com/explosion/spacy-models/releases/download/en_core_web_lg-2.2.5/en_core_web_lg-2.2.5.tar.gz
.venv/bin/pip install 'dask[bag]'
.venv/bin/pip install distributed
.venv/bin/python debug.py
"""

import os
import pathlib
import thinc
import spacy
import json
import dask.bag as db
from dask.distributed import Client, progress

print(spacy.__version__)
print(thinc.__version__)

nlp = spacy.load('en_core_web_lg')

def process(text):
    print(thinc.extra.load_nlp.VECTORS)
    return [str(x) for x in nlp(text).ents]

if __name__ == '__main__':
	client = Client(n_workers=2, threads_per_worker=1)
	docs = ["A quick brown fox jumps over a lazy dog."] * 10
	entities = db.from_sequence(docs).map(process).compute()
	print(entities)
Full Traceback:
---------------------------------------------------------------------------
OSError                                   Traceback (most recent call last)
<timed exec> in <module>

/usr/local/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
    164         dask.base.compute
    165         """
--> 166         (result,) = compute(self, traverse=False, **kwargs)
    167         return result
    168 

/usr/local/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    435     keys = [x.__dask_keys__() for x in collections]
    436     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 437     results = schedule(dsk, keys, **kwargs)
    438     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    439 

/usr/local/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2593                     should_rejoin = False
   2594             try:
-> 2595                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2596             finally:
   2597                 for f in futures.values():

/usr/local/lib/python3.8/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1885             else:
   1886                 local_worker = None
-> 1887             return self.sync(
   1888                 self._gather,
   1889                 futures,

/usr/local/lib/python3.8/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    777             return future
    778         else:
--> 779             return sync(
    780                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    781             )

/usr/local/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    346     if error[0]:
    347         typ, exc, tb = error[0]
--> 348         raise exc.with_traceback(tb)
    349     else:
    350         return result[0]

/usr/local/lib/python3.8/site-packages/distributed/utils.py in f()
    330             if callback_timeout is not None:
    331                 future = asyncio.wait_for(future, callback_timeout)
--> 332             result[0] = yield future
    333         except Exception as exc:
    334             error[0] = sys.exc_info()

/usr/local/lib/python3.8/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

/usr/local/lib/python3.8/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1750                             exc = CancelledError(key)
   1751                         else:
-> 1752                             raise exception.with_traceback(traceback)
   1753                         raise exc
   1754                     if errors == "skip":

/usr/local/lib/python3.8/site-packages/dask/bag/core.py in reify()
   1833 def reify(seq):
   1834     if isinstance(seq, Iterator):
-> 1835         seq = list(seq)
   1836     if len(seq) and isinstance(seq[0], Iterator):
   1837         seq = list(map(list, seq))

/usr/local/lib/python3.8/site-packages/dask/bag/core.py in __next__()
   2020             kwargs = dict(zip(self.kwarg_keys, vals[-self.nkws :]))
   2021             return self.f(*args, **kwargs)
-> 2022         return self.f(*vals)
   2023 
   2024     def check_all_iterators_consumed(self):

<ipython-input-7-8c491d0e578a> in process()
      3     doc = json.loads(row)
      4     text = "\n".join(x["content"] for x in doc["content"])
----> 5     return [str(x) for x in nlp(text).ents]

/usr/local/lib/python3.8/site-packages/spacy/language.py in __call__()
    437             if not hasattr(proc, "__call__"):
    438                 raise ValueError(Errors.E003.format(component=type(proc), name=name))
--> 439             doc = proc(doc, **component_cfg.get(name, {}))
    440             if doc is None:
    441                 raise ValueError(Errors.E005.format(name=name))

pipes.pyx in spacy.pipeline.pipes.Tagger.__call__()

pipes.pyx in spacy.pipeline.pipes.Tagger.predict()

/usr/local/lib/python3.8/site-packages/thinc/neural/_classes/model.py in __call__()
    165             Must match expected shape
    166         """
--> 167         return self.predict(x)
    168 
    169     def pipe(self, stream, batch_size=128):

/usr/local/lib/python3.8/site-packages/thinc/neural/_classes/feed_forward.py in predict()
     38     def predict(self, X):
     39         for layer in self._layers:
---> 40             X = layer(X)
     41         return X
     42 

/usr/local/lib/python3.8/site-packages/thinc/neural/_classes/model.py in __call__()
    165             Must match expected shape
    166         """
--> 167         return self.predict(x)
    168 
    169     def pipe(self, stream, batch_size=128):

/usr/local/lib/python3.8/site-packages/thinc/api.py in predict()
    308     def predict(seqs_in):
    309         lengths = layer.ops.asarray([len(seq) for seq in seqs_in])
--> 310         X = layer(layer.ops.flatten(seqs_in, pad=pad))
    311         return layer.ops.unflatten(X, lengths, pad=pad)
    312 

/usr/local/lib/python3.8/site-packages/thinc/neural/_classes/model.py in __call__()
    165             Must match expected shape
    166         """
--> 167         return self.predict(x)
    168 
    169     def pipe(self, stream, batch_size=128):

/usr/local/lib/python3.8/site-packages/thinc/neural/_classes/feed_forward.py in predict()
     38     def predict(self, X):
     39         for layer in self._layers:
---> 40             X = layer(X)
     41         return X
     42 

/usr/local/lib/python3.8/site-packages/thinc/neural/_classes/model.py in __call__()
    165             Must match expected shape
    166         """
--> 167         return self.predict(x)
    168 
    169     def pipe(self, stream, batch_size=128):

/usr/local/lib/python3.8/site-packages/thinc/neural/_classes/model.py in predict()
    129 
    130     def predict(self, X):
--> 131         y, _ = self.begin_update(X, drop=None)
    132         return y
    133 

/usr/local/lib/python3.8/site-packages/thinc/api.py in uniqued_fwd()
    377         )
    378         X_uniq = layer.ops.xp.ascontiguousarray(X[ind])
--> 379         Y_uniq, bp_Y_uniq = layer.begin_update(X_uniq, drop=drop)
    380         Y = Y_uniq[inv].reshape((X.shape[0],) + Y_uniq.shape[1:])
    381 

/usr/local/lib/python3.8/site-packages/thinc/neural/_classes/feed_forward.py in begin_update()
     44         callbacks = []
     45         for layer in self._layers:
---> 46             X, inc_layer_grad = layer.begin_update(X, drop=drop)
     47             callbacks.append(inc_layer_grad)
     48 

/usr/local/lib/python3.8/site-packages/thinc/api.py in begin_update()
    161     def begin_update(X, *a, **k):
    162         forward, backward = split_backward(layers)
--> 163         values = [fwd(X, *a, **k) for fwd in forward]
    164 
    165         output = ops.xp.hstack(values)

/usr/local/lib/python3.8/site-packages/thinc/api.py in <listcomp>()
    161     def begin_update(X, *a, **k):
    162         forward, backward = split_backward(layers)
--> 163         values = [fwd(X, *a, **k) for fwd in forward]
    164 
    165         output = ops.xp.hstack(values)

/usr/local/lib/python3.8/site-packages/thinc/api.py in wrap()
    254 
    255     def wrap(*args, **kwargs):
--> 256         output = func(*args, **kwargs)
    257         if splitter is None:
    258             to_keep, to_sink = output

/usr/local/lib/python3.8/site-packages/thinc/api.py in begin_update()
    161     def begin_update(X, *a, **k):
    162         forward, backward = split_backward(layers)
--> 163         values = [fwd(X, *a, **k) for fwd in forward]
    164 
    165         output = ops.xp.hstack(values)

/usr/local/lib/python3.8/site-packages/thinc/api.py in <listcomp>()
    161     def begin_update(X, *a, **k):
    162         forward, backward = split_backward(layers)
--> 163         values = [fwd(X, *a, **k) for fwd in forward]
    164 
    165         output = ops.xp.hstack(values)

/usr/local/lib/python3.8/site-packages/thinc/api.py in wrap()
    254 
    255     def wrap(*args, **kwargs):
--> 256         output = func(*args, **kwargs)
    257         if splitter is None:
    258             to_keep, to_sink = output

/usr/local/lib/python3.8/site-packages/thinc/api.py in begin_update()
    161     def begin_update(X, *a, **k):
    162         forward, backward = split_backward(layers)
--> 163         values = [fwd(X, *a, **k) for fwd in forward]
    164 
    165         output = ops.xp.hstack(values)

/usr/local/lib/python3.8/site-packages/thinc/api.py in <listcomp>()
    161     def begin_update(X, *a, **k):
    162         forward, backward = split_backward(layers)
--> 163         values = [fwd(X, *a, **k) for fwd in forward]
    164 
    165         output = ops.xp.hstack(values)

/usr/local/lib/python3.8/site-packages/thinc/api.py in wrap()
    254 
    255     def wrap(*args, **kwargs):
--> 256         output = func(*args, **kwargs)
    257         if splitter is None:
    258             to_keep, to_sink = output

/usr/local/lib/python3.8/site-packages/thinc/api.py in begin_update()
    161     def begin_update(X, *a, **k):
    162         forward, backward = split_backward(layers)
--> 163         values = [fwd(X, *a, **k) for fwd in forward]
    164 
    165         output = ops.xp.hstack(values)

/usr/local/lib/python3.8/site-packages/thinc/api.py in <listcomp>()
    161     def begin_update(X, *a, **k):
    162         forward, backward = split_backward(layers)
--> 163         values = [fwd(X, *a, **k) for fwd in forward]
    164 
    165         output = ops.xp.hstack(values)

/usr/local/lib/python3.8/site-packages/thinc/api.py in wrap()
    254 
    255     def wrap(*args, **kwargs):
--> 256         output = func(*args, **kwargs)
    257         if splitter is None:
    258             to_keep, to_sink = output

/usr/local/lib/python3.8/site-packages/thinc/neural/_classes/static_vectors.py in begin_update()
     58         if ids.ndim >= 2:
     59             ids = self.ops.xp.ascontiguousarray(ids[:, self.column])
---> 60         vector_table = self.get_vectors()
     61         vectors = vector_table[ids * (ids < vector_table.shape[0])]
     62         vectors = self.ops.xp.ascontiguousarray(vectors)

/usr/local/lib/python3.8/site-packages/thinc/neural/_classes/static_vectors.py in get_vectors()
     53 
     54     def get_vectors(self):
---> 55         return get_vectors(self.ops, self.lang)
     56 
     57     def begin_update(self, ids, drop=0.0):

/usr/local/lib/python3.8/site-packages/thinc/extra/load_nlp.py in get_vectors()
     24     key = (ops.device, lang)
     25     if key not in VECTORS:
---> 26         nlp = get_spacy(lang)
     27         VECTORS[key] = nlp.vocab.vectors.data
     28     return VECTORS[key]

/usr/local/lib/python3.8/site-packages/thinc/extra/load_nlp.py in get_spacy()
     12 
     13     if lang not in SPACY_MODELS:
---> 14         SPACY_MODELS[lang] = spacy.load(lang, **kwargs)
     15     return SPACY_MODELS[lang]
     16 

/usr/local/lib/python3.8/site-packages/spacy/__init__.py in load()
     28     if depr_path not in (True, False, None):
     29         deprecation_warning(Warnings.W001.format(path=depr_path))
---> 30     return util.load_model(name, **overrides)
     31 
     32 

/usr/local/lib/python3.8/site-packages/spacy/util.py in load_model()
    167     elif hasattr(name, "exists"):  # Path or Path-like to model data
    168         return load_model_from_path(name, **overrides)
--> 169     raise IOError(Errors.E050.format(name=name))
    170 
    171 

OSError: [E050] Can't find model 'en_core_web_lg.vectors'. It doesn't seem to be a shortcut link, a Python package or a valid path to a data directory.

Info about spaCy

  • spaCy version: 2.2.4.dev0
  • Platform: macOS-10.15.3-x86_64-i386-64bit
  • Python version: 3.8.1

AlJohri avatar Mar 05 '20 21:03 AlJohri

Related: https://github.com/explosion/spaCy/issues/3552

AlJohri avatar Mar 05 '20 21:03 AlJohri

Thanks for the detailed report!

I wonder whether this may actually be solved by the very recent PR #5081 that was just merged onto master 2 days ago. It is (hopefully) the same issue that we would encounter when running nlp.pipe(..., n_process=2) because the child processes (produced by spawn on Windows and MacOS) would not have access to the global state of the parent process, and the vector data, such as en_core_web_lg.vectors, is stored in this global state. Actually that will change from spacy v.3 onwards, but we're not quite there yet. So that PR makes sure that the global state is properly transferred to the children instead.

Is there any chance you could try building the current master branch from source, and see whether that would fix your issue?

svlandeg avatar Mar 05 '20 21:03 svlandeg

@svlandeg I think by running pip install -e git+https://github.com/explosion/spaCy#egg=spaCy, I'm already using the latest code from the master branch- can you confirm?

AlJohri avatar Mar 05 '20 22:03 AlJohri

Ah, yes, that should do it. You can double check in the src folder whether spacy/spacy/language.py has a method _apply_pipes with parameter vectors - then you have that latest PR.

svlandeg avatar Mar 06 '20 09:03 svlandeg

What is annoying, is that I can't really reproduce this. I actually run into a MemoryError when running your code snippet, which seems kind of ridiculous. The vectors are definitely the culprit though, because it works just fine when you move the nlp = spacy.load('en_core_web_lg') statement inside the process function.

svlandeg avatar Mar 06 '20 09:03 svlandeg

@svlandeg let me attach a jupyter notebook where I could more reliably produce the error early next week

I think by moving nlp = spacy.load('en_core_web_lg') into the function it sidesteps the error as it runs spacy.load directly on the workers and it doesn't need to copy globals from the parent process

based on that, this pattern has been working for me so far:

from dask.distributed import get_worker
def process(doc):
    
    worker = get_worker()
    try:
        nlp = worker.nlp
    except AttributeError:
        nlp = spacy.load(model)
        worker.nlp = nlp

    return [str(x) for x in nlp(doc).ents]

ents = bag.map(process).compute()

# same approach works if you run want to process one partition at a time
# which allows using `nlp.pipe` with some batch size which is even faster
ents = bag.map_partition(process_partition).compute()

AlJohri avatar Mar 06 '20 15:03 AlJohri

Considering there's a workaround for now (though it might not be ideal), I suggest to revisit this issue once we're closer to releasing the v.3 version. Hopefully the refactored vectors should make this a non-issue then.

svlandeg avatar Mar 19 '20 15:03 svlandeg

Just wanted to add that the workaround seems to have limitations, though I have not yet had time to fully investigate the root cause: https://github.com/usnistgov/cv-py/issues/1

rtbs-dev avatar Jun 17 '20 14:06 rtbs-dev

I think you'll have an easier time of things if you try to parallelise over larger units of work, e.g. a few dozen megabytes of compressed text per process. I would recommend chunking the text beforehand in a preprocess, and just passing the input and output path to dask. You'd then have something like this as your remote task:

def process_job(model_name, input_path, output_path):
    nlp = load_or_get_model(model_name)
    outputs = []
    texts = read_texts(input_path):
    for doc in nlp.pipe(texts):
        outputs.append(convert_output(doc))
    write_outputs(outputs, output_path)

I advise against trying to handle the parallelism too transparently. Really fine-grained task distribution is a method of last resort, for when the scheduling is difficult enough that you can't really do it manually. When you have a simple loop, you're much better off cutting it up into chunks yourself, and only distributing units of work that take at least a few minutes to complete. It's much more efficient and far more reliable, because you can manage failure by just repeating the jobs where the output files aren't there.

honnibal avatar Aug 10 '20 14:08 honnibal