aiokafka
aiokafka copied to clipboard
Faust workers crash due to keyerror from aiokafka
(Not sure if this should be raised on faust repo. Since the issue and fix can be on aiokafka, raising it here. Let me know if I should move it to faust repo)
Steps to reproduce
When changing the number of faust workers from around 5 to 6-10, while data is being streamed to the topics that agents subscribe to, the workers crash with keyerror
The following script is used for app.py
from faust import App
app = App(
'app_main',
broker='kafka://kafka:9094',
store='rocksdb://',
)
PARTITITONS = 10
event_topic = []
event_table = []
for i in range(20):
event_topic.append(app.topic(
f'event_topic_write{i}',
internal=True,
partitions=PARTITITONS,
))
event_table.append(app.Table(
f'event_table{i}',
partitions=PARTITITONS,
))
@app.agent(event_topic[0])
async def event_topic_write(streams):
async for payload in streams.events():
print(f'Got data: {payload}')
event_table[0][payload.key] = payload.value
if __name__ == '__main__':
app.main()
The following script was used to stream events to the topics:
import json
import random
import string
from kafka import KafkaProducer
producer_instance = KafkaProducer(
bootstrap_servers=['kafka:9094'],
)
event_topic = []
for i in range(20):
event_topic.append(f'event_topic_write{i}')
def randomString(stringLength=10):
"""Generate a random string of fixed length """
letters = string.ascii_lowercase
return ''.join(random.choice(letters) for i in range(stringLength))
while True:
key_bytes = bytes(json.dumps(randomString()), encoding='utf-8')
value_bytes = bytes(json.dumps('test'), encoding='utf-8')
topic_name = event_topic[random.randint(0, len(event_topic)-1)]
producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
producer_instance.flush()
print(key_bytes)
Expected behavior
The rebalance to finish successfully
Actual behavior
Some workers crash
Full traceback
[2020-02-21 11:04:39,177] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table1-changelog', partition=0) at offset 0
[2020-02-21 11:04:39,177] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table12-changelog', partition=9) at offset 0
[2020-02-21 11:04:39,177] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table4-changelog', partition=2) at offset 0
[2020-02-21 11:04:39,177] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table3-changelog', partition=6) at offset 0
[2020-02-21 11:04:39,177] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table13-changelog', partition=6) at offset 0
[2020-02-21 11:04:39,178] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table6-changelog', partition=4) at offset 0
[2020-02-21 11:04:39,178] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table12-changelog', partition=4) at offset 0
[2020-02-21 11:04:39,178] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table2-changelog', partition=9) at offset 0
[2020-02-21 11:04:39,179] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table11-changelog', partition=1) at offset 0
[2020-02-21 11:04:39,179] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table5-changelog', partition=3) at offset 0
[2020-02-21 11:04:39,181] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table9-changelog', partition=1) at offset 0
[2020-02-21 11:04:39,181] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table2-changelog', partition=4) at offset 0
[2020-02-21 11:04:39,181] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table14-changelog', partition=9) at offset 0
[2020-02-21 11:04:39,182] [9] [DEBUG] Adding fetch request for partition TopicPartition(topic='app_main-event_table19-changelog', partition=8) at offset 0
[2020-02-21 11:04:39,186] [9] [DEBUG] <AIOKafkaConnection host=kafka port=9094> Request 357: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='app_main-event_table18-changelog', partitions=[(partition=6, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576)]), (topic='app_main-event_table6-changelog', partitions=[(partition=9, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576)]), (topic='app_main-event_table0-changelog', partitions=[(partition=6, offset=133, max_bytes=1048576), (partition=0, offset=33, max_bytes=1048576), (partition=4, offset=20, max_bytes=1048576), (partition=5, offset=26, max_bytes=1048576), (partition=2, offset=35, max_bytes=1048576)]), (topic='app_main-event_table2-changelog', partitions=[(partition=5, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576)]), (topic='app_main-event_table11-changelog', partitions=[(partition=1, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576)]), (topic='app_main-event_table5-changelog', partitions=[(partition=0, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576)]), (topic='app_main-event_table7-changelog', partitions=[(partition=7, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576)]), (topic='app_main-event_table19-changelog', partitions=[(partition=5, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576)]), (topic='app_main-event_table15-changelog', partitions=[(partition=5, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576)]), (topic='app_main-event_table9-changelog', partitions=[(partition=4, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576)]), (topic='app_main-event_table1-changelog', partitions=[(partition=0, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576)]), (topic='app_main-event_table10-changelog', partitions=[(partition=3, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576)]), (topic='app_main-event_table8-changelog', partitions=[(partition=6, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576)]), (topic='app_main-event_table13-changelog', partitions=[(partition=2, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576)]), (topic='app_main-event_table17-changelog', partitions=[(partition=1, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576)]), (topic='app_main-event_table4-changelog', partitions=[(partition=5, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576)]), (topic='app_main-event_table3-changelog', partitions=[(partition=5, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576)]), (topic='app_main-event_table16-changelog', partitions=[(partition=2, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576)]), (topic='app_main-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=1048576)]), (topic='app_main-event_table12-changelog', partitions=[(partition=0, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=8, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576)]), (topic='app_main-event_table14-changelog', partitions=[(partition=8, offset=0, max_bytes=1048576), (partition=7, offset=0, max_bytes=1048576), (partition=0, offset=0, max_bytes=1048576), (partition=6, offset=0, max_bytes=1048576), (partition=2, offset=0, max_bytes=1048576), (partition=1, offset=0, max_bytes=1048576), (partition=3, offset=0, max_bytes=1048576), (partition=9, offset=0, max_bytes=1048576), (partition=4, offset=0, max_bytes=1048576), (partition=5, offset=0, max_bytes=1048576)])])
[2020-02-21 11:04:39,190] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table18-changelog', partition=8)
[2020-02-21 11:04:39,190] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table1-changelog', partition=7)
[2020-02-21 11:04:39,192] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table15-changelog', partition=9)
[2020-02-21 11:04:39,192] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table5-changelog', partition=9)
[2020-02-21 11:04:39,193] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table13-changelog', partition=3)
[2020-02-21 11:04:39,193] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table7-changelog', partition=3)
[2020-02-21 11:04:39,193] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table15-changelog', partition=3)
[2020-02-21 11:04:39,193] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table17-changelog', partition=9)
[2020-02-21 11:04:39,194] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table0-changelog', partition=7)
[2020-02-21 11:04:39,194] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table4-changelog', partition=8)
[2020-02-21 11:04:39,196] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table12-changelog', partition=3)
[2020-02-21 11:04:39,196] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table10-changelog', partition=1)
[2020-02-21 11:04:39,196] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table9-changelog', partition=7)
[2020-02-21 11:04:39,197] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table18-changelog', partition=1)
[2020-02-21 11:04:39,197] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table8-changelog', partition=7)
[2020-02-21 11:04:39,197] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table17-changelog', partition=7)
[2020-02-21 11:04:39,198] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table16-changelog', partition=8)
[2020-02-21 11:04:39,198] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table8-changelog', partition=8)
[2020-02-21 11:04:39,199] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table1-changelog', partition=9)
[2020-02-21 11:04:39,199] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table9-changelog', partition=8)
[2020-02-21 11:04:39,202] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table19-changelog', partition=7)
[2020-02-21 11:04:39,202] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table3-changelog', partition=8)
[2020-02-21 11:04:39,203] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table1-changelog', partition=3)
[2020-02-21 11:04:39,203] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table4-changelog', partition=1)
[2020-02-21 11:04:39,203] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table6-changelog', partition=7)
[2020-02-21 11:04:39,204] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table13-changelog', partition=7)
[2020-02-21 11:04:39,204] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table19-changelog', partition=9)
[2020-02-21 11:04:39,205] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table16-changelog', partition=3)
[2020-02-21 11:04:39,205] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table8-changelog', partition=1)
[2020-02-21 11:04:39,206] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table0-changelog', partition=3)
[2020-02-21 11:04:39,206] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table6-changelog', partition=9)
[2020-02-21 11:04:39,206] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table15-changelog', partition=8)
[2020-02-21 11:04:39,206] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table5-changelog', partition=8)
[2020-02-21 11:04:39,206] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table9-changelog', partition=3)
[2020-02-21 11:04:39,206] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table10-changelog', partition=8)
[2020-02-21 11:04:39,207] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table11-changelog', partition=7)
[2020-02-21 11:04:39,207] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table4-changelog', partition=7)
[2020-02-21 11:04:39,207] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table11-changelog', partition=9)
[2020-02-21 11:04:39,207] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table13-changelog', partition=1)
[2020-02-21 11:04:39,207] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table14-changelog', partition=1)
[2020-02-21 11:04:39,207] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table19-changelog', partition=3)
[2020-02-21 11:04:39,208] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table7-changelog', partition=1)
[2020-02-21 11:04:39,212] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table16-changelog', partition=9)
[2020-02-21 11:04:39,212] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table15-changelog', partition=1)
[2020-02-21 11:04:39,212] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table0-changelog', partition=9)
[2020-02-21 11:04:39,212] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table3-changelog', partition=1)
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table6-changelog', partition=3)
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table1-changelog', partition=8)
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table12-changelog', partition=1)
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table10-changelog', partition=3)
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table2-changelog', partition=1)
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table9-changelog', partition=9)
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table18-changelog', partition=3)
[2020-02-21 11:04:39,213] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table16-changelog', partition=7)
[2020-02-21 11:04:39,214] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table17-changelog', partition=1)
[2020-02-21 11:04:39,214] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table13-changelog', partition=8)
[2020-02-21 11:04:39,214] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table5-changelog', partition=1)
[2020-02-21 11:04:39,215] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table11-changelog', partition=3)
[2020-02-21 11:04:39,215] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table14-changelog', partition=7)
[2020-02-21 11:04:39,215] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table7-changelog', partition=8)
[2020-02-21 11:04:39,215] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table6-changelog', partition=8)
[2020-02-21 11:04:39,215] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table1-changelog', partition=1)
[2020-02-21 11:04:39,215] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table4-changelog', partition=3)
[2020-02-21 11:04:39,216] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table12-changelog', partition=8)
[2020-02-21 11:04:39,216] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table3-changelog', partition=7)
[2020-02-21 11:04:39,216] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table2-changelog', partition=8)
[2020-02-21 11:04:39,216] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table10-changelog', partition=9)
[2020-02-21 11:04:39,216] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table2-changelog', partition=7)
[2020-02-21 11:04:39,216] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table14-changelog', partition=8)
[2020-02-21 11:04:39,217] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table18-changelog', partition=9)
[2020-02-21 11:04:39,217] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table10-changelog', partition=7)
[2020-02-21 11:04:39,217] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table11-changelog', partition=8)
[2020-02-21 11:04:39,217] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table5-changelog', partition=7)
[2020-02-21 11:04:39,217] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table18-changelog', partition=7)
[2020-02-21 11:04:39,217] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table17-changelog', partition=8)
[2020-02-21 11:04:39,217] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table0-changelog', partition=8)
[2020-02-21 11:04:39,218] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table4-changelog', partition=9)
[2020-02-21 11:04:39,218] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table14-changelog', partition=3)
[2020-02-21 11:04:39,218] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table19-changelog', partition=1)
[2020-02-21 11:04:39,218] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table7-changelog', partition=7)
[2020-02-21 11:04:39,218] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table8-changelog', partition=9)
[2020-02-21 11:04:39,219] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table15-changelog', partition=7)
[2020-02-21 11:04:39,219] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table6-changelog', partition=1)
[2020-02-21 11:04:39,219] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table13-changelog', partition=9)
[2020-02-21 11:04:39,220] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table3-changelog', partition=3)
[2020-02-21 11:04:39,221] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table12-changelog', partition=7)
[2020-02-21 11:04:39,221] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table2-changelog', partition=3)
[2020-02-21 11:04:39,221] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table7-changelog', partition=9)
[2020-02-21 11:04:39,221] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table16-changelog', partition=1)
[2020-02-21 11:04:39,221] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table8-changelog', partition=3)
[2020-02-21 11:04:39,222] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table17-changelog', partition=3)
[2020-02-21 11:04:39,222] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table0-changelog', partition=1)
[2020-02-21 11:04:39,222] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table3-changelog', partition=9)
[2020-02-21 11:04:39,222] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table12-changelog', partition=9)
[2020-02-21 11:04:39,222] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table2-changelog', partition=9)
[2020-02-21 11:04:39,223] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table5-changelog', partition=3)
[2020-02-21 11:04:39,223] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table9-changelog', partition=1)
[2020-02-21 11:04:39,223] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table11-changelog', partition=1)
[2020-02-21 11:04:39,223] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table14-changelog', partition=9)
[2020-02-21 11:04:39,223] [9] [DEBUG] Seeking to offset 0 for partition TP(topic='app_main-event_table19-changelog', partition=8)
[2020-02-21 11:04:39,229] [9] [ERROR] [^--Consumer]: Drain messages raised: KeyError(TopicPartition(topic='app_main-event_table0-changelog', partition=9),)
Traceback (most recent call last):
File "/application/faust_mod/transport/consumer.py", line 1039, in _drain_messages
async for tp, message in ait:
File "/application/faust_mod/transport/consumer.py", line 640, in getmany
records, active_partitions = await self._wait_next_records(timeout)
File "/application/faust_mod/transport/consumer.py", line 678, in _wait_next_records
timeout=timeout,
File "/application/faust_mod/transport/consumer.py", line 1269, in _getmany
return await self._thread.getmany(active_partitions, timeout)
File "/application/faust_mod/transport/drivers/aiokafka.py", line 810, in getmany
max_records=_consumer._max_poll_records,
File "/usr/local/lib/python3.6/site-packages/mode/threads.py", line 436, in call_thread
result = await promise
File "/usr/local/lib/python3.6/site-packages/mode/threads.py", line 383, in _process_enqueued
result = await maybe_async(method(*args, **kwargs))
File "/usr/local/lib/python3.6/site-packages/mode/utils/futures.py", line 134, in maybe_async
return await res
File "/application/faust_mod/transport/drivers/aiokafka.py", line 827, in _fetch_records
max_records=max_records,
File "/usr/local/lib/python3.6/site-packages/aiokafka/consumer/fetcher.py", line 1082, in fetched_records
res_or_error = self._records[tp]
KeyError: TopicPartition(topic='app_main-event_table0-changelog', partition=9)
[2020-02-21 11:04:39,263] [9] [DEBUG] Timer Recovery.stats woke up - iteration=70 time_spent_sleeping=5.114945699984673 drift=-0.11494569998467341 new_interval=4.9 since_epoch=355.7561254000175
[2020-02-21 11:04:39,263] [9] [ERROR] [^---Fetcher]: Crashed reason=KeyError(TopicPartition(topic='app_main-event_table0-changelog', partition=9),)
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/mode/services.py", line 779, in _execute_task
await task
File "/application/faust_mod/transport/consumer.py", line 176, in _fetcher
await self._drainer
File "/application/faust_mod/transport/consumer.py", line 1039, in _drain_messages
async for tp, message in ait:
File "/application/faust_mod/transport/consumer.py", line 640, in getmany
records, active_partitions = await self._wait_next_records(timeout)
File "/application/faust_mod/transport/consumer.py", line 678, in _wait_next_records
timeout=timeout,
File "/application/faust_mod/transport/consumer.py", line 1269, in _getmany
return await self._thread.getmany(active_partitions, timeout)
File "/application/faust_mod/transport/drivers/aiokafka.py", line 810, in getmany
max_records=_consumer._max_poll_records,
File "/usr/local/lib/python3.6/site-packages/mode/threads.py", line 436, in call_thread
result = await promise
File "/usr/local/lib/python3.6/site-packages/mode/threads.py", line 383, in _process_enqueued
result = await maybe_async(method(*args, **kwargs))
File "/usr/local/lib/python3.6/site-packages/mode/utils/futures.py", line 134, in maybe_async
return await res
File "/application/faust_mod/transport/drivers/aiokafka.py", line 827, in _fetch_records
max_records=max_records,
File "/usr/local/lib/python3.6/site-packages/aiokafka/consumer/fetcher.py", line 1082, in fetched_records
res_or_error = self._records[tp]
KeyError: TopicPartition(topic='app_main-event_table0-changelog', partition=9)
Versions
- Python version - 3.6
- Faust version - 1.10.1 to 1.10.3 and master with corresponding versions of robinhood aiokafka
- Operating system
- Kafka version
- RocksDB version (if applicable)