scql icon indicating copy to clipboard operation
scql copied to clipboard

explainstatement相关测试

Open zhuyanhuazhuyanhua opened this issue 3 months ago • 11 comments

1.Image 官网原文:- 同步查询接口允许用户提交查询,等待查询完成,并在一个 RPC 中获取查询结果。该接口适用于执行快速查询,例如 DDL、DCL 和简单的 DQL。但是,如果查询运行时间过长,可能会导致超时。因此,建议使用同步查询 API 来运行复杂的查询。 修改之后:+ 同步查询接口允许用户提交查询,等待查询完成,并在一个 RPC 中获取查询结果。该接口适用于执行快速查询,例如 DDL、DCL 和简单的 DQL。但是,如果查询运行时间过长,可能会导致超时。因此,建议使用异步查询 API 来运行复杂的查询。 在api\scdb_api.proto中也做了改动

zhuyanhuazhuyanhua avatar Sep 01 '25 08:09 zhuyanhuazhuyanhua

看最后的测试结果是正常的?都pass了? 另外,如果执行了setup.sh(会重新设置公私钥),建议先 docker compose down 清理掉残留的mysql container,里面会有老的用户信息。

jingshi-ant avatar Sep 02 '25 02:09 jingshi-ant

2.测试

{
  "queries": [
    {
      "name": "explain percent select ",
      "query": "explain select tbl_1.plain_int_0,tbl_1.plain_datetime_0,tbl_1.plain_timestamp_0 from alice.tbl_1;",
      "mysql_query": ""
    }
  ]
}
2025-09-28 19:42:13.9287 INFO interpreter.go:219 subgraph of alice: nodes:{key:"0" value:{node_name:"runsql.0" op_type:"RunSQL" outputs:{key:"Out" value:{tensors:{name:"scdb.alice_tbl_1.plain_int_0.0" elem_type:INT64 option:REFERENCE annotation:{status:TENSORSTATUS_PRIVATE} ref_num:1} tensors:{name:"scdb.alice_tbl_1.plain_datetime_0.1" elem_type:DATETIME option:REFERENCE annotation:{status:TENSORSTATUS_PRIVATE} ref_num:1} tensors:{name:"scdb.alice_tbl_1.plain_timestamp_0.2" elem_type:TIMESTAMP option:REFERENCE annotation:{status:TENSORSTATUS_PRIVATE} ref_num:1}}} attributes:{key:"sql" value:{t:{name:".0" elem_type:STRING string_data:"select tbl_1.plain_int_0,tbl_1.plain_datetime_0,tbl_1.plain_timestamp_0 from alice.tbl_1;"}}} attributes:{key:"table_refs" value:{t:{name:".0" shape:{dim:{dim_value:1}} elem_type:STRING string_data:"alice.tbl_1"}}}}} nodes:{key:"1" value:{node_name:"publish.1" op_type:"Publish" inputs:{key:"In" value:{tensors:{name:"scdb.alice_tbl_1.plain_int_0.0" elem_type:INT64 option:REFERENCE annotation:{status:TENSORSTATUS_PRIVATE} ref_num:1} tensors:{name:"scdb.alice_tbl_1.plain_datetime_0.1" elem_type:DATETIME option:REFERENCE annotation:{status:TENSORSTATUS_PRIVATE} ref_num:1} tensors:{name:"scdb.alice_tbl_1.plain_timestamp_0.2" elem_type:TIMESTAMP option:REFERENCE annotation:{status:TENSORSTATUS_PRIVATE} ref_num:1}}} outputs:{key:"Out" value:{tensors:{name:"plain_int_0.3" elem_type:STRING annotation:{status:TENSORSTATUS_PRIVATE} string_data:"plain_int_0"} tensors:{name:"plain_datetime_0.4" elem_type:STRING annotation:{status:TENSORSTATUS_PRIVATE} string_data:"plain_datetime_0"} tensors:{name:"plain_timestamp_0.5" elem_type:STRING annotation:{status:TENSORSTATUS_PRIVATE} string_data:"plain_timestamp_0"}}}}} policy:{worker_num:1 pipelines:{subdags:{jobs:{node_ids:"0"}} subdags:{jobs:{node_ids:"1"}}}}

2025-09-28 19:42:13.9287 INFO submit_and_get_handler.go:83 |RequestID:|SessionID:2d28ec02-9c60-11f0-96b7-eaad763c7245|ActionName:SCDBSubmitAndGetHandler@/public/submit_and_get|CostTime:11.614892ms|Reason:|ErrorMsg:|Request:user:{user:{account_system_type:NATIVE_USER native_user:{name:"alice"}}} query:"explain select plain_int_0, plain_datetime_0, plain_timestamp_0 from alice_tbl_1;" db_name:"scdb"|ClientIP:172.18.0.1

