opensergo-go-sdk icon indicating copy to clipboard operation
opensergo-go-sdk copied to clipboard

Initial version of OpenSergo Go SDK

Open jnan806 opened this issue 3 years ago • 3 comments

初版 opensergo-go-sdk, 可配合 opensergo-control-plane 进行联调.

联调步骤: 详细可参考 README.md 或 main.go


OpenSergo Go SDK

How to use

scene 1 : subscribe config-data

package main

func main() {
    // add console logger (optional)
    //logging.NewConsoleLogger(logging.InfoLevel, logging.SeparateFormat)
    // add file logger (optional)
    //logging.NewFileLogger("/logs/opensergo/opensergo-universal-transport-service.log", logging.InfoLevel, logging.JsonFormat)
    
    // instant OpenSergoClient
    openSergoClient := client.NewOpenSergoClient("33.1.33.1",10246)
    
    // register SubscribeInfo of FaultToleranceRule
    // 1. instant SubscribeKey
    faultToleranceSubscribeKey := subscribe.NewSubscribeKey("default", "foo-app", configkind.ConfigKindRefFaultToleranceRule{})
    // 2. construct SubscribeInfo
    faultToleranceSubscribeInfo := client.NewSubscribeInfo(faultToleranceSubscribeKey)
    // 3. register
    openSergoClient.RegisterSubscribeInfo(faultToleranceSubscribeInfo)
    
    // start OpensergoClient
    openSergoClient.Start()
    
    // register after OpenSergoClient started
    // register SubscribeInfo of RateLimitStrategy
    rateLimitSubscribeKey := subscribe.NewSubscribeKey("default", "foo-app", configkind.ConfigKindRefRateLimitStrategy{})
    rateLimitSubscribeInfo := client.NewSubscribeInfo(*rateLimitSubscribeKey)
    openSergoClient.RegisterSubscribeInfo(rateLimitSubscribeInfo)

    select {}
}

scene 2 : subscribe config-data with custom-logic when config-data changed.

Add a subscriber by implements the function in subscribe.Subscriber.
There are some samples in sample directory : sample/main/sample_faulttolerance_rule_subscriber.go and sample_ratelimit_strategy_subscriber.go

    type SampleFaultToleranceRuleSubscriber struct {
    }
    
    func (sampleFaultToleranceRuleSubscriber SampleFaultToleranceRuleSubscriber) OnSubscribeDataUpdate(subscribeKey subscribe.SubscribeKey, dataSlice []protoreflect.ProtoMessage) bool {
        // TODO add custom-logic when config-data change
        // ......
        return true
    }

And then register it to subscriber.SubscriberRegistry.
Demo-code is in sample/main directory

package main

func main() {
    // add console logger (optional)
    //logging.NewConsoleLogger(logging.InfoLevel, logging.SeparateFormat)
    // add file logger (optional)
    //logging.NewFileLogger("/logs/opensergo/opensergo-universal-transport-service.log", logging.InfoLevel, logging.JsonFormat)
    
    // instant OpenSergoClient
    openSergoClient := client.NewOpenSergoClient("33.1.33.1",10246)
    
    // registry SubscribeInfo of FaultToleranceRule
    // 1. instant SubscribeKey
    faultToleranceSubscribeKey := subscribe.NewSubscribeKey("default", "foo-app", configkind.ConfigKindRefFaultToleranceRule{})
    // 2. instant Subscriber
    sampleFaultToleranceRuleSubscriber := new(SampleFaultToleranceRuleSubscriber)
    // 3. construct SubscribeInfo
    faultToleranceSubscribeInfo := client.NewSubscribeInfo(faultToleranceSubscribeKey)
    faultToleranceSubscribeInfo.AppendSubscriber(sampleFaultToleranceRuleSubscriber)
    // 4. register
    openSergoClient.RegisterSubscribeInfo(faultToleranceSubscribeInfo)
    
    // register SubscribeInfo of RateLimitStrategy
    rateLimitSubscribeKey := subscribe.NewSubscribeKey("default", "foo-app", configkind.ConfigKindRefRateLimitStrategy{})
    sampleRateLimitStrategySubscriber := new(SampleRateLimitStrategySubscriber)
    rateLimitSubscribeInfo := client.NewSubscribeInfo(rateLimitSubscribeKey)
    rateLimitSubscribeInfo.AppendSubscriber(sampleRateLimitStrategySubscriber)
    openSergoClient.RegisterSubscribeInfo(rateLimitSubscribeInfo)
    
    // start OpensergoClient
    openSergoClient.Start()
    
    // register after OpenSergoClient started
    faultToleranceSubscribeInfo.AppendSubscriber(new(subscribe.DefaultSubscriber))
    openSergoClient.RegisterSubscribeInfo(faultToleranceSubscribeInfo)
    
    // register after OpenSergoClient started
    rateLimitSubscribeInfo.AppendSubscriber(new(subscribe.DefaultSubscriber))
    openSergoClient.RegisterSubscribeInfo(rateLimitSubscribeInfo)
    
    select {}
}

