Initial version of OpenSergo Go SDK
初版 opensergo-go-sdk, 可配合 opensergo-control-plane 进行联调.
联调步骤: 详细可参考 README.md 或 main.go
OpenSergo Go SDK
How to use
scene 1 : subscribe config-data
package main
func main() {
// add console logger (optional)
//logging.NewConsoleLogger(logging.InfoLevel, logging.SeparateFormat)
// add file logger (optional)
//logging.NewFileLogger("/logs/opensergo/opensergo-universal-transport-service.log", logging.InfoLevel, logging.JsonFormat)
// instant OpenSergoClient
openSergoClient := client.NewOpenSergoClient("33.1.33.1",10246)
// register SubscribeInfo of FaultToleranceRule
// 1. instant SubscribeKey
faultToleranceSubscribeKey := subscribe.NewSubscribeKey("default", "foo-app", configkind.ConfigKindRefFaultToleranceRule{})
// 2. construct SubscribeInfo
faultToleranceSubscribeInfo := client.NewSubscribeInfo(faultToleranceSubscribeKey)
// 3. register
openSergoClient.RegisterSubscribeInfo(faultToleranceSubscribeInfo)
// start OpensergoClient
openSergoClient.Start()
// register after OpenSergoClient started
// register SubscribeInfo of RateLimitStrategy
rateLimitSubscribeKey := subscribe.NewSubscribeKey("default", "foo-app", configkind.ConfigKindRefRateLimitStrategy{})
rateLimitSubscribeInfo := client.NewSubscribeInfo(*rateLimitSubscribeKey)
openSergoClient.RegisterSubscribeInfo(rateLimitSubscribeInfo)
select {}
}
scene 2 : subscribe config-data with custom-logic when config-data changed.
Add a subscriber by implements the function in subscribe.Subscriber.
There are some samples in sample directory : sample/main/sample_faulttolerance_rule_subscriber.go and sample_ratelimit_strategy_subscriber.go
type SampleFaultToleranceRuleSubscriber struct {
}
func (sampleFaultToleranceRuleSubscriber SampleFaultToleranceRuleSubscriber) OnSubscribeDataUpdate(subscribeKey subscribe.SubscribeKey, dataSlice []protoreflect.ProtoMessage) bool {
// TODO add custom-logic when config-data change
// ......
return true
}
And then register it to subscriber.SubscriberRegistry.
Demo-code is in sample/main directory
package main
func main() {
// add console logger (optional)
//logging.NewConsoleLogger(logging.InfoLevel, logging.SeparateFormat)
// add file logger (optional)
//logging.NewFileLogger("/logs/opensergo/opensergo-universal-transport-service.log", logging.InfoLevel, logging.JsonFormat)
// instant OpenSergoClient
openSergoClient := client.NewOpenSergoClient("33.1.33.1",10246)
// registry SubscribeInfo of FaultToleranceRule
// 1. instant SubscribeKey
faultToleranceSubscribeKey := subscribe.NewSubscribeKey("default", "foo-app", configkind.ConfigKindRefFaultToleranceRule{})
// 2. instant Subscriber
sampleFaultToleranceRuleSubscriber := new(SampleFaultToleranceRuleSubscriber)
// 3. construct SubscribeInfo
faultToleranceSubscribeInfo := client.NewSubscribeInfo(faultToleranceSubscribeKey)
faultToleranceSubscribeInfo.AppendSubscriber(sampleFaultToleranceRuleSubscriber)
// 4. register
openSergoClient.RegisterSubscribeInfo(faultToleranceSubscribeInfo)
// register SubscribeInfo of RateLimitStrategy
rateLimitSubscribeKey := subscribe.NewSubscribeKey("default", "foo-app", configkind.ConfigKindRefRateLimitStrategy{})
sampleRateLimitStrategySubscriber := new(SampleRateLimitStrategySubscriber)
rateLimitSubscribeInfo := client.NewSubscribeInfo(rateLimitSubscribeKey)
rateLimitSubscribeInfo.AppendSubscriber(sampleRateLimitStrategySubscriber)
openSergoClient.RegisterSubscribeInfo(rateLimitSubscribeInfo)
// start OpensergoClient
openSergoClient.Start()
// register after OpenSergoClient started
faultToleranceSubscribeInfo.AppendSubscriber(new(subscribe.DefaultSubscriber))
openSergoClient.RegisterSubscribeInfo(faultToleranceSubscribeInfo)
// register after OpenSergoClient started
rateLimitSubscribeInfo.AppendSubscriber(new(subscribe.DefaultSubscriber))
openSergoClient.RegisterSubscribeInfo(rateLimitSubscribeInfo)
select {}
}
Could you please reformat your code with goimports?
Could you please reformat your code with goimports?
fixed ! thanks for your review ~
是否可以对项目结构的设计原则做一下说明?
项目结构:
- api: proto 文件及存根
- cmd: 如果有cli工具(通过go install可以安装的)
- docs: 文档
- internal: 不想暴露给SDK用户的代码
对于可暴露的代码,建议取消pkg,直接暴露出来,比如 client、transport ...
Please resolve the data race:
==================
WARNING: DATA RACE
Read at 0x00c000025d70 by goroutine 12:
github.com/opensergo/opensergo-go/pkg/proto/transport/v1.(*openSergoUniversalTransportServiceSubscribeConfigClient).Send()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/proto/transport/v1/protocol_grpc.pb.go:56 +0x33
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).UnsubscribeConfig()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:205 +0x44f
main.ss()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/samples/main/main.go:75 +0xb4
main.main.func1()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/samples/main/main.go:66 +0x8c
Previous write at 0x00c000025d70 by goroutine 9:
github.com/opensergo/opensergo-go/pkg/proto/transport/v1.(*openSergoUniversalTransportServiceClient).SubscribeConfig()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/proto/transport/v1/protocol_grpc.pb.go:41 +0x106
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).Start()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:126 +0x219
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:104 +0xae
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).Start.func2()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:121 +0x39
Goroutine 12 (running) created at:
main.main()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/samples/main/main.go:66 +0x36f
Goroutine 9 (running) created at:
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).Start()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:121 +0x179
main.main()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/samples/main/main.go:56 +0x299
==================
==================
WARNING: DATA RACE
Read at 0x00c0000a1070 by goroutine 12:
google.golang.org/grpc.(*clientStream).SendMsg()
/Users/sczyh30/go/pkg/mod/google.golang.org/[email protected]/stream.go:781 +0x132
github.com/opensergo/opensergo-go/pkg/proto/transport/v1.(*openSergoUniversalTransportServiceSubscribeConfigClient).Send()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/proto/transport/v1/protocol_grpc.pb.go:56 +0x54
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).UnsubscribeConfig()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:205 +0x44f
main.ss()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/samples/main/main.go:75 +0xb4
main.main.func1()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/samples/main/main.go:66 +0x8c
Previous write at 0x00c0000a1070 by goroutine 9:
google.golang.org/grpc.newClientStreamWithParams()
/Users/sczyh30/go/pkg/mod/google.golang.org/[email protected]/stream.go:278 +0xc16
google.golang.org/grpc.newClientStream.func2()
/Users/sczyh30/go/pkg/mod/google.golang.org/[email protected]/stream.go:184 +0x199
google.golang.org/grpc.newClientStream()
/Users/sczyh30/go/pkg/mod/google.golang.org/[email protected]/stream.go:212 +0x713
google.golang.org/grpc.(*ClientConn).NewStream()
/Users/sczyh30/go/pkg/mod/google.golang.org/[email protected]/stream.go:158 +0x2d6
github.com/opensergo/opensergo-go/pkg/proto/transport/v1.(*openSergoUniversalTransportServiceClient).SubscribeConfig()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/proto/transport/v1/protocol_grpc.pb.go:37 +0xe1
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).Start()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:126 +0x219
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:104 +0xae
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).Start.func2()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:121 +0x39
Goroutine 12 (running) created at:
main.main()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/samples/main/main.go:66 +0x36f
Goroutine 9 (running) created at:
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).Start()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:121 +0x179
main.main()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/samples/main/main.go:56 +0x299
==================
==================
WARNING: DATA RACE
Read at 0x00c000260f00 by goroutine 53:
runtime.mapaccess2()
/usr/local/opt/go/libexec/src/runtime/map.go:456 +0x0
sync.(*Map).Load()
/usr/local/opt/go/libexec/src/sync/map.go:112 +0x98
github.com/opensergo/opensergo-go/pkg/transport/subscribe.SubscriberRegistry.GetSubscribersOf()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/transport/subscribe/subscriber_registry.go:51 +0xf3
github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).onSubscribeDataNotify()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:220 +0x524
github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).doHandleReceive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:163 +0x48d
github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).handleReceive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:147 +0x144
github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).doHandle()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:109 +0x105
github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).doHandle()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:111 +0x112
github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).doHandle()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:111 +0x112
github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).doHandle()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:111 +0x112
github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).Start.func2()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:57 +0x39
Previous write at 0x00c000260f00 by goroutine 12:
runtime.mapdelete()
/usr/local/opt/go/libexec/src/runtime/map.go:695 +0x0
sync.(*Map).LoadAndDelete()
/usr/local/opt/go/libexec/src/sync/map.go:285 +0x1a4
sync.(*Map).Delete()
/usr/local/opt/go/libexec/src/sync/map.go:301 +0x504
github.com/opensergo/opensergo-go/pkg/transport/subscribe.(*SubscriberRegistry).RemoveSubscribers()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/transport/subscribe/subscriber_registry.go:61 +0x4bd
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).UnsubscribeConfig()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:212 +0x4bf
main.ss()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/samples/main/main.go:75 +0xb4
main.main.func1()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/samples/main/main.go:66 +0x8c
Please pay attention to the data race problem. You might need to run the demo and test cases with the -race arg.
Adding some unit tests might be needed for better quality (PS: Java SDK also lacks test cases, which is a bad case :(
==================
WARNING: DATA RACE
Read at 0x00c000260de0 by goroutine 53:
github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).onSubscribeDataNotify()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:220 +0x49b
github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).doHandleReceive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:163 +0x48d
github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).handleReceive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:147 +0x144
github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).doHandle()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:109 +0x105
github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).doHandle()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:111 +0x112
github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).doHandle()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:111 +0x112
github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).doHandle()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:111 +0x112
github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).Start.func2()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:57 +0x39
Previous write at 0x00c000260de0 by goroutine 12:
sync/atomic.AddInt32()
/usr/local/opt/go/libexec/src/runtime/race_amd64.s:281 +0xb
sync/atomic.AddInt32()
<autogenerated>:1 +0x1a
sync.(*Map).LoadAndDelete()
/usr/local/opt/go/libexec/src/sync/map.go:291 +0x1cb
sync.(*Map).Delete()
/usr/local/opt/go/libexec/src/sync/map.go:301 +0x504
github.com/opensergo/opensergo-go/pkg/transport/subscribe.(*SubscriberRegistry).RemoveSubscribers()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/transport/subscribe/subscriber_registry.go:61 +0x4bd
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).UnsubscribeConfig()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:212 +0x4bf
main.ss()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/samples/main/main.go:75 +0xb4
main.main.func1()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/samples/main/main.go:66 +0x8c
Panic occurred (additional context to be updated):
{"logLevel":"ERROR","timestamp":"2022-11-21 17:16:41.683","caller":"logger.go:163","msg":"[OpenSergo SDK] [subscribeResponseId:] panic occurred when invoking doHandleReceive() for subscribe.","error":"runtime error: invalid memory address or nil pointer dereference"}
==================
WARNING: DATA RACE
Read at 0x00c00008c6f0 by goroutine 17:
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).SubscribeConfig()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:161 +0x8b
main.main.func1()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/samples/main/main.go:72 +0x144
Previous write at 0x00c00008c6f0 by goroutine 14:
github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:91 +0x275
github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).Start.func1()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:54 +0x39
Goroutine 17 (running) created at:
main.main()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/samples/main/main.go:68 +0x3d6
Goroutine 14 (finished) created at:
github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).Start()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:54 +0xe4
github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).Start()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:137 +0x3b5
main.main()
/Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/samples/main/main.go:56 +0x2a5
==================
As discussed and debugged with @jnan806, these weird race conditions were mainly caused by non-thread-safe access of subscribeConfigStream of OpenSergoClient.
How to reproduce it: when a client has established a connection to the control plane, we restart the control plane; in the meantime we initiate Subscribe or Unsubscribe command concurrently.
As discussed and debugged with @jnan806, these weird race conditions were mainly caused by non-thread-safe access of
subscribeConfigStreamof OpenSergoClient.How to reproduce it: when a client has established a connection to the control plane, we restart the control plane; in the meantime we initiate Subscribe or Unsubscribe command concurrently.
Yeah, and has resolve concurrent-safety with several atomic.Value, and replacing recursion by for-loop or for-select.
Nice work. Since this is a large PR containing the initial implementation of the SDK, I'll create a merge commit for this PR.
Fabulous! Thanks for contributing!
PS: We may improve the SDK implementation later :)
Fabulous! Thanks for contributing!
PS: We may improve the SDK implementation later :)
It's my pleasure ~ And I will keep following and improving it. 😃