2025-09-28 19:42:13.9287 INFO server.go:158 |GIN|status=200|method=POST|path=/public/submit_and_get|ip=172.18.0.1|latency=11.674445ms|
{
  "queries": [
    {
      "name": "rank ",
      "query": "explain select ta.plain_int_0,percent_rank() over (partition by ta.plain_int_0 order by ta.plain_float_0) as num from alice.tbl_0 as ta;",
      "mysql_query": ""
    }
  ]
}
2025-09-28 19:52:36.9287 INFO interpreter.go:219 subgraph of alice: nodes:{key:"0" value:{node_name:"runsql.0" op_type:"RunSQL" outputs:{key:"Out" value:{tensors:{name:"scdb.alice_tbl_0.plain_int_0.0" elem_type:INT64 option:REFERENCE annotation:{status:TENSORSTATUS_PRIVATE} ref_num:1} tensors:{name:"Column#124.1" elem_type:FLOAT64 option:REFERENCE annotation:{status:TENSORSTATUS_PRIVATE} ref_num:1}}} attributes:{key:"sql" value:{t:{name:".0" elem_type:STRING string_data:"select ta.plain_int_0,percent_rank() over (partition by ta.plain_int_0 order by ta.plain_float_0) as num from alice.tbl_0 as ta;"}}} attributes:{key:"table_refs" value:{t:{name:".0" shape:{dim:{dim_value:1}} elem_type:STRING string_data:"alice.tbl_0"}}}}} nodes:{key:"1" value:{node_name:"publish.1" op_type:"Publish" inputs:{key:"In" value:{tensors:{name:"scdb.alice_tbl_0.plain_int_0.0" elem_type:INT64 option:REFERENCE annotation:{status:TENSORSTATUS_PRIVATE} ref_num:1} tensors:{name:"Column#124.1" elem_type:FLOAT64 option:REFERENCE annotation:{status:TENSORSTATUS_PRIVATE} ref_num:1}}} outputs:{key:"Out" value:{tensors:{name:"plain_int_0.2" elem_type:STRING annotation:{status:TENSORSTATUS_PRIVATE} string_data:"plain_int_0"} tensors:{name:"num.3" elem_type:STRING annotation:{status:TENSORSTATUS_PRIVATE} string_data:"num"}}}}} policy:{worker_num:1 pipelines:{subdags:{jobs:{node_ids:"0"}} subdags:{jobs:{node_ids:"1"}}}}

2025-09-28 19:52:36.9287 INFO submit_and_get_handler.go:83 |RequestID:|SessionID:a05e3a2b-9c61-11f0-96b7-eaad763c7245|ActionName:SCDBSubmitAndGetHandler@/public/submit_and_get|CostTime:11.130669ms|Reason:|ErrorMsg:|Request:user:{user:{account_system_type:NATIVE_USER native_user:{name:"alice"}}} query:"explain select ta.plain_int_0, percent_rank() over(partition by ta.plain_int_0 order by ta.plain_float_0) as num from alice_tbl_0 as ta;" db_name:"scdb"|ClientIP:172.18.0.1

2025-09-28 19:52:36.9287 INFO server.go:158 |GIN|status=200|method=POST|path=/public/submit_and_get|ip=172.18.0.1|latency=11.16236ms|
{
  "queries": [
    {
      "name": "explain left  join",
      "query": "explain select ta.plain_int_0 from alice_tbl_1 as ta left join alice_tbl_2 as tb on ta.plain_int_0 = tb.plain_int_0;",
      "mysql_query": ""
    }
  ]
}
2025-09-28 19:56:05.9287 INFO interpreter.go:219 subgraph of alice: nodes:{key:"0" value:{node_name:"runsql.0" op_type:"RunSQL" outputs:{key:"Out" value:{tensors:{name:"scdb.alice_tbl_1.plain_int_0.0" elem_type:INT64 option:REFERENCE annotation:{status:TENSORSTATUS_PRIVATE} ref_num:1}}} attributes:{key:"sql" value:{t:{name:".0" elem_type:STRING string_data:"select ta.plain_int_0 from alice.tbl_1 as ta left join alice.tbl_2 as tb on ta.plain_int_0=tb.plain_int_0;"}}} attributes:{key:"table_refs" value:{t:{name:".0" shape:{dim:{dim_value:2}} elem_type:STRING string_data:"alice.tbl_1" string_data:"alice.tbl_2"}}}}} nodes:{key:"1" value:{node_name:"publish.1" op_type:"Publish" inputs:{key:"In" value:{tensors:{name:"scdb.alice_tbl_1.plain_int_0.0" elem_type:INT64 option:REFERENCE annotation:{status:TENSORSTATUS_PRIVATE} ref_num:1}}} outputs:{key:"Out" value:{tensors:{name:"plain_int_0.1" elem_type:STRING annotation:{status:TENSORSTATUS_PRIVATE} string_data:"plain_int_0"}}}}} policy:{worker_num:1 pipelines:{subdags:{jobs:{node_ids:"0"}} subdags:{jobs:{node_ids:"1"}}}}

