starrocks
starrocks copied to clipboard
[Enhancement] Make routine load error msg more clear
Why I'm doing:
What I'm doing:
Kafka broker is down
mysql> show routine load\G
*************************** 1. row ***************************
Id: 106275
Name: label0
CreateTime: 2024-02-21 15:52:17
PauseTime: 2024-02-21 17:17:54
EndTime: NULL
DbName: kafka_test
TableName: lineorder
State: PAUSED
DataSourceType: KAFKA
CurrentTaskNum: 0
JobProperties: {"partitions":"*","rowDelimiter":"\t","partial_update":"false","columnToColumnExpr":"*","maxBatchIntervalS":"10","partial_update_mode":"null","whereExpr":"*","timezone":"Asia/Shanghai","format":"csv","columnSeparator":"'|'","log_rejected_record_num":"0","taskTimeoutSecond":"60","json_root":"","maxFilterRatio":"1.0","strict_mode":"false","jsonpaths":"","taskConsumeSecond":"15","desireTaskConcurrentNum":"5","maxErrorNum":"0","strip_outer_array":"false","currentTaskConcurrentNum":"0","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"wyb_test","currentKafkaPartitions":"0","brokerList":"xxx:9093"}
CustomProperties: {"group.id":"label0_1f9af280-5fef-44d1-85bf-44f41c4c7fcd"}
Statistic: {"receivedBytes":2281306,"errorRows":0,"committedTaskNum":1,"loadedRows":22683,"loadRowsRate":1000,"abortedTaskNum":0,"totalRows":22683,"unselectedRows":0,"receivedBytesRate":151000,"taskExecuteTimeMs":15040}
Progress: {"0":"501572079"}
TimestampProgress: {}
ReasonOfStateChanged: ErrorReason{errCode = 2, msg='Failed to process get kafka partition info in BE TNetworkAddress(hostname:xxx, port:8069), err: [failed to get kafka partition meta, err: Local: Broker transport failure, event: [thrd:xxx:9093/bootstrap]: xxx:9093/bootstrap: Connect to ipv4#xxx:9093 failed: Connection refused (after 0ms in state CONNECT)]'}
ErrorLogUrls:
TrackingSQL:
OtherMsg:
LatestSourcePosition: {}
1 row in set (0.01 sec)
mysql> show routine load task where jobname = "label0"\G
*************************** 1. row ***************************
TaskId: c508a547-6dcf-499c-9583-055ece8fa42b
TxnId: -1
TxnStatus: UNKNOWN
JobId: 106275
CreateTime: 2024-02-21 17:23:25
LastScheduledTime: 2024-02-21 17:23:56
ExecuteStartTime: NULL
Timeout: 60
BeId: -1
DataSourceProperties: Progress:{"0":501803411},LatestOffset:null
Message: check task ready to execute failed, err: Failed to process get kafka partition info in BE TNetworkAddress(hostname:xxx, port:8069), err: [failed to get kafka partition offset, err: Local: All broker connections are down, event: [thrd:xxx:9093/bootstrap]: xxx:9093/bootstrap: Connect to ipv4#xxx:9093 failed: Connection refused (after 0ms in state CONNECT)]
1 row in set (0.00 sec)
BE is down
mysql> show routine load task where jobname = "label0"\G
*************************** 1. row ***************************
TaskId: 5e7f2c07-fe36-4907-844b-6a833e4130c3
TxnId: -1
TxnStatus: UNKNOWN
JobId: 106275
CreateTime: 2024-02-21 17:31:45
LastScheduledTime: 2024-02-21 17:31:55
ExecuteStartTime: NULL
Timeout: 60
BeId: -1
DataSourceProperties: Progress:{"0":502285433},LatestOffset:null
Message: check task ready to execute failed, err: Failed to send get kafka partition info request to BE TNetworkAddress(hostname:xxx, port:8069), err: Unable to validate object, host: xxx
1 row in set (0.00 sec)
mysql> show routine load task where jobname = "label0"\G
*************************** 1. row ***************************
TaskId: 5e7f2c07-fe36-4907-844b-6a833e4130c3
TxnId: -1
TxnStatus: UNKNOWN
JobId: 106275
CreateTime: 2024-02-21 17:31:45
LastScheduledTime: 2024-02-21 17:32:15
ExecuteStartTime: NULL
Timeout: 60
BeId: -1
DataSourceProperties: Progress:{"0":502285433},LatestOffset:null
Message: check task ready to execute failed, err: Failed to send get kafka partition info request. err: No alive backends
1 row in set (0.01 sec)
Unknown host
mysql> show routine load\G
*************************** 1. row ***************************
Id: 107096
Name: label0
CreateTime: 2024-02-21 17:34:24
PauseTime: 2024-02-21 17:35:04
EndTime: NULL
DbName: kafka_test
TableName: lineorder
State: PAUSED
DataSourceType: KAFKA
CurrentTaskNum: 0
JobProperties: {"partitions":"*","rowDelimiter":"\t","partial_update":"false","columnToColumnExpr":"*","maxBatchIntervalS":"10","partial_update_mode":"null","whereExpr":"*","timezone":"Asia/Shanghai","format":"csv","columnSeparator":"'|'","log_rejected_record_num":"0","taskTimeoutSecond":"60","json_root":"","maxFilterRatio":"1.0","strict_mode":"false","jsonpaths":"","taskConsumeSecond":"15","desireTaskConcurrentNum":"5","maxErrorNum":"0","strip_outer_array":"false","currentTaskConcurrentNum":"0","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"wyb_test","currentKafkaPartitions":"0","brokerList":"test_host:9093"}
CustomProperties: {"group.id":"label0_18adab03-c776-472a-80ba-41ecca292f98"}
Statistic: {"receivedBytes":0,"errorRows":0,"committedTaskNum":0,"loadedRows":0,"loadRowsRate":0,"abortedTaskNum":0,"totalRows":0,"unselectedRows":0,"receivedBytesRate":0,"taskExecuteTimeMs":1}
Progress: {"0":"OFFSET_END"}
TimestampProgress: {}
ReasonOfStateChanged: ErrorReason{errCode = 2, msg='Failed to process get kafka partition info in BE TNetworkAddress(hostname:xxx, port:8069), err: [failed to get kafka partition meta, err: Local: Broker transport failure, event: [thrd:test_host:9093/bootstrap]: test_host:9093/bootstrap: Failed to resolve 'test_host:9093': Name or service not known (after 104ms in state CONNECT)]'}
ErrorLogUrls:
TrackingSQL:
OtherMsg:
LatestSourcePosition: {}
1 row in set (0.00 sec)
Fixes #issue
What type of PR is this:
- [ ] BugFix
- [ ] Feature
- [x] Enhancement
- [ ] Refactor
- [ ] UT
- [ ] Doc
- [ ] Tool
Does this PR entail a change in behavior?
- [ ] Yes, this PR will result in a change in behavior.
- [x] No, this PR will not result in a change in behavior.
If yes, please specify the type of change:
- [ ] Interface/UI changes: syntax, type conversion, expression evaluation, display information
- [ ] Parameter changes: default values, similar parameters but with different default values
- [ ] Policy changes: use new policy to replace old one, functionality automatically enabled
- [ ] Feature removed
- [ ] Miscellaneous: upgrade & downgrade compatibility, etc.
Checklist:
- [ ] I have added test cases for my bug fix or my new feature
- [ ] This pr needs user documentation (for new or modified features or behaviors)
- [ ] I have added documentation for my new feature or new function
- [ ] This is a backport pr
Bugfix cherry-pick branch check:
- [ ] I have checked the version labels which the pr will be auto-backported to the target branch
- [ ] 3.2
- [ ] 3.1
- [ ] 3.0
- [ ] 2.5
All the errors occured when "to get kafka partition info"? How about in other scenarios? In my opinion, the error info is mostly displayed, but the sentence is real long and redudant, it's not so clear to let users catch the real error quickly.
-
for "kafka broker is down"
- Can you directly show the error message "Connection refused" in
err:[ ... ]
block without "failed to get kafka partition meta"? - Can you directly put the error message from task into job's error message field? (The task's error message is clearer than the job's)
- What's the use of "after 0ms in state CONNECT", if it's useless, can you just remove it.
- Can you directly show the error message "Connection refused" in
-
for "BE is down"
- What's the error message in the Job?
- What's the meaning of "Unable to validate object"? It chould be "Failed to connec to BE: xxx".
- The same about "Failed to send get kafka partition info request"?
All the errors occured when "to get kafka partition info"? How about in other scenarios? In my opinion, the error info is mostly displayed, but the sentence is real long and redudant, it's not so clear to let users catch the real error quickly.
for "kafka broker is down"
- Can you directly show the error message "Connection refused" in
err:[ ... ]
block without "failed to get kafka partition meta"?
ok, directly return BE error message
- Can you directly put the error message from task into job's error message field? (The task's error message is clearer than the job's)
done
- What's the use of "after 0ms in state CONNECT", if it's useless, can you just remove it.
this is from kafka error message
for "BE is down"
- What's the error message in the Job?
done
- What's the meaning of "Unable to validate object"? It chould be "Failed to connec to BE: xxx".
done
- The same about "Failed to send get kafka partition info request"?
better to keep this to let user know where is failed
What will happen if the given kafka broker address is unreachable, like 192.168.0.123
?
What will happen if the given kafka broker address is unreachable, like
192.168.0.123
?
this is Kafka broker is down
case
Quality Gate passed
Issues
0 New issues
0 Accepted issues
Measures
0 Security Hotspots
0.0% Coverage on New Code
1.0% Duplication on New Code
[FE Incremental Coverage Report]
:white_check_mark: pass : 41 / 48 (85.42%)
file detail
path | covered_line | new_line | coverage | not_covered_line_detail | |
---|---|---|---|---|---|
:large_blue_circle: | com/starrocks/load/routineload/RoutineLoadTaskScheduler.java | 0 | 2 | 00.00% | [186, 187] |
:large_blue_circle: | com/starrocks/load/routineload/KafkaRoutineLoadJob.java | 0 | 1 | 00.00% | [500] |
:large_blue_circle: | com/starrocks/common/util/KafkaUtil.java | 35 | 39 | 89.74% | [200, 223, 241, 243] |
:large_blue_circle: | com/starrocks/load/routineload/RoutineLoadJob.java | 2 | 2 | 100.00% | [] |
:large_blue_circle: | com/starrocks/load/routineload/PulsarTaskInfo.java | 1 | 1 | 100.00% | [] |
:large_blue_circle: | com/starrocks/load/routineload/KafkaTaskInfo.java | 1 | 1 | 100.00% | [] |
:large_blue_circle: | com/starrocks/load/routineload/RoutineLoadTaskInfo.java | 2 | 2 | 100.00% | [] |
[BE Incremental Coverage Report]
:x: fail : 15 / 36 (41.67%)
file detail
path | covered_line | new_line | coverage | not_covered_line_detail | |
---|---|---|---|---|---|
:large_blue_circle: | be/src/runtime/routine_load/data_consumer.cpp | 7 | 20 | 35.00% | [186, 187, 244, 245, 247, 249, 251, 363, 364, 365, 366, 368, 383] |
:large_blue_circle: | be/src/runtime/routine_load/data_consumer.h | 7 | 15 | 46.67% | [96, 105, 106, 133, 134, 135, 136, 138] |
:large_blue_circle: | be/src/service/internal_service.cpp | 1 | 1 | 100.00% | [] |
@Mergifyio backport branch-3.2
@Mergifyio backport branch-3.1
backport branch-3.2
✅ Backports have been created
-
#42865 [Enhancement] Make routine load error msg more clear (backport #41306) has been created for branch
branch-3.2
but encountered conflicts
backport branch-3.1
✅ Backports have been created
-
#42866 [Enhancement] Make routine load error msg more clear (backport #41306) has been created for branch
branch-3.1
but encountered conflicts