sdk-go icon indicating copy to clipboard operation
sdk-go copied to clipboard

Add support for visiting all payloads at once in proxy visitor interceptor

Open cretz opened this issue 2 years ago • 0 comments

Describe the solution you'd like

This is for the https://github.com/temporalio/api-go project but it doesn't accept issues.

Today the payload visitor in https://pkg.go.dev/go.temporal.io/api/proxy visits sets of payloads as they are seen one set at a time. Users want to collect these and then visit them together all at once. Here's how they collect them all in user-land:

func CollectAllPayloads(msg proto.Message) (payloads []*common.Payload, err error) {
	err = proxy.VisitPayloads(context.Background(), msg, proxy.VisitPayloadsOptions{
		Visitor: func(ctx *proxy.VisitPayloadsContext, p []*common.Payload) ([]*common.Payload, error) {
			payloads = append(payloads, p...)
			return p, nil
		},
		SkipSearchAttributes: true,
	})
	return
}

And a utility to apply something to all payloads:

func ApplyToAllPayloads(msg proto.Message, apply ApplyFunc) error {
	payloads, err := CollectAllPayloads(msg)
	if err != nil {
		return err
	}
	newPayloads, err := apply(payloads)
	if err != nil {
		return err
	}
	if len(newPayloads) != len(payloads) {
		return fmt.Errorf("expected %v payload(s), got %v", len(payloads), len(newPayloads))
	}
	// Clear and merge new payloads onto old ones
	// this mutates the payloads var
	for i, payload := range payloads {
		payload.Reset()
		proto.Merge(payload, newPayloads[i])
	}
	return nil
}

Then how they may invoke them all at once in the interceptor:

type AllPayloadVisitorInterceptorOptions struct {
	OutboundApplyAll func([]*common.Payload) ([]*common.Payload, error)
	InboundApplyAll  func([]*common.Payload) ([]*common.Payload, error)
}

func NewAllPayloadVisitorInterceptor(options AllPayloadVisitorInterceptorOptions) (grpc.UnaryClientInterceptor, error) {
	return func(ctx context.Context, method string, req, response interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
		if reqMsg, ok := req.(proto.Message); ok && options.OutboundApplyAll != nil {
			err := ApplyToAllPayloads(reqMsg, options.OutboundApplyAll)
			if err != nil {
				return err
			}
		}

		err := invoker(ctx, method, req, response, cc, opts...)
		if err != nil {
			return err
		}

		if resMsg, ok := response.(proto.Message); ok && options.InboundApplyAll != nil {
			return ApplyToAllPayloads(resMsg, options.InboundApplyAll)
		}

		return nil
	}, nil
}

Basically we need this functionality opt-in. The best solution is probably some kind of AllAtOnce bool option on VisitPayloadsOptions, with documentation saying that every output payload must be 1:1 with input, and that the payloads still need to be able to be handled individually on in/out (i.e. you can't encrypt them together as one blob, because they need to be isolated), and that VisitPayloadsContext.Parent will be nil for the single visit call we will make.

cretz avatar Jun 08 '23 22:06 cretz