faust
faust copied to clipboard
InvalidReplicationFactorError is raised if reply_create_topic is set
Checklist
- [x] I have included information about relevant versions
- [x] I have verified that the issue persists when using the
masterbranch of Faust.
Steps to reproduce
Create faust app and set reply_create_topic=True flag.
app = faust.App(
'hello-world',
broker='kafka://kafka:9092',
reply_create_topic=True,
)
greetings_topic = app.topic('greetings', value_type=str)
@app.agent(greetings_topic)
async def print_greetings(greetings):
async for greeting in greetings:
print(greeting)
yield 'resp ' + greeting
@app.timer(5)
async def produce():
for i in range(100):
resp = await print_greetings.ask(value=f'hello {i}')
print(resp)
if __name__== '__main__':
app.main()
Expected behavior
Reply topic is created.
Actual behavior
InvalidReplicationFactorError is raised.
Full traceback
hello-word_1 | [2021-01-19 18:25:19,690] [8] [INFO] [^--Producer]: Creating topic 'f-reply-2a37b8f6-246e-4141-a5d5-f96a6ca9e7c9'
hello-word_1 | [2021-01-19 18:25:19,726] [8] [ERROR] [^-App]: Crashed reason=InvalidReplicationFactorError('Cannot create topic: f-reply-2a37b8f6-246e-4141-a5d5-f96a6ca9e7c9 (38): Replication factor must be larger than 0.')
hello-word_1 | Traceback (most recent call last):
hello-word_1 | File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 802, in _execute_task
hello-word_1 | await task
hello-word_1 | File "/usr/local/lib/python3.8/site-packages/faust/app/base.py", line 966, in _wrapped
hello-word_1 | return await task()
hello-word_1 | File "/usr/local/lib/python3.8/site-packages/faust/app/base.py", line 1019, in around_timer
hello-word_1 | await fun(*args)
hello-word_1 | File "/main.py", line 31, in produce
hello-word_1 | resp = await print_greetings.ask(value=f'hello {i}')
hello-word_1 | File "/usr/local/lib/python3.8/site-packages/faust/agents/agent.py", line 797, in ask
hello-word_1 | await app._reply_consumer.add(p.correlation_id, p)
hello-word_1 | File "/usr/local/lib/python3.8/site-packages/faust/agents/replies.py", line 167, in add
hello-word_1 | await self._start_fetcher(reply_topic)
hello-word_1 | File "/usr/local/lib/python3.8/site-packages/faust/agents/replies.py", line 176, in _start_fetcher
hello-word_1 | await topic.maybe_declare()
hello-word_1 | File "/usr/local/lib/python3.8/site-packages/mode/utils/futures.py", line 55, in __call__
hello-word_1 | result = await self.fun(*self.args, **self.kwargs)
hello-word_1 | File "/usr/local/lib/python3.8/site-packages/faust/topics.py", line 476, in maybe_declare
hello-word_1 | await self.declare()
hello-word_1 | File "/usr/local/lib/python3.8/site-packages/faust/topics.py", line 491, in declare
hello-word_1 | await producer.create_topic(
hello-word_1 | File "/usr/local/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 1072, in create_topic
hello-word_1 | await cast(Transport, self.transport)._create_topic(
hello-word_1 | File "/usr/local/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 1284, in _create_topic
hello-word_1 | await wrap()
hello-word_1 | File "/usr/local/lib/python3.8/site-packages/mode/utils/futures.py", line 55, in __call__
hello-word_1 | result = await self.fun(*self.args, **self.kwargs)
hello-word_1 | File "/usr/local/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 1371, in _really_create_topic
hello-word_1 | raise for_code(code)(f"Cannot create topic: {topic} ({code}): {reason}")
hello-word_1 | kafka.errors.InvalidReplicationFactorError: [Error 38] InvalidReplicationFactorError: Cannot create topic: f-reply-2a37b8f6-246e-4141-a5d5-f96a6ca9e7c9 (38): Replication factor must be larger than 0.
Workaround
class MyTopic(faust.Topic):
def __init__(self, *args, **kwargs):
if kwargs.get('replicas') == 0:
kwargs['replicas'] = 1
super().__init__(*args, **kwargs)
app = faust.App(
'hello-world',
broker='kafka://kafka:9092',
reply_create_topic=True,
Topic=MyTopic
)
Proposed solution
I think changing this line https://github.com/faust-streaming/faust/blob/master/faust/agents/replies.py#L190 to replicas=1 or replicas=None will solve the problem
Versions
- Python version 3.8
- Faust version 0.4.1
- Operating system tested on python:3.8 docker container
root@0d7a8b87eab1:/# cat /etc/os-release
PRETTY_NAME="Debian GNU/Linux 10 (buster)"
NAME="Debian GNU/Linux"
VERSION_ID="10"
VERSION="10 (buster)"
VERSION_CODENAME=buster
ID=debian
- Kafka version bitnami/kafka:2.7.0
I have no name!@9e91917f6f27:/$ kafka-topics.sh --version
2.7.0 (Commit:448719dc99a19793)
- RocksDB version (if applicable) None