2025-09-28 19:56:05.9287 INFO submit_and_get_handler.go:83 |RequestID:|SessionID:1cd65ed3-9c62-11f0-96b7-eaad763c7245|ActionName:SCDBSubmitAndGetHandler@/public/submit_and_get|CostTime:11.117878ms|Reason:|ErrorMsg:|Request:user:{user:{account_system_type:NATIVE_USER native_user:{name:"alice"}}} query:"explain select ta.plain_int_0 from alice_tbl_1 as ta left join alice_tbl_2 as tb on ta.plain_int_0 = tb.plain_int_0;" db_name:"scdb"|ClientIP:172.18.0.1

2025-09-28 19:56:05.9287 INFO server.go:158 |GIN|status=200|method=POST|path=/public/submit_and_get|ip=172.18.0.1|latency=11.165014ms|

zhuyanhuazhuyanhua avatar Sep 08 '25 16:09 zhuyanhuazhuyanhua

3.修改为OutColumns存放explain

// If parameter async is true, the query result will be notified by engine, this function will always return nil
func (app *App) runDQL(ctx context.Context, s *session, async bool) (*scql.SCDBQueryResultResponse, error) {
	compileReq, err := app.buildCompileRequest(ctx, s)
	if err != nil {
		return nil, err
	}
	intrpr := interpreter.NewInterpreter()
	compiledPlan, err := intrpr.Compile(ctx, compileReq)

	if err != nil {
		return nil, err
	}

	s.GetSessionVars().AffectedByGroupThreshold = compiledPlan.Warning.GetMayAffectedByGroupThreshold()
	s.GetSessionVars().GroupByThreshold = compileReq.CompileOpts.SecurityCompromise.GroupByThreshold
	logrus.Infof("Execution Plan:\n%s\n", compiledPlan.GetExplain().GetExeGraphDot())

	sessionStartParams := &scql.JobStartParams{
		JobId:         s.id,
		SpuRuntimeCfg: compiledPlan.GetSpuRuntimeConf(),
		TimeZone:      s.GetSessionVars().GetTimeZone(),
	}
	var partyCodes []string
	for _, p := range compiledPlan.Parties {
		partyCodes = append(partyCodes, p.GetCode())
	}

	var partyInfo *graph.PartyInfo
	{
		db := s.GetSessionVars().Storage
		var users []storage.User
		result := db.Model(&storage.User{}).Where("party_code in ?", partyCodes).Find(&users)
		if result.Error != nil {
			return nil, result.Error
		}
		partyMap := make(map[string]*graph.Participant)
		for _, u := range users {
			participant := &graph.Participant{
				PartyCode: u.PartyCode,
				Endpoints: strings.Split(u.EngineEndpoints, ";"),
				Token:     u.EngineToken,
				PubKey:    u.EnginePubKey,
			}
			partyMap[u.PartyCode] = participant
		}
		participants := make([]*graph.Participant, 0, len(partyCodes))
		for i, code := range partyCodes {
			party, exists := partyMap[code]
			if !exists {
				return nil, fmt.Errorf("could not find info for party %s", code)
			}
			participants = append(participants, party)
			sessionStartParams.Parties = append(sessionStartParams.Parties, &scql.JobStartParams_Party{
				Code:      code,
				Name:      code,
				Host:      party.Endpoints[0],
				Rank:      int32(i),
				PublicKey: party.PubKey,
			})
		}
		partyInfo = graph.NewPartyInfo(participants)
	}

	pbRequests := make(map[string]*scql.RunExecutionPlanRequest)
	for party, graph := range compiledPlan.SubGraphs {
		startParams, ok := proto.Clone(sessionStartParams).(*scql.JobStartParams)
		if !ok {
			return nil, fmt.Errorf("failed to clone session start params")
		}
		startParams.PartyCode = party
		cbURL := url.URL{
			Scheme: app.config.Protocol,
			Host:   app.config.SCDBHost,
			Path:   engineCallbackPath,
		}
		pbRequests[party] = &scql.RunExecutionPlanRequest{
			JobParams:     startParams,
			Graph:         graph,
			Async:         async,
			CallbackUrl:   cbURL.String(),
			GraphChecksum: &scql.GraphChecksum{CheckGraphChecksum: false},
		}
	}

	isExplain, err := isExplainQuery(s.request.GetQuery())
	if err != nil {
		return nil, err
	}

	if isExplain {
		explainTensor := &scql.Tensor{
			Name:     "explain_result",
			ElemType: scql.PrimitiveDataType_STRING,
			Option:   scql.TensorOptions_VALUE,
		}

		explainTensor.StringData = []string{compiledPlan.GetExplain().GetExeGraphDot()}

		return &scql.SCDBQueryResultResponse{
			Status: &scql.Status{
				Code:    int32(scql.Code_OK),
				Message: "ok", 
			},
			OutColumns:    []*scql.Tensor{explainTensor}, 
			ScdbSessionId: s.id,
			AffectedRows:  0,
			Warnings:      nil,
		}, nil
	}

	engineClient := executor.NewEngineClient(
		app.config.Engine.ClientMode,
		app.config.Engine.ClientTimeout,
		&bcfg.TLSConf{
			Mode:       app.config.Engine.TLSCfg.Mode,
			CertPath:   app.config.Engine.TLSCfg.CertFile,
			KeyPath:    app.config.Engine.TLSCfg.KeyFile,
			CACertPath: app.config.Engine.TLSCfg.CACertFile,
		},
		app.config.Engine.ContentType,
		app.config.Engine.Protocol,
	)
	s.engineStub = executor.NewEngineStub(
		s.id,
		app.config.Protocol,
		app.config.SCDBHost,
		engineCallbackPath,
		engineClient,
	)

	var outputNames []string
	for _, col := range compiledPlan.GetSchema().GetColumns() {
		outputNames = append(outputNames, col.GetName())
	}

	exec, err := executor.NewExecutor(pbRequests, outputNames, s.engineStub, s.id, partyInfo)
	if err != nil {
		return nil, err
	}
	s.executor = exec
	resp, err := exec.RunExecutionPlan(ctx, async)
	if err != nil {
		return nil, err
	}

	if async {
		// In async mode, result will be set in callback
		return nil, nil
	}
	return resp, nil
}

