aiokafka icon indicating copy to clipboard operation
aiokafka copied to clipboard

Faust workers crash due to keyerror from aiokafka

Open DhruvaPatil98 opened this issue 4 years ago • 0 comments

(Not sure if this should be raised on faust repo. Since the issue and fix can be on aiokafka, raising it here. Let me know if I should move it to faust repo)

Steps to reproduce

When changing the number of faust workers from around 5 to 6-10, while data is being streamed to the topics that agents subscribe to, the workers crash with keyerror

The following script is used for app.py

from faust import App

app = App(
    'app_main',
    broker='kafka://kafka:9094',
    store='rocksdb://',
)

PARTITITONS = 10

event_topic = []
event_table = []
for i in range(20):
    event_topic.append(app.topic(
        f'event_topic_write{i}',
        internal=True,
        partitions=PARTITITONS,
    ))

    event_table.append(app.Table(
        f'event_table{i}',
        partitions=PARTITITONS,
    ))

@app.agent(event_topic[0])
async def event_topic_write(streams):
    async for payload in streams.events():
        print(f'Got data: {payload}')
        event_table[0][payload.key] = payload.value


if __name__ == '__main__':
    app.main()

The following script was used to stream events to the topics:

import json
import random
import string
from kafka import KafkaProducer

producer_instance = KafkaProducer(
    bootstrap_servers=['kafka:9094'],
)

event_topic = []
for i in range(20):
    event_topic.append(f'event_topic_write{i}')


def randomString(stringLength=10):
    """Generate a random string of fixed length """
    letters = string.ascii_lowercase
    return ''.join(random.choice(letters) for i in range(stringLength))


while True:
    key_bytes = bytes(json.dumps(randomString()), encoding='utf-8')
    value_bytes = bytes(json.dumps('test'), encoding='utf-8')

    topic_name = event_topic[random.randint(0, len(event_topic)-1)]

    producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
    producer_instance.flush()
    print(key_bytes)

Expected behavior

The rebalance to finish successfully

Actual behavior

Some workers crash

Full traceback

