connector-x
connector-x copied to clipboard
Memory Leak when concatting pyarrow tables
Concatting the results of the connectorx.read_sql methods results in virtual memory explosion. When deep-copying the Arrow table, the garbage collector can do its work and free the memory. But the purpose of Arrow is just to have zero copy memory.
import copy
import connectorx as cx
import psutil
import pyarrow as pa
def main():
conn = "mssql://user:my_super_secret_password@server:1433/db"
query = "SELECT top 1 * FROM table"
table = None
for x in range(0, 10000):
print(f'run #{x}')
temp_table: pa.Table = cx.read_sql(conn, query, return_type="arrow")
#when enabled, the the virtual memory explosion doesn't occur
#temp_table = copy.deepcopy(temp_table)
table = temp_table if not table else pa.concat_tables([table, temp_table])
print(psutil.Process().memory_info())
if __name__ == "__main__":
main()
Hi @pjlammertyn , thanks for the report! Will take a look at this.
Problem starts to occur in version 0.2.2a5
Problem starts to occur in version 0.2.2a5
Hi @pjlammertyn , thanks for the info, it helps a lot!
Starting from 0.2.2a5
, we change the logic of writing arrow dataframe:
Before:
- Issue a
COUNT
query to get the query result sizen
- Allocate the buffer (final record batch) for containing
n
records - Read query result from database and write to arrow
After:
- Allocate a initial buffer of containing 64K records (reference)
- Read query result from database and write to arrow. If the buffer is full, allocate another buffer of size 64K (another record batch)
The reason we made the change is to avoid the COUNT
query, which could be very time consuming sometimes.
As for the test code here, each query in the loop only selects 1 row. But due to the new logic above, the real capacity of the table would be 64K. Looks to me here is that this memory won't release since the temp_table
is concatenated to table
, but when deepcopy is applied, the new temp_table
will only copy the valid rows according to the length, and the old temp_table
will be released so as the buffer. When I test in my environment, the memory would largely reduce if I change 64K to 1. Also the two versions looks similar when I modify the query to make the result size is also 64K.
Since we are mainly targeting on large query results, it might not be a problem in general. But I think the solution here is to shrink the buffer before generating the arrow record batch. The dependency library we use (arrow-rs
) has not expose the interface yet. However, we are currently also switching to arrow2
, which has this functionality. I added the "shrink_to_fit" logic and exposed arrow2 to our python binding in https://github.com/sfu-db/connector-x/pull/212 and I'm going to make an alpha release for it. You can download the new version and test it by specifying the return_type="arrow2"
(It may still larger than ideal since the capacity after shrink_to_fit
could be larger than the actual length). Will let you know when the alpha release is ready.
@wangxiaoying thanks for looking into this I agree that in most cases large query results will be used. But in some corner cases, you want to do a lot of small queries. For instance in a Master-Slave database setup, where you want to offload the ETL to the Slave database. That Slave database will apply the transaction logs of the Master for instance every 15 minutes, and while applying the logs (slave database is in RECOVERING state) all database connections are lost. So instead of large ETL queries, you want to do small incremental queries (with some exponential retry strategy for when the slave database is in recovering state) and append the results of those small queries to the Arrow table.
Hi @pjlammertyn
For instance in a Master-Slave database setup, where you want to offload the ETL to the Slave database. That Slave database will apply the transaction logs of the Master for instance every 15 minutes, and while applying the logs (slave database is in RECOVERING state) all database connections are lost. So instead of large ETL queries, you want to do small incremental queries (with some exponential retry strategy for when the slave database is in recovering state) and append the results of those small queries to the Arrow table.
Indeed in this case we might need a lot of small queries. The new 0.2.4a3
version that supports arrow2
and shrinks the buffer before produce the record batch should be available, can you give a try (specify return_type="arrow2"
instead of return_type="arrow"
) and test whether it helps? (Sorry I forgot to remove a debug message) If the memory issue still exists, maybe we could add a parameter to let the user modify the maximum size of each record batch.
Still seems to occur in version 0.2.4a3
Hi @pjlammertyn may I ask why do you care about virtual memory usage? The VM does not really mean how much physical memory is actually used and in data-intensive systems, this number can even jump to TB level despite the physical memory usage being several GB.
@pjlammertyn , can you set the return_type=arrow2
in the input parameter of read_sql
? Otherwise it will be the same as before.
@dovahcrow I know, Virtual Memory Size (VMS) doesn't say much (I was just playing around to make the memory consumption human readable), but it can give an indication of how fast memory is being allocated. The Resident Set Size (RSS) is the actual memory consumption. The RSS also starts to explode (in the screenshot below only 6000 rows are in the Arrow DataTable, consuming almost 30GB). When memory pressure occurs, the systemd-oomd.service kicks in and kills the process.
@wangxiaoying with arrow2 it does a better job, but the memory consumption is still high (1GB for 2000 rows)
@pjlammertyn yeah, it is because the size after shrink may still be larger than initiating with size 1 in your test case. Maybe we could add a parameter to let the user define the max size of the returned record batch for arrow instead of hardcode it as 64K.
Hi @wangxiaoying , is there any update on this issue?
We've been experiencing similar memory leaks where a long-running worker uses connectorx (v0.3.1) to load data into arrow
format then serve them. Based on memory profiling (attached below), we noticed most of the memory allocated from _read_sql
cannot be properly released over time, which leads to OOM kill of the worker process.
Setting return_type=arrow2
does not solve the problem.
