[Bug] Spark is unable to write to Doris hosted on K8s ( Compute Storage Decoupled mode )
Search before asking
- [x] I had searched in the issues and found no similar issues.
Version
25.1.0
What's Wrong?
I am running doris in k8s, and want to write to it using spark. getting the below exception
java.net.UnknownHostException: doris-disaggregated-cluster-cg1-0.doris-disaggregated-cluster-cg1.doris.svc.cluster.local
at java.base/java.net.InetAddress$CachedLookup.get(InetAddress.java:988)
I believe this is happening because Spark contacts FE to get a list of backend/compute nodes, and then once the IP/port of the compute nodes is attained, it directly contacts the compute groups.
The issue is in Kubernetes when compute groups spin up, they register themselves to FE nodes using their internal FQDN, which are not reachable outside.
mysql> show backends;
+---------------+-------------------------------------------------------------------------------------------+---------------+--------+----------+----------+--------------------+---------------------+---------------------+-------+----------------------+-----------+------------------+-------------------+---------------+---------------+---------+----------------+--------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+-----------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------+----------+----------+---------+
| BackendId | Host | HeartbeatPort | BePort | HttpPort | BrpcPort | ArrowFlightSqlPort | LastStartTime | LastHeartbeat | Alive | SystemDecommissioned | TabletNum | DataUsedCapacity | TrashUsedCapacity | AvailCapacity | TotalCapacity | UsedPct | MaxDiskUsedPct | RemoteUsedCapacity | Tag | ErrMsg | Version | Status | HeartbeatFailureCounter | NodeRole | CpuCores | Memory |
+---------------+-------------------------------------------------------------------------------------------+---------------+--------+----------+----------+--------------------+---------------------+---------------------+-------+----------------------+-----------+------------------+-------------------+---------------+---------------+---------+----------------+--------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+-----------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------+----------+----------+---------+
| 1750574093851 | doris-disaggregated-cluster-cg1-0.doris-disaggregated-cluster-cg1.doris.svc.cluster.local | 9050 | 9060 | 8040 | 8060 | -1 | 2025-06-22 06:35:21 | 2025-06-22 06:41:07 | true | false | 33 | 0.000 | 0.000 | 1.000 B | 0.000 | 0.00 % | 0.00 % | 0.000 | {"cloud_unique_id" : "1:1030729153:dnDaSeQC", "compute_group_status" : "NORMAL", "private_endpoint" : "", "compute_group_name" : "cg1", "location" : "default", "public_endpoint" : "", "compute_group_id" : "xdReRUSR"} | | doris-3.0.5-rc01-e277cfb83f | {"lastSuccessReportTabletsTime":"N/A","lastStreamLoadTime":-1,"isQueryDisabled":false,"isLoadDisabled":false,"isActive":true,"currentFragmentNum":0,"lastFragmentUpdateTime":1750574458773} | 0 | mix | 32 | 4.00 GB |
+---------------+-------------------------------------------------------------------------------------------+---------------+--------+----------+----------+--------------------+---------------------+---------------------+-------+----------------------+-----------+------------------+-------------------+---------------+---------------+---------+----------------+--------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+-----------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------+----------+----------+---------+
What You Expected?
Just like there is configuration for doris.fenodes and doris.query.port, there should be similar configuration for backend/compute nodes.
Then on the k8s side, backend/compute nodes can be exposed as ingress to solve this issue
How to Reproduce?
Create a table in Doris
CREATE TABLE IF NOT EXISTS demo.user_data (
user_id INT,
name STRING,
age INT,
update_time DATETIME
)
UNIQUE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 3
PROPERTIES (
"replication_num" = "1",
"enable_unique_key_merge_on_write" = "true"
);
Launch Spark shell
spark-shell --packages org.apache.doris:spark-doris-connector-spark-3.5:25.1.0,mysql:mysql-connector-java:8.0.33
Code :
import org.apache.spark.sql.SparkSession
import spark.implicits._
import org.apache.spark.sql.SaveMode
// Sample batch data
val batchData = Seq(
(1, "Alice", 30, "2024-01-01 10:00:00"),
(2, "Bob", 25, "2024-01-01 11:00:00")
)
val df = batchData.toDF("user_id", "name", "age", "update_time")
df.write.format("doris")
.option("doris.table.identifier", "demo.user_data")
.option("doris.fenodes", feNodes)
.option("user", "root")
.option("password", "")
.option("doris.write.fields", "user_id,name,age,update_time")
.mode(SaveMode.Overwrite)
.save()
Anything Else?
No response
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [x] I agree to follow this project's Code of Conduct
Hello, still stuck on this issue. Any leads would be appreciated on this
Hello, currently, StreamLoad redirects writes to the BE server, requiring the client to have access to the BE server host. For more information, see https://doris.apache.org/docs/dev/data-operate/import/import-way/stream-load-manual#basic-principles So this requires configuring DNS on the client side and ensuring network connectivity with the BE server.
In the upcoming 3.1.0 release, you can configure StreamLoad to return to a public BE IP address. See this pull request: https://github.com/apache/doris/pull/53104
Hello, currently, StreamLoad redirects writes to the BE server, requiring the client to have access to the BE server host. For more information, see https://doris.apache.org/docs/dev/data-operate/import/import-way/stream-load-manual#basic-principles So this requires configuring DNS on the client side and ensuring network connectivity with the BE server.
In the upcoming 3.1.0 release, you can configure StreamLoad to return to a public BE IP address. See this pull request: apache/doris#53104
Hello @JNSimba, the issue is I am running Doris on K8s, so the frontend returns doris backend internal ips like : doris-toolkit-dev-cg1-1.doris-toolkit-dev-cg1.doris.svc.cluster.local.
So Spark fails with errors like -
Suppressed: org.apache.doris.spark.exception.ConnectedFailedException: Connect to Backend{host='doris-toolkit-dev-cg1-1.doris-toolkit-dev-cg1.doris.svc.cluster.local', httpPort=-1, rpcPort=9060}failed.
How can the client resolve that?
Note: My take on this is to resolve this behaviour by giving a change to override the be node . Something like the following :
val orderItemsDf = spark.read.format("doris")
.option("doris.table.identifier", "demos.order_items")
.option("doris.fenodes", feNodes)
.option("doris.benodes", <provide_k8s_loadbalancer_url/ip>) # Proposed parameter
.option("user", dorisUser)
.option("password",dorisPass)
.load()
This loadbalancer IP/URL is the one also provided in the public_host in the be.conf
-
Is
doris-toolkit-dev-cg1-1.doris-toolkit-dev-cg1.doris.svc.cluster.local:9060accessible on the Spark side? -
If not, it needs to be read via arrow-flight. Refer to this link for SparkConnector configuration: https://doris.apache.org/docs/dev/ecosystem/spark-doris-connector#reading-via-arrow-flight-sql. Also,
fe.confneeds to be configured withpublic_host={nginx ip}andarrow_flight_sql_proxy_port={nginx port}. See https://doris.apache.org/docs/dev/db-connect/arrow-flight-sql-connect#multiple-bes-share-the-same-ip-accessible-from-outside-the-cluster`.
- Is
doris-toolkit-dev-cg1-1.doris-toolkit-dev-cg1.doris.svc.cluster.local:9060accessible on the Spark side?- If not, it needs to be read via arrow-flight. Refer to this link for SparkConnector configuration: https://doris.apache.org/docs/dev/ecosystem/spark-doris-connector#reading-via-arrow-flight-sql. Also,
fe.confneeds to be configured withpublic_host={nginx ip}andarrow_flight_sql_proxy_port={nginx port}. See https://doris.apache.org/docs/dev/db-connect/arrow-flight-sql-connect#multiple-bes-share-the-same-ip-accessible-from-outside-the-cluster`.
- Is doris-toolkit-dev-cg1-1.doris-toolkit-dev-cg1.doris.svc.cluster.local:9060 accessible on the Spark side? - No because spark is outside k8s
- I feel that is not the issue @JNSimba. I think the issue is that the BE nodes in compute groups are registering themselves to FE on launch using the FQDN of internal K8S, and when this Spark connector queries the FE to get the BE IP, that is what is being returned. Note that I have already put
public_host=<my_azure_lb>, and it does not work for this connector; it does perfectly work when I use theflight-sql-jdbc-corelibrary.
As I said earlier, this issue can be solved by giving the provision the connector config to specify the BE IPs like .option("doris.benodes", <provide_k8s_loadbalancer_url/ip>) so that instead of the connector fetching the BE IPs from FE, it can simply use the BE IP directly ( which can be the public_host={nginx ip} )
@Nj-kol There are two issues here: reading and writing.
-
For writing, configuring the external BE IP address on
doris.benodeswill solve the problem; this parameter is currently supported. -
For reading, as shown in the error message when accessing BE:9060, you need to switch to ArrowFlight for reading. The default Thrift method connects directly to BE:9060, but ArrowFlight provides an externally accessible method, requiring the configuration of
public_host={nginx ip}andarrow_flight_sql_proxy_port={nginx port}.
@Nj-kol There are two issues here: reading and writing.
- For writing, configuring the external BE IP address on
doris.benodeswill solve the problem; this parameter is currently supported.- For reading, as shown in the error message when accessing BE:9060, you need to switch to ArrowFlight for reading. The default Thrift method connects directly to BE:9060, but ArrowFlight provides an externally accessible method, requiring the configuration of
public_host={nginx ip}andarrow_flight_sql_proxy_port={nginx port}.
@JNSimba I tried out your suggestions, and here are my findings -
For reading, after switching to ArrowFlight and changing the config public_host={load balancer ip}, it worked! - So thanks for that : )
But the issue with writing stills persists, it seems that the parameter doris.benodes has no effect. Here is what I tried -
Create a table in doris:
CREATE TABLE IF NOT EXISTS demos.user_data (
user_id INT,
name STRING,
age INT,
update_time STRING
)
UNIQUE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 3
PROPERTIES (
"replication_num" = "1",
"enable_unique_key_merge_on_write" = "true"
);
Launch shell -
spark-shell \
--packages org.apache.doris:spark-doris-connector-spark-3.5:25.2.0 \
--jars /$HOME/Softwares/jars/mysql-connector-j-9.3.0.jar
Code -
import org.apache.spark.sql.SparkSession
import spark.implicits._
import org.apache.spark.sql.SaveMode
val feNodes = "<load_balancer_ip>:8030"
val dorisUser = "root"
val dorisPass = ""
val beNodes = "<load_balancer_ip>:8040"
// Sample batch data
val batchData = Seq(
(1, "Alice", 30, "2024-01-01 10:00:00"),
(2, "Bob", 25, "2024-01-01 11:00:00")
)
val df = batchData.toDF("user_id", "name", "age", "update_time")
df.write.format("doris")
.option("doris.table.identifier", "demos.user_data")
.option("doris.fenodes", feNodes)
.option("doris.benodes", beNodes)
.option("user", dorisUser)
.option("password",dorisPass)
.option("doris.query.port", "9030")
.option("doris.write.fields", "user_id,name,age,update_time")
.option("doris.read.mode", "arrow")
.option("doris.read.arrow-flight-sql.port", "8070")
.mode(SaveMode.Overwrite)
.save()
Error -
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (192.168.0.142 executor driver): java.net.UnknownHostException: doris-toolkit-dev-cg1-0.doris-toolkit-dev-cg1.doris.svc.cluster.local
As you can see, the BE IP being used is still being fetched from FE as doris-toolkit-dev-cg1-0.doris-toolkit-dev-cg1.doris.svc.cluster.local1 and not being taken from doris.benodes
@Nj-kol When writing, you also need to add this parameter: doris.sink.auto-redirect=false, so that it won't get the BE's IP address through FE, here's a case study you can refer to: https://github.com/apache/doris-spark-connector/blob/master/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala#L102
@Nj-kol When writing, you also need to add this parameter:
doris.sink.auto-redirect=false, so that it won't get the BE's IP address through FE, here's a case study you can refer to: https://github.com/apache/doris-spark-connector/blob/master/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala#L102
@JNSimba I tried with your recommended settings, it did not work.
As you can see, it still fetches BE endpoints from FE ( which are k8s FQDN )
@Nj-kol The spelling is incorrect; it should be doris.sink.auto-redirect=false.
@Nj-kol The spelling is incorrect; it should be
doris.sink.auto-redirect=false.
@JNSimba Thanks for pointing that out. I tried this, and it worked fine. I think it would be good if the documentation would be updated to capture this behaviour.
But, one thing is that I get this intermittent error:
org.apache.doris.spark.exception.StreamLoadException: stream load failed, txnId: -1, status: Fail, msg: [E-240]Have not get FE Master heartbeat yet
0# doris::StreamLoadExecutor::begin_txn(doris::StreamLoadContext*) at /home/zcp/repo_center/doris_release/doris/be/src/common/status.h:392
1# doris::StreamLoadAction::_on_header(doris::HttpRequest*, std::shared_ptr<doris::StreamLoadContext>) at /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr_base.h:708
2# doris::StreamLoadAction::on_header(doris::HttpRequest*) at /home/zcp/repo_center/doris_release/doris/be/src/common/status.h:392
3# doris::EvHttpServer::on_header(evhttp_request*) at /home/zcp/repo_center/doris_release/doris/be/src/http/ev_http_server.cpp:255
4# ?
5# bufferevent_run_readcb_ at /home/zcp/repo_center/doris_branch-3.1/doris/thirdparty/src/doris-thirdparty-libevent-2.1.12.1/bufferevent.c:123
6# bufferevent_readcb at /home/zcp/repo_center/doris_branch-3.1/doris/thirdparty/src/doris-thirdparty-libevent-2.1.12.1/bufferevent_sock.c:227
7# event_process_active_single_queue at /home/zcp/repo_center/doris_branch-3.1/doris/thirdparty/src/doris-thirdparty-libevent-2.1.12.1/event.c:1624
8# event_process_active at /home/zcp/repo_center/doris_branch-3.1/doris/thirdparty/src/doris-thirdparty-libevent-2.1.12.1/event.c:1790