Use compression in insert_arrow, with example ArrowStream
I noticed insert_arrow is about 10x slower that this equivalent code which does the insert over HTTP using zstandard compression. Maybe something else is going on, but from looking at network usage on my desktop, I think it's because insert_arrow is sending uncompressed data.
Try something like this as an example. This method uses Arrow to do the compression, which is nice and fast.
import pyarrow as pa
import requests
import orjson
def insert_arrow_http(arrow_table_to_insert, ...):
if arrow_table_to_insert.shape[0] == 0:
print("arrow_table_to_insert is empty - skipped")
return
ch_host = ...
ch_http_port = ...
ch_user = ...
ch_password = ...
ch_url = f"http://{ch_host}:{ch_http_port}"
sink = pa.BufferOutputStream()
arrow_file_writer = pa.ipc.new_file(
sink,
arrow_table_to_insert.schema,
options=pa.ipc.IpcWriteOptions(compression=pa.Codec(compression='zstd', compression_level=3))
)
arrow_file_writer.write_table(arrow_table_to_insert)
arrow_file_writer.close()
compressed_bytes= sink.getvalue()
compressed_bytes_length = len(compressed_bytes)
response = requests.post(
url=ch_url,
auth=requests.auth.HTTPBasicAuth(ch_user, ch_password),
params={
"query": "insert into the_db.the_table format Arrow",
},
data=compressed_bytes
)
if "X-ClickHouse-Exception-Code" in response.headers:
error_string = f"""X-ClickHouse-Exception-Code {response.headers['X-ClickHouse-Exception-Code']}"""
raise RuntimeError(error_string)
ch_summary = orjson.loads(response.headers['X-ClickHouse-Summary'])
written_rows = int(ch_summary['written_rows'])
elapsed_s = float(ch_summary['elapsed_ns']) / 1000000000
print(f"{written_rows:,} rows in {elapsed_s:.3f}s @ {written_rows/elapsed_s:,.0f} rows/sec, {compressed_bytes_length /written_rows:,.1f} compressed bytes/row")
Something similar works for inserting using ArrowStream, with generators that yield RecordBatches. For ArrowStream, the payload needs to be an IPC-serialized schema followed by IPC-serialized data batches.
def arrow_stream_generic_generator(schema, arrow_stream_reader, codec="zstd"):
uncompressed = schema.serialize()
compressed = pyarrow.compress(uncompressed, codec=codec)
yield (compressed)
for chunk in arrow_stream_reader:
uncompressed = chunk.serialize()
compressed = pyarrow.compress(uncompressed, codec=codec)
yield (compressed)
Then an example using that with DuckDB to read from a folder of parquet files (could be CSV, whatever) and insert using ArrowStream.
DuckDB has a fetch_record_batch which yields Arrow batches.
ddb = duckdb.connect()
sql = "SELECT * FROM 'test_file.parquet'"
arrow_stream_reader = ddb.execute(sql).fetch_record_batch(rows_per_batch=100000)
response = requests.post(
url=ch_url,
auth=HTTPBasicAuth(ch_user, ch_password),
# Don't use send_progress_in_http_headers here - breaks with long-running inserts (too many headers)
params={
"enable_http_compression": "1",
"query": "insert into the_db.the_table format ArrowStream",
},
headers={"content-encoding": "zstd"},
data=arrow_stream_generic_generator(
arrow_stream_reader.schema, arrow_stream_reader
),
)
It should be possible to do ArrowStream in the same style as the first Arrow example, with pa.ipc.new_stream used like pa.ipc.new_file, I just couldn't figure out how to wire it up correctly to do this kind of lower-footprint streaming pipeline with Requests.
arrow-odbc is another example of a package with support for streaming results as arrow batches (which can then be used with the arrow_stream_generic_generator above for similar streaming inserts from a streaming source). Might be helpful to look at if you'd like to add support for a read_arrow_batches in clickhouse_connect.
Thanks for the analysis! Adding arrow streams support has been discussed in other issues but having more code examples is much appreciated. I like the idea of using the Arrow compression as well, since that's tricky to implement otherwise. Of course if you do get this working with clickhouse_connect a PR would be awesome.
@chriscomeau79 have you observed similar problem with fetching data from client as "arrow" format? or the library implementation of fetching data as arrow is good enough? (talking about python client)
@chriscomeau79
It should be possible to do ArrowStream in the same style as the first Arrow example, with pa.ipc.new_stream used like pa.ipc.new_file, I just couldn't figure out how to wire it up correctly to do this kind of lower-footprint streaming pipeline with Requests.
Yes. It's possible:
import pyarrow as pa
import requests
from requests.auth import HTTPBasicAuth
def send_batch(batch: pa.RecordBatch) -> None:
sink = pa.BufferOutputStream()
with pa.ipc.new_stream(
sink,
batch.schema,
options=pa.ipc.IpcWriteOptions(compression=pa.Codec(compression='zstd', compression_level=3)),
) as writer:
writer.write(batch)
writer.close()
response = requests.post(
url="http://localhost:8123",
auth=HTTPBasicAuth("default", ""),
params={
"query": "insert into default.table1 format ArrowStream",
},
data=sink.getvalue()
)
if "X-ClickHouse-Exception-Code" in response.headers:
error_string = f"X-ClickHouse-Exception-Code {response.headers['X-ClickHouse-Exception-Code']}"
raise RuntimeError(error_string)
There's no point in serializing the whole Arrow format when you have a RecordBatch. It meant to be written batch-by-batch per HTTP request.
You can always write multiple batches and then send it to ClickHouse too.