Winloop icon indicating copy to clipboard operation
Winloop copied to clipboard

TypeError: no default __reduce__ due to non-trivial __cinit__

Open allrobot opened this issue 1 year ago • 3 comments
trafficstars

https://aiomultiprocess.omnilib.dev/en/latest/guide.html#using-uvloop

code:

async def fetch_detail(url: str, pool: aiomultiprocess.Pool):
    async with aiohttp.ClientSession() .get(url) as response:
        html = await response.text()
        return html

async def aiomultiprocess_main():
    main_url = "http://am.adianshi.com:6805/"
    async with aiomultiprocess.Pool(loop_initializer=winloop.new_event_loop()) as pool:
        task = pool.apply(fetch_detail, args=(main_url, pool))
        html=await task
        print(html)

if __name__ == "__main__":
    asyncio.run(aiomultiprocess_main())

It report error:

C:\ProgramData\anaconda3\envs\python311\python.exe C:\Users\Administrator\Personal_scripts\pythonProject\temp.py 
Traceback (most recent call last):
  File "C:\Users\Administrator\Personal_scripts\pythonProject\temp.py", line 60, in <module>
    asyncio.run(aiomultiprocess_main())
  File "C:\ProgramData\anaconda3\envs\python311\Lib\asyncio\runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "C:\ProgramData\anaconda3\envs\python311\Lib\asyncio\runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\ProgramData\anaconda3\envs\python311\Lib\asyncio\base_events.py", line 654, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\Personal_scripts\pythonProject\temp.py", line 54, in aiomultiprocess_main
    async with aiomultiprocess.Pool(loop_initializer=winloop.new_event_loop()) as pool:
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\ProgramData\anaconda3\envs\python311\Lib\site-packages\aiomultiprocess\pool.py", line 186, in __init__
    self.init()
  File "C:\ProgramData\anaconda3\envs\python311\Lib\site-packages\aiomultiprocess\pool.py", line 214, in init
    self.processes[self.create_worker(qid)] = qid
                   ^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\ProgramData\anaconda3\envs\python311\Lib\site-packages\aiomultiprocess\pool.py", line 261, in create_worker
    process.start()
  File "C:\ProgramData\anaconda3\envs\python311\Lib\site-packages\aiomultiprocess\core.py", line 156, in start
    return self.aio_process.start()
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\ProgramData\anaconda3\envs\python311\Lib\multiprocessing\process.py", line 121, in start
    self._popen = self._Popen(self)
                  ^^^^^^^^^^^^^^^^^
  File "C:\ProgramData\anaconda3\envs\python311\Lib\multiprocessing\context.py", line 336, in _Popen
    return Popen(process_obj)
           ^^^^^^^^^^^^^^^^^^
  File "C:\ProgramData\anaconda3\envs\python311\Lib\multiprocessing\popen_spawn_win32.py", line 95, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\ProgramData\anaconda3\envs\python311\Lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
  File "<stringsource>", line 2, in winloop.loop.Loop.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__

allrobot avatar Jun 07 '24 12:06 allrobot

@allrobot try running it with winloop.run() and see if it throws the same errors at you.

Vizonex avatar Jun 08 '24 19:06 Vizonex

https://github.com/Vizonex/Winloop#making-pull-requests

run code, it will report:

C:\ProgramData\anaconda3\envs\python311\python.exe "C:/Program Files/JetBrains/PyCharm Community Edition 2024.1/plugins/python-ce/helpers/pycharm/_jb_pytest_runner.py" --path C:\Users\Administrator\Personal_scripts\pythonProject\test.py 
Testing started at 下午3:28 ...
Launching pytest with arguments C:\Users\Administrator\Personal_scripts\pythonProject\test.py --no-header --no-summary -q in C:\Users\Administrator\Personal_scripts\pythonProject

============================= test session starts =============================
collecting ... collected 2 items

test.py::TestAioHTTP::test_aiohttp_basic_1 
test.py::TestAioHTTP::test_aiohttp_graceful_shutdown 

