faust icon indicating copy to clipboard operation
faust copied to clipboard

Why doesn't Event.forward default to the timestamp of the event?

Open cowboygneox opened this issue 2 years ago • 1 comments

So in Event.forward: https://github.com/faust-streaming/faust/blob/master/faust/events.py#L167

which is used by the Stream.group_by: https://github.com/faust-streaming/faust/blob/master/faust/streams.py#L890

When using group_by, the original timestamp of the event is lost. In the case of the windowing I'm experimenting with, losing this timestamp breaks the utility of using group_by and instead forces me to either build the topic with the correct partitioning key (which I may not always have control over) or to effectively do what group_by is supposed to do.

Is this intended behavior? Can I make a PR that will change Event.forward to default to Event.message.timestamp if timestamp is not provided?

Thanks!

cowboygneox avatar Dec 29 '22 20:12 cowboygneox

I guess we could also explicitly pass the event.message.timestamp in the Stream.group_by.repartition as well. That would fix this immediate problem and lessen any blast radius.

So I'm suggesting something like:

        async def repartition(value: T) -> T:
            event = self.current_event
            if event is None:
                raise RuntimeError("Cannot repartition stream with non-topic channel")
            new_key = await format_key(key, value)
            await event.forward(channel, key=new_key, timestamp=event.message.timestamp)
            return value

cowboygneox avatar Dec 29 '22 20:12 cowboygneox