bluelet icon indicating copy to clipboard operation
bluelet copied to clipboard

Joining on already exited coroutine blocks indefinetely

Open wezzman opened this issue 11 years ago • 3 comments
trafficstars

I am trying to use a looping and sleeping coroutine as a synchronization primitive and it works fairly well for synchronous and timed cases. But when I try to use it in an Asynchronous case with a long wait before trying to get the result the looping coroutine may have exited. If it did the future like coroutine I'm using blocks while trying to join on the looping coroutine indefinitely.

import bluelet
import random

class TestProxy:
    _suspeneded = {}
    _messages = {}
    _continue_exec = True

    def init(self):
        print "Initializing"
        while TestProxy._continue_exec:
            print "Checking for messages"
            for k in TestProxy._messages.keys():
                if random.randint(0,100) < 50:
                    if k in TestProxy._suspeneded:
                        TestProxy._messages[k] = 0
                        print "Message sent {}".format(k)
                        yield bluelet.kill(TestProxy._suspeneded[k])
                        del TestProxy._suspeneded[k]
                        print "Killed {}".format(k)
            yield bluelet.sleep(1)
        print "Shutting down"

    def call_sync(self, num):
        # Setup
        TestProxy._messages[self] = None
        print "{} was passed".format(num)

        def inner_call_sync(self):
            while TestProxy._messages[self] is None:
                print "Sleeping {}".format(self)
                yield bluelet.sleep(600)

        # Create wait coroutine
        coro = inner_call_sync(self)
        # Store reference to wait coroutine
        TestProxy._suspeneded[self] = coro
        # Start and block on wait coroutine
        yield coro

        # Get message a cleanup
        print "Message Received {}".format(TestProxy._messages[self])
        message = TestProxy._messages.pop(self)
        yield bluelet.end(message)

    def call_timeout(self, num, timeout):
        # Setup
        TestProxy._messages[self] = None
        print "{} was passed".format(num)

        def inner_call_timeout(self, timeout):
            print "Sleeping {} for {}".format(self, timeout)
            yield bluelet.sleep(timeout)

        # Create wait coroutine
        coro = inner_call_timeout(self, timeout)
        # Store reference to wait coroutine
        TestProxy._suspeneded[self] = coro
        # Start and block on wait coroutine
        yield coro

        # Get message a cleanup
        print "Message Received {}".format(TestProxy._messages[self])
        message = TestProxy._messages.pop(self)
        yield bluelet.end(message)

    def call_async(self, num):
        # Setup
        TestProxy._messages[self] = None
        print "{} was passed".format(num)

        def inner_call_async(self):
            while TestProxy._messages[self] is None:
                print "Sleeping {}".format(self)
                yield bluelet.sleep(600)

        # Create wait coroutine
        coro = inner_call_async(self)
        # Store reference to wait coroutine
        TestProxy._suspeneded[self] = coro
        # Start wait coroutine
        yield bluelet.spawn(coro)

        def inner_finish(self, coro):
            print "Waiting on future"
            yield bluelet.join(coro)
            # Get message a cleanup
            print "Message Received {}".format(TestProxy._messages[self])
            message = TestProxy._messages.pop(self)
            yield bluelet.end(message)

        # Returning future
        yield bluelet.end(inner_finish(self, coro))


    def shutdown(self):
        TestProxy._continue_exec = False

def test(num):
    print "Number is {}".format(num)

    tp = TestProxy()

    ret = yield tp.call_sync(num)

    print "Returned {}".format(ret)

def test2(num):
    print "Number is {}".format(num)

    tp = TestProxy()

    ret = yield tp.call_timeout(num, 2)

    if ret is None:
        print "Call timed out"
    else:
        print "Returned {}".format(ret)

def test3(num):
    print "Number is {}".format(num)

    tp = TestProxy()

    fut = yield tp.call_async(num)

    yield bluelet.sleep(60)

    print "Async called"

    ret = yield fut

    print "Returned {}".format(ret)

def main():
    yield bluelet.spawn(TestProxy().init())
    #yield bluelet.spawn(test(42))
    #yield bluelet.spawn(test(43))
    #yield bluelet.spawn(test2(44))
    #yield bluelet.spawn(test2(45))
    yield bluelet.spawn(test3(46))
    yield bluelet.spawn(test3(47))
    yield bluelet.sleep(300)
    TestProxy().shutdown()

bluelet.run(main())

I think I know how to fix this though. If you add a check to see if the coroutine you want to join has been scheduled before suspending, finished coroutines could join immediately. This does not take into account coroutines that have not been spawned yet but that can most likely be solved by using a weakref.WeakKeyDictionary containing a record of all previously scheduled coroutines that still have references elsewhere.

wezzman avatar Sep 16 '14 19:09 wezzman

Ah yes, that does make sense—join should exit immediately for finished coroutines. The weak-reference idea seems like the right one, unless we want to simplify the story and just say the unspawned co routines also cause immediate exit from join. That would probably be fine, if slightly confusing on its face, since it's typically not hard to ensure that you spawn before joining.

sampsyo avatar Sep 16 '14 20:09 sampsyo

I think I actually explained my logic wrong after reading it over again. I was actually thinking of the weak-ref idea for the history of finished coroutines and mixed it up with non-spawned coroutines when I typed it. Let me try this again.

We add a check on a JoinEvent to see if the coroutine is currently scheduled. If it is we suspend. If not we check a weak-ref history to see if it already finished, and join immediately if it has. If it is not scheduled and not in the history it indicates a coroutine not spawned yet and we should block.

I'm of the opinion that joining a non-spawned coroutine could be a logic error and should block so the programmer knows where to look. If its not a logic error it most likely is a join in a parent coroutine for a coroutine created in that parent but spawned in another independent coroutine.

I have actually forked the repo already and I'm working on solving this now. When I'm done I'll send a pull request.

wezzman avatar Sep 16 '14 23:09 wezzman

Yes, your idea is obviously the less error-prone one—thanks for taking a look! I'll be happy to merge.

sampsyo avatar Sep 16 '14 23:09 sampsyo