doris-spark-connector icon indicating copy to clipboard operation
doris-spark-connector copied to clipboard

[Bug] Spark is unable to write to Doris hosted on K8s ( Compute Storage Decoupled mode )

Open Nj-kol opened this issue 6 months ago • 2 comments

Search before asking

  • [x] I had searched in the issues and found no similar issues.

Version

25.1.0

What's Wrong?

I am running doris in k8s, and want to write to it using spark. getting the below exception

java.net.UnknownHostException: doris-disaggregated-cluster-cg1-0.doris-disaggregated-cluster-cg1.doris.svc.cluster.local
        at java.base/java.net.InetAddress$CachedLookup.get(InetAddress.java:988)

I believe this is happening because Spark contacts FE to get a list of backend/compute nodes, and then once the IP/port of the compute nodes is attained, it directly contacts the compute groups.

The issue is in Kubernetes when compute groups spin up, they register themselves to FE nodes using their internal FQDN, which are not reachable outside.

mysql> show backends;
+---------------+-------------------------------------------------------------------------------------------+---------------+--------+----------+----------+--------------------+---------------------+---------------------+-------+----------------------+-----------+------------------+-------------------+---------------+---------------+---------+----------------+--------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+-----------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------+----------+----------+---------+
| BackendId     | Host                                                                                      | HeartbeatPort | BePort | HttpPort | BrpcPort | ArrowFlightSqlPort | LastStartTime       | LastHeartbeat       | Alive | SystemDecommissioned | TabletNum | DataUsedCapacity | TrashUsedCapacity | AvailCapacity | TotalCapacity | UsedPct | MaxDiskUsedPct | RemoteUsedCapacity | Tag                                                                                                                                                                                                                      | ErrMsg | Version                     | Status                                                                                                                                                                                      | HeartbeatFailureCounter | NodeRole | CpuCores | Memory  |
+---------------+-------------------------------------------------------------------------------------------+---------------+--------+----------+----------+--------------------+---------------------+---------------------+-------+----------------------+-----------+------------------+-------------------+---------------+---------------+---------+----------------+--------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+-----------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------+----------+----------+---------+
| 1750574093851 | doris-disaggregated-cluster-cg1-0.doris-disaggregated-cluster-cg1.doris.svc.cluster.local | 9050          | 9060   | 8040     | 8060     | -1                 | 2025-06-22 06:35:21 | 2025-06-22 06:41:07 | true  | false                | 33        | 0.000            | 0.000             | 1.000 B       | 0.000         | 0.00 %  | 0.00 %         | 0.000              | {"cloud_unique_id" : "1:1030729153:dnDaSeQC", "compute_group_status" : "NORMAL", "private_endpoint" : "", "compute_group_name" : "cg1", "location" : "default", "public_endpoint" : "", "compute_group_id" : "xdReRUSR"} |        | doris-3.0.5-rc01-e277cfb83f | {"lastSuccessReportTabletsTime":"N/A","lastStreamLoadTime":-1,"isQueryDisabled":false,"isLoadDisabled":false,"isActive":true,"currentFragmentNum":0,"lastFragmentUpdateTime":1750574458773} | 0                       | mix      | 32       | 4.00 GB |
+---------------+-------------------------------------------------------------------------------------------+---------------+--------+----------+----------+--------------------+---------------------+---------------------+-------+----------------------+-----------+------------------+-------------------+---------------+---------------+---------+----------------+--------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+-----------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------+----------+----------+---------+

What You Expected?

Just like there is configuration for doris.fenodes and doris.query.port, there should be similar configuration for backend/compute nodes.

Then on the k8s side, backend/compute nodes can be exposed as ingress to solve this issue

How to Reproduce?

Create a table in Doris

CREATE TABLE IF NOT EXISTS demo.user_data (
    user_id INT,
    name STRING,
    age INT,
    update_time DATETIME
)
UNIQUE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 3
PROPERTIES (
    "replication_num" = "1",
    "enable_unique_key_merge_on_write" = "true"
);

