explainstatement相关测试
1.
官网原文:- 同步查询接口允许用户提交查询,等待查询完成,并在一个 RPC 中获取查询结果。该接口适用于执行快速查询,例如 DDL、DCL 和简单的 DQL。但是,如果查询运行时间过长,可能会导致超时。因此,建议使用同步查询 API 来运行复杂的查询。
修改之后:+ 同步查询接口允许用户提交查询,等待查询完成,并在一个 RPC 中获取查询结果。该接口适用于执行快速查询,例如 DDL、DCL 和简单的 DQL。但是,如果查询运行时间过长,可能会导致超时。因此,建议使用异步查询 API 来运行复杂的查询。
在api\scdb_api.proto中也做了改动
看最后的测试结果是正常的?都pass了? 另外,如果执行了setup.sh(会重新设置公私钥),建议先 docker compose down 清理掉残留的mysql container,里面会有老的用户信息。
2.测试
{
"queries": [
{
"name": "explain percent select ",
"query": "explain select tbl_1.plain_int_0,tbl_1.plain_datetime_0,tbl_1.plain_timestamp_0 from alice.tbl_1;",
"mysql_query": ""
}
]
}
2025-09-28 19:42:13.9287 INFO interpreter.go:219 subgraph of alice: nodes:{key:"0" value:{node_name:"runsql.0" op_type:"RunSQL" outputs:{key:"Out" value:{tensors:{name:"scdb.alice_tbl_1.plain_int_0.0" elem_type:INT64 option:REFERENCE annotation:{status:TENSORSTATUS_PRIVATE} ref_num:1} tensors:{name:"scdb.alice_tbl_1.plain_datetime_0.1" elem_type:DATETIME option:REFERENCE annotation:{status:TENSORSTATUS_PRIVATE} ref_num:1} tensors:{name:"scdb.alice_tbl_1.plain_timestamp_0.2" elem_type:TIMESTAMP option:REFERENCE annotation:{status:TENSORSTATUS_PRIVATE} ref_num:1}}} attributes:{key:"sql" value:{t:{name:".0" elem_type:STRING string_data:"select tbl_1.plain_int_0,tbl_1.plain_datetime_0,tbl_1.plain_timestamp_0 from alice.tbl_1;"}}} attributes:{key:"table_refs" value:{t:{name:".0" shape:{dim:{dim_value:1}} elem_type:STRING string_data:"alice.tbl_1"}}}}} nodes:{key:"1" value:{node_name:"publish.1" op_type:"Publish" inputs:{key:"In" value:{tensors:{name:"scdb.alice_tbl_1.plain_int_0.0" elem_type:INT64 option:REFERENCE annotation:{status:TENSORSTATUS_PRIVATE} ref_num:1} tensors:{name:"scdb.alice_tbl_1.plain_datetime_0.1" elem_type:DATETIME option:REFERENCE annotation:{status:TENSORSTATUS_PRIVATE} ref_num:1} tensors:{name:"scdb.alice_tbl_1.plain_timestamp_0.2" elem_type:TIMESTAMP option:REFERENCE annotation:{status:TENSORSTATUS_PRIVATE} ref_num:1}}} outputs:{key:"Out" value:{tensors:{name:"plain_int_0.3" elem_type:STRING annotation:{status:TENSORSTATUS_PRIVATE} string_data:"plain_int_0"} tensors:{name:"plain_datetime_0.4" elem_type:STRING annotation:{status:TENSORSTATUS_PRIVATE} string_data:"plain_datetime_0"} tensors:{name:"plain_timestamp_0.5" elem_type:STRING annotation:{status:TENSORSTATUS_PRIVATE} string_data:"plain_timestamp_0"}}}}} policy:{worker_num:1 pipelines:{subdags:{jobs:{node_ids:"0"}} subdags:{jobs:{node_ids:"1"}}}}
2025-09-28 19:42:13.9287 INFO submit_and_get_handler.go:83 |RequestID:|SessionID:2d28ec02-9c60-11f0-96b7-eaad763c7245|ActionName:SCDBSubmitAndGetHandler@/public/submit_and_get|CostTime:11.614892ms|Reason:|ErrorMsg:|Request:user:{user:{account_system_type:NATIVE_USER native_user:{name:"alice"}}} query:"explain select plain_int_0, plain_datetime_0, plain_timestamp_0 from alice_tbl_1;" db_name:"scdb"|ClientIP:172.18.0.1
2025-09-28 19:42:13.9287 INFO server.go:158 |GIN|status=200|method=POST|path=/public/submit_and_get|ip=172.18.0.1|latency=11.674445ms|
{
"queries": [
{
"name": "rank ",
"query": "explain select ta.plain_int_0,percent_rank() over (partition by ta.plain_int_0 order by ta.plain_float_0) as num from alice.tbl_0 as ta;",
"mysql_query": ""
}
]
}
2025-09-28 19:52:36.9287 INFO interpreter.go:219 subgraph of alice: nodes:{key:"0" value:{node_name:"runsql.0" op_type:"RunSQL" outputs:{key:"Out" value:{tensors:{name:"scdb.alice_tbl_0.plain_int_0.0" elem_type:INT64 option:REFERENCE annotation:{status:TENSORSTATUS_PRIVATE} ref_num:1} tensors:{name:"Column#124.1" elem_type:FLOAT64 option:REFERENCE annotation:{status:TENSORSTATUS_PRIVATE} ref_num:1}}} attributes:{key:"sql" value:{t:{name:".0" elem_type:STRING string_data:"select ta.plain_int_0,percent_rank() over (partition by ta.plain_int_0 order by ta.plain_float_0) as num from alice.tbl_0 as ta;"}}} attributes:{key:"table_refs" value:{t:{name:".0" shape:{dim:{dim_value:1}} elem_type:STRING string_data:"alice.tbl_0"}}}}} nodes:{key:"1" value:{node_name:"publish.1" op_type:"Publish" inputs:{key:"In" value:{tensors:{name:"scdb.alice_tbl_0.plain_int_0.0" elem_type:INT64 option:REFERENCE annotation:{status:TENSORSTATUS_PRIVATE} ref_num:1} tensors:{name:"Column#124.1" elem_type:FLOAT64 option:REFERENCE annotation:{status:TENSORSTATUS_PRIVATE} ref_num:1}}} outputs:{key:"Out" value:{tensors:{name:"plain_int_0.2" elem_type:STRING annotation:{status:TENSORSTATUS_PRIVATE} string_data:"plain_int_0"} tensors:{name:"num.3" elem_type:STRING annotation:{status:TENSORSTATUS_PRIVATE} string_data:"num"}}}}} policy:{worker_num:1 pipelines:{subdags:{jobs:{node_ids:"0"}} subdags:{jobs:{node_ids:"1"}}}}
2025-09-28 19:52:36.9287 INFO submit_and_get_handler.go:83 |RequestID:|SessionID:a05e3a2b-9c61-11f0-96b7-eaad763c7245|ActionName:SCDBSubmitAndGetHandler@/public/submit_and_get|CostTime:11.130669ms|Reason:|ErrorMsg:|Request:user:{user:{account_system_type:NATIVE_USER native_user:{name:"alice"}}} query:"explain select ta.plain_int_0, percent_rank() over(partition by ta.plain_int_0 order by ta.plain_float_0) as num from alice_tbl_0 as ta;" db_name:"scdb"|ClientIP:172.18.0.1
2025-09-28 19:52:36.9287 INFO server.go:158 |GIN|status=200|method=POST|path=/public/submit_and_get|ip=172.18.0.1|latency=11.16236ms|
{
"queries": [
{
"name": "explain left join",
"query": "explain select ta.plain_int_0 from alice_tbl_1 as ta left join alice_tbl_2 as tb on ta.plain_int_0 = tb.plain_int_0;",
"mysql_query": ""
}
]
}
2025-09-28 19:56:05.9287 INFO interpreter.go:219 subgraph of alice: nodes:{key:"0" value:{node_name:"runsql.0" op_type:"RunSQL" outputs:{key:"Out" value:{tensors:{name:"scdb.alice_tbl_1.plain_int_0.0" elem_type:INT64 option:REFERENCE annotation:{status:TENSORSTATUS_PRIVATE} ref_num:1}}} attributes:{key:"sql" value:{t:{name:".0" elem_type:STRING string_data:"select ta.plain_int_0 from alice.tbl_1 as ta left join alice.tbl_2 as tb on ta.plain_int_0=tb.plain_int_0;"}}} attributes:{key:"table_refs" value:{t:{name:".0" shape:{dim:{dim_value:2}} elem_type:STRING string_data:"alice.tbl_1" string_data:"alice.tbl_2"}}}}} nodes:{key:"1" value:{node_name:"publish.1" op_type:"Publish" inputs:{key:"In" value:{tensors:{name:"scdb.alice_tbl_1.plain_int_0.0" elem_type:INT64 option:REFERENCE annotation:{status:TENSORSTATUS_PRIVATE} ref_num:1}}} outputs:{key:"Out" value:{tensors:{name:"plain_int_0.1" elem_type:STRING annotation:{status:TENSORSTATUS_PRIVATE} string_data:"plain_int_0"}}}}} policy:{worker_num:1 pipelines:{subdags:{jobs:{node_ids:"0"}} subdags:{jobs:{node_ids:"1"}}}}
2025-09-28 19:56:05.9287 INFO submit_and_get_handler.go:83 |RequestID:|SessionID:1cd65ed3-9c62-11f0-96b7-eaad763c7245|ActionName:SCDBSubmitAndGetHandler@/public/submit_and_get|CostTime:11.117878ms|Reason:|ErrorMsg:|Request:user:{user:{account_system_type:NATIVE_USER native_user:{name:"alice"}}} query:"explain select ta.plain_int_0 from alice_tbl_1 as ta left join alice_tbl_2 as tb on ta.plain_int_0 = tb.plain_int_0;" db_name:"scdb"|ClientIP:172.18.0.1
2025-09-28 19:56:05.9287 INFO server.go:158 |GIN|status=200|method=POST|path=/public/submit_and_get|ip=172.18.0.1|latency=11.165014ms|
3.修改为OutColumns存放explain
// If parameter async is true, the query result will be notified by engine, this function will always return nil
func (app *App) runDQL(ctx context.Context, s *session, async bool) (*scql.SCDBQueryResultResponse, error) {
compileReq, err := app.buildCompileRequest(ctx, s)
if err != nil {
return nil, err
}
intrpr := interpreter.NewInterpreter()
compiledPlan, err := intrpr.Compile(ctx, compileReq)
if err != nil {
return nil, err
}
s.GetSessionVars().AffectedByGroupThreshold = compiledPlan.Warning.GetMayAffectedByGroupThreshold()
s.GetSessionVars().GroupByThreshold = compileReq.CompileOpts.SecurityCompromise.GroupByThreshold
logrus.Infof("Execution Plan:\n%s\n", compiledPlan.GetExplain().GetExeGraphDot())
sessionStartParams := &scql.JobStartParams{
JobId: s.id,
SpuRuntimeCfg: compiledPlan.GetSpuRuntimeConf(),
TimeZone: s.GetSessionVars().GetTimeZone(),
}
var partyCodes []string
for _, p := range compiledPlan.Parties {
partyCodes = append(partyCodes, p.GetCode())
}
var partyInfo *graph.PartyInfo
{
db := s.GetSessionVars().Storage
var users []storage.User
result := db.Model(&storage.User{}).Where("party_code in ?", partyCodes).Find(&users)
if result.Error != nil {
return nil, result.Error
}
partyMap := make(map[string]*graph.Participant)
for _, u := range users {
participant := &graph.Participant{
PartyCode: u.PartyCode,
Endpoints: strings.Split(u.EngineEndpoints, ";"),
Token: u.EngineToken,
PubKey: u.EnginePubKey,
}
partyMap[u.PartyCode] = participant
}
participants := make([]*graph.Participant, 0, len(partyCodes))
for i, code := range partyCodes {
party, exists := partyMap[code]
if !exists {
return nil, fmt.Errorf("could not find info for party %s", code)
}
participants = append(participants, party)
sessionStartParams.Parties = append(sessionStartParams.Parties, &scql.JobStartParams_Party{
Code: code,
Name: code,
Host: party.Endpoints[0],
Rank: int32(i),
PublicKey: party.PubKey,
})
}
partyInfo = graph.NewPartyInfo(participants)
}
pbRequests := make(map[string]*scql.RunExecutionPlanRequest)
for party, graph := range compiledPlan.SubGraphs {
startParams, ok := proto.Clone(sessionStartParams).(*scql.JobStartParams)
if !ok {
return nil, fmt.Errorf("failed to clone session start params")
}
startParams.PartyCode = party
cbURL := url.URL{
Scheme: app.config.Protocol,
Host: app.config.SCDBHost,
Path: engineCallbackPath,
}
pbRequests[party] = &scql.RunExecutionPlanRequest{
JobParams: startParams,
Graph: graph,
Async: async,
CallbackUrl: cbURL.String(),
GraphChecksum: &scql.GraphChecksum{CheckGraphChecksum: false},
}
}
isExplain, err := isExplainQuery(s.request.GetQuery())
if err != nil {
return nil, err
}
if isExplain {
explainTensor := &scql.Tensor{
Name: "explain_result",
ElemType: scql.PrimitiveDataType_STRING,
Option: scql.TensorOptions_VALUE,
}
explainTensor.StringData = []string{compiledPlan.GetExplain().GetExeGraphDot()}
return &scql.SCDBQueryResultResponse{
Status: &scql.Status{
Code: int32(scql.Code_OK),
Message: "ok",
},
OutColumns: []*scql.Tensor{explainTensor},
ScdbSessionId: s.id,
AffectedRows: 0,
Warnings: nil,
}, nil
}
engineClient := executor.NewEngineClient(
app.config.Engine.ClientMode,
app.config.Engine.ClientTimeout,
&bcfg.TLSConf{
Mode: app.config.Engine.TLSCfg.Mode,
CertPath: app.config.Engine.TLSCfg.CertFile,
KeyPath: app.config.Engine.TLSCfg.KeyFile,
CACertPath: app.config.Engine.TLSCfg.CACertFile,
},
app.config.Engine.ContentType,
app.config.Engine.Protocol,
)
s.engineStub = executor.NewEngineStub(
s.id,
app.config.Protocol,
app.config.SCDBHost,
engineCallbackPath,
engineClient,
)
var outputNames []string
for _, col := range compiledPlan.GetSchema().GetColumns() {
outputNames = append(outputNames, col.GetName())
}
exec, err := executor.NewExecutor(pbRequests, outputNames, s.engineStub, s.id, partyInfo)
if err != nil {
return nil, err
}
s.executor = exec
resp, err := exec.RunExecutionPlan(ctx, async)
if err != nil {
return nil, err
}
if async {
// In async mode, result will be set in callback
return nil, nil
}
return resp, nil
}
https://github.com/secretflow/scql/pull/675
- 详见 https://blog.csdn.net/2301_79815382/article/details/151257417?fromshare=blogdetail&sharetype=blogdetail&sharerId=151257417&sharerefer=PC&sharesource=2301_79815382&sharefrom=from_link runSQL函数核心处理在run函数中,run函数主要处理DDL/DCL:
// Run runs an DDL/DCL statement on SCDB
func Run(ctx sessionctx.Context, stmt string, is infoschema.InfoSchema) ([]*scql.Tensor, error) {
// Step 1: Parsing
p := parser.New()
ast, err := p.ParseOneStmt(stmt, "", "")
if err != nil {
return nil, err
}
if err := core.Preprocess(ctx, ast, is); err != nil {
return nil, err
}
// Step 2: Planning
lp, _, err := core.BuildLogicalPlan(context.Background(), ctx, ast, is)
if err != nil {
return nil, err
}
// Step 3: Executing
eb := newExecutorBuilder(ctx, is)
exec := eb.build(lp)
if eb.err != nil {
return nil, eb.err
}
if err := exec.Open(context.Background()); err != nil {
return nil, err
}
retTypes := []*types.FieldType{}
for _, c := range lp.Schema().Columns {
retTypes = append(retTypes, c.RetType)
}
ck := chunk.New(retTypes, ResultMaxRows, ResultMaxRows)
var result []*scql.Tensor
for {
if err := exec.Next(context.Background(), ck); err != nil {
return nil, err
}
if result, err = mergeResultFromChunk(ck, lp, result); err != nil {
return nil, err
}
if ck.NumRows() == 0 {
break
}
}
return result, err
}
runDQL函数处理DQL:
// If parameter async is true, the query result will be notified by engine, this function will always return nil
func (app *App) runDQL(ctx context.Context, s *session, async bool) (*scql.SCDBQueryResultResponse, error) {
compileReq, err := app.buildCompileRequest(ctx, s)
if err != nil {
return nil, err
}
intrpr := interpreter.NewInterpreter()
compiledPlan, err := intrpr.Compile(ctx, compileReq)
if err != nil {
return nil, err
}
s.GetSessionVars().AffectedByGroupThreshold = compiledPlan.Warning.GetMayAffectedByGroupThreshold()
s.GetSessionVars().GroupByThreshold = compileReq.CompileOpts.SecurityCompromise.GroupByThreshold
logrus.Infof("Execution Plan:\n%s\n", compiledPlan.GetExplain().GetExeGraphDot())
sessionStartParams := &scql.JobStartParams{
JobId: s.id,
SpuRuntimeCfg: compiledPlan.GetSpuRuntimeConf(),
TimeZone: s.GetSessionVars().GetTimeZone(),
}
var partyCodes []string
for _, p := range compiledPlan.Parties {
partyCodes = append(partyCodes, p.GetCode())
}
var partyInfo *graph.PartyInfo
{
db := s.GetSessionVars().Storage
var users []storage.User
result := db.Model(&storage.User{}).Where("party_code in ?", partyCodes).Find(&users)
if result.Error != nil {
return nil, result.Error
}
partyMap := make(map[string]*graph.Participant)
for _, u := range users {
participant := &graph.Participant{
PartyCode: u.PartyCode,
Endpoints: strings.Split(u.EngineEndpoints, ";"),
Token: u.EngineToken,
PubKey: u.EnginePubKey,
}
partyMap[u.PartyCode] = participant
}
participants := make([]*graph.Participant, 0, len(partyCodes))
for i, code := range partyCodes {
party, exists := partyMap[code]
if !exists {
return nil, fmt.Errorf("could not find info for party %s", code)
}
participants = append(participants, party)
sessionStartParams.Parties = append(sessionStartParams.Parties, &scql.JobStartParams_Party{
Code: code,
Name: code,
Host: party.Endpoints[0],
Rank: int32(i),
PublicKey: party.PubKey,
})
}
partyInfo = graph.NewPartyInfo(participants)
}
pbRequests := make(map[string]*scql.RunExecutionPlanRequest)
for party, graph := range compiledPlan.SubGraphs {
startParams, ok := proto.Clone(sessionStartParams).(*scql.JobStartParams)
if !ok {
return nil, fmt.Errorf("failed to clone session start params")
}
startParams.PartyCode = party
cbURL := url.URL{
Scheme: app.config.Protocol,
Host: app.config.SCDBHost,
Path: engineCallbackPath,
}
pbRequests[party] = &scql.RunExecutionPlanRequest{
JobParams: startParams,
Graph: graph,
Async: async,
CallbackUrl: cbURL.String(),
GraphChecksum: &scql.GraphChecksum{CheckGraphChecksum: false},
}
}
isExplain, err := isExplainQuery(s.request.GetQuery())
if err != nil {
return nil, err
}
if isExplain {
explainTensor := &scql.Tensor{
Name: "explain_result",
ElemType: scql.PrimitiveDataType_STRING,
Option: scql.TensorOptions_VALUE,
}
explainTensor.StringData = []string{compiledPlan.GetExplain().GetExeGraphDot()}
return &scql.SCDBQueryResultResponse{
Status: &scql.Status{
Code: int32(scql.Code_OK),
Message: "ok",
},
OutColumns: []*scql.Tensor{explainTensor},
ScdbSessionId: s.id,
AffectedRows: 0,
Warnings: nil,
}, nil
}
engineClient := executor.NewEngineClient(
app.config.Engine.ClientMode,
app.config.Engine.ClientTimeout,
&bcfg.TLSConf{
Mode: app.config.Engine.TLSCfg.Mode,
CertPath: app.config.Engine.TLSCfg.CertFile,
KeyPath: app.config.Engine.TLSCfg.KeyFile,
CACertPath: app.config.Engine.TLSCfg.CACertFile,
},
app.config.Engine.ContentType,
app.config.Engine.Protocol,
)
s.engineStub = executor.NewEngineStub(
s.id,
app.config.Protocol,
app.config.SCDBHost,
engineCallbackPath,
engineClient,
)
var outputNames []string
for _, col := range compiledPlan.GetSchema().GetColumns() {
outputNames = append(outputNames, col.GetName())
}
exec, err := executor.NewExecutor(pbRequests, outputNames, s.engineStub, s.id, partyInfo)
if err != nil {
return nil, err
}
s.executor = exec
resp, err := exec.RunExecutionPlan(ctx, async)
if err != nil {
return nil, err
}
if async {
// In async mode, result will be set in callback
return nil, nil
}
return resp, nil
}
run函数中不存在编译计划图过程,而runDQL函数有compiledPlan, err := intrpr.Compile(ctx, compileReq)以及compiledPlan.GetExplain().GetExeGraphDot(),存在编译计划图过程;正是explain所需的。
6.DDL/DCL的多个类型继承baseExecutor实现多态,实现核心在Next函数中: 以simple的Next函数举例
// Next implements the Executor Next interface.
func (e *SimpleExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
if e.done {
return nil
}
tx := e.ctx.GetSessionVars().Storage.Begin()
defer func() {
if err != nil {
tx.Rollback()
} else {
tx.Commit()
}
}()
switch x := e.Statement.(type) {
case *ast.CreateUserStmt:
err = e.executeCreateUser(tx, x)
case *ast.DropUserStmt:
err = e.executeDropUser(tx, x)
case *ast.AlterUserStmt:
err = e.executeAlterUser(tx, x)
default:
err = fmt.Errorf("simpleExec.Next: unsupported ast %v", x)
}
e.done = true
return err
}
Next函数只负责error的判断和执行次数的判断和操作,中间过程没有dot形式的执行图,所以不适合explain语句 @tongke6 @FollyCoolly @jingshi-ant @YiAng603 拙见望指正
6.DDL/DCL的多个类型继承baseExecutor实现多态,实现核心在Next函数中: 以simple的Next函数举例
// Next implements the Executor Next interface. func (e *SimpleExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { if e.done { return nil } tx := e.ctx.GetSessionVars().Storage.Begin() defer func() { if err != nil { tx.Rollback() } else { tx.Commit() } }()
switch x := e.Statement.(type) { case *ast.CreateUserStmt: err = e.executeCreateUser(tx, x) case *ast.DropUserStmt: err = e.executeDropUser(tx, x) case *ast.AlterUserStmt: err = e.executeAlterUser(tx, x) default: err = fmt.Errorf("simpleExec.Next: unsupported ast %v", x) } e.done = true return err } Next函数只负责error的判断和执行次数的判断和操作,中间过程没有dot形式的执行图,所以不适合explain语句 @tongke6 @FollyCoolly @jingshi-ant @YiAng603 拙见望指正
RunSQL 里得到调用 Interpreter Compile 得到 explain 的部分,除去 err != nil 这种琐碎的代码,其实就 3 行。 比起在 runDQL 里特判,可以抽出一个专门处理 explain 的函数。
7.尝试在run函数里调用 Interpreter Compile,判断是否为explain-构造编译计划请求-构造编译计划,返回编译计划里的dot格式图,跳过后续流程 尝试在Run函数中构造编译请求:
// Run runs an DDL/DCL statement on SCDB
func Run(ctx sessionctx.Context, stmt string, is infoschema.InfoSchema) ([]*scql.Tensor, error) {
// Step 1: Parsing
p := parser.New()
ast, err := p.ParseOneStmt(stmt, "", "")
if err != nil {
return nil, err
}
if err := core.Preprocess(ctx, ast, is); err != nil {
return nil, err
}
if _, ok := ast.(*parserAst.ExplainStmt); ok {
compiledReq := &pb.CompileQueryRequest{
Query: stmt,
DbName: ctx.GetSessionVars().CurrentDB,
}
intrpr := interpreter.NewInterpreter()
compiledPlan, err := intrpr.Compile(context.Background(), compiledReq)
if err != nil {
return nil, err
}
dotGraph := compiledPlan.GetExplain().GetExeGraphDot()
return []*scql.Tensor{{
Name: "explain_result",
ElemType: scql.PrimitiveDataType_STRING,
StringData: []string{dotGraph},
Shape: &scql.TensorShape{
Dim: []*scql.TensorShape_Dimension{{
Value: &scql.TensorShape_Dimension_DimValue{DimValue: 1},
}},
},
}}, nil
}
尝试后发现Run函数的入参信息过少,不足以构造完整编译请求:具体来说:Issue,catalog关键字段都无法覆盖
// Context is an interface for transaction and executive args environment.
type Context interface {
GetSessionVars() *variable.SessionVars
// SetValue saves a value associated with this context for key.
SetValue(key fmt.Stringer, value interface{})
// Value returns the value associated with this context for key.
Value(key fmt.Stringer) interface{}
}
// SessionVars is to handle user-defined or global variables in the current session.
type SessionVars struct {
// systems variables, don't modify it directly, use GetSystemVar/SetSystemVar method.
systems map[string]string
// SysWarningCount is the system variable "warning_count", because it is on the hot path, so we extract it from the systems
SysWarningCount int
// SysErrorCount is the system variable "error_count", because it is on the hot path, so we extract it from the systems
SysErrorCount uint16
// StrictSQLMode indicates if the session is in strict mode.
StrictSQLMode bool
// ActiveRoles stores active roles for current user
ActiveRoles []*auth.RoleIdentity
// Status stands for the session status. e.g. in transaction or not, auto commit is on or off, and so on.
Status uint16
// User is the user identity with which the session login.
User *auth.UserIdentity
// CurrentDB is the default database of this session.
CurrentDB string
// PlanID is the unique id of logical and physical plan.
PlanID int
// StmtCtx holds variables for current executing statement.
StmtCtx *stmtctx.StatementContext
// PlanColumnID is the unique id for column when building plan.
PlanColumnID int64
// SnapshotTS is used for reading history data.
SnapshotTS uint64
// SnapshotInfoschema is used with SnapshotTS, when the schema version at snapshotTS less than current schema
// version, we load an old version schema for query.
SnapshotInfoschema interface{}
// StartTime is the start time of the last query.
StartTime time.Time
// DurationParse is the duration of parsing SQL string to AST of the last query.
DurationParse time.Duration
// DurationPlanning is the duration of compiling AST to logical plan of the last query.
DurationPlanning time.Duration
// DurationTranslating is the duration of converting logical plan to execution plan of the last query.
DurationTranslating time.Duration
// DurationExecuting is the duration of executing execution plan of the last query.
DurationExecuting time.Duration
// Per-connection time zones. Each client that connects has its own time zone setting, given by the session time_zone variable.
// See https://dev.mysql.com/doc/refman/5.7/en/time-zone-support.html
TimeZone string
// Storage
Storage *gorm.DB
// PreparedParams params for prepared statements
PreparedParams PreparedParams
SQLMode mysql.SQLMode
// AffectedByGroupThreshold is used to mark whether GroupByThreshold is applied to protect query results
AffectedByGroupThreshold bool
// GroupByThreshold applied to protect query results
GroupByThreshold uint64
}
在buildcompilerequest中,可用的构造请求函数如下:注意到其入参ctx context.Context, s *session,从此处也看出Run函数的入参信息不足以构造编译请求
func (app *App) buildCompileRequest(ctx context.Context, s *session) (*scql.CompileQueryRequest, error) {
issuer := s.GetSessionVars().User
if issuer.Username == storage.DefaultRootName && issuer.Hostname == storage.DefaultHostName {
return nil, fmt.Errorf("user root has no privilege to execute dql")
}
issuerPartyCode, err := storage.QueryUserPartyCode(s.GetSessionVars().Storage, issuer.Username, issuer.Hostname)
if err != nil {
return nil, fmt.Errorf("failed to query issuer party code: %v", err)
}
dsList, err := app.extractDataSourcesInDQL(ctx, s)
if err != nil {
return nil, err
}
// check datasource
if len(dsList) == 0 {
return nil, fmt.Errorf("no data source specified in the query")
}
dbName := dsList[0].DBName.String()
s.GetSessionVars().CurrentDB = dbName
// check if referenced tables are in the same db
if len(dsList) > 1 {
for _, ds := range dsList[1:] {
if ds.DBName.String() != dbName {
return nil, fmt.Errorf("query is not allowed to execute across multiply databases")
}
}
}
// collect referenced table schemas
tableNames := make([]string, 0, len(dsList))
for _, ds := range dsList {
tableNames = append(tableNames, ds.TableInfo().Name.String())
}
// collect all view in db
// TODO: possible optimization: Analysis AST to find all reference tables (including real data source & view)
views, err := storage.QueryAllViewsInDb(s.GetSessionVars().Storage, dbName)
if err != nil {
return nil, fmt.Errorf("failed to query all views in db `%s`: %v", dbName, err)
}
tableNames = append(tableNames, views...)
catalog, err := app.buildCatalog(s.GetSessionVars().Storage, dbName, tableNames)
if err != nil {
return nil, fmt.Errorf("failed to build catalog: %v", err)
}
intoParties, err := util.CollectIntoParties(s.request.GetQuery())
if err != nil {
return nil, fmt.Errorf("failed to collect select into parties from query: %v", err)
}
securityConfig, err := buildSecurityConfig(s.GetSessionVars().Storage, intoParties, issuerPartyCode, catalog.Tables)
if err != nil {
return nil, err
}
spuRuntimeCfg, err := config.NewSpuRuntimeCfg(app.config.Engine.SpuRuntimeCfg)
if err != nil {
return nil, err
}
groupByThreshold := constant.DefaultGroupByThreshold
if app.config.SecurityCompromise.GroupByThreshold > 0 {
groupByThreshold = app.config.SecurityCompromise.GroupByThreshold
}
req := &scql.CompileQueryRequest{
Query: s.request.GetQuery(),
DbName: dbName,
Issuer: &scql.PartyId{
Code: issuerPartyCode,
},
IssuerAsParticipant: false,
Catalog: catalog,
SecurityConf: securityConfig,
CompileOpts: &scql.CompileOptions{
SpuConf: spuRuntimeCfg,
SecurityCompromise: &scql.SecurityCompromiseConfig{
RevealGroupMark: app.config.SecurityCompromise.RevealGroupMark,
GroupByThreshold: groupByThreshold,
RevealGroupCount: app.config.SecurityCompromise.RevealGroupCount,
},
DumpExeGraph: true,
},
}
return req, nil
}
具体来说,cannot use ctx.GetSessionVars().User (variable of type *"github.com/secretflow/scql/pkg/parser/auth".UserIdentity) as *scql.PartyId value
compiledReq := &pb.CompileQueryRequest{
Query: stmt,
DbName: ctx.GetSessionVars().CurrentDB,
Issuer: ctx.GetSessionVars().User,
}
当然可以用User去找PartyID,实现如下:我认为这样Run函数过于冗杂了
user := ctx.GetSessionVars().User
// 检查root用户权限
if user.Username == storage.DefaultRootName && user.Hostname == storage.DefaultHostName {
return nil, fmt.Errorf("user root has no privilege to execute dql")
}
// 查询用户对应的party code
issuerPartyCode, err := storage.QueryUserPartyCode(
ctx.GetSessionVars().Storage,
user.Username,
user.Hostname
)
if err != nil {
return nil, fmt.Errorf("failed to query issuer party code: %v", err)
}
Issuer: &scql.PartyId{
Code: issuerPartyCode, // 从数据库查询的真实Party Code
}
Catalog字段也有类似问题,如果一定需要在Run中实现,实现如下:
catalog := &scql.Catalog{
Tables: make([]*scql.TableEntry, 0),
}
tables := is.GetTables()
for _, table := range tables {
if table.DB().Name.L == dbName {
tableEntry := &scql.TableEntry{
TableName: table.Name.String(),
Owner: &scql.PartyId{
Code: table.GetOwnerPartyCode(),
},
DbType: table.GetDBType(),
Columns: convertColumns(table.Columns()),
}
catalog.Tables = append(catalog.Tables, tableEntry)
}
}
拙见是在Run函数中实现,不仅需要实现构造编译请求,编译;而且Run函数的入参信息不足以构造请求。在RunDQL函数中,已经有编译请求,编译器,编译计划等等,只需要复用即可。 望指正 @FollyCoolly @tongke6
7.尝试在run函数里调用 Interpreter Compile,判断是否为explain-构造编译计划请求-构造编译计划,返回编译计划里的dot格式图,跳过后续流程
if _, ok := ast.(*ast.ExplainStmt); ok { compiledReq := &pb.CompileQueryRequest{ Query: stmt, }
intrpr := interpreter.NewInterpreter() compiledPlan, err := intrpr.Compile(context.Background(), compiledReq) if err != nil { return nil, err } dotGraph := compiledPlan.GetExplain().GetExeGraphDot() return []*scql.Tensor{{ Name: "explain_result", ElemType: scql.PrimitiveDataType_STRING, StringData: []string{dotGraph}, Shape: &scql.TensorShape{ Dim: []*scql.TensorShape_Dimension{{ Value: &scql.TensorShape_Dimension_DimValue{DimValue: 1}, }}, }, }}, nil} 请问此思路正确吗
我觉得是可以的。 CC: @tongke6
func Run(ctx sessionctx.Context, stmt string, is infoschema.InfoSchema)
缺少参数是指? buildCompileRequest 应该只需要 session 和 context。 二者在 submitAndGet 里都提供了。 为 runSQL 函数加上 session 参数,或者直接增加一个 runExplain 函数,在 submitAndGet 里调用都可以。
8.主要修改如下:
isExplain, err := isExplainQuery(s.request.GetQuery())
if err != nil {
return
}
if isExplain {
compileReq, err := app.buildCompileRequest(ctx, s)
if err != nil {
return
}
intrpr := interpreter.NewInterpreter()
compiledPlan, err := intrpr.Compile(ctx, compileReq)
if err != nil {
return
}
dotGraph := compiledPlan.GetExplain().GetExeGraphDot()
rt = []*scql.Tensor{{
Name: "explain_result",
ElemType: scql.PrimitiveDataType_STRING,
StringData: []string{dotGraph},
Shape: &scql.TensorShape{
Dim: []*scql.TensorShape_Dimension{{
Value: &scql.TensorShape_Dimension_DimValue{DimValue: 1},
}},
},
}}
} else {
rt, err = scdbexecutor.Run(s, s.request.Query, is)
}
测试结果已更新