kyma icon indicating copy to clipboard operation
kyma copied to clipboard

Kyma subscription and NATS infra are not in sync

Open raypinto opened this issue 2 years ago • 7 comments

Description

Kyma subscription and NATS infra are not in sync when we have a subscription that is not ready.

Expected result

Kyma subscription and NATS subscription are in sync

Actual result

Kyma subscription and NATS subscription are not in sync

Steps to reproduce

  • Install kyma
  • Create a subscription with 3 filters (3 subscriptions are created on NATS. You can verify it with consumers on JetStream using nats CLI)
  • Edit the subscription - Delete 2 filters and have an invalid sink name
  • We have only one cleanEventType but the subscription was not sync'd with the NATS infrastructure (as we still have 3 consumers instead of 1)

Troubleshooting

The reason for this is we do the sinkValidation before syncing the subscription with the NATS infra. We should do it regardless of an invalid sink.

raypinto avatar May 03 '22 13:05 raypinto

Although this bug was resolved, the problem of syncing the subscription with the NATS infra is still a problem and needs to be addressed as part of this issue.

If we perform the step to reproduce as mentioned here, when we change the filter value in the spec to an invalid event Type value or invalid prefix, we see that the cleanEventTypes are empty. But the subscriptions (or consumers) on the server are still present.

raypinto avatar May 12 '22 08:05 raypinto

@k15r Q: Say we have a subscription with 3 valid filters and the respective subscriptions are created on the infra. The user adds the fourth filter to the same subscription which is invalid. Should we delete all the 3 existing subscriptions on the infra and show cleanEventTypes as empty OR should we leave the cleanEventTypes to contain the 3 valid subscriptions that are still present on the infra but not create the fourth invalid filter? In both cases the subscription is still not ready.

raypinto avatar May 12 '22 14:05 raypinto

I think whatever is defined as a eventing subscription filter should result in a NATS consumer. Whether the sink is invalid should only be important for the dispatcher. The dispatcher should then not try to dispatch and fail-fast IMHO.

nachtmaar avatar Jun 29 '22 18:06 nachtmaar

@k15r Q: Say we have a subscription with 3 valid filters and the respective subscriptions are created on the infra. The user adds the fourth filter to the same subscription which is invalid. Should we delete all the 3 existing subscriptions on the infra and show cleanEventTypes as empty OR should we leave the cleanEventTypes to contain the 3 valid subscriptions that are still present on the infra but not create the fourth invalid filter? In both cases the subscription is still not ready.

2 tests were written to check the behaviour of this. They can be found in this file or just below:

NATS subscriptions are not in sync then adding a filter with invalid event type

TL;DR: one invalid event-type leads to the problem that NO clean event-type is shown (nil list)

{
		// Ensure subscriptions on NATS are not changed then the event type is changed to an invalid format.
		// issue: https://github.com/kyma-project/kyma/issues/12979
		name: "NATS subscriptions are not in sync then adding a filter with invalid event type",
		givenSubscriptionOpts: []reconcilertesting.SubscriptionOpt{
			reconcilertesting.WithFilter(reconcilertesting.EventSource, fmt.Sprintf("%s.nonexistingapp.order.created.v1", validEventTypePrefix)),
			// valid sink
			reconcilertesting.WithSinkURLFromSvc(ens.SubscriberSvc),
		},
		wantBefore: utils.Want{
			K8sSubscription: []gomegatypes.GomegaMatcher{
				reconcilertesting.HaveSubscriptionReady(),
				// for each filter we want to have a clean event type
				reconcilertesting.HaveCleanEventTypes([]string{
					fmt.Sprintf("%s.nonexistingapp.order.created.v1", validEventTypePrefix),
				}),
			},
			K8sEvents: nil,
			// ensure that each filter results in a NATS consumer
			NatsSubscriptions: map[string][]gomegatypes.GomegaMatcher{
				fmt.Sprintf("%s.nonexistingapp.order.created.v1", validEventTypePrefix): {
					natstesting.BeExistingSubscription(),
					natstesting.BeValidSubscription(),
					natstesting.BeJetStreamSubscriptionWithSubject(fmt.Sprintf("%s.nonexistingapp.order.created.v1", validEventTypePrefix)),
				},
			},
		},
		changeSubscription: func(subscription *eventingv1alpha1.Subscription) {
			subscription.Spec.Filter.Filters = nil
			// keep the filter
			reconcilertesting.AddFilter(reconcilertesting.EventSource, fmt.Sprintf("%s.nonexistingapp.order.created.v1", validEventTypePrefix), subscription)

			// on purpose, we do use an invalid eventTypePrefix
			invalidEventTypePrefix := "sap.kyma.custom.ups"
			// add a new filter
			reconcilertesting.AddFilter(reconcilertesting.EventSource, fmt.Sprintf("%s.nonexistingapp.order.created.v1", invalidEventTypePrefix), subscription)
		},
		wantAfter: utils.Want{
			K8sSubscription: []gomegatypes.GomegaMatcher{
				reconcilertesting.HaveSubscriptionNotReady(),
				reconcilertesting.HaveCleanEventTypes([]string{
					// only the first event type can be cleaned, the second uses an invalid event type prefix
					fmt.Sprintf("%s.nonexistingapp.order.created.v1", validEventTypePrefix),
				}),
			},
			K8sEvents: nil,
			NatsSubscriptions: map[string][]gomegatypes.GomegaMatcher{
				// the first filter is still present as a NATS subscription
				fmt.Sprintf("%s.nonexistingapp.order.created.v1", validEventTypePrefix): {
					natstesting.BeExistingSubscription(),
					natstesting.BeValidSubscription(),
					natstesting.BeJetStreamSubscriptionWithSubject(fmt.Sprintf("%s.nonexistingapp.order.created.v1", validEventTypePrefix)),
				},
				// the filter with invalid event type is not created on NATS because it is using the wrong event type prefix
				"sap.kyma.custom.ups.nonexistingapp.order.created.v1": {
					gomega.Not(natstesting.BeExistingSubscription()),
				},
			},
		},
	},

