spring-kafka
spring-kafka copied to clipboard
consumer group specific offset seeking for AbstractConsumerSeekAware
Expected Behavior
We want to be able to seek offset for specific consumer group by using AbstractConsumerSeekAware.
Current Behavior
regarding to below implementation it is clear that we can seek offset for all assigned partitions in a topic regardless of different consumer group ids.
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
super.onPartitionsAssigned(assignments, callback);
}
@Override
public void seekToTimestamp(long time) {
getSeekCallbacks().forEach((tp, callback) -> {
callback.seekToTimestamp(tp.topic(), tp.partition(), time);
});
}
Context For our use case there might be more than one group instance which is assigned same partition in a topic. Below example might be useful to describe our case:
- we have one topic let's name it: product.feed.fullexport, it has 12 partitions.
- 10 different micro-services with different group ids are listening same topic and each has 12 concurrent consumers.
- when we want to seek offset by using above ConsumerSeekCallback implementation for one of the micro-service, it affects all assigned partitions and listening consumer instances regardless to expected group id.
Is there any way to seek offset in a partition but only for specific group id?
I don't see a behavior you are describing:
@SpringBootApplication
public class KafkaGh2302Application {
public static void main(String[] args) {
SpringApplication.run(KafkaGh2302Application.class, args);
}
@Bean
public NewTopic topic() {
return new NewTopic("seekExample", 3, (short) 1);
}
@Component
public static class Listener extends AbstractConsumerSeekAware {
@KafkaListener(id = "seekExample", topics = "seekExample", concurrency = "3")
public void listen(String payload) {
System.out.println("Listener received: " + payload);
}
public void seekToStart() {
getSeekCallbacks().forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
}
}
@Component
public static class Listener2 extends AbstractConsumerSeekAware {
@KafkaListener(id = "seekExample2", topics = "seekExample", concurrency = "3")
public void listen(String payload) {
System.out.println("Listener2 received: " + payload);
}
public void seekToStart() {
getSeekCallbacks().forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
}
}
}
As you see I have two @KafkaListener
classes with different ids which is, essentially, pointing to different consumer groups.
In the unit test I do:
@SpringBootTest
@EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers")
@DirtiesContext
class KafkaGh2302ApplicationTests {
@Autowired
KafkaGh2302Application.Listener listener;
@Autowired
KafkaTemplate<String, String> template;
@Test
void contextLoads() throws InterruptedException {
for (int i = 0; i < 10; i++) {
this.template.send("seekExample", i % 3, "some_key", "test#" + i);
}
Thread.sleep(1000);
this.listener.seekToStart();
Thread.sleep(10000);
}
}
So, after sending some data, I just call seekToStart()
on one of the listener services.
The output is like this:
Listener2 received: test#2
Listener received: test#1
Listener2 received: test#0
Listener2 received: test#5
Listener received: test#2
Listener received: test#4
Listener received: test#5
Listener received: test#7
Listener2 received: test#1
Listener2 received: test#8
Listener received: test#0
Listener2 received: test#3
Listener2 received: test#4
Listener received: test#8
Listener2 received: test#6
Listener received: test#3
Listener2 received: test#9
Listener2 received: test#7
Listener received: test#6
Listener received: test#9
Listener received: test#0
Listener received: test#3
Listener received: test#6
Listener received: test#9
Listener received: test#2
Listener received: test#5
Listener received: test#1
Listener received: test#4
Listener received: test#7
Listener received: test#8
This confirms that seeking really happens only in one consumer group and it does not effect other groups on the same topic.
Correct; seeks only affect the current group.
@artembilan @garyrussell Hello, I've found that the current behavior can occur when using multiple listeners with different group IDs in the same class.
Current Behavior - described above
it is clear that we can seek offset for all assigned partitions in a topic regardless of different consumer group ids.
Test
I have modified the code provided above slightly.
- KafkaGh2302Application
...
// Existing code
@KafkaListener(id = "seekExample", topics = "seekExample", concurrency = "3")
public void listen(String payload) {
...
}
// Added code in the same class Listener
@KafkaListener(id = "seekExample3", topics = "seekExample", concurrency = "3")
public void listen3(String payload) {
System.out.println("Listener3 received: " + payload);
}
...
- KafkaGh2302ApplicationTests
...
// Existing code
for (int i = 0; i < 30; i++) { // Change: 10 -> 30 for both two listeners(seekExample and seekExample3) to be able to seek offsets
this.template.send("seekExample", i % 3, "some_key", "test#" + i);
}
...
-
Result
- Since callbacks are registered per thread in a single listener class that implements
AbstractConsumerSeekAware
, seeking offsets is performed regardless of the consumer group ID. (In this case,Listener(id: seekExample)
andListener 3(id: seekExample3)
)
- Since callbacks are registered per thread in a single listener class that implements
...
Listener received: test#0
Listener3 received: test#3
Listener2 received: test#16
Listener2 received: test#19
Listener2 received: test#22
Listener received: test#29
Listener2 received: test#25
Listener2 received: test#28
Listener received: test#28
Listener received: test#21
Listener received: test#24
Listener received: test#27
Listener2 received: test#23
Listener2 received: test#26
Listener3 received: test#16
========
Listener received: test#0
Listener received: test#3
Listener received: test#24
Listener received: test#27
Listener received: test#2
Listener3 received: test#1
Listener received: test#5
Listener3 received: test#4
Listener received: test#8
Listener3 received: test#7
Listener received: test#11
Listener3 received: test#10
...
IMHO, If we want to seek offset for a specific consumer group only, we can use the following methods:
- Use separate classes for consumers with different group IDs.
- Add filtering conditions based on the consumer group ID included in the thread name to ensure callbacks are registered or executed selectively (in
registerSeekCallback()
oronPartitionsAssigned()
).- However, I don't want to recommend this as thread names can be changed arbitrarily and may not always include the consumer group ID.
If you have any other solutions for seeking offsets based on a specific consumer group ID, please let me know. I would appreciate hearing them. Thank you!
I am no longer involved with the project, but what you are suggesting is incorrect.
Each listener method is invoked by a different listener container and, therefore, on different threads.
So, if there is a problem, it is not related to any thread-based state.
@bky373 Thanks for reporting. As @garyrussell pointed out, this looks like it is non-thread-state related, but it looks like some bug (Thanks, Gary, for chiming in!! :) ). We will look at this today. Do you have any sample application for us to reproduce? (that would be easier). Otherwise, we can look into creating one since you provided some snippets.
@garyrussell
I am no longer involved with the project
Oh I didn't know that! Thanks for your comment!! 🙇
Each listener method is invoked by a different listener container and, therefore, on different threads.
You are totally right. Threads are different. I didn't mean to say that it's an issue with thread state. If my suggestion seemed like it was due to a thread state issue, I'm afraid I wrote it wrong.
I just wanted to report that in a class that have listeners with different consumer group IDs and implements AbstractConsumerSeekAware, it's difficult to find the offset by specifying the consumer group ID. ~(This might not be the problem).~
@bky373 Thanks for reporting. As @garyrussell pointed out, this looks like it is non-thread-state related, but it looks like some bug (Thanks, Gary, for chiming in!! :) ). We will look at this today. Do you have any sample application for us to reproduce? (that would be easier). Otherwise, we can look into creating one since you provided some snippets.
@sobychacko
Sure! the code is so simple so I'll leave it in the comments here. Thank you for your time and reply!!
@SpringBootApplication
public class KafkaGh2302Application {
public static void main(String[] args) {
SpringApplication.run(KafkaGh2302Application.class, args);
}
@Bean
public NewTopic topic() {
return new NewTopic("seekExample", 3, (short) 1);
}
@Component
public static class Listener extends AbstractConsumerSeekAware {
@KafkaListener(id = "seekExample", topics = "seekExample", concurrency = "3")
public void listen(String payload) {
System.out.println("Listener received: " + payload);
}
@KafkaListener(id = "seekExample3", topics = "seekExample", concurrency = "3")
public void listen3(String payload) {
System.out.println("Listener3 received: " + payload);
}
public void seekToStart() {
getSeekCallbacks().forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
}
}
@Component
public static class Listener2 extends AbstractConsumerSeekAware {
@KafkaListener(id = "seekExample2", topics = "seekExample", concurrency = "3")
public void listen(String payload) {
System.out.println("Listener2 received: " + payload);
}
public void seekToStart() {
getSeekCallbacks().forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
}
}
}
@SpringBootTest
@EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers")
@DirtiesContext
class KafkaGh2302ApplicationTest {
@Autowired
KafkaGh2302Application.Listener listener;
@Autowired
KafkaTemplate<String, String> template;
@Test
void contextLoads() throws InterruptedException {
for (int i = 0; i < 50; i++) {
this.template.send("seekExample", i % 3, "some_key", "test#" + i);
}
Thread.sleep(1000);
System.out.println("====================================");
this.listener.seekToStart();
Thread.sleep(10000);
}
}
I think the best course of action is to have a single consumer (KafkaListener
) per class that extends AbstractConsumerSeekAware.
If there are multiple listeners, the callbacks are applied against all the listeners in that particular class. We will see if we can come up with a solution, such as making an API level change to better accommodate group id's.
@sobychacko FYI, the thread's associated group is available in KafkaUtils
, if that helps.
https://github.com/spring-projects/spring-kafka/blob/4a5a8495fcb4169071822fa6699cc15aaf477fcd/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaUtils.java#L110-L117
@sobychacko @garyrussell Thanks for your comments!
As you mentioned, the method listeners within the class will apply the callback identically regardless of consumer group ID. So it seems necessary to execute callbacks differently for each consumer group since the intended behavior may vary between consumer groups. (Of course, we can work around this for now by keeping our classes separate.)
I'll also keep looking for ways to do it.
I'm so grateful for your help!
We will try to make some changes to accommodate this before the GA.
@bky373 After looking at this further, we realized this is a bit more involved from the framework perspective since we need to introduce some breaking changes at the API level. Therefore, we recommend your workaround in this and prior versions of Spring Kafka (since we are so close to the 3.2.0
GA release), i.e., stick with a single class / per listener for this use case. We will table the proper fixes for this issue for now and consider this for the next version of the framework, 3.3.0
.
@sobychacko
Thank you for taking the time to research and respond!
I'm curious to know what you think of the solution.
- Are you considering adding a new parameter (e.g.,
consumerGroupId
) to the existing methods ofConsumerSeekAware
? - Or are you planning to keep
ConsumerSeekAware
and have another interface that extends it (such asConsumerGroupSeekAware
)? - Or is there another way you have in mind?
In either case, I'm hesitant to say, as it would be a big change, I'd like to hear your thoughts and see if there's anything I can contribute.
@bky373 We had an internal discussion on this with @artembilan yesterday. We need to make some changes similar to your line of thinking. Some API methods in ConsumerSeekAware
need to be modified to take some new information about the consumer group. We believe that relying on the ConsumerSeekCallback
can get the group ID information; we need to look further. We can make these changes when we switch the main branch to 3.3.0-SNAPSHOT
after the GA release. I have marked this issue for 3.3.0-M1
milestone. If you want to work on a PR for this, you are certainly welcome to do so.
@sobychacko
Yes, thank you!
Personally, I'd like to take this on and work on it a bit more. However, before I start working on the code, it would be great to have a discussion and get some feedback on the direction of the work. If that's okay with you, I'll do a little more research and get back to you after I've organized things!
Off the top of my head, as you said, if we can get the consumerGroupId from the ConsumerSeekCallback
properly using KafkaUtils.getConsumerGroupId()
, we can define the behavior per groupId when creating the callback. But that needs to be tested.
@bky373 Feel free to work on it. Before you start coding, if you want us to confirm the design, please continue discussing it here, and we can review it. Thanks!
@sobychacko
Hi, I apologize for reaching out after such a long time. I've revisited the issue and thought about potential solutions.
Before diving into the details, let me briefly summarize the problem since it has been a while.
- Problem: When using
AbstractConsumerSeekAware
, different listeners within the same class use the same callback. And when seek is performed in one listener, it is executed in all listeners, even if it is not desired.
Here are the approaches I've considered:
1. Passing consumerGroupId
as a parameter from the outside.
-
1-1. Adding
groupId
as a parameter-
Current:
ConsumerSeekAware.registerSeekCallback(ConsumerSeekCallback)
-
Proposed:
ConsumerSeekAware.registerSeekCallback(ConsumerSeekCallback, String groupId)
-
Method description
- Register the callback only if the passed
groupId
matches the consumergroupId
of the current thread.
- Register the callback only if the passed
-
Issues
- The process of passing the
groupId
externally is complex and would require changes in many places that use theregisterSeekCallback()
method. - Currently, it's just
groupId
, but there might be future requests to register callbacks based on other types of values. - The current
ConsumerSeekAware
API is structurally sound and already provides sufficient functionality. Modifying the API might be unnecessary.
- The process of passing the
-
Current:
-
1-2. Changing the parameter
-
Current:
ConsumerSeekAware.registerSeekCallback(ConsumerSeekCallback)
-
Proposed:
ConsumerSeekAware.registerSeekCallback(Map<String, ConsumerSeekCallback> callbackForGroupId)
-
Method description: Allow different callbacks to be registered for different
groupId
s. - Issues: This has the same problems as 1-1.
-
Current:
2. Setting seek allow flag per @KafkaListener
(e.g. Boolean consumerSeekAllowed)
-
Current: With
ConsumerSeekAware or AbstractConsumerSeekAware
, listeners implement the seek functions and can use it immediately. -
Proposed: With
ConsumerSeekAware or AbstractConsumerSeekAware
, listeners implement the seek functions, but its usage is determined by theconsumerSeekAllowed
value.-
Explanation:
ConsumerSeekAware or AbstractConsumerSeekAware
is at the class level, and the listeners belong to that class, so it seems natural for the listeners to use the same callback. (If different callback is needed, the class should be split.) However, it's possible to create different types of listeners within the same class in code anyway, and for various reasons, users might not want to split the class. To address this issue simply, we can add theconsumerSeekAllowed
attribute to@KafkaListener
. The default value istrue
, but if seek is not desired, it can be set tofalse
. (andnull
, which does the same thing as true, can be used to to leave a warn log.) -
Issue:
- Using different callbacks per listener still requires splitting the class. However, this might be the preferred approach.
- Users need to be aware of the new
consumerSeekAllowed
attribute in addition toAbstractConsumerSeekAware
.
-
Advantages:
- You can control seeking while maintaining multiple listeners inside the same class.
- Minimal code changes. No changes at the
ConsumerSeekAware
API level. - Maintains backward compatibility and allows seek to be disabled based on criteria other than consumer group ID.
-
Explanation:
- To demonstrate this approach, I've quickly implemented some code. All existing tests pass, and new tests will need to be created if needed.
Thank you for reading through this long message. Feel free to share any thoughts or feedback. Thanks!
@sobychacko Hi, if you have some time, could I get your feedback on the above? I would like to know what you have in mind. If a different approach is needed, I would like to consider that as well. Thanks!
sure @bky373. Sorry for the delay. We will get back to you soon on this.
@bky373 I like your second approach, as this is a minimally invasive set of changes and doesn't require any API changes in ConsumerSeekAware
or related classes. On the other hand, with this approach, users need to be mindful that they need to disable seeks in the listeners in classes that extend from AbstractConsumerSeekAware
that contains multiple listeners.
@artembilan do you have any thoughts on changing KafkaListener
like this vs making/breaking API level changes in ConsumerSeekAware
?
Also, I wonder if there is a valid use case that might benefit others where they need to drive seeking offsets based on the group-id?
Well, this new consumerSeekAllowed
feels more like a workaround for what we cannot do right now.
And it is a bit awkward: we do ConsumerSeekAware
, but then consumerSeekAllowed = false
🤷
But at the same time we already have a more reasonable workaround via splitting.
I'd say this is more robust and logical workaround: in the end we develop micorservices, so as simple logic as possible is the best approach.
Without any paradox of choice.
Currently, it's just
groupId
, but there might be future requests to register callbacks based on other types of values.
So, just don't mix up many listeners in a single class. Why make your life so complicated, if we can simply just split and reuse logic via delegation to some other common service (if any)?
Sorry for some rude language, but if we go this way, I'd prefer groupId
propagation.
Or we can chose to fail fast, if same ConsumerSeekAware
is used for many listeners.
Each approaches have pros and cons. While it is an easier solution to add this as a new flag, adding a top-level property like this to KafkaListener
might not bode that well, given that there is a workaround. I guess we have 2 options if we exclude the KafkaListener
flag.
- API changes - Adding a new method that specifically takes a group ID so that only the listener with that ID is involved in the seek operation.
- Fail fast when you have multiple listeners in a class that implements
ConsumerSeekAware.
The exception thrown can suggest the users split the listeners into multiple classes.
There's another option; add a new (default
) method to ConsumerSeekAware
default boolean seekByGroupId() {
return false;
}
Then, use KafkaUtils.getConsumerGroupId()
when seeking, and when building callback tables.
https://github.com/spring-projects/spring-kafka/issues/2302#issuecomment-2096572637
No breaking API changes, no @KafkaListener
changes.
But it would only work when seeking on the listener thread.
Thanks, @garyrussell, for that great insight. @ bky373 We should look into the idea Gary suggested for this issue as the solution.
Thanks, Gary!
I see the logic in ListenerConsumer.initialize()
:
setupSeeks();
KafkaUtils.setConsumerGroupId(this.consumerGroupId);
Which probably has to be swapped to make that groupId
available for registration.
@sobychacko Thanks for your reply!
Also, I wonder if there is a valid use case that might benefit others where they need to drive seeking offsets based on the group-id?
I think this might be the case.
@Component
public class DeliveryListener extends AbstractConsumerSeekAware {
...
@KafkaListener(groupId = "delivery-status-group", topics = "delivery-topic")
void listenForStatusUpdates(String message) {
// Update delivery status in DB
updateService.update(message)
}
@KafkaListener(groupId = "delivery-notification-group", topics = "delivery-topic")
void listenForNotifications(String message) {
// Notify the customer
notificationService.notify(message)
}
@KafkaListener(groupId = "delivery-analytics-group", topics = "delivery-topic")
void listenForAnalytics(String message) {
// Process the delivery message for analytics
analyticsService.analyze(message)
}
}
If the error only occurs in the delivery-status-group
listener and we want to adjust the offset of that listener, we should make sure that we only adjust the delivery-status-group
.
Trying to seek by groupId might be a good idea in this case (currently, all listeners are affected).
In fact, in cases like the above, it's probably best to separate the classes, as the internal logic would be rather complicated (which is what I want most!). But one thing to remember is that it can be implemented in code anyway, and users can use it without realising the side effects.
Well, this new consumerSeekAllowed feels more like a workaround for what we cannot do right now. And it is a bit awkward: we do ConsumerSeekAware, but then consumerSeekAllowed = false 🤷 But at the same time we already have a more reasonable workaround via splitting. I'd say this is more robust and logical workaround: in the end we develop micorservices, so as simple logic as possible is the best approach. Without any paradox of choice.
@artembilan Yes, I completely agree with your comment. I hope there's no misunderstanding. I just think we want to prevent things that are implementable in the code, and if there are users who continue to use it unaware of the unintended behaviour (or even unaware of the side-effects), then we should prevent it with new guidance or a new implementation.
@garyrussell Thanks again for the good suggestion! I'll try to find a solution using your suggestions.
@bky373 I think it is better to go with Gary's suggestion on this. Can you think about a design along the lines of what he suggested?
@sobychacko Yes, I'll look into this and get back to you.