connector-x icon indicating copy to clipboard operation
connector-x copied to clipboard

Memory Leak when concatting pyarrow tables

Open pjlammertyn opened this issue 3 years ago • 11 comments

Concatting the results of the connectorx.read_sql methods results in virtual memory explosion. When deep-copying the Arrow table, the garbage collector can do its work and free the memory. But the purpose of Arrow is just to have zero copy memory.

import copy

import connectorx as cx
import psutil
import pyarrow as pa


def main():
    conn = "mssql://user:my_super_secret_password@server:1433/db"
    query = "SELECT top 1 * FROM table"
    table = None
    for x in range(0, 10000):
        print(f'run #{x}')
        temp_table: pa.Table = cx.read_sql(conn, query, return_type="arrow")
        
        #when enabled, the the virtual memory explosion doesn't occur
        #temp_table = copy.deepcopy(temp_table)
        
        table = temp_table if not table else pa.concat_tables([table, temp_table])
        print(psutil.Process().memory_info())


if __name__ == "__main__":
    main()

pjlammertyn avatar Dec 17 '21 14:12 pjlammertyn

Hi @pjlammertyn , thanks for the report! Will take a look at this.

wangxiaoying avatar Dec 18 '21 05:12 wangxiaoying

Problem starts to occur in version 0.2.2a5

pjlammertyn avatar Jan 04 '22 08:01 pjlammertyn

Problem starts to occur in version 0.2.2a5

Hi @pjlammertyn , thanks for the info, it helps a lot!

Starting from 0.2.2a5, we change the logic of writing arrow dataframe:

Before:

  • Issue a COUNT query to get the query result size n
  • Allocate the buffer (final record batch) for containing n records
  • Read query result from database and write to arrow

After:

  • Allocate a initial buffer of containing 64K records (reference)
  • Read query result from database and write to arrow. If the buffer is full, allocate another buffer of size 64K (another record batch)

The reason we made the change is to avoid the COUNT query, which could be very time consuming sometimes.

As for the test code here, each query in the loop only selects 1 row. But due to the new logic above, the real capacity of the table would be 64K. Looks to me here is that this memory won't release since the temp_table is concatenated to table, but when deepcopy is applied, the new temp_table will only copy the valid rows according to the length, and the old temp_table will be released so as the buffer. When I test in my environment, the memory would largely reduce if I change 64K to 1. Also the two versions looks similar when I modify the query to make the result size is also 64K.

Since we are mainly targeting on large query results, it might not be a problem in general. But I think the solution here is to shrink the buffer before generating the arrow record batch. The dependency library we use (arrow-rs) has not expose the interface yet. However, we are currently also switching to arrow2, which has this functionality. I added the "shrink_to_fit" logic and exposed arrow2 to our python binding in https://github.com/sfu-db/connector-x/pull/212 and I'm going to make an alpha release for it. You can download the new version and test it by specifying the return_type="arrow2" (It may still larger than ideal since the capacity after shrink_to_fit could be larger than the actual length). Will let you know when the alpha release is ready.

wangxiaoying avatar Jan 05 '22 05:01 wangxiaoying

@wangxiaoying thanks for looking into this I agree that in most cases large query results will be used. But in some corner cases, you want to do a lot of small queries. For instance in a Master-Slave database setup, where you want to offload the ETL to the Slave database. That Slave database will apply the transaction logs of the Master for instance every 15 minutes, and while applying the logs (slave database is in RECOVERING state) all database connections are lost. So instead of large ETL queries, you want to do small incremental queries (with some exponential retry strategy for when the slave database is in recovering state) and append the results of those small queries to the Arrow table.

pjlammertyn avatar Jan 05 '22 07:01 pjlammertyn

Hi @pjlammertyn

For instance in a Master-Slave database setup, where you want to offload the ETL to the Slave database. That Slave database will apply the transaction logs of the Master for instance every 15 minutes, and while applying the logs (slave database is in RECOVERING state) all database connections are lost. So instead of large ETL queries, you want to do small incremental queries (with some exponential retry strategy for when the slave database is in recovering state) and append the results of those small queries to the Arrow table.

Indeed in this case we might need a lot of small queries. The new 0.2.4a3 version that supports arrow2 and shrinks the buffer before produce the record batch should be available, can you give a try (specify return_type="arrow2" instead of return_type="arrow") and test whether it helps? (Sorry I forgot to remove a debug message) If the memory issue still exists, maybe we could add a parameter to let the user modify the maximum size of each record batch.

wangxiaoying avatar Jan 06 '22 05:01 wangxiaoying

Still seems to occur in version 0.2.4a3

afbeelding

pjlammertyn avatar Jan 06 '22 22:01 pjlammertyn

Hi @pjlammertyn may I ask why do you care about virtual memory usage? The VM does not really mean how much physical memory is actually used and in data-intensive systems, this number can even jump to TB level despite the physical memory usage being several GB.

dovahcrow avatar Jan 06 '22 22:01 dovahcrow

@pjlammertyn , can you set the return_type=arrow2 in the input parameter of read_sql? Otherwise it will be the same as before.

wangxiaoying avatar Jan 06 '22 23:01 wangxiaoying

@dovahcrow I know, Virtual Memory Size (VMS) doesn't say much (I was just playing around to make the memory consumption human readable), but it can give an indication of how fast memory is being allocated. The Resident Set Size (RSS) is the actual memory consumption. The RSS also starts to explode (in the screenshot below only 6000 rows are in the Arrow DataTable, consuming almost 30GB). When memory pressure occurs, the systemd-oomd.service kicks in and kills the process.

afbeelding

@wangxiaoying with arrow2 it does a better job, but the memory consumption is still high (1GB for 2000 rows)

afbeelding

pjlammertyn avatar Jan 07 '22 08:01 pjlammertyn

@pjlammertyn yeah, it is because the size after shrink may still be larger than initiating with size 1 in your test case. Maybe we could add a parameter to let the user define the max size of the returned record batch for arrow instead of hardcode it as 64K.

wangxiaoying avatar Jan 08 '22 20:01 wangxiaoying

Hi @wangxiaoying , is there any update on this issue?

We've been experiencing similar memory leaks where a long-running worker uses connectorx (v0.3.1) to load data into arrow format then serve them. Based on memory profiling (attached below), we noticed most of the memory allocated from _read_sql cannot be properly released over time, which leads to OOM kill of the worker process.

Setting return_type=arrow2 does not solve the problem.

leak

lutaod avatar Jan 18 '23 16:01 lutaod