jnan806 avatar Sep 30 '22 14:09 jnan806

Could you please reformat your code with goimports?

sczyh30 avatar Oct 09 '22 03:10 sczyh30

Could you please reformat your code with goimports?

fixed ! thanks for your review ~

jnan806 avatar Oct 09 '22 15:10 jnan806

是否可以对项目结构的设计原则做一下说明?

项目结构:

  • api: proto 文件及存根
  • cmd: 如果有cli工具(通过go install可以安装的)
  • docs: 文档
  • internal: 不想暴露给SDK用户的代码

对于可暴露的代码,建议取消pkg,直接暴露出来,比如 client、transport ...

iamharvey avatar Oct 13 '22 03:10 iamharvey

Please resolve the data race:

==================
WARNING: DATA RACE
Read at 0x00c000025d70 by goroutine 12:
  github.com/opensergo/opensergo-go/pkg/proto/transport/v1.(*openSergoUniversalTransportServiceSubscribeConfigClient).Send()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/proto/transport/v1/protocol_grpc.pb.go:56 +0x33
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).UnsubscribeConfig()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:205 +0x44f
  main.ss()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/samples/main/main.go:75 +0xb4
  main.main.func1()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/samples/main/main.go:66 +0x8c

Previous write at 0x00c000025d70 by goroutine 9:
  github.com/opensergo/opensergo-go/pkg/proto/transport/v1.(*openSergoUniversalTransportServiceClient).SubscribeConfig()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/proto/transport/v1/protocol_grpc.pb.go:41 +0x106
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).Start()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:126 +0x219
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:104 +0xae
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).Start.func2()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:121 +0x39

Goroutine 12 (running) created at:
  main.main()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/samples/main/main.go:66 +0x36f

Goroutine 9 (running) created at:
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).Start()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:121 +0x179
  main.main()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/samples/main/main.go:56 +0x299
==================

sczyh30 avatar Nov 21 '22 08:11 sczyh30

==================
WARNING: DATA RACE
Read at 0x00c0000a1070 by goroutine 12:
  google.golang.org/grpc.(*clientStream).SendMsg()
      /Users/sczyh30/go/pkg/mod/google.golang.org/[email protected]/stream.go:781 +0x132
  github.com/opensergo/opensergo-go/pkg/proto/transport/v1.(*openSergoUniversalTransportServiceSubscribeConfigClient).Send()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/proto/transport/v1/protocol_grpc.pb.go:56 +0x54
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).UnsubscribeConfig()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:205 +0x44f
  main.ss()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/samples/main/main.go:75 +0xb4
  main.main.func1()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/samples/main/main.go:66 +0x8c

Previous write at 0x00c0000a1070 by goroutine 9:
  google.golang.org/grpc.newClientStreamWithParams()
      /Users/sczyh30/go/pkg/mod/google.golang.org/[email protected]/stream.go:278 +0xc16
  google.golang.org/grpc.newClientStream.func2()
      /Users/sczyh30/go/pkg/mod/google.golang.org/[email protected]/stream.go:184 +0x199
  google.golang.org/grpc.newClientStream()
      /Users/sczyh30/go/pkg/mod/google.golang.org/[email protected]/stream.go:212 +0x713
  google.golang.org/grpc.(*ClientConn).NewStream()
      /Users/sczyh30/go/pkg/mod/google.golang.org/[email protected]/stream.go:158 +0x2d6
  github.com/opensergo/opensergo-go/pkg/proto/transport/v1.(*openSergoUniversalTransportServiceClient).SubscribeConfig()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/proto/transport/v1/protocol_grpc.pb.go:37 +0xe1
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).Start()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:126 +0x219
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:104 +0xae
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).keepAlive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:107 +0xc7
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).Start.func2()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:121 +0x39

Goroutine 12 (running) created at:
  main.main()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/samples/main/main.go:66 +0x36f

Goroutine 9 (running) created at:
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).Start()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:121 +0x179
  main.main()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/samples/main/main.go:56 +0x299
==================

sczyh30 avatar Nov 21 '22 08:11 sczyh30

==================
WARNING: DATA RACE
Read at 0x00c000260f00 by goroutine 53:
  runtime.mapaccess2()
      /usr/local/opt/go/libexec/src/runtime/map.go:456 +0x0
  sync.(*Map).Load()
      /usr/local/opt/go/libexec/src/sync/map.go:112 +0x98
  github.com/opensergo/opensergo-go/pkg/transport/subscribe.SubscriberRegistry.GetSubscribersOf()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/transport/subscribe/subscriber_registry.go:51 +0xf3
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).onSubscribeDataNotify()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:220 +0x524
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).doHandleReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:163 +0x48d
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).handleReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:147 +0x144
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).doHandle()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:109 +0x105
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).doHandle()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:111 +0x112
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).doHandle()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:111 +0x112
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).doHandle()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:111 +0x112
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).Start.func2()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:57 +0x39