zhuyanhuazhuyanhua avatar Sep 09 '25 01:09 zhuyanhuazhuyanhua

https://github.com/secretflow/scql/pull/675

zhuyanhuazhuyanhua avatar Sep 09 '25 01:09 zhuyanhuazhuyanhua

  1. 详见 https://blog.csdn.net/2301_79815382/article/details/151257417?fromshare=blogdetail&sharetype=blogdetail&sharerId=151257417&sharerefer=PC&sharesource=2301_79815382&sharefrom=from_link runSQL函数核心处理在run函数中,run函数主要处理DDL/DCL:
// Run runs an DDL/DCL statement on SCDB
func Run(ctx sessionctx.Context, stmt string, is infoschema.InfoSchema) ([]*scql.Tensor, error) {
	// Step 1: Parsing
	p := parser.New()
	ast, err := p.ParseOneStmt(stmt, "", "")
	if err != nil {
		return nil, err
	}
	if err := core.Preprocess(ctx, ast, is); err != nil {
		return nil, err
	}

	// Step 2: Planning
	lp, _, err := core.BuildLogicalPlan(context.Background(), ctx, ast, is)
	if err != nil {
		return nil, err
	}

	// Step 3: Executing
	eb := newExecutorBuilder(ctx, is)
	exec := eb.build(lp)
	if eb.err != nil {
		return nil, eb.err
	}
	if err := exec.Open(context.Background()); err != nil {
		return nil, err
	}

	retTypes := []*types.FieldType{}
	for _, c := range lp.Schema().Columns {
		retTypes = append(retTypes, c.RetType)
	}

	ck := chunk.New(retTypes, ResultMaxRows, ResultMaxRows)
	var result []*scql.Tensor
	for {
		if err := exec.Next(context.Background(), ck); err != nil {
			return nil, err
		}
		if result, err = mergeResultFromChunk(ck, lp, result); err != nil {
			return nil, err
		}
		if ck.NumRows() == 0 {
			break
		}
	}
	return result, err
}

runDQL函数处理DQL:

