connector-x
connector-x copied to clipboard
partition_range is not taken into account in memory allocation
When you partition a query and specify partition_range it is taken into account to partition the query but not in the memory allocation.
Reproduce:
connectorx.read_sql(
'postgres://postgres:postgres@localhost:5432/postgres',
'select * from lineitem',
partition_num=4,
partition_on='l_orderkey',
partition_range=(1, 10),
return_type='pandas',
)
If we check the logs:
...
[2025-05-29T17:08:26Z DEBUG tokio_postgres::prepare] preparing query s1: SELECT count(*) FROM (SELECT * FROM lineitem) AS CXTMPTAB_COUNT
...
[2025-05-29T17:08:26Z DEBUG connectorx::pandas::dispatcher] Allocate destination memory: 1199969x16
...
It will call a count(*) and allocate memory for the entire dataset.
Hi @surister , thanks for bringing up this!
The partition_range parameter is designed to let users with domain knowledge manually set the range in order to avoid the additional MIN MAX queries during data loading. It is not designed to be used as a filter for the query, which should be done in the query it self (e.g., select * from lineitem where l_orderkey >= 0 AND l_orderkey <= 10).
If the user sets the partition_range=(a, b) manually, they should guarantee the range at least covers the real range of the column in their query. It is ok if a is smaller than real min value or b is greater than the real max value. However, if a is larger than the real min value or b is less than the real max value, there will be an issuing as the size of the allocated dataframe will be wrong.
If the users are not sure about the range at runtime, they should not fill in this parameter and therefore connectorx will issue a MIN MAX query to get the real range of the partitioned_column before split and fetch the query result.
Please let me know if you have further questions!
thanks for the thorough explanation, couldn't we make the memory allocation more efficient if we issued a select count(*) from tbl where partition_on > partition_range.0 and partition_on < partition_range.1 ? We are issuing the count already and most likely any count of a given value range by the user will be smaller than the total count
thanks for the thorough explanation, couldn't we make the memory allocation more efficient if we issued a
select count(*) from tbl where partition_on > partition_range.0 and partition_on < partition_range.1? We are issuing the count already and most likely any count of a given value range by the user will be smaller than the total count
If we add the WHERE partition_on > partition_range.0 and partition_on < partition_range.1 clause to the COUNT query and if the given partition_range DOES NOT cover the real range, the result when enabling and disabling the partition of the same query will be different. That is, the result of a query will depend on whether and how you specify the partition_range parameter. This is not what we want because our read_sql(conn, query) function semantically means fetching the result of query. The partition related parameters, including partition_col, partition_num and partition_range, just indicate how you want to fetch the result instead of what to fetch.
If we add the WHERE partition_on > partition_range.0 and partition_on < partition_range.1 clause to the COUNT query and if the given partition_range DOES cover the real partition, this newly added WHERE clause will be redundant since it cannot filter out any data.
In general, if the user only wants the query result within the partition, they should add this filter into the query itself, instead of only indicating at the partition_range parameter. Using your initial example, if the user only wants data where l_orderkey is in range [1, 10], the code should be written like this:
connectorx.read_sql(
'postgres://postgres:postgres@localhost:5432/postgres',
'select * from lineitem where l_orderkey >= 1 and l_orderkey <= 10',
partition_num=4,
partition_on='l_orderkey',
partition_range=(1, 10),
return_type='pandas',
)
I understand the intention of letting the user provide the min/max to avoid a query, but ultimately that lets the user filter by value, indirectly, as min/max is used on creating the partition queries.
[2025-06-06T08:47:23Z DEBUG tokio_postgres::prepare] preparing query s2: COPY (SELECT * FROM (SELECT * FROM lineitem) AS CXTMPTAB_PART WHERE 5 <= CXTMPTAB_PART.l_orderkey AND CXTMPTAB_PART.l_orderkey < 7) TO STDOUT WITH BINARY
[2025-06-06T08:47:23Z DEBUG tokio_postgres::prepare] preparing query s4: COPY (SELECT * FROM (SELECT * FROM lineitem) AS CXTMPTAB_PART WHERE 1 <= CXTMPTAB_PART.l_orderkey AND CXTMPTAB_PART.l_orderkey < 3) TO STDOUT WITH BINARY
[2025-06-06T08:47:23Z DEBUG tokio_postgres::prepare] preparing query s5: COPY (SELECT * FROM (SELECT * FROM lineitem) AS CXTMPTAB_PART WHERE 7 <= CXTMPTAB_PART.l_orderkey AND CXTMPTAB_PART.l_orderkey < 11) TO STDOUT WITH BINARY
[2025-06-06T08:47:23Z DEBUG tokio_postgres::prepare] preparing query s3: COPY (SELECT * FROM (SELECT * FROM lineitem) AS CXTMPTAB_PART WHERE 3 <= CXTMPTAB_PART.l_orderkey AND CXTMPTAB_PART.l_orderkey < 5) TO STDOUT WITH BINARY
The count query is going to be done if partition_num and queries.len == 1 anyway, if we add the conditionals on the count query, three things can happen:
partition_range: is bigger than the actual range: it does not matter, thecountof that will be the same ascount(*), only an error could occur if between the count and fetching the data, new values are commited, but this is also true in normal operations withoutpartition_range, so not a problem created by this situation.partition_range: is equal to real range, nothing happens, result is the same ascount(*)partition_range: is smaller than real range, values are filtered, count will be smaller thancount(*), memory will be more efficient.
I think that users will and use partition_range to filter out values as per hyrum's law, it's way easier to just specify a tuple than modifying queries themselves, there are users that pipe outside queries to connectorx, we might as well make it efficient, or change the API to not let this happen.
For extra context of external readers, we can end up with dataframes that are mostly empty (and unnecessarily allocating memory)
l_orderkey l_partkey ... l_shipmode l_comment
0 1 31038 ... TRUCK egular courts above the
1 1 13462 ... MAIL ly final dependencies: slyly bold
2 1 12740 ... REG AIR riously. regular, express dep
3 1 427 ... AIR lites. fluffily even de
4 1 4806 ... FOB pending foxes. slyly re
... ... ... ... ... ...
1199964 0 0 ... 0 0
1199965 0 0 ... 0 0
1199966 0 0 ... 0 0
1199967 0 0 ... 0 0
1199968 0 0 ... 0 0
[1199969 rows x 16 columns]
Also, if we wanted to avoid the min/max query, couldn't we just always send together the min/max query with the count? I'm doing that right now https://github.com/surister/conecta/blob/8b7fb945efef10e5b20966cbf608a99ff4f8eda2/conecta-core/src/source/postgres.rs#L64
I still think we need to keep the semantic of read_sql (con, sql) function, which simply fetches the result of the sql query using the connection con. All partition related parameters are only used to indicate how the result can be fetched in parallel. This interface aligns with spark jdbc API, which mentions that: Notice that lowerBound and upperBound are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned. modin also leverages the same semantic in terms of the bounds of the partition column.
I think the Hyrum's Law you mentioned is a valid point. We should add more documentation to clarify the usage of partition_range. Also, I think we should probably simply eliminate the usage of the values from the partition_range in the partitioned query by using open ended filters on the first and last partition, for example
connectorx.read_sql(
'postgres://postgres:postgres@localhost:5432/postgres',
'select * from lineitem',
partition_num=3,
partition_on='l_orderkey',
partition_range=(1, 10),
)
is partitioned into:
select * from lineitem where l_orderkey <= 4;
select * from lineitem where l_orderkey > 4 and l_orderkey <= 7;
select * from lineitem where l_orderkey > 7;
I see your point, thanks for taking time to discuss things with me!
I'll close this issue, we can open a new one with the issues you mentioned