stan.py icon indicating copy to clipboard operation
stan.py copied to clipboard

Support subscribe with no ack_wait

Open elbaro opened this issue 6 years ago • 7 comments

Do you support the subscription with no ack timeout? RabbitMQ has no timeout for ack, and the messages are considered failed on disconnects.

elbaro avatar Jun 24 '18 16:06 elbaro

As a workaround you could just ack the message as soon as you get it, that way further redeliveries would not happen disabling the timeout:

    async def cb(msg):
        try:
            print("Received a message on sub 'foo' (seq: {}): {}".format(msg.sequence, msg.data))
            await sc.ack(msg)
        except Exception as e:
            print("ERROR: ", e)

        # Rest of processing without timeout...

    await sc.subscribe("foo", start_at='first', cb=cb, manual_acks=True)

wallyqs avatar Apr 03 '19 18:04 wallyqs

If you ack the message and the process crashes in # Rest of processing .., the last message is lost and not redelivered. One way to guarantee reliable processing is holding acks for long-running tasks.

Does nats-io provide other ways to redeliver the 'failed' msg? nats-io is not a job-queue, so I guess there is no concept of failures.

elbaro avatar Apr 04 '19 03:04 elbaro

We're facing the same issue in our current project in my company: some processes take longer than the default ack_wait to perform, so we get the message redelivered to the same subscription.

Our workaround was to setup the ack_wait of that subscriptions to a higher value (estimated time to perform the process x 2). The process is the following: the subscription receives a bunch of IDs in the message and has to call an 3rd party API for each one of them, then publish the results in another topic. We have a retry policy on that API calls, so if the API starts to fail, the process will take longer ('cause the retry policy) so even with the higher ack_wait the risk of getting a message twice in the same subscription while it's still being processed is still there.

I was thinking on the following workaround: storing the topic and sequence of the messages being processed at the moment for a certain subscription. If you receive the same message again while processing it, you discard it. When you finish the process of a message(by failing to process it or succeed), you remove the sequence+topic pair from memory, if you succeeded you also ack the message. This latest workaround is something we can achieve within our code but we would love a way to avoid this ack_wait problem within the NATS Streaming Python Client. What do you think @wallyqs?

Thanks in advance for reading and any insight related to this question. Also great job with the library!

sam-ji avatar Jul 09 '19 18:07 sam-ji

@sam-ji that sounds interesting, would be great to add as an example here and then look at whether this functionality could be part of the client so that it helps with this scenario. Something that we should consider too is that if using queue subscriptions then if another member of the group crashes and this gets redelivered to another member of the group that was processing a message with higher sequences, then the redelivery with lower sequence number may get ignored though ideally we'd want for another member to handle it and not ignore the message.

wallyqs avatar Jul 09 '19 20:07 wallyqs

@wallyqs totally agree with the durable queue subscriptions. Maybe for that matter, NATS can be used to store this sequence + topic being processed, like it's done in Kafka: using meta-topics to store the state of certain parts of the system.

Do you want me to add the example in the folder you linked, or do you want to do it?

Thanks again!

sam-ji avatar Jul 11 '19 22:07 sam-ji

@sam-ji using meta-topics sounds very interesting, feel free to send the example if want to iterate over the idea, I could also take a look at it next week

wallyqs avatar Jul 11 '19 23:07 wallyqs

@wallyqs Hello! Sorry for the late response. I've been thinking about the issue these past weeks and how to tackle it. The thing is even using meta-topics to avoid the processing of a message again while being processed, you will still generate overhead in the network, because the discard of the message is made at the client level, not server, so I don't think is an elegant solution.

Also, I think that maybe the answer to the redelivery problem we're facing in our project is because we decided wrongly to use NATS Streaming, instead of NATS. We needed an event bus to intercommunicate microservices, in order to change from HTTP to NATS for inner system communications.

Since you know more about the matter than I do, do you think that maybe NATS Streaming was an overkill and that NATS will do the trick as a centralized event bus like RabbitMQ?

Thanks in advance for all your help and sorry for the late response again!

sam-ji avatar Sep 09 '19 23:09 sam-ji