// If parameter async is true, the query result will be notified by engine, this function will always return nil
func (app *App) runDQL(ctx context.Context, s *session, async bool) (*scql.SCDBQueryResultResponse, error) {
	compileReq, err := app.buildCompileRequest(ctx, s)
	if err != nil {
		return nil, err
	}
	intrpr := interpreter.NewInterpreter()
	compiledPlan, err := intrpr.Compile(ctx, compileReq)

	if err != nil {
		return nil, err
	}

	s.GetSessionVars().AffectedByGroupThreshold = compiledPlan.Warning.GetMayAffectedByGroupThreshold()
	s.GetSessionVars().GroupByThreshold = compileReq.CompileOpts.SecurityCompromise.GroupByThreshold
	logrus.Infof("Execution Plan:\n%s\n", compiledPlan.GetExplain().GetExeGraphDot())

	sessionStartParams := &scql.JobStartParams{
		JobId:         s.id,
		SpuRuntimeCfg: compiledPlan.GetSpuRuntimeConf(),
		TimeZone:      s.GetSessionVars().GetTimeZone(),
	}
	var partyCodes []string
	for _, p := range compiledPlan.Parties {
		partyCodes = append(partyCodes, p.GetCode())
	}

	var partyInfo *graph.PartyInfo
	{
		db := s.GetSessionVars().Storage
		var users []storage.User
		result := db.Model(&storage.User{}).Where("party_code in ?", partyCodes).Find(&users)
		if result.Error != nil {
			return nil, result.Error
		}
		partyMap := make(map[string]*graph.Participant)
		for _, u := range users {
			participant := &graph.Participant{
				PartyCode: u.PartyCode,
				Endpoints: strings.Split(u.EngineEndpoints, ";"),
				Token:     u.EngineToken,
				PubKey:    u.EnginePubKey,
			}
			partyMap[u.PartyCode] = participant
		}
		participants := make([]*graph.Participant, 0, len(partyCodes))
		for i, code := range partyCodes {
			party, exists := partyMap[code]
			if !exists {
				return nil, fmt.Errorf("could not find info for party %s", code)
			}
			participants = append(participants, party)
			sessionStartParams.Parties = append(sessionStartParams.Parties, &scql.JobStartParams_Party{
				Code:      code,
				Name:      code,
				Host:      party.Endpoints[0],
				Rank:      int32(i),
				PublicKey: party.PubKey,
			})
		}
		partyInfo = graph.NewPartyInfo(participants)
	}

	pbRequests := make(map[string]*scql.RunExecutionPlanRequest)
	for party, graph := range compiledPlan.SubGraphs {
		startParams, ok := proto.Clone(sessionStartParams).(*scql.JobStartParams)
		if !ok {
			return nil, fmt.Errorf("failed to clone session start params")
		}
		startParams.PartyCode = party
		cbURL := url.URL{
			Scheme: app.config.Protocol,
			Host:   app.config.SCDBHost,
			Path:   engineCallbackPath,
		}
		pbRequests[party] = &scql.RunExecutionPlanRequest{
			JobParams:     startParams,
			Graph:         graph,
			Async:         async,
			CallbackUrl:   cbURL.String(),
			GraphChecksum: &scql.GraphChecksum{CheckGraphChecksum: false},
		}
	}

	isExplain, err := isExplainQuery(s.request.GetQuery())
	if err != nil {
		return nil, err
	}

	if isExplain {
		explainTensor := &scql.Tensor{
			Name:     "explain_result",
			ElemType: scql.PrimitiveDataType_STRING,
			Option:   scql.TensorOptions_VALUE,
		}

		explainTensor.StringData = []string{compiledPlan.GetExplain().GetExeGraphDot()}

		return &scql.SCDBQueryResultResponse{
			Status: &scql.Status{
				Code:    int32(scql.Code_OK),
				Message: "ok",
			},
			OutColumns:    []*scql.Tensor{explainTensor},
			ScdbSessionId: s.id,
			AffectedRows:  0,
			Warnings:      nil,
		}, nil
	}

	engineClient := executor.NewEngineClient(
		app.config.Engine.ClientMode,
		app.config.Engine.ClientTimeout,
		&bcfg.TLSConf{
			Mode:       app.config.Engine.TLSCfg.Mode,
			CertPath:   app.config.Engine.TLSCfg.CertFile,
			KeyPath:    app.config.Engine.TLSCfg.KeyFile,
			CACertPath: app.config.Engine.TLSCfg.CACertFile,
		},
		app.config.Engine.ContentType,
		app.config.Engine.Protocol,
	)
	s.engineStub = executor.NewEngineStub(
		s.id,
		app.config.Protocol,
		app.config.SCDBHost,
		engineCallbackPath,
		engineClient,
	)

	var outputNames []string
	for _, col := range compiledPlan.GetSchema().GetColumns() {
		outputNames = append(outputNames, col.GetName())
	}

	exec, err := executor.NewExecutor(pbRequests, outputNames, s.engineStub, s.id, partyInfo)
	if err != nil {
		return nil, err
	}
	s.executor = exec
	resp, err := exec.RunExecutionPlan(ctx, async)
	if err != nil {
		return nil, err
	}

	if async {
		// In async mode, result will be set in callback
		return nil, nil
	}
	return resp, nil
}

run函数中不存在编译计划图过程,而runDQL函数有compiledPlan, err := intrpr.Compile(ctx, compileReq)以及compiledPlan.GetExplain().GetExeGraphDot(),存在编译计划图过程;正是explain所需的。

zhuyanhuazhuyanhua avatar Sep 09 '25 06:09 zhuyanhuazhuyanhua

6.DDL/DCL的多个类型继承baseExecutor实现多态,实现核心在Next函数中: 以simple的Next函数举例

// Next implements the Executor Next interface.
func (e *SimpleExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
	if e.done {
		return nil
	}
	tx := e.ctx.GetSessionVars().Storage.Begin()
	defer func() {
		if err != nil {
			tx.Rollback()
		} else {
			tx.Commit()
		}
	}()

	switch x := e.Statement.(type) {
	case *ast.CreateUserStmt:
		err = e.executeCreateUser(tx, x)
	case *ast.DropUserStmt:
		err = e.executeDropUser(tx, x)
	case *ast.AlterUserStmt:
		err = e.executeAlterUser(tx, x)
	default:
		err = fmt.Errorf("simpleExec.Next: unsupported ast %v", x)
	}
	e.done = true
	return err
}

