clickhouse-connect icon indicating copy to clipboard operation
clickhouse-connect copied to clipboard

Improve performance of queries that result in many blocks

Open georgipeev opened this issue 5 months ago • 23 comments

Is your feature request related to a problem? Please describe. This request is related to a performance issue. When clickhouse_connect.driver.client.Client.query_df results are sent back in a large number of separate blocks, the performance of clickhouse_connect.driver.npquery.NumpyResult.df_result suffers greatly. For example, if a query whose result contains 30k rows is returned in 5000 separate blocks of ~6 rows each (as can happen depending on the query and, I believe, outside of the client's control), the df_result method takes 10 seconds to construct the final DataFrame, because the generator returned by NumpyResult._df_stream() processes each block separately, unmarshalling it into a DataFrame in ~2ms and yielding it. Concatenating the resulting 5000 DataFrames into one by NumpyResult.close_df() then takes an additional 2-3s.

Describe the solution you'd like An improved implementation of NumpyResult.df_result that performs well even if the result is returned by the server in a large number of small blocks. I can propose the following simple alternative implementation of NumpyResult._df_stream.pd_blocks, which, in the example I gave above, improves the runtime of NumpyResult.close_df() by a factor of 3. Better solutions can probably be found with more time invested in testing and profiling.

from itertools import chain
...
class NumpyResult(Closable):
...
    def _df_stream(self) -> Generator:
...
        def pd_blocks():
            yield pd.DataFrame(dict(zip(self.column_names, [chain(*b) for b in zip(*block_gen)])))

Describe alternatives you've considered I've considered somehow affecting/reducing the number of blocks returned in the response, but have not figured out a way to do it.

Additional context clickhouse_connect==0.6.16

georgipeev avatar Feb 28 '24 16:02 georgipeev

That seems like a very large number of blocks for the 30k rows response. Are they very wide rows? How large is the entire result set?

Your solution basically "unstreams" the result and pulls the entire HTTP response into memory at once, so while it makes sense for smaller result sets, it will hurt memory usage and otherwise defeat the purpose of streaming when blocks are "normal sized".

You might check the preferred_block_size_bytes setting on your server and play with sending it as a setting with your query.

genzgd avatar Feb 28 '24 17:02 genzgd

Assuming you're using df_result, you could probably move the suggested fix to the df_close method (instead of having df_close call _df_stream), and that would be a good improvement without affecting the streaming use case.

genzgd avatar Feb 28 '24 17:02 genzgd

In my proposal I was trying to have a minimal code impact. I will, of course, defer to you about what change would make most sense. I get the point about streaming - perhaps it would make sense to combine blocks into larger-sized chunks, but not necessarily into a single chunk as in my proposal (maybe add a itertools.batched around the chain(*b)?). The performance issue, however, is real and it's currently causing us to consider circumventing the clickhouse_connect code path altogether.

georgipeev avatar Feb 28 '24 17:02 georgipeev

Just to reiterate, I think your optimization makes perfect sense in the non-streaming case and I have no problem with implementing that in the next release. It's a fairly small code impact to decouple df_result from df_stream which I'm happy to do for the performance gain.

It also may make sense to batch smaller blocks in the streaming case, but the difficulty comes with deciding how large the batches should be.

Getting back to the root cause, I think it would be very helpful to understand why you have so many blocks for such a small number of rows, and the better solution to the issue might be tuning the ClickHouse settings or the queries to return fewer blocks.

genzgd avatar Feb 28 '24 17:02 genzgd

I posit that streaming would also benefit from grouping small blocks into larger batches, and it seems sensible to determine the appropriate batch size empirically. Letting the client choose the batching behavior (batching on/off, batch size) and leaving the default behavior be what the library does currently also makes sense.

Figuring out why my query results in so many small blocks would, indeed, be great, but I haven't been successful at it. I initially thought it was because my where clause included, in addition to selecting a single table partition, only one other field that occurs last in my lengthy order by list. However, when I change the where clause and replace that field with the very first field in the order by list, the result still comes in thousands of small pieces. My current hypothesis is that this is determined by the Clickhouse server implementation and table schema, and completely outside of the querying client's control.

georgipeev avatar Feb 28 '24 18:02 georgipeev

@georgipeev - there's an attempted improvement in 0.7.2. In order to maintain the correct dtypes, I had to use a somewhat different approach and I don't know how it compares with your fix since I don't have simple way to test dataframes with many small blocks at scale. If you could test it out and report the results that would be appreciated.

You did mention looking at other alternatives to clickhouse-connect -- did you get any further down that path and find anything better for the same queries? Any other ideas of how to handle this would be helpful references.

genzgd avatar Mar 08 '24 05:03 genzgd

Thanks! In the two test cases I ran, the performance of 0.7.2 was indistinguishable from that of 0.6.16. There were some new type-related woes I had to take care of to get 0.7.2 to run properly too.

georgipeev avatar Mar 13 '24 23:03 georgipeev

Ah, that's unfortunate. Apparently the main performance hit is in concatenating the underlying pandas Series objects, not the DataFrame wrappers. I think I'll need to construct a useful test case for small blocks to dig further into the bottleneck.

Just so I have some background, what exactly were the type problems you mentioned? Are they pandas or Python types? Did you use a different pandas version?

genzgd avatar Mar 13 '24 23:03 genzgd

Some string columns that used to come back as dtype object/str now come as pandas.StringDtype(). I didn't change the pandas version I was using, it was 2.2.1 in both cases.

georgipeev avatar Mar 13 '24 23:03 georgipeev

Okay, I think that's the expected behavior if the use_extended_dtypes argument to query_df is set to True (which is the default). And yes, that's probably been updated since 0.6.16.

genzgd avatar Mar 14 '24 00:03 genzgd

@genzgd Can you revert the change then? The change is breaking the index behaviour of the resulting Dataframe. Prior to 0.7.2, the index of the resulting Dataframe was always contiguous, but after 0.7.2 the index of the resulting Dataframe now is only contiguous within each block.

If it's not possible to revert the change, then there should be a note about this Breaking Change in the documentation, I think.

cwegener avatar Apr 08 '24 22:04 cwegener

Figuring out why my query results in so many small blocks would, indeed, be great, but I haven't been successful at it.

@georgipeev Are you able to share you table schema and your query?

cwegener avatar Apr 08 '24 22:04 cwegener

@cwegener I'll take a look -- do you know if there is there a way to fix the index by somehow either recalculating the index of the series that are being concatenated or recalculating the index of the resulting dataframe? I'm not sure what the options are here, but concatenating dataframes seems expensive.

I do have ideas for other ways of combining larger blocks before they are converted to Pandas at all, but I've not yet had time to explore that route.

genzgd avatar Apr 08 '24 22:04 genzgd

Hmmm ... I just had another look at the 0.7.2 change. I can see why the change has no effect on the problem reported by Georgi.

The change introduced in 0.7.2 simple replaces the one call to pd.concat() that takes all blocks and creates a new DataFrame with multiple calls to pd.concat() - one call per block .... which is just the same thing. The complexity before and after the change is the same (O(n))

cwegener avatar Apr 09 '24 06:04 cwegener

Regardless of the fact that the 0.7.2 should not be expected to have any impact on Georgi's original problem as stated in my previous comment ...

do you know if there is there a way to fix the index

This should do the trick (can't test yet because I would need to also update my code with the same datetime precision type changes that Georgi alluded to)

diff --git a/clickhouse_connect/driver/npquery.py b/clickhouse_connect/driver/npquery.py
index 513c81a..5b47133 100644
--- a/clickhouse_connect/driver/npquery.py
+++ b/clickhouse_connect/driver/npquery.py
@@ -100,7 +100,7 @@ class NumpyResult(Closable):
         new_df_series = []
         for c in chains:
             new_df_series.append(pd.concat([pd.Series(piece, copy=False) for piece in c], copy=False))
-        self._df_result = pd.DataFrame(dict(zip(self.column_names, new_df_series)))
+        self._df_result = pd.DataFrame(dict(zip(self.column_names, new_df_series))).reset_index()
         self.close()
         return self

cwegener avatar Apr 09 '24 07:04 cwegener

@cwegener The change is somewhat more subtle than that. Yes, the complexity is still the same, I was just hoping that Series concat was more efficient than DataFrame concat, but apparently not.

@georgipeev's original improvement flattened the incoming data into a single array of Python native values, and then created a DataFrame from that array, but when I tried that the specific dtype was lost (so for example, anything with Nones became a dtype object, which seemed like even more of a breaking change). That problem might be solved by inferring the specific Pandas dtype from the Clickhouse column type, and apply it to the raw data after the DataFrame is created from the raw Python objects, but that will require some extensive testing.

My other thought for an improvement is to combine the binary columns from multiple from ClickHouse blocks before any transformation, but that is moderately complex and requires tracking the size of the blocks coming in to know when to combine.

Finally the reset_index() seems like a straightforward fix to the non-consecutive index issue that should be fairly inexpensive, so if I don't have time in the near future for something better (which is unfortunately likely), I'll at least add that.

genzgd avatar Apr 09 '24 08:04 genzgd

I did add a limited test to test small blocks, using the ClickHouse setting max_block_size, and my observations are:

  • The change in 0.7.2 actually does improve performance by about 40%
  • @georgipeev's suggested change of yield pd.DataFrame(dict(zip(self.column_names, [chain(*b) for b in zip(*block_gen)]))) actually significantly reduces performance
  • The index on the Dataframe under 0.7.7 looks contiguous/consecutive,

So to keep this open, I'll need some more information from @georgipeev -- in particular some kind of reproducible dataset.

@cwegener If you could open a new issue with a reproducible example of the broken behavior you're seeing, I'd definitely like to understand what's going on.

For reference, I've been testing under Python 3.11, Pandas 2.2.1, and using the following query to gauge rough performance numbers:

client.query_df('SELECT number, randomString(512) FROM numbers(5000000)',
                               settings={'max_block_size': 50}  

That takes a little over 18.5 seconds on my mac, while the default max_block size takes about about 11.5 seconds.

genzgd avatar Apr 09 '24 20:04 genzgd

If you could open a new issue with a reproducible example of the broken behavior you're seeing, I'd definitely like to understand what's going on.

I'll test and report results in a new issue.

cwegener avatar Apr 09 '24 21:04 cwegener

I'm sorry, but I don't think I would be able to generate a sanitized set that reproduces the problem. Regarding the schema, the table is partitioned by day, has 6 additional order by columns, and about 50 other columns. As I mentioned earlier, the where clause that results in a heavily fragmented result selects a single partition and has a filter on just one more field from the order by list (which one seems to not be important).

georgipeev avatar Apr 13 '24 17:04 georgipeev