eventing
eventing copied to clipboard
Injected CE Client should allow for customization
Typically, eventing sources
use adapterv2.MainWithContext
for injecting clients, etc. into the adapter to benefit from automatic logging configuration, metrics reporting, CR status integration (in case of send errors, etc.).
However, this does not allow for customization of the injected CloudEvent client
since the called NewCloudEventsClientCRStatus
does not accept http.Option
compared to NewCloudEventsClientWithOptions
.
https://github.com/knative/eventing/blob/31e4e2eb26d1f1987552027ab301b09f9b11557a/pkg/adapter/v2/cloudevents.go#L56
Thus, features like rate limits and custom retry codes (instead of the defaults used in CloudEvents sdk/go
) cannot be configured. The current workaround is to not use the injected CE client
and use a custom one which means mimicking a lot of the adapter
injection magic - or not benefiting from it.
Some options that were discussed in a Slack thread:
- Don't use
adapterv2.MainWithContext
, mimic its behavior (if needed) and create custom CEclient
- Default to using
kncloudevents
inadapterv2.MainWithContext
to have standardized behavior across Knative-related projects (sources
,sinks
,brokers
, etc.) - Change
adapterv2.MainWithContext
to allow for customization, e.g.WithOptions()
(depending on the implementation, this could be a non-breaking API change) - Introduce a new env to configure the CE
client
, e.g.K_CE_CLIENT_CONFIG
similar to how we configure logging and metrics in Kn (idea proposed by @odacremolbap) - Only for retries/delivery though: Allow to pass a
DeliverySpec
and extend it to support retryable status codes (non-breaking API change IMHO)
Initially I liked the idea of K_CE_CLIENT_CONFIG
to customize the CE client
(e.g. retry config), but thinking more about it it could certainly overlap with objects we already have (e.g. DeliverySpec
) and thus could confuse the user, lead to a lot of redundant code and complexity. On the other hand, DeliverySpec
does not cover all CE client
options, e.g. rate limits, HTTP options, etc.
cc/ @salaboy @evankanderson @benmoss @odacremolbap
Just to make sure, can you elaborate on the behavior for the source you are building?
The DeliverySpec
can be configured with a DeadLetterSink
which I don't think is in the scope of CloudEvents HTTP options.
Also the DeliverySpec
at the InMemoryChannel afaik does not leverage CloudEvents options to retry and backoff.
On the retries, there is a default retriable function that matches some error status codes. Is that something you wish to customize? Regarding the rate limiter, I think it should make a lot more sense to use it at a sink rather than at a source.
A concrete example is this source. Across several of the Tanzu related Knative projects (brokers, sources, etc.) we‘re trying to standardize on kncloudevents
to keep client logic similar. In this particular case its retry codes which requires me to customize the injected CE client (without completely swapping the client for now).
I agree that DeliverySpec
might not be the right way to make generic modifications of the CE client.
Alternatively to this proposal, we might think about adding retryable status codes to the DeliverySpec
so users can define this behavior.
On the retries, there is a default retriable function that matches some error status codes. Is that something you wish to customize?
Exactly, wrapping this into an http.Option
.
As more and more systems offer webhook integration, I foresee more use cases around sources
implementing a webhook endpoint. In fact, this is something I wanted to build and contribute as a sandbox just haven‘t had time for it. Hence the need for inbound rate limiting as another option (I recently added this to CE Go SDK driven by this use case): https://github.com/cloudevents/sdk-go/pull/722
I think I'd lean most towards 2 or 3. Injecting the client via context seems like a pattern we use all over the rest of the place, why not here too?
@embano1 the rate limiter at the CE producer client confused me (I still think that is not a valid scenario) but customizing HTTP options for custom retrying when sending sounds good.
Focusing on that send function, the context that gets evaluated when configuring the retry strategy is passed from the caller.
It does not seem to be configured at the CE client but at each call, adding this before each call would make it:
import cecontext "github.com/cloudevents/sdk-go/v2/context"
...
ctx = cecontext.WithRetriesExponentialBackoff(ctx, time.Second * seconds, retries)
result := h.ceClient.Send(ctx, yourEvent)
If you want to customize the isRetriable
function (maybe to add a RetryAlways function that does not depend on status code), then that would be the case where injecting by context would be needed, although I think it would be nice to do that at the CloudEvents SDK to allow HTTP options via context, then using it from here.
The env var that I mentioned would be intended in case we want to provide a generic way of configuring how CE are produced so that admins can configure the source adapters with their own settings, but that is a step further.
Would the per request context configuration solve your needs or would you still need customization of the adapter's CE client when created?
I think I'd lean most towards 2 or 3. Injecting the client via context seems like a pattern we use all over the rest of the place, why not here too?
In order to keep the goodies of the factory, e.g. event recorder and tracing, how could that be achieved if I only want to customize certain options instead of supplying/creating my own CE client? (more thinking about UX here)
the rate limiter at the CE producer client confused me (I still think that is not a valid scenario) but customizing HTTP options for custom retrying when sending sounds good.
Any form of bulk-heading, including rate limiting/quotas, to protect a service from overload is considered good practice so I think this is still a valid scenario to add inbound rate limiting to a CE client/receiver. Should I explain this further or does this resiliency aspect make sense?
It does not seem to be configured at the CE client but at each call
Yup, but currently that doesn't allow to customize retry behavior (codes) since this is hardcoded in the client of not passed via Option
.
then that would be the case where injecting by context would be needed
Not following this one. Are you referring to what Ben said in terms of injecting a full client in the ctx
or adding a new retry strategy (status codes) to the cecontext
?
although I think it would be nice to do that at the CloudEvents SDK to allow HTTP options via context, then using it from here
Sounds like a breaking change to the SDK which might be unlikely to happen?
Would the per request context configuration solve your needs or would you still need customization of the adapter's CE client when created?
Totally, since we already use WithRetriesExponentialBackoff
etc. So are you suggesting a new field here in RetryParams
and allow for custom status codes/callback to overwrite the default retry status codes (isRetriableFunc
)? This would be a non-breaking change in https://github.com/cloudevents/sdk-go/blob/82f2b61ecde41fd0577969cc58a7c2a18eeda250/v2/protocol/http/protocol_retry.go#L51
Update:
Created an issue to discuss in sdk-go: https://github.com/cloudevents/sdk-go/issues/740
Challenge is that cecontext
is supposed to be protocol agnostic so we need to come up with a generic IsRetriable
implemenation.
Any form of bulk-heading, including rate limiting/quotas, to protect a service from overload is considered good practice so I think this is still a valid scenario to add inbound rate limiting to a CE client/receiver.
The receiver case is clear (and people like me who have been developing sinks using the adapter would benefit from it, I'm all in for this), I am still not sure about the CE producer scenario. Do you mean to buffer or discard events at sources when the configured sink returns 429?
currently that doesn't allow to customize retry behavior (codes) since this is hardcoded
:+1: so you want to customize the isRetriable
, then you are right and can forget my suggestion about adding that WithRetries...
at the comment above.
Challenge is that cecontext is supposed to be protocol agnostic so we need to come up with a generic IsRetriable implemenation.
Nice, that is in the direction of what I meant. And as you say, it will be very hard to make it generic, so what if we can create context options for HTTP CE Client? the equivalent of this file here:
https://github.com/cloudevents/sdk-go/blob/main/v2/context/context.go
but at this package:
https://github.com/cloudevents/sdk-go/tree/main/v2/protocol/http
Being a context configured option just for HTTP protocol that would not be a breaking change.
The receiver case is clear (and people like me who have been developing sinks using the adapter would benefit from it, I'm all in for this), I am still not sure about the CE producer scenario. Do you mean to buffer or discard events at sources when the configured sink returns 429?
Oh, I was only talking about the receiver case here (as a webhook source
will receive incoming events and hence need to protect itself). Evan made a comment to also have rate limiting for Send()
, e.g. to a broker
or other sink
since this is also highly desirable when the downstream has a hardcoded limit one should adhere too (and avoid dealing with 429s right away). Naturally, this gets you into buffering issues which the client needs to handle (either buffer or drop at a certain threshold). Since the exact use case/API is not clear we did not add it yet to the SDK.
so what if we can create context options for HTTP CE Client
Certainly an option to discuss in the linked issue (feel free to propose), but not sure what the maintainers will say in terms of API/UX and code duplication.
Will do, thanks for pushing this @embano1 !!
After a great chat with @odacremolbap here's a proposal how to allow for custom CE client []http.Option
in the adapter injection phase without breaking the API:
- Add a new
ctx
key for passing[]http.Option
- Pass the modified
ctx
inadapter.MainWithContext
as usual - Create a new public constructor
adapter. NewCloudEventsClientCRStatusWithOptions
with same signature asadapter. NewCloudEventsClientCRStatus
but variadic parameteropts ...http.Option
(to comply with how other constructors with options are named) - In
adapter. MainWithInformers
check for the presence of thectx
key - If key is present call the new constructor, otherwise keep behavior as is
// NewCloudEventsClientCRStatusWithOptions returns a client CR status with provided options
func NewCloudEventsClientCRStatusWithOptions(env EnvConfigAccessor, reporter source.StatsReporter, crStatusEventClient *crstatusevent.CRStatusEventClient, opts ...http.Option) (cloudevents.Client, error) {
return newCloudEventsClientCRStatus(env, nil, reporter, crStatusEventClient, opts)
}
@embano1 I am chewing on this, and thinking that there are way too many constructors while we are providing one more. What about creating a CE Config structure:
type CloudEventsConfig struct {
CloudEventsHTTPOptions []http.Option
CloudEventsOptions []cloudeventsclient.Option
CRStatusEventClient *crstatusevent.CRStatusEventClient
...
}
providing a default configuration
func defaultCloudEventsConfig() *CloudEventsConfig {
return &CloudEventsConfig{
CRStatusEventClient: crstatusevent.GetDefaultClient(),
...
}
}
And then building the client with all provided options
func NewCloudEventsClient(opts ...CloudEventsClientOption) (cloudevents.Client, error) {
cfg := defaultCloudEventsConfig()
for _, opt := range opts {
opt(cfg)
}
ceClient, err := newClientHTTPObserved(cfg.CloudEventsHTTPOptions, cfg.CloudEventsOptions)
...
The configuration element should be provided:
func WithCloudEventsHTTPOptions(opts []http.Option) CloudEventsClientOption { ... }
func WithCloudEventsOptions(opts []cloudeventsclient.Option) CloudEventsClientOption { ... }
func WithCloudEventsFromEnvAccessorOptions(env EnvConfigAccessor) CloudEventsClientOption { ... }
Building the client would be
NewCloudEventsClient(
WithCloudEventsFromEnvAccessorOptions(env),
WithCloudEventsHTTPOptions(your-options),
)
and thinking that there are way too many constructors
Agree.
And then building the client with all provided options
I'm a big fan of the options pattern, so 100% ack.
Are you still proposing a change inside MainWithInformers
, i.e. non-breaking to controller users and basically only modify this path? :
https://github.com/vmware-tanzu/sources-for-knative/blob/eb93df2f1492ddaef6e9b931549cd574b0776529/vendor/knative.dev/eventing/pkg/adapter/v2/main.go#L147-L165
Btw: something to also bring up in our discussion whether it's safe to assume that this CE client will be replaced by pkg/kncloudevents
anytime soon, rendering some of these discussions obsolete.
Are you still proposing a change inside MainWithInformers
Yes. Using a new CloudEvents client constructor we could keep backwards compatibility but inject by context any option.
NewCloudEventsHTTPClient(
WithCloudEventsFromEnvAccessorOptions(env),
WithCRStatusEventClient(crStatusEventClient),
WithSourceStatsReporter(reporter),
WithCloudEventsClientContextOptions(ctx),
)
Regarding pkg/kncloudevents
, that one seems to be using net/http
, CloudEvents SDK only used to populate requests with additional headers.
I'm guessing that the reason why there are a CloudEvents related package under pkg/adapter/v2
and pkg/kncloudevents
is because the former started for sources that produce CE only, and the later was focused on receiving and propagating, and along the way providing KProbes with health info.
My gut feeling is that there should be a curated CE receiver and a CE sender provided by eventing for any external source or sink that use them. Things might get a bit blurred because CE Client interface includes both sender and receiver in a single interface, plus the kncloudevents
package does not only receive CEs but sends them, even implementing their own retries, without relying on CE SDK code.
We could open a different discussion to make clear which one to use in which cases, if the kncloudevents
should be using CE SDK retries instead of implementing their owns, and if the client implementation under pkg/adapter/v2
should be moved to pkg/kncloudevents
.
Using a new CloudEvents client constructor we could keep backwards compatibility but inject by context any option.
Nice.
We could open a different discussion to make clear which one to use in which cases
Let's bring this up in the next eventing call too.
In order to keep the goodies of the factory, e.g. event recorder and tracing, how could that be achieved if I only want to customize certain options instead of supplying/creating my own CE client? (more thinking about UX here)
I think those would just have to be lifted into some constructor (ideally the same one that the factory uses) that also exposes configuring additional options
I think I prefer this because the user has control this way over the final object (the cloudevents client) rather than exposing piecemeal some options. If they want to use something completely different that satisfies the CE client interface, they're free to do so, so we have unlimited flexibility. Injecting options just means that we are stuck with the implementation that NewCloudEventsHTTPClient
gives us, the flexibility is solely in whatever WithCloudEventsClientContextOptions
gives you within that configuration space.
/assign
cc @creydr
Closing this for now as there hasn't been any recent update/interest about this issue