w3bstream
w3bstream copied to clipboard
`FlatMap` op
example code:
// rxgo.FromChannel(ch).FlatMap(flatMap)
func flatMap(item rxgo.Item) rxgo.Observable {
// 1.Serialize
b, err := json.Marshal(item.V.(models.SourceCustomer))
if err != nil {
l.Error(err)
}
// 2.Invoke wasm code
code := ins.HandleEvent(ctx, "flatMapBySpace", b).Code
l.Info(fmt.Sprintf("flatMapBySpace wasm code %d", code))
// 3.Get & parse data
if code < 0 {
l.Error(errors.New(fmt.Sprintf("%v %s error.", item.V.(models.SourceCustomer), "flatMapBySpace")))
//TODO handle exceptions
return nil
}
rb, ok := ins.GetResource(uint32(code))
defer ins.RmvResource(ctx, uint32(code))
if !ok {
l.Error(errors.New("flatMapBySpace result not found"))
//TODO handle exceptions
return nil
}
// []models.Customer is configured in yaml
var cus []models.Customer
err = json.Unmarshal(rb, &cus)
if err != nil {
l.Error(errors.New("flatMapBySpace result not found"))
//TODO handle exceptions
return nil
}
obs := rxgo.Just(cus)()
return obs
}