tornado icon indicating copy to clipboard operation
tornado copied to clipboard

Is connet_timeout not used in the AsyncHTTPClient stream connect timeout?

Open FireKAKA opened this issue 4 years ago • 1 comments

In the AsyncHTTPClient source code.

    def _on_timeout(self, info: str = None) -> None:
        self._timeout = None
        error_message = "Timeout {0}".format(info) if info else "Timeout"
        if self.final_callback is not None:
            self._handle_exception(
                HTTPTimeoutError, HTTPTimeoutError(error_message), None
            )

    def _handle_exception(
        self,
        typ: "Optional[Type[BaseException]]",
        value: Optional[BaseException],
        tb: Optional[TracebackType],
    ) -> bool:
        if self.final_callback:
            self._remove_timeout()
            if isinstance(value, StreamClosedError):
                if value.real_error is None:
                    value = HTTPStreamClosedError("Stream closed")
                else:
                    value = value.real_error
            self._run_callback(
                HTTPResponse(
                    self.request,
                    599,
                    error=value,
                    request_time=self.io_loop.time() - self.start_time,
                    start_time=self.start_wall_time,
                )
            )

            if hasattr(self, "stream"):
                # TODO: this may cause a StreamClosedError to be raised
                # by the connection's Future.  Should we cancel the
                # connection more gracefully?
                self.stream.close()
        timeout = min(self.request.connect_timeout, self.request.request_timeout)
            if timeout:
                self._timeout = self.io_loop.add_timeout(
                    self.start_time + timeout,
                    functools.partial(self._on_timeout, "while connecting"),
                )
                stream = await self.tcp_client.connect(
                    host,
                    port,
                    af=af,
                    ssl_options=ssl_options,
                    max_buffer_size=self.max_buffer_size,
                    source_ip=source_ip,
                )
    async def connect(
        self,
        host: str,
        port: int,
        af: socket.AddressFamily = socket.AF_UNSPEC,
        ssl_options: Union[Dict[str, Any], ssl.SSLContext] = None,
        max_buffer_size: int = None,
        source_ip: str = None,
        source_port: int = None,
        timeout: Union[float, datetime.timedelta] = None,
    ) -> IOStream:
        if timeout is not None:
            if isinstance(timeout, numbers.Real):
                timeout = IOLoop.current().time() + timeout
            elif isinstance(timeout, datetime.timedelta):
                timeout = IOLoop.current().time() + timeout.total_seconds()
            else:
                raise TypeError("Unsupported timeout %r" % timeout)
        if timeout is not None:
            addrinfo = await gen.with_timeout(
                timeout, self.resolver.resolve(host, port, af)
            )
        else:
            addrinfo = await self.resolver.resolve(host, port, af)
        connector = _Connector(
            addrinfo,
            functools.partial(
                self._create_stream,
                max_buffer_size,
                source_ip=source_ip,
                source_port=source_port,
            ),
        )
        af, addr, stream = await connector.start(connect_timeout=timeout)
........

    def start(
        self,
        timeout: float = _INITIAL_CONNECT_TIMEOUT,
        connect_timeout: Union[float, datetime.timedelta] = None,
    ) -> "Future[Tuple[socket.AddressFamily, Any, IOStream]]":
        self.try_connect(iter(self.primary_addrs))
        self.set_timeout(timeout)
        if connect_timeout is not None:
            self.set_connect_timeout(connect_timeout)
        return self.future

When connect timeout , though, _on_timeout let we can get a return. But self.tcp_client.connect, timeout and connect_timeout is None. If the client connet to server timeout, though we can get a return in our application, it seems like the socket is going to stay blocked and not be closed here.Because at this point, there is no stream in the self. How do I set a value for connect_timeout if i needn't change the source code

FireKAKA avatar Mar 16 '20 09:03 FireKAKA

I think you're right that the IOStream will not be closed immediately if the tcp_client.connect() times-out. I think that might actually not be possible, because the future has not yet resolved into the IOStream object.

So what happens is that _on_timeout() calls _handle_exception() calls _run_callback(), which schedules _final_callback and then sets self._final_callback = None. That final_callback returns execution to your code (raising an exception there). The tcp connect should finish or fail on its own eventually (a couple minutes in the worst case on most operating systems?) and then the _HTTPConnection notices that final_callback was cleared already, and the IOStream is cleaned up asynchronously:

                stream = await self.tcp_client.connect(
                    host,
                    port,
                    af=af,
                    ssl_options=ssl_options,
                    max_buffer_size=self.max_buffer_size,
                    source_ip=source_ip,
                )

                if self.final_callback is None:
                    # final_callback is cleared if we've hit our timeout.
                    stream.close()
                    return

So yes, the timed-out tcp connection is not closed immediately, but I think it should be cleaned up asynchronously within a couple minutes at most. Hopefully :)

ploxiln avatar Mar 18 '20 19:03 ploxiln