faust
faust copied to clipboard
Writing into Table with Stream.take() not allowed
Steps to reproduce
When using the Stream.take() function I can not write into a Table but get the error Cannot modify table key from outside of stream iteration.
Can I just not write to tables when using take?
Example:
from faust import App
app = App(id="take-error-example")
topic = app.topic('orders')
table = app.Table('orders')
@app.agent(topic)
async def consumer(stream):
async for values in stream.take(1):
for value in values:
table["key"] = "some value"
yield value
Expected behavior
The key value pair should be written into the Table
Actual behavior
File ".../python3.10/site-packages/mode/utils/collections.py", line 652, in __setitem__
self.on_key_set(key, value)
File ".../python3.10/site-packages/faust/tables/table.py", line 77, in on_key_set
fut = self.send_changelog(self.partition_for_key(key), key, value)
File ".../python3.10/site-packages/faust/tables/base.py", line 314, in partition_for_key
raise TypeError(
TypeError: Cannot modify table key from outside of stream iteration
Versions
- Python version: 3.10.6
- Faust version: 0.10.11
- Operating system: MacOS 13
Workaround for now is that we use a sink for this or read another topic
Workaround for now is that we use a sink for this or read another topic
Can you explain the workaround? Modified pseudocode from the example perhaps?
I'm afraid of hitting the same issue and I need .take(N) to work for large N as we benefit from batch processing.
@mcskatkat Sorry after checking the source code more thoroughly a sink approach is possibly not suitable as the current event is checked against the processed event and this must be the same event (which it isn't as current event is the last event in take, hence the error). You might need to add a new channel (if not backed by kafka else a topic) and publish all the data consumed via take to this channel. consume the channel without a take approach to modify the table. Might not work in all situations though but worked for us...
Thanks @dabdada .
Actually in my case the table update uses associative reductions on events yielded from .take(), so I'm totally ok making just one table update with the "current event" being the last in the batch. Much better than accessing the table for every event.
However from your example I gather that this is impossible, as Faust will raise an exception because I didn't iter the stream itself.
It depends, in our experience when you use stream.current_event you get either the latest message in take or None (which is probably bad but might be addressed in the PR by @richardhundt). So stream.current_event is afaik the same that the table write checks, so it might work out ok.
Thanks. I suppose it's worth giving it a try then.
My PR doesn't fix this. The issue is that current_event is used to parameterize the rocksdb write operation via a global. This needs cooperation from the underlying consumer which has to set that variable, before yielding a time-slice to your agent. ~~This is simply not being done in cloned streams such as with take. You'd need another PR where you go through all those methods and set that global~~. This is being done in Stream but for some reason the ContextVar storing the current_event isn't visible from the rocksdb.Store's _set method.
Relying on the previously set event is kinda dangerous when you have any sort of meaningful concurrency (M partitions, N agents). I suspect that you'll start seeing weird behaviour when your writes and reads aren't hitting the same underlying rocksdb assigned to a given partition.
EDIT: another way to fix this whole thing is to drop the idea of having a separate rocksdb for each partition. It makes sense as long as you have separate processes or system-level threads on each partition, but if you don't have real parallelism as with single-threaded multitasking, then you don't gain anything. From what I can tell, rocksdb access is single threaded in faust, so there's no contention on the underlying store.
@richardhundt Thanks for the explanation.
I'm afraid I'm a bit out of my depth here with Faust internals, for the time being at least, as I'm just starting out with it.
One thing that might be going for me is that I don't expect to have too much concurrency over the same data, i.e. probably going to have a single-partition single-agent per topic, with parallelization over multiple topics only. If I understood your explanation correctly, this carries less risk.
@mcskatkat I've got to correct my understanding as well. It turns out that setting the current_event is done by the Stream, and this is actually happening correctly, however it's a ContextVar and the rocksdb task doesn't inherit that context, so it doesn't see it :(
@mcskatkat okay, here's a what will work:
app = App(id="take-error-example")
topic = app.topic('orders-1')
table = app.Table('orders-1')
@app.agent(topic)
async def consumer(stream: faust.StreamT):
async for values in stream.take_events(1, 3):
async for value in app.stream(values):
table["key"] = "some value"
yield value
That ensures that you 1) have an iterable of Event objects in values and 2) that you're wrapping this in a Stream so that current_event is set correctly. That second part is actually important because otherwise the dispatcher really doesn't know which event you're currently iterating over if you just have a list of values.
@mcskatkat okay, here's a what will work:
Great, I think this directly addresses the OP issue.
What if I want to make just one table update (rather than one per event) after, say, summing over values above?
Do I still need to construct a stream?
BTW, is it significant to use .take_events() rather than .take()? or .take_with_timestamp()? I wonder why those three don't delegate to one another... Each has its own inline implementation.
EDIT: after reading the last part of your message again, I figure that creating a Stream is necessary. I guess I can still suppress writing to the table except for the last event taken.
Yeah, you seem to need take_events as it blows up with TypeError('Cannot modify table key from outside of stream iteration') otherwise. That's interesting though in itself.
I take that to mean that constructing a Stream from a list of non-event values is allowed, but the internals detect that these are not of type Event and therefore (correctly) don't set current_event.
Of course, this makes sense given that the Event object has information about the partition, but the wrapped value doesn't. I still think a SimpleRocksDB version which didn't try to create a sharded storage layer and allowed writes from anywhere would be a useful addition. Or at least GlobalTable shouldn't depend on knowing the partition.
I have the same question. How should I update table once per multiple events? updating per event will cause the producer buffer to get full.
@mcskatkat I have tested the following and I think it is working:
@app.agent(topic2)
async def agent_b(stream):
async for event in stream:
# update table here
@app.agent(topic1, sink=[topic2])
async def agent_a(stream):
async for event in stream.take(n, within=t):
yield values
@mcskatkat I have tested the following and I think it is working:
Thanks!