eventing icon indicating copy to clipboard operation
eventing copied to clipboard

Injected CE Client should allow for customization

Open embano1 opened this issue 2 years ago • 18 comments

Typically, eventing sources use adapterv2.MainWithContext for injecting clients, etc. into the adapter to benefit from automatic logging configuration, metrics reporting, CR status integration (in case of send errors, etc.).

However, this does not allow for customization of the injected CloudEvent client since the called NewCloudEventsClientCRStatus does not accept http.Option compared to NewCloudEventsClientWithOptions.

https://github.com/knative/eventing/blob/31e4e2eb26d1f1987552027ab301b09f9b11557a/pkg/adapter/v2/cloudevents.go#L56

Thus, features like rate limits and custom retry codes (instead of the defaults used in CloudEvents sdk/go) cannot be configured. The current workaround is to not use the injected CE client and use a custom one which means mimicking a lot of the adapter injection magic - or not benefiting from it.

Some options that were discussed in a Slack thread:

  1. Don't use adapterv2.MainWithContext, mimic its behavior (if needed) and create custom CE client
  2. Default to using kncloudevents in adapterv2.MainWithContext to have standardized behavior across Knative-related projects (sources, sinks, brokers, etc.)
  3. Change adapterv2.MainWithContext to allow for customization, e.g. WithOptions() (depending on the implementation, this could be a non-breaking API change)
  4. Introduce a new env to configure the CE client, e.g. K_CE_CLIENT_CONFIG similar to how we configure logging and metrics in Kn (idea proposed by @odacremolbap)
  5. Only for retries/delivery though: Allow to pass a DeliverySpec and extend it to support retryable status codes (non-breaking API change IMHO)

Initially I liked the idea of K_CE_CLIENT_CONFIG to customize the CE client (e.g. retry config), but thinking more about it it could certainly overlap with objects we already have (e.g. DeliverySpec) and thus could confuse the user, lead to a lot of redundant code and complexity. On the other hand, DeliverySpec does not cover all CE client options, e.g. rate limits, HTTP options, etc.

cc/ @salaboy @evankanderson @benmoss @odacremolbap

embano1 avatar Nov 10 '21 09:11 embano1

Just to make sure, can you elaborate on the behavior for the source you are building?

The DeliverySpec can be configured with a DeadLetterSink which I don't think is in the scope of CloudEvents HTTP options. Also the DeliverySpec at the InMemoryChannel afaik does not leverage CloudEvents options to retry and backoff.

On the retries, there is a default retriable function that matches some error status codes. Is that something you wish to customize? Regarding the rate limiter, I think it should make a lot more sense to use it at a sink rather than at a source.

odacremolbap avatar Nov 10 '21 15:11 odacremolbap

A concrete example is this source. Across several of the Tanzu related Knative projects (brokers, sources, etc.) we‘re trying to standardize on kncloudevents to keep client logic similar. In this particular case its retry codes which requires me to customize the injected CE client (without completely swapping the client for now).

I agree that DeliverySpec might not be the right way to make generic modifications of the CE client. Alternatively to this proposal, we might think about adding retryable status codes to the DeliverySpec so users can define this behavior.

On the retries, there is a default retriable function that matches some error status codes. Is that something you wish to customize?

Exactly, wrapping this into an http.Option.

As more and more systems offer webhook integration, I foresee more use cases around sources implementing a webhook endpoint. In fact, this is something I wanted to build and contribute as a sandbox just haven‘t had time for it. Hence the need for inbound rate limiting as another option (I recently added this to CE Go SDK driven by this use case): https://github.com/cloudevents/sdk-go/pull/722

embano1 avatar Nov 10 '21 19:11 embano1

I think I'd lean most towards 2 or 3. Injecting the client via context seems like a pattern we use all over the rest of the place, why not here too?

benmoss avatar Nov 11 '21 21:11 benmoss

@embano1 the rate limiter at the CE producer client confused me (I still think that is not a valid scenario) but customizing HTTP options for custom retrying when sending sounds good.

Focusing on that send function, the context that gets evaluated when configuring the retry strategy is passed from the caller.

It does not seem to be configured at the CE client but at each call, adding this before each call would make it:

import cecontext "github.com/cloudevents/sdk-go/v2/context"

...

ctx = cecontext.WithRetriesExponentialBackoff(ctx, time.Second * seconds, retries)
result := h.ceClient.Send(ctx, yourEvent)

If you want to customize the isRetriable function (maybe to add a RetryAlways function that does not depend on status code), then that would be the case where injecting by context would be needed, although I think it would be nice to do that at the CloudEvents SDK to allow HTTP options via context, then using it from here.

The env var that I mentioned would be intended in case we want to provide a generic way of configuring how CE are produced so that admins can configure the source adapters with their own settings, but that is a step further.

Would the per request context configuration solve your needs or would you still need customization of the adapter's CE client when created?

odacremolbap avatar Nov 17 '21 10:11 odacremolbap

I think I'd lean most towards 2 or 3. Injecting the client via context seems like a pattern we use all over the rest of the place, why not here too?

