Spacy + Dask?
I'm unable to get a simple example using spacy + dask.distributed up and running. In the context of a Jupyter Notebook, I recieve this error:
OSError: [E050] Can't find model 'en_core_web_lg.vectors'. It doesn't seem to be a shortcut link, a Python package or a valid path to a data directory.
In the context of a script, it simply hangs.
How to reproduce the behavior
Here's my attempt at creating a reproducible script although I get slightly different behavior in a notebook setting.
"""
python3 -m venv .venv
.venv/bin/pip install -e git+https://github.com/explosion/spaCy#egg=spaCy
# explicitly download spacy model because otherwise "No compatible models found for v2.2.4 of spaCy"
.venv/bin/pip install https://github.com/explosion/spacy-models/releases/download/en_core_web_lg-2.2.5/en_core_web_lg-2.2.5.tar.gz
.venv/bin/pip install 'dask[bag]'
.venv/bin/pip install distributed
.venv/bin/python debug.py
"""
import os
import pathlib
import thinc
import spacy
import json
import dask.bag as db
from dask.distributed import Client, progress
print(spacy.__version__)
print(thinc.__version__)
nlp = spacy.load('en_core_web_lg')
def process(text):
print(thinc.extra.load_nlp.VECTORS)
return [str(x) for x in nlp(text).ents]
if __name__ == '__main__':
client = Client(n_workers=2, threads_per_worker=1)
docs = ["A quick brown fox jumps over a lazy dog."] * 10
entities = db.from_sequence(docs).map(process).compute()
print(entities)
---------------------------------------------------------------------------
OSError Traceback (most recent call last)
<timed exec> in <module>
/usr/local/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
164 dask.base.compute
165 """
--> 166 (result,) = compute(self, traverse=False, **kwargs)
167 return result
168
/usr/local/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
435 keys = [x.__dask_keys__() for x in collections]
436 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 437 results = schedule(dsk, keys, **kwargs)
438 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
439
/usr/local/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2593 should_rejoin = False
2594 try:
-> 2595 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
2596 finally:
2597 for f in futures.values():
/usr/local/lib/python3.8/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
1885 else:
1886 local_worker = None
-> 1887 return self.sync(
1888 self._gather,
1889 futures,
/usr/local/lib/python3.8/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
777 return future
778 else:
--> 779 return sync(
780 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
781 )
/usr/local/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
346 if error[0]:
347 typ, exc, tb = error[0]
--> 348 raise exc.with_traceback(tb)
349 else:
350 return result[0]
/usr/local/lib/python3.8/site-packages/distributed/utils.py in f()
330 if callback_timeout is not None:
331 future = asyncio.wait_for(future, callback_timeout)
--> 332 result[0] = yield future
333 except Exception as exc:
334 error[0] = sys.exc_info()
/usr/local/lib/python3.8/site-packages/tornado/gen.py in run(self)
733
734 try:
--> 735 value = future.result()
736 except Exception:
737 exc_info = sys.exc_info()
/usr/local/lib/python3.8/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1750 exc = CancelledError(key)
1751 else:
-> 1752 raise exception.with_traceback(traceback)
1753 raise exc
1754 if errors == "skip":
/usr/local/lib/python3.8/site-packages/dask/bag/core.py in reify()
1833 def reify(seq):
1834 if isinstance(seq, Iterator):
-> 1835 seq = list(seq)
1836 if len(seq) and isinstance(seq[0], Iterator):
1837 seq = list(map(list, seq))
/usr/local/lib/python3.8/site-packages/dask/bag/core.py in __next__()
2020 kwargs = dict(zip(self.kwarg_keys, vals[-self.nkws :]))
2021 return self.f(*args, **kwargs)
-> 2022 return self.f(*vals)
2023
2024 def check_all_iterators_consumed(self):
<ipython-input-7-8c491d0e578a> in process()
3 doc = json.loads(row)
4 text = "\n".join(x["content"] for x in doc["content"])
----> 5 return [str(x) for x in nlp(text).ents]
/usr/local/lib/python3.8/site-packages/spacy/language.py in __call__()
437 if not hasattr(proc, "__call__"):
438 raise ValueError(Errors.E003.format(component=type(proc), name=name))
--> 439 doc = proc(doc, **component_cfg.get(name, {}))
440 if doc is None:
441 raise ValueError(Errors.E005.format(name=name))
pipes.pyx in spacy.pipeline.pipes.Tagger.__call__()
pipes.pyx in spacy.pipeline.pipes.Tagger.predict()
/usr/local/lib/python3.8/site-packages/thinc/neural/_classes/model.py in __call__()
165 Must match expected shape
166 """
--> 167 return self.predict(x)
168
169 def pipe(self, stream, batch_size=128):
/usr/local/lib/python3.8/site-packages/thinc/neural/_classes/feed_forward.py in predict()
38 def predict(self, X):
39 for layer in self._layers:
---> 40 X = layer(X)
41 return X
42
/usr/local/lib/python3.8/site-packages/thinc/neural/_classes/model.py in __call__()
165 Must match expected shape
166 """
--> 167 return self.predict(x)
168
169 def pipe(self, stream, batch_size=128):
/usr/local/lib/python3.8/site-packages/thinc/api.py in predict()
308 def predict(seqs_in):
309 lengths = layer.ops.asarray([len(seq) for seq in seqs_in])
--> 310 X = layer(layer.ops.flatten(seqs_in, pad=pad))
311 return layer.ops.unflatten(X, lengths, pad=pad)
312
/usr/local/lib/python3.8/site-packages/thinc/neural/_classes/model.py in __call__()
165 Must match expected shape
166 """
--> 167 return self.predict(x)
168
169 def pipe(self, stream, batch_size=128):
/usr/local/lib/python3.8/site-packages/thinc/neural/_classes/feed_forward.py in predict()
38 def predict(self, X):
39 for layer in self._layers:
---> 40 X = layer(X)
41 return X
42
/usr/local/lib/python3.8/site-packages/thinc/neural/_classes/model.py in __call__()
165 Must match expected shape
166 """
--> 167 return self.predict(x)
168
169 def pipe(self, stream, batch_size=128):
/usr/local/lib/python3.8/site-packages/thinc/neural/_classes/model.py in predict()
129
130 def predict(self, X):
--> 131 y, _ = self.begin_update(X, drop=None)
132 return y
133
/usr/local/lib/python3.8/site-packages/thinc/api.py in uniqued_fwd()
377 )
378 X_uniq = layer.ops.xp.ascontiguousarray(X[ind])
--> 379 Y_uniq, bp_Y_uniq = layer.begin_update(X_uniq, drop=drop)
380 Y = Y_uniq[inv].reshape((X.shape[0],) + Y_uniq.shape[1:])
381
/usr/local/lib/python3.8/site-packages/thinc/neural/_classes/feed_forward.py in begin_update()
44 callbacks = []
45 for layer in self._layers:
---> 46 X, inc_layer_grad = layer.begin_update(X, drop=drop)
47 callbacks.append(inc_layer_grad)
48
/usr/local/lib/python3.8/site-packages/thinc/api.py in begin_update()
161 def begin_update(X, *a, **k):
162 forward, backward = split_backward(layers)
--> 163 values = [fwd(X, *a, **k) for fwd in forward]
164
165 output = ops.xp.hstack(values)
/usr/local/lib/python3.8/site-packages/thinc/api.py in <listcomp>()
161 def begin_update(X, *a, **k):
162 forward, backward = split_backward(layers)
--> 163 values = [fwd(X, *a, **k) for fwd in forward]
164
165 output = ops.xp.hstack(values)
/usr/local/lib/python3.8/site-packages/thinc/api.py in wrap()
254
255 def wrap(*args, **kwargs):
--> 256 output = func(*args, **kwargs)
257 if splitter is None:
258 to_keep, to_sink = output
/usr/local/lib/python3.8/site-packages/thinc/api.py in begin_update()
161 def begin_update(X, *a, **k):
162 forward, backward = split_backward(layers)
--> 163 values = [fwd(X, *a, **k) for fwd in forward]
164
165 output = ops.xp.hstack(values)
/usr/local/lib/python3.8/site-packages/thinc/api.py in <listcomp>()
161 def begin_update(X, *a, **k):
162 forward, backward = split_backward(layers)
--> 163 values = [fwd(X, *a, **k) for fwd in forward]
164
165 output = ops.xp.hstack(values)
/usr/local/lib/python3.8/site-packages/thinc/api.py in wrap()
254
255 def wrap(*args, **kwargs):
--> 256 output = func(*args, **kwargs)
257 if splitter is None:
258 to_keep, to_sink = output
/usr/local/lib/python3.8/site-packages/thinc/api.py in begin_update()
161 def begin_update(X, *a, **k):
162 forward, backward = split_backward(layers)
--> 163 values = [fwd(X, *a, **k) for fwd in forward]
164
165 output = ops.xp.hstack(values)
/usr/local/lib/python3.8/site-packages/thinc/api.py in <listcomp>()
161 def begin_update(X, *a, **k):
162 forward, backward = split_backward(layers)
--> 163 values = [fwd(X, *a, **k) for fwd in forward]
164
165 output = ops.xp.hstack(values)
/usr/local/lib/python3.8/site-packages/thinc/api.py in wrap()
254
255 def wrap(*args, **kwargs):
--> 256 output = func(*args, **kwargs)
257 if splitter is None:
258 to_keep, to_sink = output
/usr/local/lib/python3.8/site-packages/thinc/api.py in begin_update()
161 def begin_update(X, *a, **k):
162 forward, backward = split_backward(layers)
--> 163 values = [fwd(X, *a, **k) for fwd in forward]
164
165 output = ops.xp.hstack(values)
/usr/local/lib/python3.8/site-packages/thinc/api.py in <listcomp>()
161 def begin_update(X, *a, **k):
162 forward, backward = split_backward(layers)
--> 163 values = [fwd(X, *a, **k) for fwd in forward]
164
165 output = ops.xp.hstack(values)
/usr/local/lib/python3.8/site-packages/thinc/api.py in wrap()
254
255 def wrap(*args, **kwargs):
--> 256 output = func(*args, **kwargs)
257 if splitter is None:
258 to_keep, to_sink = output
/usr/local/lib/python3.8/site-packages/thinc/neural/_classes/static_vectors.py in begin_update()
58 if ids.ndim >= 2:
59 ids = self.ops.xp.ascontiguousarray(ids[:, self.column])
---> 60 vector_table = self.get_vectors()
61 vectors = vector_table[ids * (ids < vector_table.shape[0])]
62 vectors = self.ops.xp.ascontiguousarray(vectors)
/usr/local/lib/python3.8/site-packages/thinc/neural/_classes/static_vectors.py in get_vectors()
53
54 def get_vectors(self):
---> 55 return get_vectors(self.ops, self.lang)
56
57 def begin_update(self, ids, drop=0.0):
/usr/local/lib/python3.8/site-packages/thinc/extra/load_nlp.py in get_vectors()
24 key = (ops.device, lang)
25 if key not in VECTORS:
---> 26 nlp = get_spacy(lang)
27 VECTORS[key] = nlp.vocab.vectors.data
28 return VECTORS[key]
/usr/local/lib/python3.8/site-packages/thinc/extra/load_nlp.py in get_spacy()
12
13 if lang not in SPACY_MODELS:
---> 14 SPACY_MODELS[lang] = spacy.load(lang, **kwargs)
15 return SPACY_MODELS[lang]
16
/usr/local/lib/python3.8/site-packages/spacy/__init__.py in load()
28 if depr_path not in (True, False, None):
29 deprecation_warning(Warnings.W001.format(path=depr_path))
---> 30 return util.load_model(name, **overrides)
31
32
/usr/local/lib/python3.8/site-packages/spacy/util.py in load_model()
167 elif hasattr(name, "exists"): # Path or Path-like to model data
168 return load_model_from_path(name, **overrides)
--> 169 raise IOError(Errors.E050.format(name=name))
170
171
OSError: [E050] Can't find model 'en_core_web_lg.vectors'. It doesn't seem to be a shortcut link, a Python package or a valid path to a data directory.
Info about spaCy
- spaCy version: 2.2.4.dev0
- Platform: macOS-10.15.3-x86_64-i386-64bit
- Python version: 3.8.1
Related: https://github.com/explosion/spaCy/issues/3552
Thanks for the detailed report!
I wonder whether this may actually be solved by the very recent PR #5081 that was just merged onto master 2 days ago. It is (hopefully) the same issue that we would encounter when running nlp.pipe(..., n_process=2) because the child processes (produced by spawn on Windows and MacOS) would not have access to the global state of the parent process, and the vector data, such as en_core_web_lg.vectors, is stored in this global state. Actually that will change from spacy v.3 onwards, but we're not quite there yet. So that PR makes sure that the global state is properly transferred to the children instead.
Is there any chance you could try building the current master branch from source, and see whether that would fix your issue?
@svlandeg I think by running pip install -e git+https://github.com/explosion/spaCy#egg=spaCy, I'm already using the latest code from the master branch- can you confirm?
Ah, yes, that should do it. You can double check in the src folder whether spacy/spacy/language.py has a method _apply_pipes with parameter vectors - then you have that latest PR.
What is annoying, is that I can't really reproduce this. I actually run into a MemoryError when running your code snippet, which seems kind of ridiculous. The vectors are definitely the culprit though, because it works just fine when you move the nlp = spacy.load('en_core_web_lg') statement inside the process function.
@svlandeg let me attach a jupyter notebook where I could more reliably produce the error early next week
I think by moving nlp = spacy.load('en_core_web_lg') into the function it sidesteps the error as it runs spacy.load directly on the workers and it doesn't need to copy globals from the parent process
based on that, this pattern has been working for me so far:
from dask.distributed import get_worker
def process(doc):
worker = get_worker()
try:
nlp = worker.nlp
except AttributeError:
nlp = spacy.load(model)
worker.nlp = nlp
return [str(x) for x in nlp(doc).ents]
ents = bag.map(process).compute()
# same approach works if you run want to process one partition at a time
# which allows using `nlp.pipe` with some batch size which is even faster
ents = bag.map_partition(process_partition).compute()
Considering there's a workaround for now (though it might not be ideal), I suggest to revisit this issue once we're closer to releasing the v.3 version. Hopefully the refactored vectors should make this a non-issue then.
Just wanted to add that the workaround seems to have limitations, though I have not yet had time to fully investigate the root cause: https://github.com/usnistgov/cv-py/issues/1
I think you'll have an easier time of things if you try to parallelise over larger units of work, e.g. a few dozen megabytes of compressed text per process. I would recommend chunking the text beforehand in a preprocess, and just passing the input and output path to dask. You'd then have something like this as your remote task:
def process_job(model_name, input_path, output_path):
nlp = load_or_get_model(model_name)
outputs = []
texts = read_texts(input_path):
for doc in nlp.pipe(texts):
outputs.append(convert_output(doc))
write_outputs(outputs, output_path)
I advise against trying to handle the parallelism too transparently. Really fine-grained task distribution is a method of last resort, for when the scheduling is difficult enough that you can't really do it manually. When you have a simple loop, you're much better off cutting it up into chunks yourself, and only distributing units of work that take at least a few minutes to complete. It's much more efficient and far more reliable, because you can manage failure by just repeating the jobs where the output files aren't there.