======================== 2 failed, 1 warning in 2.32s =========================
FAILED                        [ 50%]
test.py:22 (TestAioHTTP.test_aiohttp_basic_1)
self = <test.TestAioHTTP testMethod=test_aiohttp_basic_1>

    def test_aiohttp_basic_1(self):
        PAYLOAD = '<h1>It Works!</h1>' * 10000
    
        async def on_request(request):
            return aiohttp.web.Response(text=PAYLOAD)
    
        asyncio.set_event_loop(self.loop)
        app = aiohttp.web.Application()
        app.router.add_get('/', on_request)
    
        runner = aiohttp.web.AppRunner(app)
        self.loop.run_until_complete(runner.setup())
        site = aiohttp.web.TCPSite(runner, '0.0.0.0', '10000')
        self.loop.run_until_complete(site.start())
        port = site._server.sockets[0].getsockname()[1]
    
        async def test():
            # Make sure we're using the correct event loop.
            self.assertIs(asyncio.get_event_loop(), self.loop)
    
            for addr in (('localhost', port),
                         ('127.0.0.1', port)):
                async with aiohttp.ClientSession() as client:
                    async with client.get('http://{}:{}'.format(*addr)) as r:
                        self.assertEqual(r.status, 200)
                        result = await r.text()
                        self.assertEqual(result, PAYLOAD)
    
>       self.loop.run_until_complete(test())

test.py:51: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\ProgramData\anaconda3\envs\python311\Lib\asyncio\base_events.py:654: in run_until_complete
    return future.result()
test.py:46: in test
    async with client.get('http://{}:{}'.format(*addr)) as r:
C:\ProgramData\anaconda3\envs\python311\Lib\site-packages\aiohttp\client.py:1197: in __aenter__
    self._resp = await self._coro
C:\ProgramData\anaconda3\envs\python311\Lib\site-packages\aiohttp\client.py:608: in _request
    await resp.start(conn)
C:\ProgramData\anaconda3\envs\python311\Lib\site-packages\aiohttp\client_reqrep.py:976: in start
    message, payload = await protocol.read()  # type: ignore[union-attr]
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <aiohttp.client_proto.ResponseHandler object at 0x00000240EE359B70>

    async def read(self) -> _T:
        if not self._buffer and not self._eof:
            assert not self._waiter
            self._waiter = self._loop.create_future()
            try:
>               await self._waiter
E               aiohttp.client_exceptions.ServerDisconnectedError: Server disconnected

C:\ProgramData\anaconda3\envs\python311\Lib\site-packages\aiohttp\streams.py:640: ServerDisconnectedError
FAILED              [100%]
test.py:53 (TestAioHTTP.test_aiohttp_graceful_shutdown)
self = <test.TestAioHTTP testMethod=test_aiohttp_graceful_shutdown>

    def test_aiohttp_graceful_shutdown(self):
        async def websocket_handler(request):
            ws = aiohttp.web.WebSocketResponse()
            await ws.prepare(request)
            request.app['websockets'].add(ws)
            try:
                async for msg in ws:
                    await ws.send_str(msg.data)
            finally:
                request.app['websockets'].discard(ws)
            return ws
    
        async def on_shutdown(app):
            for ws in set(app['websockets']):
                await ws.close(
                    code=aiohttp.WSCloseCode.GOING_AWAY,
                    message='Server shutdown')
    
        asyncio.set_event_loop(self.loop)
        app = aiohttp.web.Application()
        app.router.add_get('/', websocket_handler)
        app.on_shutdown.append(on_shutdown)
        app['websockets'] = weakref.WeakSet()
    
        runner = aiohttp.web.AppRunner(app)
        self.loop.run_until_complete(runner.setup())
        site = aiohttp.web.TCPSite(runner, '0.0.0.0', '10000')
>       self.loop.run_until_complete(site.start())

test.py:81: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\ProgramData\anaconda3\envs\python311\Lib\asyncio\base_events.py:654: in run_until_complete
    return future.result()