In order to keep the goodies of the factory, e.g. event recorder and tracing, how could that be achieved if I only want to customize certain options instead of supplying/creating my own CE client? (more thinking about UX here)

embano1 avatar Nov 19 '21 09:11 embano1

the rate limiter at the CE producer client confused me (I still think that is not a valid scenario) but customizing HTTP options for custom retrying when sending sounds good.

Any form of bulk-heading, including rate limiting/quotas, to protect a service from overload is considered good practice so I think this is still a valid scenario to add inbound rate limiting to a CE client/receiver. Should I explain this further or does this resiliency aspect make sense?

It does not seem to be configured at the CE client but at each call

Yup, but currently that doesn't allow to customize retry behavior (codes) since this is hardcoded in the client of not passed via Option.

then that would be the case where injecting by context would be needed

Not following this one. Are you referring to what Ben said in terms of injecting a full client in the ctx or adding a new retry strategy (status codes) to the cecontext?

although I think it would be nice to do that at the CloudEvents SDK to allow HTTP options via context, then using it from here

Sounds like a breaking change to the SDK which might be unlikely to happen?

Would the per request context configuration solve your needs or would you still need customization of the adapter's CE client when created?

Totally, since we already use WithRetriesExponentialBackoff etc. So are you suggesting a new field here in RetryParams and allow for custom status codes/callback to overwrite the default retry status codes (isRetriableFunc )? This would be a non-breaking change in https://github.com/cloudevents/sdk-go/blob/82f2b61ecde41fd0577969cc58a7c2a18eeda250/v2/protocol/http/protocol_retry.go#L51

Update:

Created an issue to discuss in sdk-go: https://github.com/cloudevents/sdk-go/issues/740 Challenge is that cecontext is supposed to be protocol agnostic so we need to come up with a generic IsRetriable implemenation.

embano1 avatar Nov 19 '21 09:11 embano1

Any form of bulk-heading, including rate limiting/quotas, to protect a service from overload is considered good practice so I think this is still a valid scenario to add inbound rate limiting to a CE client/receiver.