[2020-02-21 11:04:39,177] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table1-changelog', partition=0) at offset 0 
[2020-02-21 11:04:39,177] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table12-changelog', partition=9) at offset 0 
[2020-02-21 11:04:39,177] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table4-changelog', partition=2) at offset 0 
[2020-02-21 11:04:39,177] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table3-changelog', partition=6) at offset 0 
[2020-02-21 11:04:39,177] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table13-changelog', partition=6) at offset 0 
[2020-02-21 11:04:39,178] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table6-changelog', partition=4) at offset 0 
[2020-02-21 11:04:39,178] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table12-changelog', partition=4) at offset 0 
[2020-02-21 11:04:39,178] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table2-changelog', partition=9) at offset 0 
[2020-02-21 11:04:39,179] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table11-changelog', partition=1) at offset 0 
[2020-02-21 11:04:39,179] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table5-changelog', partition=3) at offset 0 
[2020-02-21 11:04:39,181] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table9-changelog', partition=1) at offset 0 
[2020-02-21 11:04:39,181] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table2-changelog', partition=4) at offset 0 
[2020-02-21 11:04:39,181] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table14-changelog', partition=9) at offset 0 
[2020-02-21 11:04:39,182] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table19-changelog', partition=8) at offset 0 
[2020-02-21 11:04:39,186] [9] [DEBUG] <AIOKafkaConnection host=kafka port=9094> Request 357: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='app_main-event_table18-changelog', partitions=[(partition=6, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576)]), (topic='app_main-event_table6-changelog', partitions=[(partition=9, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576)]), (topic='app_main-event_table0-changelog', partitions=[(partition=6, offset=133, max_bytes=1048576), (partition=0, offset=33, max_bytes=1048576), (partition=4, offset=20, max_bytes=1048576), (partition=5, offset=26, max_bytes=1048576), (partition=2, offset=35, max_bytes=1048576)]), (topic='app_main-event_table2-changelog', partitions=[(partition=5, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576)]), (topic='app_main-event_table11-changelog', partitions=[(partition=1, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576)]), (topic='app_main-event_table5-changelog', partitions=[(partition=0, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576)]), (topic='app_main-event_table7-changelog', partitions=[(partition=7, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576)]), (topic='app_main-event_table19-changelog', partitions=[(partition=5, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576)]), (topic='app_main-event_table15-changelog', partitions=[(partition=5, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576)]), (topic='app_main-event_table9-changelog', partitions=[(partition=4, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576)]), (topic='app_main-event_table1-changelog', partitions=[(partition=0, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576)]), (topic='app_main-event_table10-changelog', partitions=[(partition=3, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576)]), (topic='app_main-event_table8-changelog', partitions=[(partition=6, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576)]), (topic='app_main-event_table13-changelog', partitions=[(partition=2, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576)]), (topic='app_main-event_table17-changelog', partitions=[(partition=1, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576)]), (topic='app_main-event_table4-changelog', partitions=[(partition=5, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576)]), (topic='app_main-event_table3-changelog', partitions=[(partition=5, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576)]), (topic='app_main-event_table16-changelog', partitions=[(partition=2, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576)]), (topic='app_main-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)]), (topic='app_main-event_table12-changelog', partitions=[(partition=0, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576)]), (topic='app_main-event_table14-changelog', partitions=[(partition=8, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576)])]) 
[2020-02-21 11:04:39,190] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table18-changelog', partition=8) 
[2020-02-21 11:04:39,190] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table1-changelog', partition=7) 
[2020-02-21 11:04:39,192] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table15-changelog', partition=9) 
[2020-02-21 11:04:39,192] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table5-changelog', partition=9) 
[2020-02-21 11:04:39,193] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table13-changelog', partition=3) 
[2020-02-21 11:04:39,193] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table7-changelog', partition=3) 
[2020-02-21 11:04:39,193] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table15-changelog', partition=3) 
[2020-02-21 11:04:39,193] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table17-changelog', partition=9) 
[2020-02-21 11:04:39,194] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table0-changelog', partition=7) 
[2020-02-21 11:04:39,194] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table4-changelog', partition=8) 
[2020-02-21 11:04:39,196] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table12-changelog', partition=3) 
[2020-02-21 11:04:39,196] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table10-changelog', partition=1) 
[2020-02-21 11:04:39,196] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table9-changelog', partition=7) 
[2020-02-21 11:04:39,197] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table18-changelog', partition=1) 
[2020-02-21 11:04:39,197] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table8-changelog', partition=7) 
[2020-02-21 11:04:39,197] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table17-changelog', partition=7) 
[2020-02-21 11:04:39,198] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table16-changelog', partition=8) 
[2020-02-21 11:04:39,198] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table8-changelog', partition=8) 
[2020-02-21 11:04:39,199] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table1-changelog', partition=9) 
[2020-02-21 11:04:39,199] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table9-changelog', partition=8) 
[2020-02-21 11:04:39,202] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table19-changelog', partition=7) 
[2020-02-21 11:04:39,202] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table3-changelog', partition=8) 
[2020-02-21 11:04:39,203] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table1-changelog', partition=3) 
[2020-02-21 11:04:39,203] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table4-changelog', partition=1) 
[2020-02-21 11:04:39,203] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table6-changelog', partition=7) 
[2020-02-21 11:04:39,204] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table13-changelog', partition=7) 
[2020-02-21 11:04:39,204] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table19-changelog', partition=9) 
[2020-02-21 11:04:39,205] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table16-changelog', partition=3) 
[2020-02-21 11:04:39,205] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table8-changelog', partition=1) 
[2020-02-21 11:04:39,206] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table0-changelog', partition=3) 
[2020-02-21 11:04:39,206] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table6-changelog', partition=9) 
[2020-02-21 11:04:39,206] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table15-changelog', partition=8) 
[2020-02-21 11:04:39,206] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table5-changelog', partition=8) 
[2020-02-21 11:04:39,206] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table9-changelog', partition=3) 
[2020-02-21 11:04:39,206] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table10-changelog', partition=8) 
[2020-02-21 11:04:39,207] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table11-changelog', partition=7) 
[2020-02-21 11:04:39,207] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table4-changelog', partition=7) 
[2020-02-21 11:04:39,207] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table11-changelog', partition=9) 
[2020-02-21 11:04:39,207] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table13-changelog', partition=1) 
[2020-02-21 11:04:39,207] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table14-changelog', partition=1) 
[2020-02-21 11:04:39,207] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table19-changelog', partition=3) 
[2020-02-21 11:04:39,208] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table7-changelog', partition=1) 
[2020-02-21 11:04:39,212] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table16-changelog', partition=9) 
[2020-02-21 11:04:39,212] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table15-changelog', partition=1) 
[2020-02-21 11:04:39,212] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table0-changelog', partition=9) 
[2020-02-21 11:04:39,212] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table3-changelog', partition=1) 
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table6-changelog', partition=3) 
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table1-changelog', partition=8) 
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table12-changelog', partition=1) 
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table10-changelog', partition=3) 
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table2-changelog', partition=1) 
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table9-changelog', partition=9) 
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table18-changelog', partition=3) 
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table16-changelog', partition=7) 
[2020-02-21 11:04:39,214] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table17-changelog', partition=1) 
[2020-02-21 11:04:39,214] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table13-changelog', partition=8) 
[2020-02-21 11:04:39,214] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table5-changelog', partition=1) 
[2020-02-21 11:04:39,215] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table11-changelog', partition=3) 
[2020-02-21 11:04:39,215] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table14-changelog', partition=7) 
[2020-02-21 11:04:39,215] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table7-changelog', partition=8) 
[2020-02-21 11:04:39,215] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table6-changelog', partition=8) 
[2020-02-21 11:04:39,215] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table1-changelog', partition=1) 
[2020-02-21 11:04:39,215] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table4-changelog', partition=3) 
[2020-02-21 11:04:39,216] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table12-changelog', partition=8) 
[2020-02-21 11:04:39,216] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table3-changelog', partition=7) 
[2020-02-21 11:04:39,216] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table2-changelog', partition=8) 
[2020-02-21 11:04:39,216] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table10-changelog', partition=9) 
[2020-02-21 11:04:39,216] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table2-changelog', partition=7) 
[2020-02-21 11:04:39,216] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table14-changelog', partition=8) 
[2020-02-21 11:04:39,217] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table18-changelog', partition=9) 
[2020-02-21 11:04:39,217] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table10-changelog', partition=7) 
[2020-02-21 11:04:39,217] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table11-changelog', partition=8) 
[2020-02-21 11:04:39,217] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table5-changelog', partition=7) 
[2020-02-21 11:04:39,217] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table18-changelog', partition=7) 
[2020-02-21 11:04:39,217] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table17-changelog', partition=8) 
[2020-02-21 11:04:39,217] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table0-changelog', partition=8) 
[2020-02-21 11:04:39,218] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table4-changelog', partition=9) 
[2020-02-21 11:04:39,218] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table14-changelog', partition=3) 
[2020-02-21 11:04:39,218] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table19-changelog', partition=1) 
[2020-02-21 11:04:39,218] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table7-changelog', partition=7) 
[2020-02-21 11:04:39,218] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table8-changelog', partition=9) 
[2020-02-21 11:04:39,219] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table15-changelog', partition=7) 
[2020-02-21 11:04:39,219] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table6-changelog', partition=1) 
[2020-02-21 11:04:39,219] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table13-changelog', partition=9) 
[2020-02-21 11:04:39,220] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table3-changelog', partition=3) 
[2020-02-21 11:04:39,221] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table12-changelog', partition=7) 
[2020-02-21 11:04:39,221] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table2-changelog', partition=3) 
[2020-02-21 11:04:39,221] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table7-changelog', partition=9) 
[2020-02-21 11:04:39,221] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table16-changelog', partition=1) 
[2020-02-21 11:04:39,221] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table8-changelog', partition=3) 
[2020-02-21 11:04:39,222] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table17-changelog', partition=3) 
[2020-02-21 11:04:39,222] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table0-changelog', partition=1) 
[2020-02-21 11:04:39,222] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table3-changelog', partition=9) 
[2020-02-21 11:04:39,222] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table12-changelog', partition=9) 
[2020-02-21 11:04:39,222] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table2-changelog', partition=9) 
[2020-02-21 11:04:39,223] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table5-changelog', partition=3) 
[2020-02-21 11:04:39,223] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table9-changelog', partition=1) 
[2020-02-21 11:04:39,223] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table11-changelog', partition=1) 
[2020-02-21 11:04:39,223] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table14-changelog', partition=9) 
[2020-02-21 11:04:39,223] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table19-changelog', partition=8) 
[2020-02-21 11:04:39,229] [9] [ERROR] [^--Consumer]: Drain messages raised: KeyError(TopicPartition(topic='app_main-event_table0-changelog', partition=9),) 
Traceback (most recent call last):
  File "/application/faust_mod/transport/consumer.py", line 1039, in _drain_messages
    async for tp, message in ait:
  File "/application/faust_mod/transport/consumer.py", line 640, in getmany
    records, active_partitions = await self._wait_next_records(timeout)
  File "/application/faust_mod/transport/consumer.py", line 678, in _wait_next_records
    timeout=timeout,
  File "/application/faust_mod/transport/consumer.py", line 1269, in _getmany
    return await self._thread.getmany(active_partitions, timeout)
  File "/application/faust_mod/transport/drivers/aiokafka.py", line 810, in getmany
    max_records=_consumer._max_poll_records,
  File "/usr/local/lib/python3.6/site-packages/mode/threads.py", line 436, in call_thread
    result = await promise
  File "/usr/local/lib/python3.6/site-packages/mode/threads.py", line 383, in _process_enqueued
    result = await maybe_async(method(*args, **kwargs))
  File "/usr/local/lib/python3.6/site-packages/mode/utils/futures.py", line 134, in maybe_async
    return await res
  File "/application/faust_mod/transport/drivers/aiokafka.py", line 827, in _fetch_records
    max_records=max_records,
  File "/usr/local/lib/python3.6/site-packages/aiokafka/consumer/fetcher.py", line 1082, in fetched_records
    res_or_error = self._records[tp]
KeyError: TopicPartition(topic='app_main-event_table0-changelog', partition=9)
[2020-02-21 11:04:39,263] [9] [DEBUG] Timer Recovery.stats woke up - iteration=70 time_spent_sleeping=5.114945699984673 drift=-0.11494569998467341 new_interval=4.9 since_epoch=355.7561254000175 
[2020-02-21 11:04:39,263] [9] [ERROR] [^---Fetcher]: Crashed reason=KeyError(TopicPartition(topic='app_main-event_table0-changelog', partition=9),) 
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/mode/services.py", line 779, in _execute_task
    await task
  File "/application/faust_mod/transport/consumer.py", line 176, in _fetcher
    await self._drainer
  File "/application/faust_mod/transport/consumer.py", line 1039, in _drain_messages
    async for tp, message in ait:
  File "/application/faust_mod/transport/consumer.py", line 640, in getmany
    records, active_partitions = await self._wait_next_records(timeout)
  File "/application/faust_mod/transport/consumer.py", line 678, in _wait_next_records
    timeout=timeout,
  File "/application/faust_mod/transport/consumer.py", line 1269, in _getmany
    return await self._thread.getmany(active_partitions, timeout)
  File "/application/faust_mod/transport/drivers/aiokafka.py", line 810, in getmany
    max_records=_consumer._max_poll_records,
  File "/usr/local/lib/python3.6/site-packages/mode/threads.py", line 436, in call_thread
    result = await promise
  File "/usr/local/lib/python3.6/site-packages/mode/threads.py", line 383, in _process_enqueued
    result = await maybe_async(method(*args, **kwargs))
  File "/usr/local/lib/python3.6/site-packages/mode/utils/futures.py", line 134, in maybe_async
    return await res
  File "/application/faust_mod/transport/drivers/aiokafka.py", line 827, in _fetch_records
    max_records=max_records,
  File "/usr/local/lib/python3.6/site-packages/aiokafka/consumer/fetcher.py", line 1082, in fetched_records
    res_or_error = self._records[tp]
KeyError: TopicPartition(topic='app_main-event_table0-changelog', partition=9)

Versions

  • Python version - 3.6
  • Faust version - 1.10.1 to 1.10.3 and master with corresponding versions of robinhood aiokafka
  • Operating system
  • Kafka version
  • RocksDB version (if applicable)

DhruvaPatil98 avatar Feb 25 '20 05:02 DhruvaPatil98