SetTable crashes when using RocksDB
Checklist
- [x] I have included information about relevant versions
- [x] I have verified that the issue persists when using the
masterbranch of Faust.
Steps to reproduce
If you use "rocksdb" as storage you got this error, it doesn't happen if you use memory.
Basically, I have a list of tasks that I load from the database and store locally (no kafka here), then I have some external events that are triggering execution of these tasks, the only thing I want is to not send a task to a kafka if it's already was sent and currently processing. I use a SetTable to track the progress of the task execution. All topics and this table have the same number of partitions and also use the same key, so they should be co-partitioned.
import random
import asyncio
import faust
import logging
app = faust.App(
id=f"rocksdb_test",
broker='localhost:9092',
store='rocksdb://'
)
event_topic = app.topic('test_events', key_type=str, value_type=str, partitions=8)
task_queue_topic = app.topic('test_task_queue_topic', key_type=str, value_serializer='json', partitions=8)
task_completion_topic = app.topic('test_task_completion_topic', key_type=str, value_serializer='json', partitions=8)
tasks_progress = app.SetTable('test_tasks_progress', partitions=8, value_type=int)
# hardcoded tasks
TASK_DEFINITIONS = {
"foo": [
{"id": 1},
{"id": 2},
{"id": 3}
],
"bar": [
{"id": 4},
{"id": 5},
{"id": 6},
],
"hello": [{"id": 7}],
"world": [{"id": 8}],
}
# =================================== SCHEDULER ================================
async def task_queue_sink(job):
logging.info(f"Sending task: task_id = {job['task_id']} / uuid = {job['uuid']}")
await task_queue_topic.send(key=job['key'], value=job)
tasks_progress[job['key']].add(job['task_id'])
@app.agent(event_topic, sink=[task_queue_sink])
async def events_stream(stream: faust.Stream):
async for key in stream:
for task in TASK_DEFINITIONS[key]:
if task['id'] not in tasks_progress[key]:
new_job = {
'uuid': faust.uuid(),
'task_id': task['id'],
'key': key
}
yield new_job
@app.agent(task_completion_topic)
async def task_completions(stream: faust.Stream):
async for job in stream:
logging.info(f"Received Task Completion: id = {job['task_id']} / uuid = {job['uuid']}")
tasks_progress[job['key']].discard(job['task_id'])
# ==================================================================================
# ================================== WORKER ========================================
async def task_completed_sink(job):
logging.info(f"Sending task completion! task_id = {job['task_id']} / uuid = {job['uuid']}")
await task_completion_topic.send(key=job["key"], value=job)
@app.agent(task_queue_topic, sink=[task_completed_sink])
async def worker(stream: faust.Stream):
async for job in stream:
logging.info(f"Task Received!: task_id = {job['task_id']} / uuid = {job['uuid']}")
await asyncio.sleep(1) # DO SOME WORK HERE
logging.info(f"Task Processed!: task_id = {job['task_id']} / uuid = {job['uuid']}")
yield job
# ==================================================================================
# ================================= 3rd party events ===============================
@app.timer(interval=1.0)
async def push_events_every_second():
key = random.choice(list(TASK_DEFINITIONS.keys()))
logging.info(f"Pushing key = {key}")
await event_topic.send(key=key, value=key)
# ====================================================================================
if __name__ == '__main__':
app.main()
Tell us what you did to cause something to happen.
Expected behavior
I should be able to store data to the SetTable that uses RocksDB without any errors.
Actual behavior
AssertionError: assert event is not None
../python3.9/site-packages/faust/stores/rocksdb.py#274
Full traceback
[2022-01-19 14:26:10,389] [33567] [ERROR] [^----ChangeloggedSetManager: None]: Crashed reason=AssertionError()
Traceback (most recent call last):
File "******/projects/scheduler/.venv/lib/python3.9/site-packages/mode/services.py", line 802, in _execute_task
await task
File "******/projects/scheduler/.venv/lib/python3.9/site-packages/faust/tables/objects.py", line 146, in _periodic_flush
self.flush_to_storage()
File "******/projects/scheduler/.venv/lib/python3.9/site-packages/faust/tables/objects.py", line 139, in flush_to_storage
self.storage[key] = self.data[key].as_stored_value()
File "******/projects/scheduler/.venv/lib/python3.9/site-packages/faust/stores/base.py", line 208, in __setitem__
return self._set(self._encode_key(key), self._encode_value(value))
File "******/projects/scheduler/.venv/lib/python3.9/site-packages/faust/stores/rocksdb.py", line 274, in _set
assert event is not None
AssertionError
[2022-01-19 14:26:10,398] [33567] [INFO] [^Worker]: Stopping...
[2022-01-19 14:26:10,399] [33567] [INFO] [^-App]: Stopping...
Versions
- Python version = 3.9.5
- Faust version = 0.8.0
- Operating system = macOS Monterey M1 (Apple Silicone)
- Kafka version = 3.0.0
- RocksDB version (if applicable) = must be latest (6.27.3)
Can be reproduced also by example examples/tableofset.py jsut by adding store='rocksdb://'
Code that produces assert is similar to all other table operations where there is this code instead
if event is None:
raise TypeError("Cannot modify table key from outside of stream iteration")
After examining the code, this is exactly the case. The table is written outside of the stream iteration. There is a separate periodic task that takes the memory representation and tries to flush it out to the RocksDB table. However, this cannot work. I just don't understand how it can work at all any time, because it is broken by design.
@j123b567 I highly recommend you to use java spring + kafka streams directly if it's possible. After spending many days on faust I decided to switch and after a couple of days of learning, I did what I wanted. I thought that fixing issues that you face with faust will require you to contribute to this repo A LOT. Faust is a great tool and this is really sad that nobody maintains it, and on the other hand, only people who know kafka steams well can do it.
@elja my app is now functional and using faust-streaming :D but this proposal will be probably my next step.
I took a look at your example and this part caught my attention:
async def task_queue_sink(job):
logging.info(f"Sending task: task_id = {job['task_id']} / uuid = {job['uuid']}")
await task_queue_topic.send(key=job['key'], value=job)
tasks_progress[job['key']].add(job['task_id'])
Using this as an agent sink to modify a table is entering murky waters. Taken from https://faust.readthedocs.io/en/latest/userguide/agents.html#concurrency:
Warning:
Concurrent instances of an agent will process the stream out-of-order, so you cannot mutate tables from within the agent function:
An agent having concurrency > 1, can only read from a table, never write.
I haven't tried writing to a Table using your methodology, so i'm afraid I can't be too helpful.
memory storage works because it's a base.Store and is relatively straightforward. The rocksdb driver is derived from the base.SerializedStore class, which is a separate beast altogether. The functionality required for the rocksdb driver to satisfy your use-case will require some changes made to faust.