faust icon indicating copy to clipboard operation
faust copied to clipboard

Writing into Table with Stream.take() not allowed

Open dada-engineer opened this issue 2 years ago • 15 comments

Steps to reproduce

When using the Stream.take() function I can not write into a Table but get the error Cannot modify table key from outside of stream iteration.

Can I just not write to tables when using take?

Example:

from faust import App

app = App(id="take-error-example")

topic = app.topic('orders')
table = app.Table('orders')

@app.agent(topic)
async def consumer(stream):
    async for values in stream.take(1):
        for value in values:
            table["key"] = "some value"
            yield value

Expected behavior

The key value pair should be written into the Table

Actual behavior

File ".../python3.10/site-packages/mode/utils/collections.py", line 652, in __setitem__
    self.on_key_set(key, value)
  File ".../python3.10/site-packages/faust/tables/table.py", line 77, in on_key_set
    fut = self.send_changelog(self.partition_for_key(key), key, value)
  File ".../python3.10/site-packages/faust/tables/base.py", line 314, in partition_for_key
    raise TypeError(
TypeError: Cannot modify table key from outside of stream iteration

Versions

  • Python version: 3.10.6
  • Faust version: 0.10.11
  • Operating system: MacOS 13

dada-engineer avatar Apr 20 '23 13:04 dada-engineer

Workaround for now is that we use a sink for this or read another topic

dada-engineer avatar May 03 '23 20:05 dada-engineer

Workaround for now is that we use a sink for this or read another topic

Can you explain the workaround? Modified pseudocode from the example perhaps? I'm afraid of hitting the same issue and I need .take(N) to work for large N as we benefit from batch processing.

mcskatkat avatar Jun 21 '23 11:06 mcskatkat

@mcskatkat Sorry after checking the source code more thoroughly a sink approach is possibly not suitable as the current event is checked against the processed event and this must be the same event (which it isn't as current event is the last event in take, hence the error). You might need to add a new channel (if not backed by kafka else a topic) and publish all the data consumed via take to this channel. consume the channel without a take approach to modify the table. Might not work in all situations though but worked for us...

dada-engineer avatar Jun 21 '23 12:06 dada-engineer

Thanks @dabdada . Actually in my case the table update uses associative reductions on events yielded from .take(), so I'm totally ok making just one table update with the "current event" being the last in the batch. Much better than accessing the table for every event.

However from your example I gather that this is impossible, as Faust will raise an exception because I didn't iter the stream itself.

mcskatkat avatar Jun 21 '23 12:06 mcskatkat

It depends, in our experience when you use stream.current_event you get either the latest message in take or None (which is probably bad but might be addressed in the PR by @richardhundt). So stream.current_event is afaik the same that the table write checks, so it might work out ok.

dada-engineer avatar Jun 21 '23 12:06 dada-engineer

Thanks. I suppose it's worth giving it a try then.

mcskatkat avatar Jun 21 '23 12:06 mcskatkat

My PR doesn't fix this. The issue is that current_event is used to parameterize the rocksdb write operation via a global. This needs cooperation from the underlying consumer which has to set that variable, before yielding a time-slice to your agent. ~~This is simply not being done in cloned streams such as with take. You'd need another PR where you go through all those methods and set that global~~. This is being done in Stream but for some reason the ContextVar storing the current_event isn't visible from the rocksdb.Store's _set method.

Relying on the previously set event is kinda dangerous when you have any sort of meaningful concurrency (M partitions, N agents). I suspect that you'll start seeing weird behaviour when your writes and reads aren't hitting the same underlying rocksdb assigned to a given partition.

EDIT: another way to fix this whole thing is to drop the idea of having a separate rocksdb for each partition. It makes sense as long as you have separate processes or system-level threads on each partition, but if you don't have real parallelism as with single-threaded multitasking, then you don't gain anything. From what I can tell, rocksdb access is single threaded in faust, so there's no contention on the underlying store.

richardhundt avatar Jun 21 '23 14:06 richardhundt

@richardhundt Thanks for the explanation.

I'm afraid I'm a bit out of my depth here with Faust internals, for the time being at least, as I'm just starting out with it.

One thing that might be going for me is that I don't expect to have too much concurrency over the same data, i.e. probably going to have a single-partition single-agent per topic, with parallelization over multiple topics only. If I understood your explanation correctly, this carries less risk.

mcskatkat avatar Jun 22 '23 05:06 mcskatkat

@mcskatkat I've got to correct my understanding as well. It turns out that setting the current_event is done by the Stream, and this is actually happening correctly, however it's a ContextVar and the rocksdb task doesn't inherit that context, so it doesn't see it :(

richardhundt avatar Jun 22 '23 08:06 richardhundt

@mcskatkat okay, here's a what will work:

app = App(id="take-error-example")

topic = app.topic('orders-1')
table = app.Table('orders-1')

@app.agent(topic)
async def consumer(stream: faust.StreamT):
    async for values in stream.take_events(1, 3):
        async for value in app.stream(values):
            table["key"] = "some value"
            yield value

That ensures that you 1) have an iterable of Event objects in values and 2) that you're wrapping this in a Stream so that current_event is set correctly. That second part is actually important because otherwise the dispatcher really doesn't know which event you're currently iterating over if you just have a list of values.

richardhundt avatar Jun 22 '23 09:06 richardhundt

@mcskatkat okay, here's a what will work:

Great, I think this directly addresses the OP issue. What if I want to make just one table update (rather than one per event) after, say, summing over values above? Do I still need to construct a stream?

BTW, is it significant to use .take_events() rather than .take()? or .take_with_timestamp()? I wonder why those three don't delegate to one another... Each has its own inline implementation.

EDIT: after reading the last part of your message again, I figure that creating a Stream is necessary. I guess I can still suppress writing to the table except for the last event taken.

mcskatkat avatar Jun 22 '23 09:06 mcskatkat

Yeah, you seem to need take_events as it blows up with TypeError('Cannot modify table key from outside of stream iteration') otherwise. That's interesting though in itself.

I take that to mean that constructing a Stream from a list of non-event values is allowed, but the internals detect that these are not of type Event and therefore (correctly) don't set current_event.

Of course, this makes sense given that the Event object has information about the partition, but the wrapped value doesn't. I still think a SimpleRocksDB version which didn't try to create a sharded storage layer and allowed writes from anywhere would be a useful addition. Or at least GlobalTable shouldn't depend on knowing the partition.

richardhundt avatar Jun 22 '23 13:06 richardhundt

I have the same question. How should I update table once per multiple events? updating per event will cause the producer buffer to get full.

aliosia avatar Jul 01 '23 13:07 aliosia

@mcskatkat I have tested the following and I think it is working:

@app.agent(topic2)
async def agent_b(stream):
    async for event in stream:
        # update table here

@app.agent(topic1, sink=[topic2])
async def agent_a(stream):
    async for event in stream.take(n, within=t):
        yield values

aliosia avatar Jul 02 '23 06:07 aliosia

@mcskatkat I have tested the following and I think it is working:

Thanks!

mcskatkat avatar Jul 02 '23 18:07 mcskatkat