fluvio icon indicating copy to clipboard operation
fluvio copied to clipboard

[Tracking] Consumer Stream Resume from last used offset

Open simlay opened this issue 2 years ago • 14 comments

When creating a consumer stream one should be able to start a stream using an offset such as:

let stream_id = String::from("my-stream-id");
let offset = Offset::from_last_read(stream_id);

To do this we need:

  • [x] https://github.com/infinyon/fluvio/issues/2367
  • [x] https://github.com/infinyon/fluvio/issues/2368
  • [x] https://github.com/infinyon/fluvio/issues/2369

simlay avatar May 12 '22 17:05 simlay

IIUC, we will introduce a new consumer_id identifier (not stream_id, right?) which can be user-defined or random. This id will be added to 3 requests:

  1. FetchOffsetsRequest - to read last_read_offset per consumer_id.
  2. StreamFetchRequest - to validate and forbid more then 1 stream_id per consumer_id and partition.
  3. UpdateOffsetsRequest - to update last_read_offset per consumer_id.

Did I understand it correctly?

galibey avatar May 13 '22 12:05 galibey

should be consumer_id rather than customer_id. Should add to ConsumerConfig

sehz avatar May 13 '22 13:05 sehz

should be consumer_id rather than customer_id. Should add to ConsumerConfig

Right, thanks! I edited my comment.

galibey avatar May 13 '22 14:05 galibey

I have a few questions:

  • Can a stream have multiple concurrent consumer_ids?
  • Are consumer_ids managed?
    • Do I need to update the offset after every read, or it happens automatically?
    • Can I delete them, or they age after a certain time?
  • Can consumer_ids be reset or moved to a specific offset?
  • What happens if the offset is out of range?
  • Can we add a spec that explains how this whole thing works? This spec would also drive the documentation on fluvio.io once it's done.

ajhunyady avatar May 13 '22 14:05 ajhunyady

Thoughts:

  • consumer can be associated with a single consumer_id.
  • consumer_id is managed by the consumer. if there is no existing offset, then it uses the offset parameter as the default starting offset.
  • There is no delete since offsets are stored in the internal [TBD] topic. Eventually, they will be purged based on the retention policy.
  • if the offset is out of range (for example, records are purged based on retention), then it will resume from the first offset

sehz avatar May 13 '22 14:05 sehz

should be consumer_id rather than customer_id. Should add to ConsumerConfig

I'm on the fence about putting it ConsumerConfig. I think we could have it as a part of the Enum in Offset that way you'd do something like Offset::from_last_offset(consumer_id). This feels more ergonomic to me.

Can a stream have multiple concurrent consumer_ids?

Yes, this is a requirement. You can have consumer_id of foo and bar and those two last-offsets will be updated.

What happens if the offset is out of range?

We'll do min(max(...)) to ensure it's not over the stable_offset.

Can I delete them, or they age after a certain time?

We should just add delete so that we can test more easily but that probably won't be the usual flow. It should just be deleted after a retention policy denotes it's "expired" (whatever this means).

simlay avatar May 13 '22 17:05 simlay

IIUC, we will introduce a new consumer_id identifier (not stream_id, right?) which can be user-defined or random. This id will be added to 3 requests:

  1. FetchOffsetsRequest - to read last_read_offset per consumer_id.
  2. StreamFetchRequest - to validate and forbid more then 1 stream_id per consumer_id and partition.
  3. UpdateOffsetsRequest - to update last_read_offset per consumer_id.

Did I understand it correctly?

FetchOffsetsRequest already exists but we need the last_read_offset

Numbers 2 and 3 are correct.

I'm open to using stream_id as well. I only have consumer_id because it's a consumer stream.

simlay avatar May 13 '22 17:05 simlay

Stale issue message

github-actions[bot] avatar Jul 13 '22 11:07 github-actions[bot]

What happened to this? Is there a solution to this issue in 2024?

jhass avatar Feb 07 '24 18:02 jhass

