pytest-twisted icon indicating copy to clipboard operation
pytest-twisted copied to clipboard

new reactor for each test, like pytest-asyncio

Open graingert opened this issue 6 years ago • 2 comments
trafficstars

graingert avatar Nov 04 '19 12:11 graingert

to allow the reactor to re-run you'd need something like this:

from zope.interfaces.interface import Method, Attribute

from twisted.internet import interfaces
from twisted.python.runtime import platform


def _default_reactor_factory(platform):
    try:
        if platform.isLinux():
            try:
                from twisted.internet.epollreactor import EPollReactor as Reactor
            except ImportError:
                from twisted.internet.pollreactor import PollReactor as Reactor
        elif platform.getType() == "posix" and not platform.isMacOSX():
            from twisted.internet.pollreactor import PollReactor as Reactor
        else:
            from twisted.internet.selectreactor import SelectReactor as Reactor
    except ImportError:
        from twisted.internet.selectreactor import SelectReactor as Reactor
    return Reactor


class ReactorProxy(object):
    def __init__(self, reactor_factory=_default_reactor_factory(platform)):
        self.__reactor = None
        self.__reactor_factory = reactor_factory

    def __get_reactor(self):
        if self.__reactor is not None:
            return self.__reactor
        self.__reactor = self.__reactor_factory()
        return self.__reactor

    def reset_reactor(self):
        if self.__reactor is None:
            return
        if self.__reactor.running:
            raise Exception("cannot reset running reactor")
        self.__reactor = None

    @classmethod
    def setup(cls):
        def instrument(name):
            def do(self, *args, **kwargs):
                return getattr(self.__get_reactor(), name)(*args, **kwargs)

            return do

        def makeprop(name):
            def set_(self, attr):
                setattr(self.__get_reactor(), name, attr)

            def get(self):
                return getattr(self.__get_reactor(), name)

            return property(get, set_)

        for iname, interface in vars(interfaces).items():
            if not iname.startswith("IReactor"):
                continue
            for attr_name, attr_type in interface.namesAndDescriptions(True):
                if isinstance(attr_name, Method):
                    setattr(cls, attr_name, instrument(attr_name))
                elif isinstance(attr_name, Attribute):  # Method inherits Attribute
                    setattr(cls, attr_name, makeprop(attr_name))


reset_reactor = ReactorProxy.reset_reactor
del ReactorProxy.reset_reactor

_setup_reactor_proxy = ReactorProxy.setup
del ReactorProxy.setup
_setup_reactor_proxy()


def install(**kwargs):
    p = ReactorProxy(**kwargs)
    from twisted.internet.main import installReactor

    installReactor(p)


def run_until_complete(fn, *args, **kwargs):
    from twisted.internet import reactor, defer

    if not isinstance(reactor, ReactorProxy):
        raise Exception("Wrong reactor installed")

    if reactor.running:
        raise Exception("run_until_complete is not reentrant")

    class ExtractResult(object):
        def __init__(self):
            self.result = None

        @defer.inlineCallbacks
        def go(self):
            try:
                self.result = (None, (yield fn(*args, **kwargs)))
            except Exception as e:
                self.result = (e, None)
            finally:
                reactor.stop()

    er = ExtractResult()
    reactor.callWhenRunning(er.go)
    try:
        reactor.run()
    finally:
        reset_reactor(reactor)

    e, r = er.result
    if e is not None:
        raise e
    return r

graingert avatar Nov 04 '19 14:11 graingert

I've got a new version of this util now: https://gist.github.com/graingert/6dbde2a302c669e75c6fd57c1bc35944#file-twisted-py

I use it for my test suite and it took a 22 minute build to ~15 minutes because we were filling our one global eventloop with all sorts of readers/writers/LoopingCalls/threads

also when I hit ctrl+c it stops the test suite because the reactor isn't configured to swallow all the signals

graingert avatar Nov 21 '19 10:11 graingert