Previous write at 0x00c000260f00 by goroutine 12:
  runtime.mapdelete()
      /usr/local/opt/go/libexec/src/runtime/map.go:695 +0x0
  sync.(*Map).LoadAndDelete()
      /usr/local/opt/go/libexec/src/sync/map.go:285 +0x1a4
  sync.(*Map).Delete()
      /usr/local/opt/go/libexec/src/sync/map.go:301 +0x504
  github.com/opensergo/opensergo-go/pkg/transport/subscribe.(*SubscriberRegistry).RemoveSubscribers()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/transport/subscribe/subscriber_registry.go:61 +0x4bd
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).UnsubscribeConfig()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:212 +0x4bf
  main.ss()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/samples/main/main.go:75 +0xb4
  main.main.func1()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/samples/main/main.go:66 +0x8c

sczyh30 avatar Nov 21 '22 08:11 sczyh30

Please pay attention to the data race problem. You might need to run the demo and test cases with the -race arg.

sczyh30 avatar Nov 21 '22 08:11 sczyh30

Adding some unit tests might be needed for better quality (PS: Java SDK also lacks test cases, which is a bad case :(

sczyh30 avatar Nov 21 '22 08:11 sczyh30

==================
WARNING: DATA RACE
Read at 0x00c000260de0 by goroutine 53:
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).onSubscribeDataNotify()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:220 +0x49b
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).doHandleReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:163 +0x48d
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).handleReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:147 +0x144
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).doHandle()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:109 +0x105
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).doHandle()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:111 +0x112
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).doHandle()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:111 +0x112
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).doHandle()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:111 +0x112
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).Start.func2()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:57 +0x39

Previous write at 0x00c000260de0 by goroutine 12:
  sync/atomic.AddInt32()
      /usr/local/opt/go/libexec/src/runtime/race_amd64.s:281 +0xb
  sync/atomic.AddInt32()
      <autogenerated>:1 +0x1a
  sync.(*Map).LoadAndDelete()
      /usr/local/opt/go/libexec/src/sync/map.go:291 +0x1cb
  sync.(*Map).Delete()
      /usr/local/opt/go/libexec/src/sync/map.go:301 +0x504
  github.com/opensergo/opensergo-go/pkg/transport/subscribe.(*SubscriberRegistry).RemoveSubscribers()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/transport/subscribe/subscriber_registry.go:61 +0x4bd
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).UnsubscribeConfig()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:212 +0x4bf
  main.ss()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/samples/main/main.go:75 +0xb4
  main.main.func1()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/samples/main/main.go:66 +0x8c

sczyh30 avatar Nov 21 '22 09:11 sczyh30

Panic occurred (additional context to be updated):

{"logLevel":"ERROR","timestamp":"2022-11-21 17:16:41.683","caller":"logger.go:163","msg":"[OpenSergo SDK] [subscribeResponseId:] panic occurred when invoking doHandleReceive() for subscribe.","error":"runtime error: invalid memory address or nil pointer dereference"}

sczyh30 avatar Nov 21 '22 09:11 sczyh30

==================
WARNING: DATA RACE
Read at 0x00c00008c6f0 by goroutine 17:
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).SubscribeConfig()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:161 +0x8b
  main.main.func1()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/samples/main/main.go:72 +0x144

Previous write at 0x00c00008c6f0 by goroutine 14:
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:91 +0x275
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).Start.func1()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:54 +0x39

Goroutine 17 (running) created at:
  main.main()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/samples/main/main.go:68 +0x3d6

Goroutine 14 (finished) created at:
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).Start()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:54 +0xe4
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).Start()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:137 +0x3b5
  main.main()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/samples/main/main.go:56 +0x2a5
==================

sczyh30 avatar Nov 21 '22 13:11 sczyh30

As discussed and debugged with @jnan806, these weird race conditions were mainly caused by non-thread-safe access of subscribeConfigStream of OpenSergoClient.

How to reproduce it: when a client has established a connection to the control plane, we restart the control plane; in the meantime we initiate Subscribe or Unsubscribe command concurrently.

sczyh30 avatar Nov 21 '22 13:11 sczyh30

As discussed and debugged with @jnan806, these weird race conditions were mainly caused by non-thread-safe access of subscribeConfigStream of OpenSergoClient.

How to reproduce it: when a client has established a connection to the control plane, we restart the control plane; in the meantime we initiate Subscribe or Unsubscribe command concurrently.

Yeah, and has resolve concurrent-safety with several atomic.Value, and replacing recursion by for-loop or for-select.

jnan806 avatar Nov 22 '22 04:11 jnan806

Nice work. Since this is a large PR containing the initial implementation of the SDK, I'll create a merge commit for this PR.

sczyh30 avatar Nov 26 '22 13:11 sczyh30

Fabulous! Thanks for contributing!

PS: We may improve the SDK implementation later :)

sczyh30 avatar Nov 26 '22 13:11 sczyh30

Fabulous! Thanks for contributing!

PS: We may improve the SDK implementation later :)

It's my pleasure ~ And I will keep following and improving it. 😃

jnan806 avatar Nov 26 '22 14:11 jnan806