databroker icon indicating copy to clipboard operation
databroker copied to clipboard

Descriptors with same names cause explosive xarrays

Open ronpandolfi opened this issue 4 years ago • 0 comments

See https://github.com/bluesky/databroker/pull/560

Extracting an xarray from a RunCatalog including multiple descriptors with the same name seems to cause this spectacular explosion:

databroker\tests\test_v2\test_multi_descriptor.py:65 (test_multi_descriptors_same[multi_descriptor_doc_stream0])
multi_descriptor_doc_stream = <generator object multi_descriptor_doc_stream.<locals>.doc_gen at 0x00000254199AF048>

    @pytest.mark.parametrize("multi_descriptor_doc_stream", (["primary", "primary"],), indirect=True)
    def test_multi_descriptors_same(multi_descriptor_doc_stream):
>       _test_ingest_to_xarray(multi_descriptor_doc_stream)

test_multi_descriptor.py:68: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
test_multi_descriptor.py:58: in _test_ingest_to_xarray
    assert catalog[-1]['primary'].to_dask()['raw'].compute().shape == (1, *data_shape)
..\..\core.py:1372: in to_dask
    return super().to_dask()
..\..\intake_xarray_core\base.py:77: in to_dask
    return self.read_chunked()
..\..\intake_xarray_core\base.py:53: in read_chunked
    self._load_metadata()
C:\Users\LBL\.virtualenvs\merged-repo\lib\site-packages\intake\source\base.py:117: in _load_metadata
    self._schema = self._get_schema()
..\..\intake_xarray_core\base.py:20: in _get_schema
    self._open_dataset()
..\..\core.py:1355: in _open_dataset
    exclude=self.exclude)
..\..\core.py:719: in _documents_to_xarray
    return xarray.merge(datasets)
C:\Users\LBL\.virtualenvs\merged-repo\lib\site-packages\xarray\core\merge.py:793: in merge
    merge_result = merge_core(dict_like_objects, compat, join, fill_value=fill_value)
C:\Users\LBL\.virtualenvs\merged-repo\lib\site-packages\xarray\core\merge.py:555: in merge_core
    variables, out_indexes = merge_collected(collected, prioritized, compat=compat)
C:\Users\LBL\.virtualenvs\merged-repo\lib\site-packages\xarray\core\merge.py:228: in merge_collected
    merged_vars[name] = unique_variable(name, variables, compat)
C:\Users\LBL\.virtualenvs\merged-repo\lib\site-packages\xarray\core\merge.py:137: in unique_variable
    equals = getattr(out, compat)(var)
C:\Users\LBL\.virtualenvs\merged-repo\lib\site-packages\xarray\core\variable.py:1722: in no_conflicts
    return self.broadcast_equals(other, equiv=equiv)
C:\Users\LBL\.virtualenvs\merged-repo\lib\site-packages\xarray\core\variable.py:1703: in broadcast_equals
    return self.equals(other, equiv=equiv)
C:\Users\LBL\.virtualenvs\merged-repo\lib\site-packages\xarray\core\variable.py:1687: in equals
    self._data is other._data or equiv(self.data, other.data)
C:\Users\LBL\.virtualenvs\merged-repo\lib\site-packages\xarray\core\duck_array_ops.py:234: in array_notnull_equiv
    flag_array = (arr1 == arr2) | isnull(arr1) | isnull(arr2)
C:\Users\LBL\.virtualenvs\merged-repo\lib\site-packages\dask\array\core.py:1833: in __eq__
    return elemwise(operator.eq, self, other)
C:\Users\LBL\.virtualenvs\merged-repo\lib\site-packages\dask\array\core.py:3910: in elemwise
    name = kwargs.get("name", None) or "%s-%s" % (funcname(op), tokenize(op, dt, *args))
C:\Users\LBL\.virtualenvs\merged-repo\lib\site-packages\dask\base.py:658: in tokenize
    return md5(str(tuple(map(normalize_token, args))).encode()).hexdigest()
C:\Users\LBL\.virtualenvs\merged-repo\lib\site-packages\dask\utils.py:506: in __call__
    return meth(arg, *args, **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

x = array([[[[[0.73661346, 0.73661346, 0.73661346, ..., 0.73661346,
           0.73661346, 0.73661346],
          [0.73661...n,        nan],
          [       nan,        nan,        nan, ...,        nan,
                  nan,        nan]]]]])

    @normalize_token.register(np.ndarray)
    def normalize_array(x):
        if not x.shape:
            return (x.item(), x.dtype)
        if hasattr(x, "mode") and getattr(x, "filename", None):
            if hasattr(x.base, "ctypes"):
                offset = (
                    x.ctypes.get_as_parameter().value
                    - x.base.ctypes.get_as_parameter().value
                )
            else:
                offset = 0  # root memmap's have mmap object as base
            if hasattr(
                x, "offset"
            ):  # offset numpy used while opening, and not the offset to the beginning of the file
                offset += getattr(x, "offset")
            return (
                x.filename,
                os.path.getmtime(x.filename),
                x.dtype,
                x.shape,
                x.strides,
                offset,
            )
        if x.dtype.hasobject:
            try:
                try:
                    # string fast-path
                    data = hash_buffer_hex(
                        "-".join(x.flat).encode(
                            encoding="utf-8", errors="surrogatepass"
                        )
                    )
                except UnicodeDecodeError:
                    # bytes fast-path
                    data = hash_buffer_hex(b"-".join(x.flat))
            except (TypeError, UnicodeDecodeError):
                try:
                    data = hash_buffer_hex(pickle.dumps(x, pickle.HIGHEST_PROTOCOL))
                except Exception:
                    # pickling not supported, use UUID4-based fallback
                    data = uuid.uuid4().hex
        else:
            try:
>               data = hash_buffer_hex(x.ravel(order="K").view("i1"))
E               MemoryError: Unable to allocate 14.6 TiB for an array with shape (2000000000000,) and data type float64

C:\Users\LBL\.virtualenvs\merged-repo\lib\site-packages\dask\base.py:878: MemoryError

ronpandolfi avatar May 07 '20 23:05 ronpandolfi