Memory question
I am writing an in-memory numpy array to tensorstore using the zarr driver. When I write the array, the memory increases by about the size of the array. I am not sure why this is, as I would expect references to the numpy array to be made by tensorstore, and then it is just writting to disk. Even if copies are being made in memory by tensorstore, I am not sure why the memory is not released after writing has finished. Here is an example. I show that even after main has returned, the numpy array is still being held in memory. This doesn't happen if I don't write to tensorstore.
import os
import tempfile
from pathlib import Path
import numpy as np
import psutil
import tensorstore
process = psutil.Process(os.getpid())
def main():
x = np.random.randint(low=0, high=255, size=(30000, 30000, 3), dtype='uint8')
with tempfile.TemporaryDirectory() as tmpdir:
store = tensorstore.open(
spec={
'driver': 'zarr',
'kvstore': {
'driver': 'file',
'path': str(Path(tmpdir) / 'z.zarr')
}
},
shape=x.shape,
dtype=x.dtype,
chunk_layout=tensorstore.ChunkLayout(
chunk=tensorstore.ChunkLayout.Grid(shape=(4096, 4096, 3))
),
create=True,
).result()
print('before write: ', process.memory_info().rss)
store[:] = x
print('after write: ', process.memory_info().rss)
if __name__ == '__main__':
print('before main: ', process.memory_info().rss)
main()
print('after main: ', process.memory_info().rss)
output:
before main: 86323200
before write: 2787667968
after write: 5585666048
after main: 2885693440
Regarding your question of whether tensorstore copies the data:
Tensorstore copies into the chunk cache by default; in this case once in the chunk cache, they can be written directly with no additional copies. If you specify store.write(x, can_reference_source_data_indefinitely=True).result() instead of store[:] = x, then the copy to the chunk cache is avoided, but a copy is still needed when writing in order to convert from the source array layout (30000 * 30000 * 3) to the chunk layout (4096 * 4096 * 3), and currently tensorstore will attempt to encode all of the chunks in parallel rather than only encoding chunks as they get written. Therefore, can_reference_source_data_indefinitely=True does not reduce the amount of copying in this case.
I can reproduce the resident set size increase that you observe. From looking at mallinfo2, the number of in-use bytes does not grow but the number of free bytes retained in the heap does grow, and calling malloc_trim explicitly does not free those bytes. This indicates that there is not a memory leak in tensorstore, but we will have to investigate why an increasing amount of free memory gets retained in the heap --- normally this would only be due to fragmentation but it is not clear why fragmentation would be occurring to this extent.
Thanks Jeremy. I am noticing that if I write a chunk at a time then the memory increase after write is much less, e.g.
chunk_shape = (4096, 4096, 3)
store = tensorstore.open(
...,
chunk_layout=tensorstore.ChunkLayout(
chunk=tensorstore.ChunkLayout.Grid(shape=chunk_shape)
),
)
step = chunk_shape[0]
for i in range(0, x.shape[0] - step + 1, step):
for j in range(0, x.shape[1] - step + 1, step):
subarray = x[i:i + step, j:j + step]
store[i:i + step, j:j + step].write(subarray).result()
this results in a 100 MB increase after the nested for loop completes rather than the ~3GB increase if we write x all at once. Is this 100MB a single chunk stored in memory, since it's somewhat close to (4096^2*3/1e6=500MB)? While this does seem to get around the problem, this is not how tensorstore was intended to be used, right? We would expect tensorstore to handle rechunking the data and writing it. This is also slow since it's not parallelized.
If I pass can_reference_source_data_indefinitely=True then I don't see any difference, and to be honest, I don't understand the purpose of this argument.
If I collect the futures instead rather than calling .result() on each one and then call .result() on each one
for f in futures:
f.result()
then we again see the ~3GB increase. I guess this means that all chunks were copied into memory in this case.
I adjusted your script to dump /proc/self/status (Linux). The values are quite different depending on whether you use the system malloc or tcmalloc, or whatever. The interesting value is the VmRSS value, which tells you how much of the vm size is mapped into the working set, and should be the same as the rss size reported by process_info / stat.
_FIELDS = ('VmPeak', 'VmSize', 'VmHWM', 'VmRSS', 'RssAnon', 'RssFile', 'VmData')
def dump_stats(header):
lines = Path('/proc/self/status').read_text().splitlines()
lines = [l for t in _FIELDS for l in lines if t in l]
print('-'*30)
print(header)
print('-'*30)
for x in lines:
print(x)
On my machine it shows that the rss goes way down in main, as expected.
------------------------------
before main
------------------------------
VmPeak: 2741048 kB
VmSize: 2741048 kB
VmHWM: 49104 kB
VmRSS: 49104 kB
RssAnon: 22016 kB
RssFile: 27088 kB
VmData: 2637712 kB
------------------------------
before write
------------------------------
VmPeak: 5602696 kB
VmSize: 5602696 kB
VmHWM: 2692468 kB
VmRSS: 2692468 kB
RssAnon: 2659568 kB
RssFile: 32900 kB
VmData: 5300224 kB
------------------------------
after write
------------------------------
VmPeak: 12017472 kB
VmSize: 8773172 kB
VmHWM: 5670368 kB
VmRSS: 2862708 kB
RssAnon: 2829296 kB
RssFile: 33412 kB
VmData: 5822644 kB
------------------------------
after main
------------------------------
VmPeak: 12017472 kB
VmSize: 6136588 kB
VmHWM: 5670368 kB
VmRSS: 225992 kB
RssAnon: 192580 kB
RssFile: 33412 kB
VmData: 3186060 kB
We do have some patterns which allocate on one thread and release on another. If fragmentation is a concern you could try tcmalloc, which does a pretty good job these days with percpu datastructures and the thread-transfer cache.
$ LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libtcmalloc.so.4.5.18 python3 ./example.py
However tcmalloc has a relatively slow release rate by default, so if you just care about RSS immediately after the write tcmalloc will hold more memory.
We have encountered the same issue of increasing resident set size with stable heap size when reading and writing using the zarr3 driver. Our test case is basically the script from the issue description, but we repeatedly rewrote the same 512³ shard using random uint32 data.
During testing we noticed that the RSS increase strongly correlated with the number of available CPU cores, but also varied by OS. RSS increase over 500 full chunk write operations:
- Ubuntu 20.04 (8 cores, glibc 2.31): ~0 MB
- CentOS 7 (8 cores, glibc 2.17): ~300 MB
- CentOS 7 (32 cores, glibc 2.17): ~800 MB
- SLES 15 (144 cores, glibc 2.38): ~5.8 GB
Because of these observations and @jbms' comment regarding fragmentation we suspected glibc per-thread memory arenas to be the culprit. Indeed, running the test script with MALLOC_ARENA_MAX=1 completely eliminated the RSS growth across all tested systems.
Hope this helps and we're happy to share additional details.
I see that you filed a new issue of RSS growth without responding to the potential issues raised above. Please try the above remediations and let us know whether they address your issue.
I opened the other ticket because I thought it was possibly different from this one and more detailed. I tried using tcmalloc and MALLOC_ARENA_MAX=1 that @daniel-wer suggested. But I am still noticing that RSS continually increases. I modified the shapes in the script in the linked ticket so that it runs quicker.
Most allocators are going to include some increase in rss as malloc implementations maintain a cache of pages for future allocations; this isn't typically an issue, and over the long term the use tends to stabilize. With what you've provided so far I we can't tell with any specificity what your system is running, what you have tried, and/or what you expect and why.
If you would like assistance in reducing your apparent rss use, please provide additional details. Without a lot more details in the reproduction case it will be impossible to correctly diagnose allocation growth concerns. You will need to provide, at a minimum, system details things like this the OS version, libc verion, allocator, hugepage settings, etc. Ideally you actually provide a vm image which can demonstrate the concern.
If I am calling dest.write(src) where src is 250 MB, I expect the amount of RAM consumed to be around this amount. I don't expect it to grow to 10+ times this amount and keep growing. Are you able to reproduce this behavior using the script I provided? If you are then I don't think I need to provide further details. The script I wrote perfectly captures the use case and the memory increase at a smaller scale.
You're right that this is not the performance level that we want. The copy issue/memory use is actually a duplicate of this other bug as well, which is a legitimate issue that we're planning on working on:
https://github.com/google/tensorstore/issues/202
Basically the chunks between source and dest are misaligned. In this case you could copy from source to memory, and then write out, or use a transaction.
But the chunks are aligned in my example. Read chunk of source is (3, 1, 128, 128), write chunk (3, 1, 512, 512). Read chunk of dest is (3, 10, 128, 128), write chunk (3, 10, 512, 512).
The issue is that when copying from one tensorstore to another, currently tensorstore does this by iterating over read chunks of the source array, and then writing each read chunk to the destination array, and if this is done outside of a transaction, then none of those writes are coalesced, so the entire 3x10x512x512 destination write chunk gets re-written for every source read chunk of size 3x1x128x128. That is both very slow and inefficient and also means a bunch of extra memory gets allocated during the write because there can even be multiple concurrent outstanding write attempts for the same write chunk.
If we wrap the entire copy with a transaction:
with tensorstore.Transaction() as txn:
dest.with_transaction(txn).write(src).result()
then all of the writes get coalesced and it is much faster and allocates much less memory.
We still see some memory growth by default but that is eliminated by setting MALLOC_ARENA_MAX=1 in my own run of your script.
Even without using a transaction, setting MALLOC_ARENA_MAX=1 eliminates the memory growth across multiple write iterations. Memory use is still high within a single write attempt but that is real memory use (due to the #202 bug), not fragmentation.
Thank you @jbms that clears a lot up. To check my understanding, without a transaction if we write multiple inner chunks to a single outer chunk, each write of the inner chunk must read the outer chunk into memory before writing it back. With a transaction all of the inner chunks are grouped together and written at once to the outer chunk.
I was under the impression that the inner chunks are coalesced automatically by tensorstore when writing to a larger outer chunk. I would think though that in your example the rewrite to the outer chunk would occur with each destination read chunk of size 3x10x128x128, not with the source read chunk of size 3x1x128x128, since the source read chunks should be coalesced into the destination read chunk 3x10x128x128 before being written?
In our case the situation was much worse because we were reading 3x1x128x128 chunks and writing 3x40x128x128 inner chunks and 3x40x4096x4096 shards.
I did confirm that by using a transaction and using MALLOC_ARENA_MAX=1 that it is indeed significantly faster and memory is as expected.
Yes, for normal writes from an array inner chunks are coalesced automatically but there is an outstanding bug (#202) for writes from another TensorStore.
However, I just implemented a fix for #202 which will be pushed out shortly.
Another follow-up question: if we have a very large tensorstore that we are copying to another tensorstore with:
with tensorstore.Transaction() as txn:
dest.with_transaction(txn).write(src).result()
which uses the same chunking scheme as in Excessive memory consumption #235
and we use
"context": {
"data_copy_concurrency": {"limit": max_workers}
}
I am finding that we still quickly run out of RAM. The shard size is 4GB, and I am passing max_workers=8, so I would expect this to use ~32GB RAM, but it runs out with 64GB. Does data_copy_concurrency control the number of threads that are used to read/write the 4GB of data or is it doing something else?
The parallelism limits the number of threads in io, but not the io queue, so it can exceed the available memory, particularly when resharding/copying or attempting to read where the intermediate read exceeds the available memory. There is a related issue here: https://github.com/google/tensorstore/issues/213