faust icon indicating copy to clipboard operation
faust copied to clipboard

changelog_topic argument in Table declaration seems to be ignored

Open cezary69 opened this issue 3 years ago • 2 comments

Checklist

  • [ ] I have included information about relevant versions
  • [x ] I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

Our Kafka configuration (with ACLs) denies dynamic topic creation. So I manually created a topic dedicated to a faust Table changelog topic. I try to pass this topic to Table definition by changelog_topic argument but it seems to be ignored

app = faust.App('window-agg', broker=KAFKA, topic_allow_declare=False, topic_disable_leader=True)

source = app.topic('raw-event', value_type=RawModel)
sink = app.topic('agg-event', value_type=AggModel)
chlog_topic = app.topic('table_changelog', internal=False)

tumbling_table = app.Table(
        'tumbling_table',
        default=list,
        changelog_topic=chlog_topic,
        on_window_close=window_processor,
    ).tumbling(10)

Expected behavior

Faust should use provided topic as Table changelog topic

Actual behavior

Instead faust tries to create a new topic for Table changelog : window-agg-tumbling_table-changelog

[2022-04-23 00:04:04,878] [31014] [INFO] Updating subscribed topics to: 
┌Requested Subscription───────────────┐
│ topic name                          │
├─────────────────────────────────────┤
│ raw-event                           │
│ window-agg-tumbling_table-changelog │
└─────────────────────────────────────┘ 
[2022-04-23 00:04:04,878] [31014] [INFO] Subscribed to topic(s): 
┌Final Subscription───────────────────┐
│ topic name                          │
├─────────────────────────────────────┤
│ raw-event                           │
│ window-agg-tumbling_table-changelog │
└─────────────────────────────────────┘ 
[2022-04-23 00:04:04,883] [31014] [ERROR] Topic window-agg-tumbling_table-changelog not found in cluster metadata 
[2022-04-23 00:04:04,885] [31014] [INFO] Discovered coordinator 0 for group window-agg 
[2022-04-23 00:04:04,885] [31014] [INFO] Revoking previously assigned partitions set() for group window-agg 
[2022-04-23 00:04:04,886] [31014] [INFO] (Re-)joining group window-agg 
[2022-04-23 00:04:04,890] [31014] [INFO] Joined group 'window-agg' (generation 1) with member_id faust-0.8.4-13090e05-ac7e-45a2-b3c3-4c1c8e37958f 
[2022-04-23 00:04:04,890] [31014] [INFO] Elected group leader -- performing partition assignments using faust 
[2022-04-23 00:04:04,891] [31014] [WARNING] Ignoring missing topic: 'window-agg-tumbling_table-changelog' 
[2022-04-23 00:04:04,894] [31014] [INFO] Successfully synced group window-agg with generation 1 
[2022-04-23 00:04:04,895] [31014] [INFO] Setting newly assigned partitions 
┌Topic Partition Set─────┐
│ topic     │ partitions │
├───────────┼────────────┤
│ raw-event │ {0}        │
└───────────┴────────────┘ for group window-agg 
[2022-04-23 00:04:04,896] [31014] [INFO] Executing _on_partitions_assigned 
[2022-04-23 00:04:04,898] [31014] [INFO] generation id 1 app consumers id 1 
[2022-04-23 00:04:04,901] [31014] [INFO] [^---Recovery]: Highwater for active changelog partitions:
┌Highwater - Active─┬───────────┐
│ topic │ partition │ highwater │
└───────┴───────────┴───────────┘ 
[2022-04-23 00:04:04,903] [31014] [INFO] [^---Recovery]: Resuming flow... 
[2022-04-23 00:04:04,904] [31014] [INFO] [^---Recovery]: Recovery complete 
[2022-04-23 00:04:04,904] [31014] [INFO] [^---Recovery]: Restore complete! 
[2022-04-23 00:04:04,904] [31014] [INFO] [^---Recovery]: Seek stream partitions to committed offsets. 
[2022-04-23 00:04:04,906] [31014] [INFO] [^---Fetcher]: Starting... 
[2022-04-23 00:04:04,906] [31014] [INFO] [^---Recovery]: Worker ready 
[2022-04-23 00:04:04,907] [31014] [INFO] [^Worker]: Ready 
[2022-04-23 00:04:04,910] [31014] [ERROR] [^----Agent*: agg_exam[.]print_windowed_events]: Crashed reason=PartitionsMismatch("The source topic 'raw-event' for table 'tumbling_table'\nhas 1 partitions, but the changelog\ntopic 'window-agg-tumbling_table-changelog' has 0 partitions.\n\nPlease make sure the topics have the same number of partitions\nby configuring Kafka correctly.\n") 

Full traceback

Paste the full traceback (if there is any)

Versions

  • Python version : Python 3.8.10
  • Faust version : faust, version Faust 0.8.4
  • Operating system : Ubuntu 20.04
  • Kafka version : 2.8.0
  • RocksDB version (if applicable)

cezary69 avatar Apr 22 '22 22:04 cezary69

I can confirm this behavior. I tried different way to pass the topic as string or topic but wihtout success.

thomas-chauvet avatar Oct 10 '22 07:10 thomas-chauvet

Looks like the changelog topic is hardcoded in https://github.com/faust-streaming/faust/blob/master/faust/tables/base.py#L419 and outside of base.py, it's only invocation is at https://github.com/faust-streaming/faust/blob/master/faust/assignor/partition_assignor.py#L321. I assume turning the changelog topic name into a property with a setter and getter should get the issue fixed.

wbarnha avatar Oct 10 '22 17:10 wbarnha