fluvio
fluvio copied to clipboard
[Tracking] Consumer Stream Resume from last used offset
When creating a consumer stream one should be able to start a stream using an offset such as:
let stream_id = String::from("my-stream-id");
let offset = Offset::from_last_read(stream_id);
To do this we need:
- [x] https://github.com/infinyon/fluvio/issues/2367
- [x] https://github.com/infinyon/fluvio/issues/2368
- [x] https://github.com/infinyon/fluvio/issues/2369
IIUC, we will introduce a new consumer_id
identifier (not stream_id,
right?) which can be user-defined or random.
This id will be added to 3 requests:
-
FetchOffsetsRequest
- to readlast_read_offset
perconsumer_id
. -
StreamFetchRequest
- to validate and forbid more then 1stream_id
perconsumer_id
andpartition
. -
UpdateOffsetsRequest
- to updatelast_read_offset
perconsumer_id
.
Did I understand it correctly?
should be consumer_id
rather than customer_id
. Should add to ConsumerConfig
should be
consumer_id
rather thancustomer_id
. Should add toConsumerConfig
Right, thanks! I edited my comment.
I have a few questions:
- Can a
stream
have multiple concurrentconsumer_ids
? - Are
consumer_ids
managed?- Do I need to update the offset after every read, or it happens automatically?
- Can I delete them, or they age after a certain time?
- Can
consumer_ids
be reset or moved to a specific offset? - What happens if the offset is out of range?
- Can we add a spec that explains how this whole thing works? This spec would also drive the documentation on
fluvio.io
once it's done.
Thoughts:
-
consumer
can be associated with a singleconsumer_id
. -
consumer_id
is managed by the consumer. if there is no existing offset, then it uses the offset parameter as the default starting offset. - There is no delete since offsets are stored in the internal [TBD] topic. Eventually, they will be purged based on the retention policy.
- if the offset is out of range (for example, records are purged based on retention), then it will resume from the first offset
should be
consumer_id
rather thancustomer_id
. Should add toConsumerConfig
I'm on the fence about putting it ConsumerConfig
. I think we could have it as a part of the Enum in Offset
that way you'd do something like Offset::from_last_offset(consumer_id)
. This feels more ergonomic to me.
Can a stream have multiple concurrent consumer_ids?
Yes, this is a requirement. You can have consumer_id
of foo
and bar
and those two last-offsets will be updated.
What happens if the offset is out of range?
We'll do min(max(...))
to ensure it's not over the stable_offset
.
Can I delete them, or they age after a certain time?
We should just add delete so that we can test more easily but that probably won't be the usual flow. It should just be deleted after a retention policy denotes it's "expired" (whatever this means).
IIUC, we will introduce a new
consumer_id
identifier (notstream_id,
right?) which can be user-defined or random. This id will be added to 3 requests:
FetchOffsetsRequest
- to readlast_read_offset
perconsumer_id
.StreamFetchRequest
- to validate and forbid more then 1stream_id
perconsumer_id
andpartition
.UpdateOffsetsRequest
- to updatelast_read_offset
perconsumer_id
.Did I understand it correctly?
FetchOffsetsRequest
already exists but we need the last_read_offset
Numbers 2 and 3 are correct.
I'm open to using stream_id
as well. I only have consumer_id
because it's a consumer stream.
Stale issue message
What happened to this? Is there a solution to this issue in 2024?
@jhass, we should have addressed this earlier. We just defined a RFC we'd like to solicit your feedback on. It is targeted for release this quarter. We'll post a link to the RFC shortly.
@jhass this is ready for review - https://github.com/infinyon/fluvio/blob/master/rfc/offset-management.md
Hey @ajhunyady, thanks for the very fast response, I honestly didn't expect this :)
Thanks for writing up the RFC, I think overall it can cover the most important usecases and I would encourage you to implement it, at least as a first step into a perhaps more advanced solution.
One point that doesn't get quite clear for me is what happens if multiple consumers using the same consumer_id
and topic would be started.
I see that while they would be listening to different partitions of the same topic there's probably no issue, your CLI examples hint at that, though I would recommend extending the RFC with clear wording about this.
But what would happen with multiple consumers using the same topic, partition and consumer_id
running at the same time?
I imagine the easiest solution to get an initial implementation going will be to just throw an error for all but the first consumer starting to listen.
To an extent this can scale by creating partitions within a topic not for the point of scaling the data storage but for scaling the amount of consumers you can run. However I'd be concerned one would quickly run into trouble for workloads where producing is pretty cheap but consuming is expensive. Since this setup would still limit you to a single consumer per partition I could easily imagine a very fast producer outrunning the consumers.
I might be missing something here, my practical experience with Fluvio is still quite limited I have to admit 😇 Should my concern be valid I would still recommend implementing this first version of this feature, making sure any potential undefined behavior errors out for now, and then tackling this point with the very next iteration of this feature.
Key idea is that there is a single "primary" reader for each named partition consumer similar to a single writer concept.
If primary fails then secondary consumers would take over. This is general concept behind consumer group. For mvp, we would wont allow secondary. Next step would be adding secondary.
There is no issue consumer falling behind is that fluvio is event log stream not message queue. As long as topic retention is big enough, consumer can catch up. If consumer can't catch up then proper partitioning is needed since this is more balancing producer and consumer problem
Hey @ajhunyady, thanks for the very fast response, I honestly didn't expect this :)
Thanks for writing up the RFC, I think overall it can cover the most important usecases and I would encourage you to implement it, at least as a first step into a perhaps more advanced solution.
One point that doesn't get quite clear for me is what happens if multiple consumers using the same
consumer_id
and topic would be started.I see that while they would be listening to different partitions of the same topic there's probably no issue, your CLI examples hint at that, though I would recommend extending the RFC with clear wording about this.
But what would happen with multiple consumers using the same topic, partition and
consumer_id
running at the same time?I imagine the easiest solution to get an initial implementation going will be to just throw an error for all but the first consumer starting to listen.
To an extent this can scale by creating partitions within a topic not for the point of scaling the data storage but for scaling the amount of consumers you can run. However I'd be concerned one would quickly run into trouble for workloads where producing is pretty cheap but consuming is expensive. Since this setup would still limit you to a single consumer per partition I could easily imagine a very fast producer outrunning the consumers.
I might be missing something here, my practical experience with Fluvio is still quite limited I have to admit 😇 Should my concern be valid I would still recommend implementing this first version of this feature, making sure any potential undefined behavior errors out for now, and then tackling this point with the very next iteration of this feature.
@jhass, good observation, the first implementation will have the drawback you mentioned as the implementation is meant for one consumer per consumer-id. We'll need to build consumer groups to cover the case more comprehensively. However, we want to get something up and running and build things incrementally from there.
On your second point, if the consumer falls behind due to traffic bursts (as hinted in @sehz's reply), the consumer should gracefully recover. If this is a pervasive problem, the workaround would be to divide the traffic in partitions.
We all agree that this issue will be solved by consumer groups.