The receiver case is clear (and people like me who have been developing sinks using the adapter would benefit from it, I'm all in for this), I am still not sure about the CE producer scenario. Do you mean to buffer or discard events at sources when the configured sink returns 429?

currently that doesn't allow to customize retry behavior (codes) since this is hardcoded

:+1: so you want to customize the isRetriable, then you are right and can forget my suggestion about adding that WithRetries... at the comment above.

Challenge is that cecontext is supposed to be protocol agnostic so we need to come up with a generic IsRetriable implemenation.

Nice, that is in the direction of what I meant. And as you say, it will be very hard to make it generic, so what if we can create context options for HTTP CE Client? the equivalent of this file here:

https://github.com/cloudevents/sdk-go/blob/main/v2/context/context.go

but at this package:

https://github.com/cloudevents/sdk-go/tree/main/v2/protocol/http

Being a context configured option just for HTTP protocol that would not be a breaking change.

odacremolbap avatar Nov 19 '21 10:11 odacremolbap

The receiver case is clear (and people like me who have been developing sinks using the adapter would benefit from it, I'm all in for this), I am still not sure about the CE producer scenario. Do you mean to buffer or discard events at sources when the configured sink returns 429?

Oh, I was only talking about the receiver case here (as a webhook source will receive incoming events and hence need to protect itself). Evan made a comment to also have rate limiting for Send(), e.g. to a broker or other sink since this is also highly desirable when the downstream has a hardcoded limit one should adhere too (and avoid dealing with 429s right away). Naturally, this gets you into buffering issues which the client needs to handle (either buffer or drop at a certain threshold). Since the exact use case/API is not clear we did not add it yet to the SDK.

so what if we can create context options for HTTP CE Client

Certainly an option to discuss in the linked issue (feel free to propose), but not sure what the maintainers will say in terms of API/UX and code duplication.

embano1 avatar Nov 19 '21 10:11 embano1

Will do, thanks for pushing this @embano1 !!

odacremolbap avatar Nov 19 '21 10:11 odacremolbap

After a great chat with @odacremolbap here's a proposal how to allow for custom CE client []http.Option in the adapter injection phase without breaking the API:

  1. Add a new ctx key for passing []http.Option
  2. Pass the modified ctx in adapter.MainWithContext as usual
  3. Create a new public constructor adapter. NewCloudEventsClientCRStatusWithOptions with same signature as adapter. NewCloudEventsClientCRStatus but variadic parameter opts ...http.Option (to comply with how other constructors with options are named)
  4. In adapter. MainWithInformers check for the presence of the ctx key
  5. If key is present call the new constructor, otherwise keep behavior as is
// NewCloudEventsClientCRStatusWithOptions returns a client CR status with provided options
func NewCloudEventsClientCRStatusWithOptions(env EnvConfigAccessor, reporter source.StatsReporter, crStatusEventClient *crstatusevent.CRStatusEventClient, opts ...http.Option) (cloudevents.Client, error) {
	return newCloudEventsClientCRStatus(env, nil, reporter, crStatusEventClient, opts)
}

embano1 avatar Nov 19 '21 11:11 embano1

@embano1 I am chewing on this, and thinking that there are way too many constructors while we are providing one more. What about creating a CE Config structure:

type CloudEventsConfig struct {
	CloudEventsHTTPOptions []http.Option
	CloudEventsOptions []cloudeventsclient.Option
	CRStatusEventClient *crstatusevent.CRStatusEventClient
	...	
}

providing a default configuration

func defaultCloudEventsConfig() *CloudEventsConfig {
	return &CloudEventsConfig{
		CRStatusEventClient: crstatusevent.GetDefaultClient(),
                ...
	}
}

And then building the client with all provided options

func NewCloudEventsClient(opts ...CloudEventsClientOption) (cloudevents.Client, error) {

	cfg := defaultCloudEventsConfig()
	for _, opt := range opts {
		opt(cfg)
	}

	ceClient, err := newClientHTTPObserved(cfg.CloudEventsHTTPOptions, cfg.CloudEventsOptions)

        ...

The configuration element should be provided:

func WithCloudEventsHTTPOptions(opts []http.Option) CloudEventsClientOption { ... }
func WithCloudEventsOptions(opts []cloudeventsclient.Option) CloudEventsClientOption { ... }
func WithCloudEventsFromEnvAccessorOptions(env EnvConfigAccessor) CloudEventsClientOption { ... }

Building the client would be

NewCloudEventsClient(
    WithCloudEventsFromEnvAccessorOptions(env),
    WithCloudEventsHTTPOptions(your-options),
)

odacremolbap avatar Nov 22 '21 08:11 odacremolbap

and thinking that there are way too many constructors

Agree.

And then building the client with all provided options

I'm a big fan of the options pattern, so 100% ack.

Are you still proposing a change inside MainWithInformers, i.e. non-breaking to controller users and basically only modify this path? :

https://github.com/vmware-tanzu/sources-for-knative/blob/eb93df2f1492ddaef6e9b931549cd574b0776529/vendor/knative.dev/eventing/pkg/adapter/v2/main.go#L147-L165

embano1 avatar Nov 22 '21 09:11 embano1

Btw: something to also bring up in our discussion whether it's safe to assume that this CE client will be replaced by pkg/kncloudevents anytime soon, rendering some of these discussions obsolete.

embano1 avatar Nov 22 '21 09:11 embano1

Are you still proposing a change inside MainWithInformers

Yes. Using a new CloudEvents client constructor we could keep backwards compatibility but inject by context any option.

	NewCloudEventsHTTPClient(
		WithCloudEventsFromEnvAccessorOptions(env),
		WithCRStatusEventClient(crStatusEventClient),
		WithSourceStatsReporter(reporter),
		WithCloudEventsClientContextOptions(ctx),
	)

Regarding pkg/kncloudevents, that one seems to be using net/http, CloudEvents SDK only used to populate requests with additional headers.

I'm guessing that the reason why there are a CloudEvents related package under pkg/adapter/v2 and pkg/kncloudevents is because the former started for sources that produce CE only, and the later was focused on receiving and propagating, and along the way providing KProbes with health info.

My gut feeling is that there should be a curated CE receiver and a CE sender provided by eventing for any external source or sink that use them. Things might get a bit blurred because CE Client interface includes both sender and receiver in a single interface, plus the kncloudevents package does not only receive CEs but sends them, even implementing their own retries, without relying on CE SDK code.

We could open a different discussion to make clear which one to use in which cases, if the kncloudevents should be using CE SDK retries instead of implementing their owns, and if the client implementation under pkg/adapter/v2 should be moved to pkg/kncloudevents.

odacremolbap avatar Nov 22 '21 12:11 odacremolbap

Using a new CloudEvents client constructor we could keep backwards compatibility but inject by context any option.

Nice.

We could open a different discussion to make clear which one to use in which cases

Let's bring this up in the next eventing call too.

embano1 avatar Nov 23 '21 08:11 embano1

In order to keep the goodies of the factory, e.g. event recorder and tracing, how could that be achieved if I only want to customize certain options instead of supplying/creating my own CE client? (more thinking about UX here)

I think those would just have to be lifted into some constructor (ideally the same one that the factory uses) that also exposes configuring additional options

I think I prefer this because the user has control this way over the final object (the cloudevents client) rather than exposing piecemeal some options. If they want to use something completely different that satisfies the CE client interface, they're free to do so, so we have unlimited flexibility. Injecting options just means that we are stuck with the implementation that NewCloudEventsHTTPClient gives us, the flexibility is solely in whatever WithCloudEventsClientContextOptions gives you within that configuration space.

benmoss avatar Dec 01 '21 14:12 benmoss

/assign

odacremolbap avatar Jan 31 '22 09:01 odacremolbap

cc @creydr

pierDipi avatar Jul 06 '23 15:07 pierDipi

Closing this for now as there hasn't been any recent update/interest about this issue

pierDipi avatar Mar 27 '24 18:03 pierDipi