faust icon indicating copy to clipboard operation
faust copied to clipboard

Storing commits on a database

Open jamesellis1999 opened this issue 2 years ago • 1 comments

Hi,

This is more of a general question than an issue to reproduce, so please pardon me not following the template.

Is it possible to store the offset of a consumer in an external database? Ideally, I would like to commit my offsets with the other data I have in a single database transaction. How would I go about doing this in Faust?

Thanks!

jamesellis1999 avatar Mar 06 '23 16:03 jamesellis1999

Not sure if this helps, but you can get the offset as each event comes into your stream processor, then use that while doing something with the other data too:

@app.agent(some_topic)
async def process_event_with_offset(stream):
  async for event in stream.events():
    # use event.value for your data
    # event.message.offset for the offset of the event
    ....

fonty422 avatar Apr 04 '23 05:04 fonty422