[Bug] python function does not respect `--retain-ordering` flag
Search before asking
- [X] I searched in the issues and found nothing similar.
Version
Pulsar version: 2.10.1
Minimal reproduce step
Follow Pulsar functions quickstart. I'm using Docker to run Pulsar standalone:
1. Run pulsar standalone
First, create a new Python file:
$ touch fn.py
In that file, add the following:
def process(input: bytes):
if "B" in str(input):
raise Exception("Fail on 'B'")
return input
This function will raise an exception when the message context text contains the character "B"
2. Start the function with --retain-ordering flag
$ docker exec -it pulsar bash -c "/pulsar/bin/pulsar-admin functions localrun \
--py '/pulsar/fn.py' \
--classname fn \
--inputs persistent://public/default/input \
--output persistent://public/default/output \
--tenant public \
--namespace default \
--subs-name 'test' \
--retain-ordering \
--name fn"
3. Consume output topic
$ docker exec -it pulsar bash -c "bin/pulsar-client consume \
persistent://public/default/output \
--subscription-name test \
--num-messages 0"
3. Produce messages to the source topic containing "B"
$ docker exec -it pulsar bash -c "/pulsar/bin/pulsar-client produce \
persistent://public/default/input \
--messages 'A,B,C,D,E,F'"
What did you expect to see?
The consumer in step (3) should output messages in the order that they were provided (so A and nothing more)
What did you see instead?
It continues processing messages after B (C, D ..) and thus does not retain the order anymore:
----- got message -----
key:[null], properties:[__pfn_input_msg_id__=CAoQACAA, __pfn_input_topic__=persistent://public/default/input], content:A
----- got message -----
key:[null], properties:[__pfn_input_msg_id__=CAoQAiAA, __pfn_input_topic__=persistent://public/default/input], content:C
----- got message -----
key:[null], properties:[__pfn_input_msg_id__=CAoQAyAA, __pfn_input_topic__=persistent://public/default/input], content:D
----- got message -----
key:[null], properties:[__pfn_input_msg_id__=CAoQBCAA, __pfn_input_topic__=persistent://public/default/input], content:E
----- got message -----
key:[null], properties:[__pfn_input_msg_id__=CAoQBSAA, __pfn_input_topic__=persistent://public/default/input], content:F
Anything else?
Perhaps I do not understand the flag's description from the functions-cli docs:
retain-ordering: Function consumes and processes messages in order.`
So I would expect that unhandled exceptions are taken into account to ensure that the messages are processed in the order they are sequenced in the source topic.
Are you willing to submit a PR?
- [ ] I'm willing to submit a PR!