influxdb
influxdb copied to clipboard
Perf issues between HTTP vs FlightRPC v3
I'm still experiencing performance issues between calling CLI that calls HTTP API under the hood vs FlightRPC with pyarrow client.
I'm gonna try to create a minimal example for this.
I'm using the v3.0.0 build enterprise, with compaction enabled.
In the mean time here is the log calling the same SQL query:
Here using CLI: 32ms
influxdb3 | SELECT DISTINCT ON (tid) lat, lon, tid, battery, altitude, time
influxdb3 | FROM telemetry
influxdb3 | WHERE tid IN ('1', '2', '3', '4', '5', '6', '7', '8', '9', '10')
influxdb3 | ORDER BY tid, time DESC;
influxdb3 | query_params=Params { } issue_time=2025-04-20T17:52:56.673480632+00:00 partitions=8 parquet_files=16 deduplicated_partitions=3 deduplicated_parquet_files=11 plan_duration_secs=0.031543458 permit_duration_secs=0.000114208 execute_duration_secs=0.000655875 end2end_duration_secs=0.032463916 compute_duration_secs=0.00020381 max_memory=0 ingester_metrics=IngesterMetrics { latency_to_plan = 0ns, latency_to_full_data = 0ns, response_rows = 0, partition_count = 0, response_size = 0 } success=true running=false cancelled=false
Using FLightRPC: 617ms
influxdb3 | SELECT DISTINCT ON (tid) lat, lon, tid, battery, altitude, time
influxdb3 | FROM telemetry
influxdb3 | WHERE tid IN ('1', '2', '3', '4', '5', '6', '7', '8', '9', '10')
influxdb3 | ORDER BY tid, time DESC;
influxdb3 | query_params=Params { } issue_time=2025-04-20T17:53:48.931954711+00:00 partitions=8 parquet_files=16 deduplicated_partitions=3 deduplicated_parquet_files=11 plan_duration_secs=0.060829459 permit_duration_secs=0.001200208 execute_duration_secs=0.555666209 end2end_duration_secs=0.617759376 compute_duration_secs=0.095443782 max_memory=25699024 ingester_metrics=IngesterMetrics { latency_to_plan = 0ns, latency_to_full_data = 0ns, response_rows = 0, partition_count = 0, response_size = 0 } success=true running=false cancelled=false
Expected behaviour: Same performance between the two paths
Actual behaviour:
Environment info:
- Please provide the command you used to build the project, including any
RUSTFLAGS. - System info: Darwin 24.3.0 arm64
- I'm running influxdb3 using docker with a volume, object store is setup as
file - Other relevant environment details: disk info, hardware setup etc.
Config:
- INFLUXDB3_OBJECT_STORE=file
- INFLUXDB3_DB_DIR=/var/lib/influxdb3
- INFLUXDB3_ENTERPRISE_LICENSE_EMAIL=
Logs:
Include snippet of errors in logs or stack traces here.
Sometimes you can get useful information by running the program with the RUST_BACKTRACE=full environment variable.
Finally, the IOx server has a -vv for verbose logging.
Hey @jules-ch would you happen to have a minimum repro of how you're using the pyarrow client? Even just a small python script to execute would help. That way we could make an equivalent Rust one to help rule out some possible perf bottlenecks
Yeah I'll try to have one soon. Is it possible to call the compactor manually or would past data spread on multiple periods would be compacted to marquet, I'm asking just for reproductibility
Ok here is a Minimal example:
services:
influxdb3:
image: quay.io/influxdb/influxdb3-enterprise:5aed0d9d2cb1cba948f77869ff506230b2d0d668
container_name: influxdb3
volumes:
- ./influxdb3/:/var/lib/influxdb3/
environment:
- INFLUXDB3_OBJECT_STORE=file
- INFLUXDB3_DB_DIR=/var/lib/influxdb3
- INFLUXDB3_ENTERPRISE_LICENSE_EMAIL=<YOUREMAIL>
- INFLUXDB3_ENTERPRISE_MODE=all
- INFLUXDB3_HTTP_BIND_ADDR=0.0.0.0:8181
- INFLUXDB3_MAX_HTTP_REQUEST_SIZE=20971520
- LOG_FILTER=info
- INFLUXDB3_WAL_FLUSH_INTERVAL=1000ms
- INFLUXDB3_NODE_IDENTIFIER_PREFIX=node-1
- INFLUXDB3_ENTERPRISE_CLUSTER_ID=cluster-1
- INFLUXDB3_WAL_SNAPSHOT_SIZE=10
expose:
- 8181
ports:
- 8181:8181
healthcheck:
test: ["CMD-SHELL", "bash -c ':> /dev/tcp/127.0.0.1/8181' || exit 1"]
interval: 5s
retries: 5
start_period: 30s
timeout: 10s
I reduced WAL_SNAPSHOT_SIZE to trigger compaction when loading data with the script.
Load data 1000 devices with 120 minutes of data, sample rate is every seconds.
from datetime import datetime, timedelta
import numpy as np
from influxdb_client_3 import InfluxDBClient3, Point, WriteOptions, write_client_options
TOKEN = "<INFLUXDB_TOKEN>"
DATABASE = "location"
tids = np.arange(0, 1000)
def generate_points():
end_time = datetime.now()
time_span = timedelta(minutes=120)
start_time = end_time - time_span
times = np.arange(
int(start_time.timestamp() * 1e9), int(end_time.timestamp() * 1e9), int(1e9)
)
n_points = int(time_span.total_seconds())
for tid in tids:
lats = np.random.uniform(-90.0, 90.0, (n_points,))
lons = np.random.uniform(-180, 180, (n_points,))
for lat, lon, time_ in zip(lats, lons, times, strict=True):
point = (
Point("telemetry")
.tag("tid", tid)
.field("lat", lat)
.field("lon", lon)
.time(time_)
)
yield point
BATCH_SIZE = 50000
class BatchingCallback:
def __init__(self):
self.write_count = 0
self.total = 0
def success(self, conf, data: str):
self.write_count += 1
count = len(data.splitlines())
self.total += count
print(f"Written batch: {conf}, data: {len(data.splitlines())} items")
print(f"Written {self.total} total")
callback = BatchingCallback()
write_options = WriteOptions(
batch_size=BATCH_SIZE,
flush_interval=10_000,
jitter_interval=2_000,
retry_interval=5_000,
max_retries=5,
max_retry_delay=30_000,
exponential_base=2,
)
wco = write_client_options(
success_callback=callback.success,
write_options=write_options,
)
with InfluxDBClient3(
host="http://127.0.0.1:8181",
database=DATABASE,
token=TOKEN,
write_client_options=wco,
) as influxdb3_client:
influxdb3_client.write(generate_points())
Then query with both FlightRPC and V3 http calls, I'm using aiohttp here not to wait on response calls but influxdb server logs are the way to truly compare end2end_duration.
Use docker compose logs influxdb3 -n0 -f | grep end2end_duration to compare durations between calls.
import asyncio
import json
import time
import aiohttp
from pyarrow.flight import FlightCallOptions, FlightClient, Ticket
TOKEN = "<INFLUXDB_TOKEN>"
flight_client = FlightClient(location="grpc+tcp://127.0.0.1:8181")
token = (b"authorization", f"Bearer {TOKEN}".encode("utf-8"))
query_1 = """
--sql
SELECT DISTINCT ON (tid) lat, lon, tid, time
FROM telemetry
WHERE tid IN ('1', '2', '3', '4', '5', '6', '7', '8', '9', '10')
ORDER BY tid ASC, time DESC;
"""
query_2 = """
--sql
SELECT lat, lon, tid, time
FROM telemetry
ORDER BY time DESC
LIMIT 200
"""
query_3 = """
--sql
SELECT AVG(lat)
FROM telemetry
GROUP BY tid
"""
query_4 = """
--sql
SELECT
DATE_BIN(INTERVAL '10 minutes', time) AS time,
tid,
selector_max(lat, time)['value'] AS 'max lat',
selector_min(lat, time)['value'] AS 'min lat',
avg(lat) AS 'average lat'
FROM telemetry
GROUP BY 1, tid
ORDER BY tid, 1
"""
def flight_rpc_query(query_sql: str, flight_client: FlightClient):
ticket_data = {
"database": "location",
"sql_query": query_sql,
"query_type": "sql",
}
ticket = Ticket(json.dumps(ticket_data).encode("utf-8"))
options = FlightCallOptions(timeout=10, headers=[token])
start_time = time.perf_counter()
reader = flight_client.do_get(ticket, options)
table = reader.read_all()
print(time.perf_counter() - start_time, "s")
async def v3_http_call(query: str):
influxdb_url = "http://127.0.0.1:8181"
headers = {
"Authorization": f"Bearer {TOKEN}",
"accept": "application/json",
"Content-Type": "application/json",
}
content = json.dumps(
{
"db": "location",
"q": query,
"format": "json",
},
)
async with aiohttp.ClientSession(headers=headers) as session:
start_time = time.perf_counter()
async with session.post(
influxdb_url + "/api/v3/query_sql", data=content
) as response:
print("Status:", response.status)
print(time.perf_counter() - start_time, "s")
response = await response.json()
queries = [query_1, query_2, query_3, query_4]
for query in queries:
print("\nSQL query: ", query)
print("Calling FlightRPC\n-----------------")
flight_rpc_query(query, flight_client)
print("Calling HTTP V3\n-----------------")
asyncio.run(v3_http_call(query))
SQL query:
--sql
SELECT DISTINCT ON (tid) lat, lon, tid, time
FROM telemetry
WHERE tid IN ('1', '2', '3', '4', '5', '6', '7', '8', '9', '10')
ORDER BY tid ASC, time DESC;
Calling FlightRPC
-----------------
0.05812533299831557 s
Calling HTTP V3
-----------------
Status: 200
0.030552708001778228 s
SQL query:
--sql
SELECT lat, lon, tid, time
FROM telemetry
ORDER BY time DESC
LIMIT 200
Calling FlightRPC
-----------------
1.0316940000011527 s
Calling HTTP V3
-----------------
Status: 200
0.010956583999359282 s
SQL query:
--sql
SELECT AVG(lat)
FROM telemetry
GROUP BY tid
Calling FlightRPC
-----------------
0.9687469999989844 s
Calling HTTP V3
-----------------
Status: 200
0.0137646670009417 s
SQL query:
--sql
SELECT
DATE_BIN(INTERVAL '10 minutes', time) AS time,
tid,
selector_max(lat, time)['value'] AS 'max lat',
selector_min(lat, time)['value'] AS 'min lat',
avg(lat) AS 'average lat'
FROM telemetry
GROUP BY 1, tid
ORDER BY tid, 1
Calling FlightRPC
-----------------
1.0136529580013303 s
Calling HTTP V3
-----------------
Status: 200
0.010635374997946201 s
influxdb3 | query_params=Params { } issue_time=2025-04-27T22:09:23.012372595+00:00 partitions=8 parquet_files=30 deduplicated_partitions=6 deduplicated_parquet_files=28 plan_duration_secs=0.011326125 permit_duration_secs=0.000628417 execute_duration_secs=0.040893375 end2end_duration_secs=0.05298875 compute_duration_secs=0.050799921 max_memory=42873040 ingester_metrics=IngesterMetrics { latency_to_plan = 0ns, latency_to_full_data = 0ns, response_rows = 0, partition_count = 0, response_size = 0 } success=true running=false cancelled=false
influxdb3 | query_params=Params { } issue_time=2025-04-27T22:09:23.067608428+00:00 partitions=8 parquet_files=30 deduplicated_partitions=6 deduplicated_parquet_files=28 plan_duration_secs=0.008610792 permit_duration_secs=6.3917e-5 execute_duration_secs=0.000350458 end2end_duration_secs=0.009155084 compute_duration_secs=0.000436696 max_memory=0 ingester_metrics=IngesterMetrics { latency_to_plan = 0ns, latency_to_full_data = 0ns, response_rows = 0, partition_count = 0, response_size = 0 } success=true running=false cancelled=false
influxdb3 | query_params=Params { } issue_time=2025-04-27T22:09:23.126399512+00:00 partitions=11 parquet_files=35 deduplicated_partitions=11 deduplicated_parquet_files=35 plan_duration_secs=0.006819791 permit_duration_secs=0.000935 execute_duration_secs=1.022507751 end2end_duration_secs=1.030344 compute_duration_secs=3.243217819 max_memory=1409936128 ingester_metrics=IngesterMetrics { latency_to_plan = 0ns, latency_to_full_data = 0ns, response_rows = 0, partition_count = 0, response_size = 0 } success=true running=false cancelled=false
influxdb3 | query_params=Params { } issue_time=2025-04-27T22:09:24.158995429+00:00 partitions=11 parquet_files=35 deduplicated_partitions=11 deduplicated_parquet_files=35 plan_duration_secs=0.007156542 permit_duration_secs=5.7166e-5 execute_duration_secs=0.001554 end2end_duration_secs=0.008937625 compute_duration_secs=0.001594963 max_memory=2972808 ingester_metrics=IngesterMetrics { latency_to_plan = 0ns, latency_to_full_data = 0ns, response_rows = 0, partition_count = 0, response_size = 0 } success=true running=false cancelled=false
influxdb3 | query_params=Params { } issue_time=2025-04-27T22:09:25.166081888+00:00 partitions=11 parquet_files=35 deduplicated_partitions=11 deduplicated_parquet_files=35 plan_duration_secs=0.006815625 permit_duration_secs=0.000289875 execute_duration_secs=0.960482625 end2end_duration_secs=0.967681 compute_duration_secs=3.563979224 max_memory=1256736474 ingester_metrics=IngesterMetrics { latency_to_plan = 0ns, latency_to_full_data = 0ns, response_rows = 0, partition_count = 0, response_size = 0 } success=true running=false cancelled=false
influxdb3 | query_params=Params { } issue_time=2025-04-27T22:09:26.135655222+00:00 partitions=11 parquet_files=35 deduplicated_partitions=11 deduplicated_parquet_files=35 plan_duration_secs=0.0061555 permit_duration_secs=5.5875e-5 execute_duration_secs=0.000161916 end2end_duration_secs=0.006390375 compute_duration_secs=2.3629e-5 max_memory=0 ingester_metrics=IngesterMetrics { latency_to_plan = 0ns, latency_to_full_data = 0ns, response_rows = 0, partition_count = 0, response_size = 0 } success=true running=false cancelled=false
influxdb3 | query_params=Params { } issue_time=2025-04-27T22:09:26.933600930+00:00 partitions=11 parquet_files=35 deduplicated_partitions=11 deduplicated_parquet_files=35 plan_duration_secs=0.008665917 permit_duration_secs=0.000477208 execute_duration_secs=0.993381945 end2end_duration_secs=1.002627861 compute_duration_secs=3.706687025 max_memory=1263053695 ingester_metrics=IngesterMetrics { latency_to_plan = 0ns, latency_to_full_data = 0ns, response_rows = 0, partition_count = 0, response_size = 0 } success=true running=false cancelled=false
influxdb3 | query_params=Params { } issue_time=2025-04-27T22:09:27.940562291+00:00 partitions=11 parquet_files=35 deduplicated_partitions=11 deduplicated_parquet_files=35 plan_duration_secs=0.008619459 permit_duration_secs=4.7041e-5 execute_duration_secs=0.000263667 end2end_duration_secs=0.008
Especially on aggregate query, performance is 100x worse when using Flight
@jules-ch - thanks for your detailed explanation of the setup. Would it be possible for you to run the same test against 3.2.1 (most recent release)?
I ran a quick test against 3.2.1 and I couldn't reproduce the difference, both HTTP and FlightSQL are roughly on par and a lot of the times FlightSQL is quicker. I'm running in a non-containerized env on Linux though, I can switch to the exact 3.0.0 revision 5aed0d9d2cb1cba948f77869ff506230b2d0d668 running within docker and try it again but it's probably beneficial to run against the most recent release anyway to see if you are still experiencing the perf difference.
@jules-ch - I've ran your script instead of queries directly and I've managed to reproduce the issue. I just wanted to clarify if this is something that you've done intentionally, in your http call you seem to print the time as soon as you get the status but not await the json response.
async def v3_http_call(query: str):
influxdb_url = "http://127.0.0.1:8181"
headers = {
"Authorization": f"Bearer {TOKEN}",
"accept": "application/json",
"Content-Type": "application/json",
}
content = json.dumps(
{
"db": "location",
"q": query,
"format": "json",
},
)
async with aiohttp.ClientSession(headers=headers) as session:
start_time = time.perf_counter()
async with session.post(
influxdb_url + "/api/v3/query_sql", data=content
) as response:
print("Status:", response.status)
print(time.perf_counter() - start_time, "s") # <---- this print seems to be before you get the json body
response = await response.json()
When you compare that with flight call, you're measuring the time after table = reader.read_all(). I'm not sure if it is a fair comparison if one is measured after reading all the data and other is before reading the response. If I move the timer print statement in v3_http_call function after waiting for the response I get the following as result,
SQL query:
--sql
SELECT DISTINCT ON (tid) lat, lon, tid, time
FROM telemetry
WHERE tid IN ('1', '2', '3', '4', '5', '6', '7', '8', '9', '10')
ORDER BY tid ASC, time DESC;
Calling FlightRPC
-----------------
0.2590100329834968 s
Calling HTTP V3
-----------------
Status: 200
0.0907010689843446 s
SQL query:
--sql
SELECT lat, lon, tid, time
FROM telemetry
ORDER BY time DESC
LIMIT 200
Calling FlightRPC
-----------------
0.01946758001577109 s
Calling HTTP V3
-----------------
Status: 200
0.020495977951213717 s
SQL query:
--sql
SELECT AVG(lat)
FROM telemetry
GROUP BY tid
Calling FlightRPC
-----------------
3.2022568699903786 s
Calling HTTP V3
-----------------
Status: 200
6.923285491997376 s
SQL query:
--sql
SELECT
DATE_BIN(INTERVAL '10 minutes', time) AS time,
tid,
selector_max(lat, time)['value'] AS 'max lat',
selector_min(lat, time)['value'] AS 'min lat',
avg(lat) AS 'average lat'
FROM telemetry
GROUP BY 1, tid
ORDER BY tid, 1
Calling FlightRPC
-----------------
4.450999940978363 s
Calling HTTP V3
-----------------
Status: 200
5.365592924063094 s
Now the first query is slightly slower than the HTTP response (which I'm looking into atm), but flight seems to be quicker for all other queries. I'll continue to investigate to see if there are any other issues.
@praveen-influx small off-topic question: Does the arrow flight server split response into chunks ( multiple record batches ) and sends it through a stream chunk by chunk, or it simply puts the whole data into a single batch and sends it right away?
I was comparing the end2end_duration_secs in the influxdb server logs, the compute step is higher in the case of FlightRPC.
I did the print before streaming on purpose to have the time to first byte.
On the server logs you can see compute duration to be in 1e-5s for http query and 3s for GRPC. But maybe it's not measuring the same things.
It's difficult to clearly compare it though since there is the download and decoding part when using http.
I expected GRPC to have better performance though.
@jules-ch btw, maybe it makes sense to configure keep alive for your client and test a sequence of the requests. Maybe this performance gap is TCP connection initialization?
From local testing (keep alive configured)
Time elapsed: 344.2032ms Time elapsed: 173.1554ms Time elapsed: 172.2722ms Time elapsed: 172.6376ms Time elapsed: 172.3643ms Time elapsed: 179.2679ms Time elapsed: 172.8454ms Time elapsed: 172.3532ms Time elapsed: 173.8715ms Time elapsed: 173.2267ms