Launch Spark shell

spark-shell --packages org.apache.doris:spark-doris-connector-spark-3.5:25.1.0,mysql:mysql-connector-java:8.0.33 

Code :

import org.apache.spark.sql.SparkSession
import spark.implicits._
import org.apache.spark.sql.SaveMode

// Sample batch data
val batchData = Seq(
  (1, "Alice", 30, "2024-01-01 10:00:00"),
  (2, "Bob", 25, "2024-01-01 11:00:00")
)

val df = batchData.toDF("user_id", "name", "age", "update_time")

df.write.format("doris")
  .option("doris.table.identifier", "demo.user_data")
  .option("doris.fenodes", feNodes)
  .option("user", "root")
  .option("password", "")
  .option("doris.write.fields", "user_id,name,age,update_time")
  .mode(SaveMode.Overwrite)
  .save()

Anything Else?

No response

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

Nj-kol avatar Jun 22 '25 06:06 Nj-kol

Hello, still stuck on this issue. Any leads would be appreciated on this

Nj-kol avatar Jun 29 '25 06:06 Nj-kol

Hello, currently, StreamLoad redirects writes to the BE server, requiring the client to have access to the BE server host. For more information, see https://doris.apache.org/docs/dev/data-operate/import/import-way/stream-load-manual#basic-principles So this requires configuring DNS on the client side and ensuring network connectivity with the BE server.

In the upcoming 3.1.0 release, you can configure StreamLoad to return to a public BE IP address. See this pull request: https://github.com/apache/doris/pull/53104

JNSimba avatar Jul 31 '25 02:07 JNSimba

Hello, currently, StreamLoad redirects writes to the BE server, requiring the client to have access to the BE server host. For more information, see https://doris.apache.org/docs/dev/data-operate/import/import-way/stream-load-manual#basic-principles So this requires configuring DNS on the client side and ensuring network connectivity with the BE server.

In the upcoming 3.1.0 release, you can configure StreamLoad to return to a public BE IP address. See this pull request: apache/doris#53104

Hello @JNSimba, the issue is I am running Doris on K8s, so the frontend returns doris backend internal ips like : doris-toolkit-dev-cg1-1.doris-toolkit-dev-cg1.doris.svc.cluster.local.

So Spark fails with errors like -

    Suppressed: org.apache.doris.spark.exception.ConnectedFailedException: Connect to Backend{host='doris-toolkit-dev-cg1-1.doris-toolkit-dev-cg1.doris.svc.cluster.local', httpPort=-1, rpcPort=9060}failed.

How can the client resolve that?

Note: My take on this is to resolve this behaviour by giving a change to override the be node . Something like the following :

val orderItemsDf = spark.read.format("doris")
  .option("doris.table.identifier",  "demos.order_items")
  .option("doris.fenodes", feNodes)
  .option("doris.benodes", <provide_k8s_loadbalancer_url/ip>) # Proposed parameter
  .option("user", dorisUser)
  .option("password",dorisPass)
  .load()

This loadbalancer IP/URL is the one also provided in the public_host in the be.conf

Nj-kol avatar Nov 22 '25 06:11 Nj-kol

  1. Is doris-toolkit-dev-cg1-1.doris-toolkit-dev-cg1.doris.svc.cluster.local:9060 accessible on the Spark side?

  2. If not, it needs to be read via arrow-flight. Refer to this link for SparkConnector configuration: https://doris.apache.org/docs/dev/ecosystem/spark-doris-connector#reading-via-arrow-flight-sql. Also, fe.conf needs to be configured with public_host={nginx ip} and arrow_flight_sql_proxy_port={nginx port}. See https://doris.apache.org/docs/dev/db-connect/arrow-flight-sql-connect#multiple-bes-share-the-same-ip-accessible-from-outside-the-cluster`.

