clickhouse-connect icon indicating copy to clipboard operation
clickhouse-connect copied to clipboard

Use compression in insert_arrow, with example ArrowStream

Open chriscomeau79 opened this issue 2 years ago • 3 comments

I noticed insert_arrow is about 10x slower that this equivalent code which does the insert over HTTP using zstandard compression. Maybe something else is going on, but from looking at network usage on my desktop, I think it's because insert_arrow is sending uncompressed data.

Try something like this as an example. This method uses Arrow to do the compression, which is nice and fast.

import pyarrow as pa
import requests
import orjson

def insert_arrow_http(arrow_table_to_insert, ...):
    if arrow_table_to_insert.shape[0] == 0:
        print("arrow_table_to_insert is empty - skipped")
        return

    ch_host = ...
    ch_http_port = ...
    ch_user = ...
    ch_password = ...
    ch_url = f"http://{ch_host}:{ch_http_port}"

    sink = pa.BufferOutputStream()
    arrow_file_writer = pa.ipc.new_file(
        sink, 
        arrow_table_to_insert.schema, 
        options=pa.ipc.IpcWriteOptions(compression=pa.Codec(compression='zstd', compression_level=3))
    )
    arrow_file_writer.write_table(arrow_table_to_insert)
    arrow_file_writer.close()
    
    compressed_bytes= sink.getvalue()
    compressed_bytes_length = len(compressed_bytes)
    
    response = requests.post(
        url=ch_url,
        auth=requests.auth.HTTPBasicAuth(ch_user, ch_password),
        params={
            "query": "insert into the_db.the_table format Arrow",
        },
        data=compressed_bytes
    )

    if "X-ClickHouse-Exception-Code" in response.headers:
        error_string = f"""X-ClickHouse-Exception-Code {response.headers['X-ClickHouse-Exception-Code']}"""
        raise RuntimeError(error_string)
    
    ch_summary = orjson.loads(response.headers['X-ClickHouse-Summary'])
    written_rows = int(ch_summary['written_rows'])
    elapsed_s = float(ch_summary['elapsed_ns']) / 1000000000
    print(f"{written_rows:,} rows in {elapsed_s:.3f}s @ {written_rows/elapsed_s:,.0f} rows/sec, {compressed_bytes_length /written_rows:,.1f} compressed bytes/row") 

Something similar works for inserting using ArrowStream, with generators that yield RecordBatches. For ArrowStream, the payload needs to be an IPC-serialized schema followed by IPC-serialized data batches.

def arrow_stream_generic_generator(schema, arrow_stream_reader, codec="zstd"):
    uncompressed = schema.serialize()
    compressed = pyarrow.compress(uncompressed, codec=codec)
    yield (compressed)
    for chunk in arrow_stream_reader:
        uncompressed = chunk.serialize()
        compressed = pyarrow.compress(uncompressed, codec=codec)
        yield (compressed)

Then an example using that with DuckDB to read from a folder of parquet files (could be CSV, whatever) and insert using ArrowStream.

DuckDB has a fetch_record_batch which yields Arrow batches.

ddb = duckdb.connect()
sql = "SELECT * FROM 'test_file.parquet'"
arrow_stream_reader = ddb.execute(sql).fetch_record_batch(rows_per_batch=100000)

response = requests.post(
    url=ch_url,
    auth=HTTPBasicAuth(ch_user, ch_password),
    # Don't use send_progress_in_http_headers here - breaks with long-running inserts (too many headers)
    params={
        "enable_http_compression": "1",
        "query": "insert into the_db.the_table format ArrowStream",
    },
    headers={"content-encoding": "zstd"},
    data=arrow_stream_generic_generator(
        arrow_stream_reader.schema, arrow_stream_reader
    ),
)

It should be possible to do ArrowStream in the same style as the first Arrow example, with pa.ipc.new_stream used like pa.ipc.new_file, I just couldn't figure out how to wire it up correctly to do this kind of lower-footprint streaming pipeline with Requests.

arrow-odbc is another example of a package with support for streaming results as arrow batches (which can then be used with the arrow_stream_generic_generator above for similar streaming inserts from a streaming source). Might be helpful to look at if you'd like to add support for a read_arrow_batches in clickhouse_connect.

chriscomeau79 avatar Nov 08 '23 19:11 chriscomeau79

Thanks for the analysis! Adding arrow streams support has been discussed in other issues but having more code examples is much appreciated. I like the idea of using the Arrow compression as well, since that's tricky to implement otherwise. Of course if you do get this working with clickhouse_connect a PR would be awesome.

genzgd avatar Nov 08 '23 19:11 genzgd

@chriscomeau79 have you observed similar problem with fetching data from client as "arrow" format? or the library implementation of fetching data as arrow is good enough? (talking about python client)

std-python-dev avatar Dec 18 '23 11:12 std-python-dev

@chriscomeau79

It should be possible to do ArrowStream in the same style as the first Arrow example, with pa.ipc.new_stream used like pa.ipc.new_file, I just couldn't figure out how to wire it up correctly to do this kind of lower-footprint streaming pipeline with Requests.

Yes. It's possible:

import pyarrow as pa
import requests
from requests.auth import HTTPBasicAuth


def send_batch(batch: pa.RecordBatch) -> None:
    sink = pa.BufferOutputStream()
    with pa.ipc.new_stream(
            sink,
            batch.schema,
            options=pa.ipc.IpcWriteOptions(compression=pa.Codec(compression='zstd', compression_level=3)),
    ) as writer:
        writer.write(batch)
    writer.close()
    response = requests.post(
        url="http://localhost:8123",
        auth=HTTPBasicAuth("default", ""),
        params={
            "query": "insert into default.table1 format ArrowStream",
        },
        data=sink.getvalue()
    )
    if "X-ClickHouse-Exception-Code" in response.headers:
        error_string = f"X-ClickHouse-Exception-Code {response.headers['X-ClickHouse-Exception-Code']}"
        raise RuntimeError(error_string)

There's no point in serializing the whole Arrow format when you have a RecordBatch. It meant to be written batch-by-batch per HTTP request. You can always write multiple batches and then send it to ClickHouse too.

pkit avatar Jun 12 '24 21:06 pkit