faust icon indicating copy to clipboard operation
faust copied to clipboard

InvalidReplicationFactorError is raised if reply_create_topic is set

Open ihor-rud opened this issue 4 years ago • 0 comments

Checklist

  • [x] I have included information about relevant versions
  • [x] I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

Create faust app and set reply_create_topic=True flag.

app = faust.App(
    'hello-world',
    broker='kafka://kafka:9092',
    reply_create_topic=True,
)

greetings_topic = app.topic('greetings', value_type=str)


@app.agent(greetings_topic)
async def print_greetings(greetings):
    async for greeting in greetings:
        print(greeting)
        yield 'resp ' + greeting


@app.timer(5)
async def produce():
    for i in range(100):
        resp = await print_greetings.ask(value=f'hello {i}')
        print(resp)

if __name__== '__main__':
    app.main()

Expected behavior

Reply topic is created.

Actual behavior

InvalidReplicationFactorError is raised.

Full traceback

hello-word_1  | [2021-01-19 18:25:19,690] [8] [INFO] [^--Producer]: Creating topic 'f-reply-2a37b8f6-246e-4141-a5d5-f96a6ca9e7c9'
hello-word_1  | [2021-01-19 18:25:19,726] [8] [ERROR] [^-App]: Crashed reason=InvalidReplicationFactorError('Cannot create topic: f-reply-2a37b8f6-246e-4141-a5d5-f96a6ca9e7c9 (38): Replication factor must be larger than 0.')
hello-word_1  | Traceback (most recent call last):
hello-word_1  |   File "/usr/local/lib/python3.8/site-packages/mode/services.py", line 802, in _execute_task
hello-word_1  |     await task
hello-word_1  |   File "/usr/local/lib/python3.8/site-packages/faust/app/base.py", line 966, in _wrapped
hello-word_1  |     return await task()
hello-word_1  |   File "/usr/local/lib/python3.8/site-packages/faust/app/base.py", line 1019, in around_timer
hello-word_1  |     await fun(*args)
hello-word_1  |   File "/main.py", line 31, in produce
hello-word_1  |     resp = await print_greetings.ask(value=f'hello {i}')
hello-word_1  |   File "/usr/local/lib/python3.8/site-packages/faust/agents/agent.py", line 797, in ask
hello-word_1  |     await app._reply_consumer.add(p.correlation_id, p)
hello-word_1  |   File "/usr/local/lib/python3.8/site-packages/faust/agents/replies.py", line 167, in add
hello-word_1  |     await self._start_fetcher(reply_topic)
hello-word_1  |   File "/usr/local/lib/python3.8/site-packages/faust/agents/replies.py", line 176, in _start_fetcher
hello-word_1  |     await topic.maybe_declare()
hello-word_1  |   File "/usr/local/lib/python3.8/site-packages/mode/utils/futures.py", line 55, in __call__
hello-word_1  |     result = await self.fun(*self.args, **self.kwargs)
hello-word_1  |   File "/usr/local/lib/python3.8/site-packages/faust/topics.py", line 476, in maybe_declare
hello-word_1  |     await self.declare()
hello-word_1  |   File "/usr/local/lib/python3.8/site-packages/faust/topics.py", line 491, in declare
hello-word_1  |     await producer.create_topic(
hello-word_1  |   File "/usr/local/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 1072, in create_topic
hello-word_1  |     await cast(Transport, self.transport)._create_topic(
hello-word_1  |   File "/usr/local/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 1284, in _create_topic
hello-word_1  |     await wrap()
hello-word_1  |   File "/usr/local/lib/python3.8/site-packages/mode/utils/futures.py", line 55, in __call__
hello-word_1  |     result = await self.fun(*self.args, **self.kwargs)
hello-word_1  |   File "/usr/local/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 1371, in _really_create_topic
hello-word_1  |     raise for_code(code)(f"Cannot create topic: {topic} ({code}): {reason}")
hello-word_1  | kafka.errors.InvalidReplicationFactorError: [Error 38] InvalidReplicationFactorError: Cannot create topic: f-reply-2a37b8f6-246e-4141-a5d5-f96a6ca9e7c9 (38): Replication factor must be larger than 0.

Workaround

class MyTopic(faust.Topic):

    def __init__(self, *args, **kwargs):
        if kwargs.get('replicas') == 0:
            kwargs['replicas'] = 1
        super().__init__(*args, **kwargs)

app = faust.App(
    'hello-world',
    broker='kafka://kafka:9092',
    reply_create_topic=True,
    Topic=MyTopic
)

Proposed solution

I think changing this line https://github.com/faust-streaming/faust/blob/master/faust/agents/replies.py#L190 to replicas=1 or replicas=None will solve the problem

Versions

  • Python version 3.8
  • Faust version 0.4.1
  • Operating system tested on python:3.8 docker container
root@0d7a8b87eab1:/# cat /etc/os-release
PRETTY_NAME="Debian GNU/Linux 10 (buster)"
NAME="Debian GNU/Linux"
VERSION_ID="10"
VERSION="10 (buster)"
VERSION_CODENAME=buster
ID=debian
  • Kafka version bitnami/kafka:2.7.0
I have no name!@9e91917f6f27:/$ kafka-topics.sh --version
2.7.0 (Commit:448719dc99a19793)
  • RocksDB version (if applicable) None

ihor-rud avatar Jan 19 '21 18:01 ihor-rud