JNSimba avatar Nov 24 '25 02:11 JNSimba

  1. Is doris-toolkit-dev-cg1-1.doris-toolkit-dev-cg1.doris.svc.cluster.local:9060 accessible on the Spark side?
  2. If not, it needs to be read via arrow-flight. Refer to this link for SparkConnector configuration: https://doris.apache.org/docs/dev/ecosystem/spark-doris-connector#reading-via-arrow-flight-sql. Also, fe.conf needs to be configured with public_host={nginx ip} and arrow_flight_sql_proxy_port={nginx port}. See https://doris.apache.org/docs/dev/db-connect/arrow-flight-sql-connect#multiple-bes-share-the-same-ip-accessible-from-outside-the-cluster`.
  1. Is doris-toolkit-dev-cg1-1.doris-toolkit-dev-cg1.doris.svc.cluster.local:9060 accessible on the Spark side? - No because spark is outside k8s
  2. I feel that is not the issue @JNSimba. I think the issue is that the BE nodes in compute groups are registering themselves to FE on launch using the FQDN of internal K8S, and when this Spark connector queries the FE to get the BE IP, that is what is being returned. Note that I have already put public_host=<my_azure_lb>, and it does not work for this connector; it does perfectly work when I use the flight-sql-jdbc-core library.

As I said earlier, this issue can be solved by giving the provision the connector config to specify the BE IPs like .option("doris.benodes", <provide_k8s_loadbalancer_url/ip>) so that instead of the connector fetching the BE IPs from FE, it can simply use the BE IP directly ( which can be the public_host={nginx ip} )

Nj-kol avatar Nov 24 '25 03:11 Nj-kol

@Nj-kol There are two issues here: reading and writing.

  1. For writing, configuring the external BE IP address on doris.benodes will solve the problem; this parameter is currently supported.

  2. For reading, as shown in the error message when accessing BE:9060, you need to switch to ArrowFlight for reading. The default Thrift method connects directly to BE:9060, but ArrowFlight provides an externally accessible method, requiring the configuration of public_host={nginx ip} and arrow_flight_sql_proxy_port={nginx port}.

JNSimba avatar Nov 24 '25 07:11 JNSimba

@Nj-kol There are two issues here: reading and writing.

  1. For writing, configuring the external BE IP address on doris.benodes will solve the problem; this parameter is currently supported.
  2. For reading, as shown in the error message when accessing BE:9060, you need to switch to ArrowFlight for reading. The default Thrift method connects directly to BE:9060, but ArrowFlight provides an externally accessible method, requiring the configuration of public_host={nginx ip} and arrow_flight_sql_proxy_port={nginx port}.

@JNSimba I tried out your suggestions, and here are my findings -

For reading, after switching to ArrowFlight and changing the config public_host={load balancer ip}, it worked! - So thanks for that : )

But the issue with writing stills persists, it seems that the parameter doris.benodes has no effect. Here is what I tried -

Create a table in doris:

CREATE TABLE IF NOT EXISTS demos.user_data (
    user_id INT,
    name STRING,
    age INT,
    update_time STRING
)
UNIQUE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 3
PROPERTIES (
    "replication_num" = "1",
    "enable_unique_key_merge_on_write" = "true"
);

Launch shell -

spark-shell \
--packages org.apache.doris:spark-doris-connector-spark-3.5:25.2.0 \
--jars /$HOME/Softwares/jars/mysql-connector-j-9.3.0.jar

Code -

import org.apache.spark.sql.SparkSession
import spark.implicits._
import org.apache.spark.sql.SaveMode

val feNodes = "<load_balancer_ip>:8030"
val dorisUser = "root"
val dorisPass = ""
val beNodes = "<load_balancer_ip>:8040"

// Sample batch data
val batchData = Seq(
  (1, "Alice", 30, "2024-01-01 10:00:00"),
  (2, "Bob", 25, "2024-01-01 11:00:00")
)