C:\ProgramData\anaconda3\envs\python311\Lib\site-packages\aiohttp\web_runner.py:119: in start
    self._server = await loop.create_server(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <ProactorEventLoop running=False closed=False debug=False>
protocol_factory = <aiohttp.web_server.Server object at 0x00000240EE86EFD0>
host = '0.0.0.0', port = '10000'

    async def create_server(
            self, protocol_factory, host=None, port=None,
            *,
            family=socket.AF_UNSPEC,
            flags=socket.AI_PASSIVE,
            sock=None,
            backlog=100,
            ssl=None,
            reuse_address=None,
            reuse_port=None,
            ssl_handshake_timeout=None,
            ssl_shutdown_timeout=None,
            start_serving=True):
        """Create a TCP server.
    
        The host parameter can be a string, in that case the TCP server is
        bound to host and port.
    
        The host parameter can also be a sequence of strings and in that case
        the TCP server is bound to all hosts of the sequence. If a host
        appears multiple times (possibly indirectly e.g. when hostnames
        resolve to the same IP address), the server is only bound once to that
        host.
    
        Return a Server object which can be used to stop the service.
    
        This method is a coroutine.
        """
        if isinstance(ssl, bool):
            raise TypeError('ssl argument must be an SSLContext or None')
    
        if ssl_handshake_timeout is not None and ssl is None:
            raise ValueError(
                'ssl_handshake_timeout is only meaningful with ssl')
    
        if ssl_shutdown_timeout is not None and ssl is None:
            raise ValueError(
                'ssl_shutdown_timeout is only meaningful with ssl')
    
        if sock is not None:
            _check_ssl_socket(sock)
    
        if host is not None or port is not None:
            if sock is not None:
                raise ValueError(
                    'host/port and sock can not be specified at the same time')
    
            if reuse_address is None:
                reuse_address = os.name == "posix" and sys.platform != "cygwin"
            sockets = []
            if host == '':
                hosts = [None]
            elif (isinstance(host, str) or
                  not isinstance(host, collections.abc.Iterable)):
                hosts = [host]
            else:
                hosts = host
    
            fs = [self._create_server_getaddrinfo(host, port, family=family,
                                                  flags=flags)
                  for host in hosts]
            infos = await tasks.gather(*fs)
            infos = set(itertools.chain.from_iterable(infos))
    
            completed = False
            try:
                for res in infos:
                    af, socktype, proto, canonname, sa = res
                    try:
                        sock = socket.socket(af, socktype, proto)
                    except socket.error:
                        # Assume it's a bad family/type/protocol combination.
                        if self._debug:
                            logger.warning('create_server() failed to create '
                                           'socket.socket(%r, %r, %r)',
                                           af, socktype, proto, exc_info=True)
                        continue
                    sockets.append(sock)
                    if reuse_address:
                        sock.setsockopt(
                            socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
                    if reuse_port:
                        _set_reuseport(sock)
                    # Disable IPv4/IPv6 dual stack support (enabled by
                    # default on Linux) which makes a single socket
                    # listen on both address families.
                    if (_HAS_IPv6 and
                            af == socket.AF_INET6 and
                            hasattr(socket, 'IPPROTO_IPV6')):
                        sock.setsockopt(socket.IPPROTO_IPV6,
                                        socket.IPV6_V6ONLY,
                                        True)
                    try:
                        sock.bind(sa)
                    except OSError as err:
                        msg = ('error while attempting '
                               'to bind on address %r: %s'
                               % (sa, err.strerror.lower()))
                        if err.errno == errno.EADDRNOTAVAIL:
                            # Assume the family is not enabled (bpo-30945)
                            sockets.pop()
                            sock.close()
                            if self._debug:
                                logger.warning(msg)
                            continue
>                       raise OSError(err.errno, msg) from None
E                       OSError: [Errno 10048] error while attempting to bind on address ('0.0.0.0', 10000): Typically each socket address (protocol/network address/port) is only allowed to be used once.

C:\ProgramData\anaconda3\envs\python311\Lib\asyncio\base_events.py:1536: OSError

https://github.com/Vizonex/Winloop#how-to-use-winloop-with-fastapi

it report

C:\ProgramData\anaconda3\envs\python311\python.exe "C:/Program Files/JetBrains/PyCharm Community Edition 2024.1/plugins/python-ce/helpers/pycharm/_jb_pytest_runner.py" --path C:\Users\Administrator\Personal_scripts\pythonProject\test.py 
Testing started at 下午3:26 ...
Launching pytest with arguments C:\Users\Administrator\Personal_scripts\pythonProject\test.py --no-header --no-summary -q in C:\Users\Administrator\Personal_scripts\pythonProject

============================= test session starts =============================
collecting ... collected 2 items

test.py::test_get_request SKIPPED (async def function and no async
plugin installed (see warnings))                                         [ 50%]
Skipped: async def function and no async plugin installed (see warnings)

test.py::test_dynamic_response PASSED                                    [100%]

================== 1 passed, 1 skipped, 4 warnings in 0.39s ===================

allrobot avatar Jun 09 '24 00:06 allrobot

@allrobot I added winloop to aiomultiprocess's testsuite as well so that hopefully any future issues can be resolved.

Vizonex avatar Jul 06 '24 22:07 Vizonex

I tested it and it works fine now so I am closing this issue up

Vizonex avatar Aug 20 '24 14:08 Vizonex