pulsar icon indicating copy to clipboard operation
pulsar copied to clipboard

[Bug] python function does not respect `--retain-ordering` flag

Open dionjansen opened this issue 3 years ago • 0 comments

Search before asking

  • [X] I searched in the issues and found nothing similar.

Version

Pulsar version: 2.10.1

Minimal reproduce step

Follow Pulsar functions quickstart. I'm using Docker to run Pulsar standalone:

1. Run pulsar standalone

First, create a new Python file:

$ touch fn.py

In that file, add the following:

def process(input: bytes):
    if "B" in str(input):
        raise Exception("Fail on 'B'")
    return input

This function will raise an exception when the message context text contains the character "B"

2. Start the function with --retain-ordering flag

$ docker exec -it pulsar bash -c "/pulsar/bin/pulsar-admin functions localrun \
  --py '/pulsar/fn.py' \
  --classname fn \
  --inputs persistent://public/default/input \
  --output persistent://public/default/output \
  --tenant public \
  --namespace default \
  --subs-name 'test' \
  --retain-ordering \
  --name fn"

3. Consume output topic

$ docker exec -it pulsar bash -c "bin/pulsar-client consume \
  persistent://public/default/output \
  --subscription-name test \
  --num-messages 0"

3. Produce messages to the source topic containing "B"

$ docker exec -it pulsar bash -c "/pulsar/bin/pulsar-client produce \
  persistent://public/default/input \
  --messages 'A,B,C,D,E,F'"

What did you expect to see?

The consumer in step (3) should output messages in the order that they were provided (so A and nothing more)

What did you see instead?

It continues processing messages after B (C, D ..) and thus does not retain the order anymore:

----- got message -----
key:[null], properties:[__pfn_input_msg_id__=CAoQACAA, __pfn_input_topic__=persistent://public/default/input], content:A
----- got message -----
key:[null], properties:[__pfn_input_msg_id__=CAoQAiAA, __pfn_input_topic__=persistent://public/default/input], content:C
----- got message -----
key:[null], properties:[__pfn_input_msg_id__=CAoQAyAA, __pfn_input_topic__=persistent://public/default/input], content:D
----- got message -----
key:[null], properties:[__pfn_input_msg_id__=CAoQBCAA, __pfn_input_topic__=persistent://public/default/input], content:E
----- got message -----
key:[null], properties:[__pfn_input_msg_id__=CAoQBSAA, __pfn_input_topic__=persistent://public/default/input], content:F

Anything else?

Perhaps I do not understand the flag's description from the functions-cli docs:

retain-ordering: Function consumes and processes messages in order.`

So I would expect that unhandled exceptions are taken into account to ensure that the messages are processed in the order they are sequenced in the source topic.

Are you willing to submit a PR?

  • [ ] I'm willing to submit a PR!

dionjansen avatar Sep 21 '22 13:09 dionjansen