val df = batchData.toDF("user_id", "name", "age", "update_time")

df.write.format("doris")
  .option("doris.table.identifier", "demos.user_data")
  .option("doris.fenodes", feNodes)
  .option("doris.benodes", beNodes)
  .option("user", dorisUser)
  .option("password",dorisPass)
  .option("doris.query.port", "9030")
  .option("doris.write.fields", "user_id,name,age,update_time")
  .option("doris.read.mode", "arrow")
  .option("doris.read.arrow-flight-sql.port", "8070")
  .mode(SaveMode.Overwrite)
  .save()

Error -

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (192.168.0.142 executor driver): java.net.UnknownHostException: doris-toolkit-dev-cg1-0.doris-toolkit-dev-cg1.doris.svc.cluster.local

As you can see, the BE IP being used is still being fetched from FE as doris-toolkit-dev-cg1-0.doris-toolkit-dev-cg1.doris.svc.cluster.local1 and not being taken from doris.benodes

Nj-kol avatar Nov 29 '25 06:11 Nj-kol

@Nj-kol When writing, you also need to add this parameter: doris.sink.auto-redirect=false, so that it won't get the BE's IP address through FE, here's a case study you can refer to: https://github.com/apache/doris-spark-connector/blob/master/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala#L102

JNSimba avatar Dec 01 '25 02:12 JNSimba

@Nj-kol When writing, you also need to add this parameter: doris.sink.auto-redirect=false, so that it won't get the BE's IP address through FE, here's a case study you can refer to: https://github.com/apache/doris-spark-connector/blob/master/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala#L102

@JNSimba I tried with your recommended settings, it did not work.

Image

As you can see, it still fetches BE endpoints from FE ( which are k8s FQDN )

Nj-kol avatar Dec 01 '25 07:12 Nj-kol

@Nj-kol The spelling is incorrect; it should be doris.sink.auto-redirect=false.

JNSimba avatar Dec 04 '25 13:12 JNSimba

@Nj-kol The spelling is incorrect; it should be doris.sink.auto-redirect=false.

@JNSimba Thanks for pointing that out. I tried this, and it worked fine. I think it would be good if the documentation would be updated to capture this behaviour.

But, one thing is that I get this intermittent error:

org.apache.doris.spark.exception.StreamLoadException: stream load failed, txnId: -1, status: Fail, msg: [E-240]Have not get FE Master heartbeat yet

	0#  doris::StreamLoadExecutor::begin_txn(doris::StreamLoadContext*) at /home/zcp/repo_center/doris_release/doris/be/src/common/status.h:392
	1#  doris::StreamLoadAction::_on_header(doris::HttpRequest*, std::shared_ptr<doris::StreamLoadContext>) at /var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr_base.h:708
	2#  doris::StreamLoadAction::on_header(doris::HttpRequest*) at /home/zcp/repo_center/doris_release/doris/be/src/common/status.h:392
	3#  doris::EvHttpServer::on_header(evhttp_request*) at /home/zcp/repo_center/doris_release/doris/be/src/http/ev_http_server.cpp:255
	4#  ?
	5#  bufferevent_run_readcb_ at /home/zcp/repo_center/doris_branch-3.1/doris/thirdparty/src/doris-thirdparty-libevent-2.1.12.1/bufferevent.c:123
	6#  bufferevent_readcb at /home/zcp/repo_center/doris_branch-3.1/doris/thirdparty/src/doris-thirdparty-libevent-2.1.12.1/bufferevent_sock.c:227
	7#  event_process_active_single_queue at /home/zcp/repo_center/doris_branch-3.1/doris/thirdparty/src/doris-thirdparty-libevent-2.1.12.1/event.c:1624
	8#  event_process_active at /home/zcp/repo_center/doris_branch-3.1/doris/thirdparty/src/doris-thirdparty-libevent-2.1.12.1/event.c:1790

Nj-kol avatar Dec 05 '25 05:12 Nj-kol