NATS subscriptions are not in sync then changing the event-type to an invalid format

TL;DR: the NATS subscription IS NOT deleted if the event-type is changed to an invalid format

	{
		// Ensure subscriptions on NATS are not changed when the event type is changed to an invalid format.
		// issue: https://github.com/kyma-project/kyma/issues/12979
		name: "NATS subscriptions are not in sync then changing the event-type to an invalid format",
		givenSubscriptionOpts: []reconcilertesting.SubscriptionOpt{
			reconcilertesting.WithFilter(reconcilertesting.EventSource, fmt.Sprintf("%s.nonexistingapp.order.created.v1", validEventTypePrefix)),
			// valid sink
			reconcilertesting.WithSinkURLFromSvc(ens.SubscriberSvc),
		},
		wantBefore: utils.Want{
			K8sSubscription: []gomegatypes.GomegaMatcher{
				reconcilertesting.HaveSubscriptionReady(),
				// for each filter we want to have a clean event type
				reconcilertesting.HaveCleanEventTypes([]string{
					fmt.Sprintf("%s.nonexistingapp.order.created.v1", validEventTypePrefix),
				}),
			},
			K8sEvents: nil,
			// ensure that each filter results in a NATS consumer
			NatsSubscriptions: map[string][]gomegatypes.GomegaMatcher{
				fmt.Sprintf("%s.nonexistingapp.order.created.v1", validEventTypePrefix): {
					natstesting.BeExistingSubscription(),
					natstesting.BeValidSubscription(),
					natstesting.BeJetStreamSubscriptionWithSubject(fmt.Sprintf("%s.nonexistingapp.order.created.v1", validEventTypePrefix)),
				},
			},
		},
		changeSubscription: func(subscription *eventingv1alpha1.Subscription) {
			subscription.Spec.Filter.Filters = nil
			// on purpose, we do use an invalid eventTypePrefix
			invalidEventTypePrefix := "sap.kyma.custom.ups"
			reconcilertesting.AddFilter(reconcilertesting.EventSource, fmt.Sprintf("%s.nonexistingapp.order.created.v1", invalidEventTypePrefix), subscription)
		},
		wantAfter: utils.Want{
			K8sSubscription: []gomegatypes.GomegaMatcher{
				reconcilertesting.HaveSubscriptionNotReady(),
				// the prefix is wrong, it cannot be cleaned
				reconcilertesting.HaveCleanEventTypes(nil),
			},
			K8sEvents: nil,
			// the subscription is not ready and changing the filter in the NATS subscription does not make sense because the wrong event type prefix is used.
			NatsSubscriptions: map[string][]gomegatypes.GomegaMatcher{
				// TODO: delete the NATS subscription
				fmt.Sprintf("%s.nonexistingapp.order.created.v1", validEventTypePrefix): {
					gomega.Not(natstesting.BeExistingSubscription()),
validEventTypePrefix)),
				},
			},
		},
	},
},

@raypinto @k15r and me had a discussion on the topic and @k15r proposed to write a validation webhook for the Subscription CR. This way the problems which are revealed by the 2 tests can be mitigated because the webhook would decline to set an invalid event type.

Ticket is here: https://github.com/kyma-project/kyma/issues/14750

nachtmaar avatar Jul 05 '22 11:07 nachtmaar

After the last team discussion about the possible options on https://github.com/kyma-project/kyma/pull/14742#discussion_r914826639, @k15r raised the concern that this might lead to event loss.

The problem is the following

  • If the NATS Subscription is synced independent of the sink, then this allows setting an invalid sink (e.g. pointing to a svc which does not exist) in the NATS Subscription.
  • The Kyma Subscription will be marked as not ready by the controller, but still the sink will be set in the NATS Subscription
  • That means that the dispatcher (part of the Eventing-Controller) will try sending the event to a sink which is not able to receive the event (because in this case no one is listening for events on the defined sink).
  • The number of retries for NATS JetStream is set to 100
  • Sending to a not existing sink increases the retry counter and ultimately stops as soon as the retry counter is reached.

Does a Validation Webhook prevent the problem ? -> NO!