Next函数只负责error的判断和执行次数的判断和操作,中间过程没有dot形式的执行图,所以不适合explain语句 @tongke6 @FollyCoolly @jingshi-ant @YiAng603 拙见望指正

zhuyanhuazhuyanhua avatar Sep 13 '25 06:09 zhuyanhuazhuyanhua

6.DDL/DCL的多个类型继承baseExecutor实现多态,实现核心在Next函数中: 以simple的Next函数举例

// Next implements the Executor Next interface. func (e *SimpleExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { if e.done { return nil } tx := e.ctx.GetSessionVars().Storage.Begin() defer func() { if err != nil { tx.Rollback() } else { tx.Commit() } }()

switch x := e.Statement.(type) { case *ast.CreateUserStmt: err = e.executeCreateUser(tx, x) case *ast.DropUserStmt: err = e.executeDropUser(tx, x) case *ast.AlterUserStmt: err = e.executeAlterUser(tx, x) default: err = fmt.Errorf("simpleExec.Next: unsupported ast %v", x) } e.done = true return err } Next函数只负责error的判断和执行次数的判断和操作,中间过程没有dot形式的执行图,所以不适合explain语句 @tongke6 @FollyCoolly @jingshi-ant @YiAng603 拙见望指正

RunSQL 里得到调用 Interpreter Compile 得到 explain 的部分,除去 err != nil 这种琐碎的代码,其实就 3 行。 比起在 runDQL 里特判,可以抽出一个专门处理 explain 的函数。

FollyCoolly avatar Sep 15 '25 02:09 FollyCoolly

7.尝试在run函数里调用 Interpreter Compile,判断是否为explain-构造编译计划请求-构造编译计划,返回编译计划里的dot格式图,跳过后续流程 尝试在Run函数中构造编译请求:

