faust icon indicating copy to clipboard operation
faust copied to clipboard

Ordering between partitions

Open jdesfossez opened this issue 2 years ago • 0 comments

Hi,

I am new to Faust and I am trying to understand how to make sure I have processed the oldest event by partition.

I am using the group_by syntax to shard the data on a field, my issue is that I don't know how to access the partition when iterating. I think with the partition ID I would be able to maintain a local mapping between partition ID and oldest timestamp by partition. This would allow me to know when I have received the oldest message possible.

This is my code:

class Metric(faust.Record, serializer='json', isodates=True):
    hostname: str
    name: str
    timestamp: datetime
    value: float

topic = app.topic('metrics', value_type=Metric)

@app.agent(topic)
async def process_metrics(messages):
    async for message in messages.group_by(Metric.hostname):
       # message is of type Metric so I don't see how to access the partition ID here

Is there a way to do what I am looking for ? Or is there a better way to ensure I won't receive an event from the past by hostname ? Thanks !

jdesfossez avatar Jul 19 '22 02:07 jdesfossez