pytest-twisted
pytest-twisted copied to clipboard
new reactor for each test, like pytest-asyncio
trafficstars
to allow the reactor to re-run you'd need something like this:
from zope.interfaces.interface import Method, Attribute
from twisted.internet import interfaces
from twisted.python.runtime import platform
def _default_reactor_factory(platform):
try:
if platform.isLinux():
try:
from twisted.internet.epollreactor import EPollReactor as Reactor
except ImportError:
from twisted.internet.pollreactor import PollReactor as Reactor
elif platform.getType() == "posix" and not platform.isMacOSX():
from twisted.internet.pollreactor import PollReactor as Reactor
else:
from twisted.internet.selectreactor import SelectReactor as Reactor
except ImportError:
from twisted.internet.selectreactor import SelectReactor as Reactor
return Reactor
class ReactorProxy(object):
def __init__(self, reactor_factory=_default_reactor_factory(platform)):
self.__reactor = None
self.__reactor_factory = reactor_factory
def __get_reactor(self):
if self.__reactor is not None:
return self.__reactor
self.__reactor = self.__reactor_factory()
return self.__reactor
def reset_reactor(self):
if self.__reactor is None:
return
if self.__reactor.running:
raise Exception("cannot reset running reactor")
self.__reactor = None
@classmethod
def setup(cls):
def instrument(name):
def do(self, *args, **kwargs):
return getattr(self.__get_reactor(), name)(*args, **kwargs)
return do
def makeprop(name):
def set_(self, attr):
setattr(self.__get_reactor(), name, attr)
def get(self):
return getattr(self.__get_reactor(), name)
return property(get, set_)
for iname, interface in vars(interfaces).items():
if not iname.startswith("IReactor"):
continue
for attr_name, attr_type in interface.namesAndDescriptions(True):
if isinstance(attr_name, Method):
setattr(cls, attr_name, instrument(attr_name))
elif isinstance(attr_name, Attribute): # Method inherits Attribute
setattr(cls, attr_name, makeprop(attr_name))
reset_reactor = ReactorProxy.reset_reactor
del ReactorProxy.reset_reactor
_setup_reactor_proxy = ReactorProxy.setup
del ReactorProxy.setup
_setup_reactor_proxy()
def install(**kwargs):
p = ReactorProxy(**kwargs)
from twisted.internet.main import installReactor
installReactor(p)
def run_until_complete(fn, *args, **kwargs):
from twisted.internet import reactor, defer
if not isinstance(reactor, ReactorProxy):
raise Exception("Wrong reactor installed")
if reactor.running:
raise Exception("run_until_complete is not reentrant")
class ExtractResult(object):
def __init__(self):
self.result = None
@defer.inlineCallbacks
def go(self):
try:
self.result = (None, (yield fn(*args, **kwargs)))
except Exception as e:
self.result = (e, None)
finally:
reactor.stop()
er = ExtractResult()
reactor.callWhenRunning(er.go)
try:
reactor.run()
finally:
reset_reactor(reactor)
e, r = er.result
if e is not None:
raise e
return r
I've got a new version of this util now: https://gist.github.com/graingert/6dbde2a302c669e75c6fd57c1bc35944#file-twisted-py
I use it for my test suite and it took a 22 minute build to ~15 minutes because we were filling our one global eventloop with all sorts of readers/writers/LoopingCalls/threads
also when I hit ctrl+c it stops the test suite because the reactor isn't configured to swallow all the signals