changelog_topic argument in Table declaration seems to be ignored
Checklist
- [ ] I have included information about relevant versions
- [x ] I have verified that the issue persists when using the
masterbranch of Faust.
Steps to reproduce
Our Kafka configuration (with ACLs) denies dynamic topic creation. So I manually created a topic dedicated to a faust Table changelog topic. I try to pass this topic to Table definition by changelog_topic argument but it seems to be ignored
app = faust.App('window-agg', broker=KAFKA, topic_allow_declare=False, topic_disable_leader=True)
source = app.topic('raw-event', value_type=RawModel)
sink = app.topic('agg-event', value_type=AggModel)
chlog_topic = app.topic('table_changelog', internal=False)
tumbling_table = app.Table(
'tumbling_table',
default=list,
changelog_topic=chlog_topic,
on_window_close=window_processor,
).tumbling(10)
Expected behavior
Faust should use provided topic as Table changelog topic
Actual behavior
Instead faust tries to create a new topic for Table changelog : window-agg-tumbling_table-changelog
[2022-04-23 00:04:04,878] [31014] [INFO] Updating subscribed topics to:
┌Requested Subscription───────────────┐
│ topic name │
├─────────────────────────────────────┤
│ raw-event │
│ window-agg-tumbling_table-changelog │
└─────────────────────────────────────┘
[2022-04-23 00:04:04,878] [31014] [INFO] Subscribed to topic(s):
┌Final Subscription───────────────────┐
│ topic name │
├─────────────────────────────────────┤
│ raw-event │
│ window-agg-tumbling_table-changelog │
└─────────────────────────────────────┘
[2022-04-23 00:04:04,883] [31014] [ERROR] Topic window-agg-tumbling_table-changelog not found in cluster metadata
[2022-04-23 00:04:04,885] [31014] [INFO] Discovered coordinator 0 for group window-agg
[2022-04-23 00:04:04,885] [31014] [INFO] Revoking previously assigned partitions set() for group window-agg
[2022-04-23 00:04:04,886] [31014] [INFO] (Re-)joining group window-agg
[2022-04-23 00:04:04,890] [31014] [INFO] Joined group 'window-agg' (generation 1) with member_id faust-0.8.4-13090e05-ac7e-45a2-b3c3-4c1c8e37958f
[2022-04-23 00:04:04,890] [31014] [INFO] Elected group leader -- performing partition assignments using faust
[2022-04-23 00:04:04,891] [31014] [WARNING] Ignoring missing topic: 'window-agg-tumbling_table-changelog'
[2022-04-23 00:04:04,894] [31014] [INFO] Successfully synced group window-agg with generation 1
[2022-04-23 00:04:04,895] [31014] [INFO] Setting newly assigned partitions
┌Topic Partition Set─────┐
│ topic │ partitions │
├───────────┼────────────┤
│ raw-event │ {0} │
└───────────┴────────────┘ for group window-agg
[2022-04-23 00:04:04,896] [31014] [INFO] Executing _on_partitions_assigned
[2022-04-23 00:04:04,898] [31014] [INFO] generation id 1 app consumers id 1
[2022-04-23 00:04:04,901] [31014] [INFO] [^---Recovery]: Highwater for active changelog partitions:
┌Highwater - Active─┬───────────┐
│ topic │ partition │ highwater │
└───────┴───────────┴───────────┘
[2022-04-23 00:04:04,903] [31014] [INFO] [^---Recovery]: Resuming flow...
[2022-04-23 00:04:04,904] [31014] [INFO] [^---Recovery]: Recovery complete
[2022-04-23 00:04:04,904] [31014] [INFO] [^---Recovery]: Restore complete!
[2022-04-23 00:04:04,904] [31014] [INFO] [^---Recovery]: Seek stream partitions to committed offsets.
[2022-04-23 00:04:04,906] [31014] [INFO] [^---Fetcher]: Starting...
[2022-04-23 00:04:04,906] [31014] [INFO] [^---Recovery]: Worker ready
[2022-04-23 00:04:04,907] [31014] [INFO] [^Worker]: Ready
[2022-04-23 00:04:04,910] [31014] [ERROR] [^----Agent*: agg_exam[.]print_windowed_events]: Crashed reason=PartitionsMismatch("The source topic 'raw-event' for table 'tumbling_table'\nhas 1 partitions, but the changelog\ntopic 'window-agg-tumbling_table-changelog' has 0 partitions.\n\nPlease make sure the topics have the same number of partitions\nby configuring Kafka correctly.\n")
Full traceback
Paste the full traceback (if there is any)
Versions
- Python version : Python 3.8.10
- Faust version : faust, version Faust 0.8.4
- Operating system : Ubuntu 20.04
- Kafka version : 2.8.0
- RocksDB version (if applicable)
I can confirm this behavior. I tried different way to pass the topic as string or topic but wihtout success.
Looks like the changelog topic is hardcoded in https://github.com/faust-streaming/faust/blob/master/faust/tables/base.py#L419 and outside of base.py, it's only invocation is at https://github.com/faust-streaming/faust/blob/master/faust/assignor/partition_assignor.py#L321. I assume turning the changelog topic name into a property with a setter and getter should get the issue fixed.