Now let's image the situation if the sink validation would be performed in a validation webhook for the Subscription CR

  • The webhook can validate the schema of the URL and reject sinks which don't have a valid URL schema (like horst://horst-hausen).
  • The Eventing-Controller also checks for the presence of the kubernetes service which is specified in the sink url. Previously, if the service did not exist, the controller did not update the NATS Subscription. By merging the linked PR this would not be the case anymore.
    • We could also move the service checking part to the webhook. However this would mean that then creating a subscription, the service of the sink has to already exist (because it would be reject otherwise by the webhook). This would not allow kubectl apply -f commands anymore where the subscription and the sink are created simultaneously.
    • For the mentioned reason, I think we should only move the schema validation to the webhook and leave the checking of the sink k8s service to the eventing-controller.
    • That however means that the problem described in the paragraph before remains (because you can still change the sink to a service which does not exist).

The solution

Another idea was brought up by @k15r and quickly checked by @mfaizanse :

  • We could pause and unpause the NATS subscription in the case of an invalid sink
  • According to the idea from @mfaizanse we can delete the subscription binded to the JetStream Consumer, this will pause dispatching but the consumer will still exists and have interest (means no event-less).
  • @k15r likes to have this functionality anyways, so I will create a follow-up ticket for this

Next steps on this issue: I will leave the PR on hold and try to get the testing changes merged into main. As soon as the follow-up ticket is solved, the PR can be merged :)

nachtmaar avatar Aug 11 '22 14:08 nachtmaar

Blocked until https://github.com/kyma-project/eventing-manager/issues/518 is implemented

nachtmaar avatar Aug 12 '22 08:08 nachtmaar

This issue has been automatically marked as stale due to the lack of recent activity. It will soon be closed if no further activity occurs. Thank you for your contributions.

github-actions[bot] avatar Oct 12 '22 03:10 github-actions[bot]

This issue or PR has been automatically marked as stale due to the lack of recent activity. Thank you for your contributions.

This bot triages issues and PRs according to the following rules:

  • After 60d of inactivity, lifecycle/stale is applied
  • After 7d of inactivity since lifecycle/stale was applied, the issue is closed

You can:

  • Mark this issue or PR as fresh with /remove-lifecycle stale
  • Close this issue or PR with /close

If you think that I work incorrectly, kindly raise an issue with the problem.

/lifecycle stale

kyma-bot avatar Dec 12 '22 08:12 kyma-bot

This issue or PR has been automatically closed due to the lack of activity. Thank you for your contributions.

This bot triages issues and PRs according to the following rules:

  • After 60d of inactivity, lifecycle/stale is applied
  • After 7d of inactivity since lifecycle/stale was applied, the issue is closed

You can:

  • Reopen this issue or PR with /reopen
  • Mark this issue or PR as fresh with /remove-lifecycle stale

If you think that I work incorrectly, kindly raise an issue with the problem.

/close

kyma-bot avatar Dec 19 '22 08:12 kyma-bot

@kyma-bot: Closing this issue.

In response to this:

This issue or PR has been automatically closed due to the lack of activity. Thank you for your contributions.

This bot triages issues and PRs according to the following rules:

  • After 60d of inactivity, lifecycle/stale is applied
  • After 7d of inactivity since lifecycle/stale was applied, the issue is closed

You can:

  • Reopen this issue or PR with /reopen
  • Mark this issue or PR as fresh with /remove-lifecycle stale

If you think that I work incorrectly, kindly raise an issue with the problem.

/close

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

kyma-bot avatar Dec 19 '22 08:12 kyma-bot

This issue or PR has been automatically marked as stale due to the lack of recent activity. Thank you for your contributions.

This bot triages issues and PRs according to the following rules:

  • After 60d of inactivity, lifecycle/stale is applied
  • After 7d of inactivity since lifecycle/stale was applied, the issue is closed

You can:

  • Mark this issue or PR as fresh with /remove-lifecycle stale
  • Close this issue or PR with /close

If you think that I work incorrectly, kindly raise an issue with the problem.

/lifecycle stale

kyma-bot avatar Feb 21 '23 13:02 kyma-bot

This issue or PR has been automatically closed due to the lack of activity. Thank you for your contributions.

This bot triages issues and PRs according to the following rules:

  • After 60d of inactivity, lifecycle/stale is applied
  • After 7d of inactivity since lifecycle/stale was applied, the issue is closed

You can:

  • Reopen this issue or PR with /reopen
  • Mark this issue or PR as fresh with /remove-lifecycle stale

If you think that I work incorrectly, kindly raise an issue with the problem.

/close

kyma-bot avatar Feb 28 '23 13:02 kyma-bot

@kyma-bot: Closing this issue.

In response to this:

This issue or PR has been automatically closed due to the lack of activity. Thank you for your contributions.

This bot triages issues and PRs according to the following rules:

  • After 60d of inactivity, lifecycle/stale is applied
  • After 7d of inactivity since lifecycle/stale was applied, the issue is closed

You can:

  • Reopen this issue or PR with /reopen
  • Mark this issue or PR as fresh with /remove-lifecycle stale

If you think that I work incorrectly, kindly raise an issue with the problem.

/close

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

kyma-bot avatar Feb 28 '23 13:02 kyma-bot