supabase-py icon indicating copy to clipboard operation
supabase-py copied to clipboard

AsyncRealtimeChannel.subscribe returns without waiting for an answer

Open o-santi opened this issue 6 months ago • 3 comments

Bug report

  • [X] I confirm this is a bug with Supabase, not with my own application.
  • [X] I confirm I have searched the Docs, GitHub Discussions, and Discord.

Describe the bug

When subscribe is called, the channel object will send the payload through the websocket connection and instantly return, without waiting for an answer on whether subscribe was successful or not. This creates some unexpected behavior, as callback is not guaranteed to be called after await channel.subscribe(callback) returns which create non trivial race conditions.

Because of this, even if the callback crashes the program in the case of failure to subscribe to a channel, API calls after subscribe returns are not guarantee to be in a valid subscribed state, as it may be that callback hasn't been called yet.

To Reproduce

Create a supabase start local instance, then run the example given in the README.md:

# test.py
import asyncio
from typing import Optional

from realtime import AsyncRealtimeClient, RealtimeSubscribeStates

async def main():
    REALTIME_URL = "ws://localhost:54321/realtime/v1"
    API_KEY = "{API_KEY}" # api key here

    socket = AsyncRealtimeClient(REALTIME_URL, API_KEY)
    channel = socket.channel("test-channel")
    
    def _on_subscribe(status: RealtimeSubscribeStates, err: Optional[Exception]):
        raise SystemExit(100)

    await channel.subscribe(_on_subscribe)

if __name__ == '__main__':
    asyncio.run(main())
    print('`main()` ran successfully')

Which results in:

$ python test.py
`main()` ran successfully

Even though the callback should've crashed the program instantly.

Expected behavior

After channel.subscribe(callback) is awaited, it should ensure that callback is called by waiting for the answer inside the subscribe async function. Currently, the only way to ensure that it is called is to manually block after the await point with some asyncio.sleep calls, which is not only ugly but also requires you to emulate timeouts externally, even though subscribe does seem to have an internal timeout mechanic.

System information

  • OS: NixOS
  • Version of supabase-py: 2.5.2

o-santi avatar Jun 26 '25 18:06 o-santi

I believe one possible solution to this is by waiting for the answer inside the function and directly returning the payload from the subscribe call, which would remove the need of passing in a callback. This also would facilitate the user to do direct control flow, by matching on the payload type, instead of the current behavior that forces the user to use some non-local asyncio.Events to handle the control.

For instance, instead of using this style of control flow (present in almost all tests):

subscribe_event = asyncio.Event()
await channel.on_broadcast("test-event", broadcast_callback).subscribe(
    lambda state, _: (
        subscribe_event.set()
        if state == RealtimeSubscribeStates.SUBSCRIBED
        else None
    )
)

await asyncio.wait_for(subscribe_event.wait(), 5)

One should be able to write the much simpler:

channel.on_broadcast("test-event", broadcast_callback)
state = await channel.subscribe()
if not state == RealtimeSubscribeStates.SUBSCRIBED:
    return None

which makes control flow easier, and does not duplicate the timeout logic in asyncio.wait_for.

It is important to notice that this would be a breaking change, as it would need to change the type of subscribe. One alternative would be to introduce a new function that subsumes this use case and deprecate subscribe in the future, due to this behavior.

o-santi avatar Jun 26 '25 19:06 o-santi

The downside of your suggested approach is that when we call subscribe(), the rejoin logic runs in the background. So, anytime a socket sends an error or closes, it will automatically try to rejoin the channel.

How would we handle that in your suggestion?

But I agree we should rethink how we handle that, we want to change the API for realtime for quite a while.

grdsdev avatar Jun 26 '25 20:06 grdsdev

I believe the auto rejoin and the subscribe callback should be two different concerns. I believe that the current callback based design doesn't give the user enough knowledge and control over what's going on under the curtains, as it isn't at all places that websocket errors may be thrown, just precisely on those that send information to the server. Callbacks are best fit for reacting to server messages, not exactly when you'd like to know the result of a message you sent.

For instance, websocket errors may only happen on send_broadcast, track, subscribe and unsubscribe. These should directly return/raise the result of sending message instead of relying on the callbacks registered on the subscribe method, as currently there's no way to know certainly if a message was correctly sent, due to the AsyncPush class supressing all these errors into the logger:

class AsyncPush:
    ...
    async def send(self):
        # boilerplate
        try:
            await self.channel.socket.send(
                {
                    "topic": self.channel.topic,
                    "event": self.event,
                    "payload": self.payload,
                    "ref": self.ref,
                    "join_ref": self.channel.join_push.ref,
                }
            )
        except Exception as e:
            logger.error(f"send push failed: {e}")
            # no re-raising of the exception

This means that a simple snippet like:

await channel.send_broadcast('messages', my_first_message)
await channel.send_broadcast('messages', my_second_message_that_only_makes_sense_if_the_first_was_correctly_received)

Isn't guaranteed to being correct, as the first message may error out due to a network error, and the second send may continue through.

Besides that, I don't know if automatic rejoining on the background is a practice we should maintain. At the very least, I believe the user should know about it and opt into that, as it's not clear nowhere in the documentation that it happens. Furthermore, the control flow for generating correct code with it is very much non trivial, as you need to constantly check that the connection you have is valid inside the callbacks.

o-santi avatar Jun 27 '25 13:06 o-santi