w3bstream icon indicating copy to clipboard operation
w3bstream copied to clipboard

`FlatMap` op

Open hunshenshi opened this issue 3 years ago • 0 comments

example code:

// rxgo.FromChannel(ch).FlatMap(flatMap)

func flatMap(item rxgo.Item) rxgo.Observable {

	// 1.Serialize
	b, err := json.Marshal(item.V.(models.SourceCustomer))
	if err != nil {
		l.Error(err)
	}

	// 2.Invoke wasm code
	code := ins.HandleEvent(ctx, "flatMapBySpace", b).Code
	l.Info(fmt.Sprintf("flatMapBySpace wasm code %d", code))

	// 3.Get & parse data
	if code < 0 {
		l.Error(errors.New(fmt.Sprintf("%v %s error.", item.V.(models.SourceCustomer), "flatMapBySpace")))
		//TODO handle exceptions
		return nil
	}

	rb, ok := ins.GetResource(uint32(code))
	defer ins.RmvResource(ctx, uint32(code))
	if !ok {
		l.Error(errors.New("flatMapBySpace result not found"))
		//TODO handle exceptions
		return nil
	}

	// []models.Customer is configured in yaml
	var cus []models.Customer
	err = json.Unmarshal(rb, &cus)
	if err != nil {
		l.Error(errors.New("flatMapBySpace result not found"))
		//TODO handle exceptions
		return nil
	}

	obs := rxgo.Just(cus)()
	return obs
}

hunshenshi avatar Feb 27 '23 05:02 hunshenshi