// Run runs an DDL/DCL statement on SCDB
func Run(ctx sessionctx.Context, stmt string, is infoschema.InfoSchema) ([]*scql.Tensor, error) {
	// Step 1: Parsing
	p := parser.New()
	ast, err := p.ParseOneStmt(stmt, "", "")
	if err != nil {
		return nil, err
	}
	if err := core.Preprocess(ctx, ast, is); err != nil {
		return nil, err
	}
	if _, ok := ast.(*parserAst.ExplainStmt); ok {
		compiledReq := &pb.CompileQueryRequest{
			Query:  stmt,
			DbName: ctx.GetSessionVars().CurrentDB,
		}

		intrpr := interpreter.NewInterpreter()
		compiledPlan, err := intrpr.Compile(context.Background(), compiledReq)
		if err != nil {
			return nil, err
		}

		dotGraph := compiledPlan.GetExplain().GetExeGraphDot()
		return []*scql.Tensor{{
			Name:       "explain_result",
			ElemType:   scql.PrimitiveDataType_STRING,
			StringData: []string{dotGraph},
			Shape: &scql.TensorShape{
				Dim: []*scql.TensorShape_Dimension{{
					Value: &scql.TensorShape_Dimension_DimValue{DimValue: 1},
				}},
			},
		}}, nil
	}

尝试后发现Run函数的入参信息过少,不足以构造完整编译请求:具体来说:Issue,catalog关键字段都无法覆盖

// Context is an interface for transaction and executive args environment.
type Context interface {
	GetSessionVars() *variable.SessionVars

	// SetValue saves a value associated with this context for key.
	SetValue(key fmt.Stringer, value interface{})

	// Value returns the value associated with this context for key.
	Value(key fmt.Stringer) interface{}
}
// SessionVars is to handle user-defined or global variables in the current session.
type SessionVars struct {
	// systems variables, don't modify it directly, use GetSystemVar/SetSystemVar method.
	systems map[string]string
	// SysWarningCount is the system variable "warning_count", because it is on the hot path, so we extract it from the systems
	SysWarningCount int
	// SysErrorCount is the system variable "error_count", because it is on the hot path, so we extract it from the systems
	SysErrorCount uint16

	// StrictSQLMode indicates if the session is in strict mode.
	StrictSQLMode bool

	// ActiveRoles stores active roles for current user
	ActiveRoles []*auth.RoleIdentity

	// Status stands for the session status. e.g. in transaction or not, auto commit is on or off, and so on.
	Status uint16

	// User is the user identity with which the session login.
	User *auth.UserIdentity

	// CurrentDB is the default database of this session.
	CurrentDB string

	// PlanID is the unique id of logical and physical plan.
	PlanID int

	// StmtCtx holds variables for current executing statement.
	StmtCtx *stmtctx.StatementContext

	// PlanColumnID is the unique id for column when building plan.
	PlanColumnID int64

	// SnapshotTS is used for reading history data.
	SnapshotTS uint64

	// SnapshotInfoschema is used with SnapshotTS, when the schema version at snapshotTS less than current schema
	// version, we load an old version schema for query.
	SnapshotInfoschema interface{}

	// StartTime is the start time of the last query.
	StartTime time.Time

	// DurationParse is the duration of parsing SQL string to AST of the last query.
	DurationParse time.Duration

	// DurationPlanning is the duration of compiling AST to logical plan of the last query.
	DurationPlanning time.Duration

	// DurationTranslating is the duration of converting logical plan to execution plan of the last query.
	DurationTranslating time.Duration

	// DurationExecuting is the duration of executing execution plan of the last query.
	DurationExecuting time.Duration

	// Per-connection time zones. Each client that connects has its own time zone setting, given by the session time_zone variable.
	// See https://dev.mysql.com/doc/refman/5.7/en/time-zone-support.html
	TimeZone string

	// Storage
	Storage *gorm.DB

	// PreparedParams params for prepared statements
	PreparedParams PreparedParams

	SQLMode mysql.SQLMode

	// AffectedByGroupThreshold is used to mark whether GroupByThreshold is applied to protect query results
	AffectedByGroupThreshold bool

	// GroupByThreshold applied to protect query results
	GroupByThreshold uint64
}

在buildcompilerequest中,可用的构造请求函数如下:注意到其入参ctx context.Context, s *session,从此处也看出Run函数的入参信息不足以构造编译请求

func (app *App) buildCompileRequest(ctx context.Context, s *session) (*scql.CompileQueryRequest, error) {
	issuer := s.GetSessionVars().User
	if issuer.Username == storage.DefaultRootName && issuer.Hostname == storage.DefaultHostName {
		return nil, fmt.Errorf("user root has no privilege to execute dql")
	}
	issuerPartyCode, err := storage.QueryUserPartyCode(s.GetSessionVars().Storage, issuer.Username, issuer.Hostname)
	if err != nil {
		return nil, fmt.Errorf("failed to query issuer party code: %v", err)
	}

	dsList, err := app.extractDataSourcesInDQL(ctx, s)
	if err != nil {
		return nil, err
	}

	// check datasource
	if len(dsList) == 0 {
		return nil, fmt.Errorf("no data source specified in the query")
	}
	dbName := dsList[0].DBName.String()
	s.GetSessionVars().CurrentDB = dbName
	// check if referenced tables are in the same db
	if len(dsList) > 1 {
		for _, ds := range dsList[1:] {
			if ds.DBName.String() != dbName {
				return nil, fmt.Errorf("query is not allowed to execute across multiply databases")
			}
		}
	}

	// collect referenced table schemas
	tableNames := make([]string, 0, len(dsList))
	for _, ds := range dsList {
		tableNames = append(tableNames, ds.TableInfo().Name.String())
	}

	// collect all view in db
	// TODO: possible optimization: Analysis AST to find all reference tables (including real data source & view)
	views, err := storage.QueryAllViewsInDb(s.GetSessionVars().Storage, dbName)
	if err != nil {
		return nil, fmt.Errorf("failed to query all views in db `%s`: %v", dbName, err)
	}
	tableNames = append(tableNames, views...)

	catalog, err := app.buildCatalog(s.GetSessionVars().Storage, dbName, tableNames)
	if err != nil {
		return nil, fmt.Errorf("failed to build catalog: %v", err)
	}

	intoParties, err := util.CollectIntoParties(s.request.GetQuery())
	if err != nil {
		return nil, fmt.Errorf("failed to collect select into parties from query: %v", err)
	}
	securityConfig, err := buildSecurityConfig(s.GetSessionVars().Storage, intoParties, issuerPartyCode, catalog.Tables)
	if err != nil {
		return nil, err
	}

	spuRuntimeCfg, err := config.NewSpuRuntimeCfg(app.config.Engine.SpuRuntimeCfg)
	if err != nil {
		return nil, err
	}

	groupByThreshold := constant.DefaultGroupByThreshold
	if app.config.SecurityCompromise.GroupByThreshold > 0 {
		groupByThreshold = app.config.SecurityCompromise.GroupByThreshold
	}

	req := &scql.CompileQueryRequest{
		Query:  s.request.GetQuery(),
		DbName: dbName,
		Issuer: &scql.PartyId{
			Code: issuerPartyCode,
		},
		IssuerAsParticipant: false,
		Catalog:             catalog,
		SecurityConf:        securityConfig,
		CompileOpts: &scql.CompileOptions{
			SpuConf: spuRuntimeCfg,
			SecurityCompromise: &scql.SecurityCompromiseConfig{
				RevealGroupMark:  app.config.SecurityCompromise.RevealGroupMark,
				GroupByThreshold: groupByThreshold,
				RevealGroupCount: app.config.SecurityCompromise.RevealGroupCount,
			},
			DumpExeGraph: true,
		},
	}

	return req, nil
}

具体来说,cannot use ctx.GetSessionVars().User (variable of type *"github.com/secretflow/scql/pkg/parser/auth".UserIdentity) as *scql.PartyId value

		compiledReq := &pb.CompileQueryRequest{
			Query:  stmt,
			DbName: ctx.GetSessionVars().CurrentDB,
			Issuer: ctx.GetSessionVars().User,
		}

当然可以用User去找PartyID,实现如下:我认为这样Run函数过于冗杂了

user := ctx.GetSessionVars().User
 
// 检查root用户权限
if user.Username == storage.DefaultRootName && user.Hostname == storage.DefaultHostName {
    return nil, fmt.Errorf("user root has no privilege to execute dql")
}
 
// 查询用户对应的party code
issuerPartyCode, err := storage.QueryUserPartyCode(
    ctx.GetSessionVars().Storage, 
    user.Username, 
    user.Hostname
)
if err != nil {
    return nil, fmt.Errorf("failed to query issuer party code: %v", err)
}
 
Issuer: &scql.PartyId{
    Code: issuerPartyCode,  // 从数据库查询的真实Party Code
}

Catalog字段也有类似问题,如果一定需要在Run中实现,实现如下:

    catalog := &scql.Catalog{
        Tables: make([]*scql.TableEntry, 0),
    }
    
    tables := is.GetTables()
    for _, table := range tables {
        if table.DB().Name.L == dbName {
            tableEntry := &scql.TableEntry{
                TableName: table.Name.String(),
                Owner: &scql.PartyId{
                    Code: table.GetOwnerPartyCode(),  
                },
                DbType: table.GetDBType(),
                Columns: convertColumns(table.Columns()),
            }
            catalog.Tables = append(catalog.Tables, tableEntry)
        }
    }

拙见是在Run函数中实现,不仅需要实现构造编译请求,编译;而且Run函数的入参信息不足以构造请求。在RunDQL函数中,已经有编译请求,编译器,编译计划等等,只需要复用即可。 望指正 @FollyCoolly @tongke6

zhuyanhuazhuyanhua avatar Sep 16 '25 09:09 zhuyanhuazhuyanhua

7.尝试在run函数里调用 Interpreter Compile,判断是否为explain-构造编译计划请求-构造编译计划,返回编译计划里的dot格式图,跳过后续流程

if _, ok := ast.(*ast.ExplainStmt); ok { compiledReq := &pb.CompileQueryRequest{ Query: stmt, }

  intrpr := interpreter.NewInterpreter()
  compiledPlan, err := intrpr.Compile(context.Background(), compiledReq)
  if err != nil {
  	return nil, err
  }
  dotGraph := compiledPlan.GetExplain().GetExeGraphDot()
  return []*scql.Tensor{{
  	Name:       "explain_result",
  	ElemType:   scql.PrimitiveDataType_STRING,
  	StringData: []string{dotGraph},
  	Shape: &scql.TensorShape{
  		Dim: []*scql.TensorShape_Dimension{{
  			Value: &scql.TensorShape_Dimension_DimValue{DimValue: 1},
  		}},
  	},
  }}, nil

} 请问此思路正确吗

我觉得是可以的。 CC: @tongke6

FollyCoolly avatar Sep 17 '25 02:09 FollyCoolly

func Run(ctx sessionctx.Context, stmt string, is infoschema.InfoSchema)

缺少参数是指? buildCompileRequest 应该只需要 session 和 context。 二者在 submitAndGet 里都提供了。 为 runSQL 函数加上 session 参数,或者直接增加一个 runExplain 函数,在 submitAndGet 里调用都可以。

FollyCoolly avatar Sep 26 '25 07:09 FollyCoolly

8.主要修改如下:

	isExplain, err := isExplainQuery(s.request.GetQuery())
	if err != nil {
		return
	}
	if isExplain {
		compileReq, err := app.buildCompileRequest(ctx, s)
		if err != nil {
			return
		}
		intrpr := interpreter.NewInterpreter()
		compiledPlan, err := intrpr.Compile(ctx, compileReq)

		if err != nil {
			return
		}

		dotGraph := compiledPlan.GetExplain().GetExeGraphDot()
		rt = []*scql.Tensor{{
			Name:       "explain_result",
			ElemType:   scql.PrimitiveDataType_STRING,
			StringData: []string{dotGraph},
			Shape: &scql.TensorShape{
				Dim: []*scql.TensorShape_Dimension{{
					Value: &scql.TensorShape_Dimension_DimValue{DimValue: 1},
				}},
			},
		}}
	} else {
		rt, err = scdbexecutor.Run(s, s.request.Query, is)
	}

测试结果已更新

zhuyanhuazhuyanhua avatar Sep 28 '25 11:09 zhuyanhuazhuyanhua