@jhass, we should have addressed this earlier. We just defined a RFC we'd like to solicit your feedback on. It is targeted for release this quarter. We'll post a link to the RFC shortly.

ajhunyady avatar Feb 07 '24 18:02 ajhunyady

@jhass this is ready for review - https://github.com/infinyon/fluvio/blob/master/rfc/offset-management.md

ajhunyady avatar Feb 07 '24 21:02 ajhunyady

Hey @ajhunyady, thanks for the very fast response, I honestly didn't expect this :)

Thanks for writing up the RFC, I think overall it can cover the most important usecases and I would encourage you to implement it, at least as a first step into a perhaps more advanced solution.

One point that doesn't get quite clear for me is what happens if multiple consumers using the same consumer_id and topic would be started.

I see that while they would be listening to different partitions of the same topic there's probably no issue, your CLI examples hint at that, though I would recommend extending the RFC with clear wording about this.

But what would happen with multiple consumers using the same topic, partition and consumer_id running at the same time?

I imagine the easiest solution to get an initial implementation going will be to just throw an error for all but the first consumer starting to listen.

To an extent this can scale by creating partitions within a topic not for the point of scaling the data storage but for scaling the amount of consumers you can run. However I'd be concerned one would quickly run into trouble for workloads where producing is pretty cheap but consuming is expensive. Since this setup would still limit you to a single consumer per partition I could easily imagine a very fast producer outrunning the consumers.

I might be missing something here, my practical experience with Fluvio is still quite limited I have to admit 😇 Should my concern be valid I would still recommend implementing this first version of this feature, making sure any potential undefined behavior errors out for now, and then tackling this point with the very next iteration of this feature.

jhass avatar Feb 08 '24 08:02 jhass

Key idea is that there is a single "primary" reader for each named partition consumer similar to a single writer concept.

If primary fails then secondary consumers would take over. This is general concept behind consumer group. For mvp, we would wont allow secondary. Next step would be adding secondary.

There is no issue consumer falling behind is that fluvio is event log stream not message queue. As long as topic retention is big enough, consumer can catch up. If consumer can't catch up then proper partitioning is needed since this is more balancing producer and consumer problem

sehz avatar Feb 08 '24 16:02 sehz

Hey @ajhunyady, thanks for the very fast response, I honestly didn't expect this :)

Thanks for writing up the RFC, I think overall it can cover the most important usecases and I would encourage you to implement it, at least as a first step into a perhaps more advanced solution.

One point that doesn't get quite clear for me is what happens if multiple consumers using the same consumer_id and topic would be started.

I see that while they would be listening to different partitions of the same topic there's probably no issue, your CLI examples hint at that, though I would recommend extending the RFC with clear wording about this.

But what would happen with multiple consumers using the same topic, partition and consumer_id running at the same time?

I imagine the easiest solution to get an initial implementation going will be to just throw an error for all but the first consumer starting to listen.

To an extent this can scale by creating partitions within a topic not for the point of scaling the data storage but for scaling the amount of consumers you can run. However I'd be concerned one would quickly run into trouble for workloads where producing is pretty cheap but consuming is expensive. Since this setup would still limit you to a single consumer per partition I could easily imagine a very fast producer outrunning the consumers.

I might be missing something here, my practical experience with Fluvio is still quite limited I have to admit 😇 Should my concern be valid I would still recommend implementing this first version of this feature, making sure any potential undefined behavior errors out for now, and then tackling this point with the very next iteration of this feature.

@jhass, good observation, the first implementation will have the drawback you mentioned as the implementation is meant for one consumer per consumer-id. We'll need to build consumer groups to cover the case more comprehensively. However, we want to get something up and running and build things incrementally from there.

On your second point, if the consumer falls behind due to traffic bursts (as hinted in @sehz's reply), the consumer should gracefully recover. If this is a pervasive problem, the workaround would be to divide the traffic in partitions.

We all agree that this issue will be solved by consumer groups.

ajhunyady avatar Feb 08 '24 17:02 ajhunyady