components-contrib icon indicating copy to clipboard operation
components-contrib copied to clipboard

Ability to "ResetOffset" for Kafka pub/sub

Open pattisapu01 opened this issue 3 years ago • 8 comments

In what area(s)?

/area components (kafka pub/sub)

Describe the feature

The current kafka pub/sub component targets just the minimum interfaces to publish and subscribe. We need an enhancement to the kafka component to support "ResetOffets". This means, we need to be able to invoke an API call to the dapr sidecar to "reset" the current offset to a previous value so that messages can be replayed. As a side note, when dapr calls our services on an output binding for delivering messages, include the "offset" number so that clients can keep track of Message-Id to offset mapping.

UPDATE

The solution would involve handling metadata from PubSub subscriber's response and let each component implement custom handling of metadata. So, the subscriber can respond with a custom metadata to enable these scenarios.

pattisapu01 avatar Apr 21 '21 13:04 pattisapu01

@pattisapu01 Dapr provides the common interface among components by design. Adding specific feature for each component would pollute Dapr's interface. Is there some other way to achieve this? Can a metadata be added to the subscriber to send a reset offset?

I am curious to understand how the offset reset happens in your scenario. Is this something that happens automatically from the app or does the operator need to trigger it?

artursouza avatar May 04 '21 06:05 artursouza

Targeting a "Least Common Denominator" may make the dapr-kafka client "Least commonly used". I am not saying we need to incorporate every interface available on the Kafka API. Our need is to "re-process" messages from a specific offset (driven by business demands and flexibility for message processing). We have an option to use the Confluent C# client library or use kafka-dapr bindings. Since we are already injecting "dapr" side car. thought of using "dapr" for pub/sub. If this is not feasible, will shift over to the client lib.

pattisapu01 avatar May 05 '21 20:05 pattisapu01

When apps use specific features of a component, it naturally loses the portability advantages of using Dapr - which might be required in some cases.

The way to perform dynamic changes to the subscriber (reset offset could be one of them) would be via the HTTP response that the subscriber app returns to Dapr. That response would include some metadata that each pubsub component can interpret. There are similar discussions about how to enrich the pubsub subscriber response to expose specific features in components (delay redelivery, for example). Today, it is only status code: SUCCESS, DROP, RETRY

Long story short, this is not something we support today. But it seems to be something we can add in the future.

/cc @yaron2 @pkedy thoughts?

artursouza avatar May 06 '21 16:05 artursouza

We handle Kafka errors by storing the offsets in a database (rather than using some of the more complex strategies). It requires services containing consumers at startup to read the offsets, and using the confluent client lib we use those offsets in the DB to process from those points. so we would need the ability to send this metadata to dapr kafka someone in order to continue with this error handling strategy.

mickdelaney avatar Jan 08 '22 09:01 mickdelaney

Any idea when this could be discussed again ? I’m looking at a new greenfield project and this has come up again in relation to Kafka.

mickdelaney avatar Mar 31 '22 05:03 mickdelaney

Any idea when this could be discussed again ? I’m looking at a new greenfield project and this has come up again in relation to Kafka.

We currently only support an oldest/newest offset setting with the initialOffset metadata param: https://docs.dapr.io/reference/components-reference/supported-pubsub/setup-apache-kafka/#spec-metadata-fields.

It should be fairly simple to add a custom offset value. I'm moving this issue to the contrib repository.

yaron2 avatar Mar 31 '22 06:03 yaron2

Since Confluent was mentioned here -- we do not (and cannot) use the Confluent Client SDKs for Go due their dependency on CGO.

It is unclear whether this change will work with Confluent if added to our Kafka implementation based on the Sarama library.

For greater success a pluggable component (probably not until 1.11 at the earliest) for Confluent Kafka should be created.

berndverst avatar Feb 01 '23 22:02 berndverst

Removing from a milestone since there is no plan to work on this yet.

artursouza avatar Jun 30 '23 17:06 artursouza