gino icon indicating copy to clipboard operation
gino copied to clipboard

Multihost string example

Open Smosker opened this issue 4 years ago • 9 comments

  • GINO version:1.0.1
  • Python version: 3.7
  • asyncpg version: 0.18.3

Description

Hello, i want to pass couple of hosts to connection string, is it possible to do so with Gino?

I want to achive same behavior as in psql for example - to connect to the first avaliable host.

What I Did

I Tried

g = Gino(user='user',database='db',host='host1,host2,host3', password='pass, ssl=True,port=1234)

db.init_app(app)

But i receive

python/gino/gino/ext/starlette.py in set_bind(self, bind, loop, **kwargs)
    224     async def set_bind(self, bind, loop=None, **kwargs):
    225         kwargs.setdefault("strategy", "starlette")
--> 226         return await super().set_bind(bind, loop=loop, **kwargs)

python/gino/gino/api.py in set_bind(self, bind, loop, **kwargs)
    415             from . import create_engine
    416
--> 417             bind = await create_engine(bind, loop=loop, **kwargs)
    418         self.bind = bind
    419         return bind

python/gino/gino/strategies.py in create(self, name_or_url, loop, **kwargs)
     53         dialect = dialect_cls(**dialect_args)
     54         pool_class = kwargs.pop("pool_class", None)
---> 55         pool = await dialect.init_pool(u, loop, pool_class=pool_class)
     56
     57         engine_args = dict(loop=loop)

python/gino/gino/dialects/asyncpg.py in init_pool(self, url, loop, pool_class)
    463         if pool_class is None:
    464             pool_class = Pool
--> 465         return await pool_class(url, loop, init=self.on_connect(), **self._pool_kwargs)
    466
    467     # noinspection PyMethodMayBeStatic

python/gino/gino/dialects/asyncpg.py in _init(self)
    216             password=self._url.password,
    217         )
--> 218         self._pool = await asyncpg.create_pool(**args)
    219         return self
    220

python/asyncpg/asyncpg/pool.py in _async__init__(self)
    398         self._initializing = True
    399         try:
--> 400             await self._initialize()
    401             return self
    402         finally:

python/asyncpg/asyncpg/pool.py in _initialize(self)
    415             # speed up successive connection attempts.
    416             first_ch = self._holders[-1]  # type: PoolConnectionHolder
--> 417             await first_ch.connect()
    418
    419             if self._minsize > 1:

python/asyncpg/asyncpg/pool.py in connect(self)
    123                 'connection already exists')
    124
--> 125         self._con = await self._pool._get_new_connection()
    126         self._generation = self._pool._generation
    127         self._maybe_cancel_inactive_callback()

python/asyncpg/asyncpg/pool.py in _get_new_connection(self)
    461                 loop=self._loop,
    462                 connection_class=self._connection_class,
--> 463                 **self._connect_kwargs)
    464
    465             self._working_addr = con._addr

python/asyncpg/asyncpg/connection.py in connect(dsn, host, port, user, password, passfile, database, loop, timeout, statement_cache_size, max_cached_statement_lifetime, max_cacheable_statement_size, command_timeout, ssl, connection_class, server_settings)
   1686         statement_cache_size=statement_cache_size,
   1687         max_cached_statement_lifetime=max_cached_statement_lifetime,
-> 1688         max_cacheable_statement_size=max_cacheable_statement_size)
   1689
   1690

python/asyncpg/asyncpg/connect_utils.py in _connect(loop, timeout, connection_class, **kwargs)
    549             timeout -= time.monotonic() - before
    550
--> 551     raise last_error
    552
    553

python/asyncpg/asyncpg/connect_utils.py in _connect(loop, timeout, connection_class, **kwargs)
    541                 addr=addr, loop=loop, timeout=timeout,
    542                 params=params, config=config,
--> 543                 connection_class=connection_class)
    544         except (OSError, asyncio.TimeoutError, ConnectionError) as ex:
    545             last_error = ex

python/asyncpg/asyncpg/connect_utils.py in _connect_addr(addr, loop, timeout, params, config, connection_class)
    511     before = time.monotonic()
    512     tr, pr = await asyncio.wait_for(
--> 513         connector, timeout=timeout, loop=loop)
    514     timeout -= time.monotonic() - before
    515

python3/src/Lib/asyncio/tasks.py in wait_for(fut, timeout, loop)
    440
    441         if fut.done():
--> 442             return fut.result()
    443         else:
    444             fut.remove_done_callback(cb)

python/asyncpg/asyncpg/connect_utils.py in _create_ssl_connection(protocol_factory, host, port, loop, ssl_context, ssl_is_advisory)
    604         ssl=ssl_context,
    605         server_hostname=host,
--> 606         ssl_is_advisory=ssl_is_advisory)
    607
    608

python/asyncpg/asyncpg/connect_utils.py in _negotiate_ssl_connection(host, port, conn_factory, loop, ssl, server_hostname, ssl_is_advisory)
    558     # negotiation fails or the PostgreSQL user isn't permitted to use SSL,
    559     # there's nothing that would attempt to reconnect with a non-SSL socket.
--> 560     reader, writer = await asyncio.open_connection(host, port, loop=loop)
    561
    562     tr = writer.transport

python3/src/Lib/asyncio/streams.py in open_connection(host, port, loop, limit, **kwds)
     75     protocol = StreamReaderProtocol(reader, loop=loop)
     76     transport, _ = await loop.create_connection(
---> 77         lambda: protocol, host, port, **kwds)
     78     writer = StreamWriter(transport, protocol, reader, loop)
     79     return reader, writer

python3/src/Lib/asyncio/base_events.py in create_connection(self, protocol_factory, host, port, ssl, family, proto, flags, sock, local_addr, server_hostname, ssl_handshake_timeout)
    911             infos = await self._ensure_resolved(
    912                 (host, port), family=family,
--> 913                 type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)
    914             if not infos:
    915                 raise OSError('getaddrinfo() returned empty list')

python3/src/Lib/asyncio/base_events.py in _ensure_resolved(self, address, family, type, proto, flags, loop)
   1288         else:
   1289             return await loop.getaddrinfo(host, port, family=family, type=type,
-> 1290                                           proto=proto, flags=flags)
   1291
   1292     async def _create_server_getaddrinfo(self, host, port, family, flags):

python3/src/Lib/asyncio/base_events.py in getaddrinfo(self, host, port, family, type, proto, flags)
    790
    791         return await self.run_in_executor(
--> 792             None, getaddr_func, host, port, family, type, proto, flags)
    793
    794     async def getnameinfo(self, sockaddr, flags=0):

python3/src/Lib/concurrent/futures/thread.py in run(self)
     55
     56         try:
---> 57             result = self.fn(*self.args, **self.kwargs)
     58         except BaseException as exc:
     59             self.future.set_exception(exc)

python3/src/Lib/socket.py in getaddrinfo(host, port, family, type, proto, flags)
    750     # and socket type values to enum constants.
    751     addrlist = []
--> 752     for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
    753         af, socktype, proto, canonname, sa = res
    754         addrlist.append((_intenum_converter(af, AddressFamily),

gaierror: [Errno -2] Name or service not known

Smosker avatar Jun 22 '20 14:06 Smosker

It's out of scope of an "ORM". I think you're looking for a load balancer like service or proxy.

wwwjfy avatar Jun 22 '20 14:06 wwwjfy

psql can work with such kind of connect string django ORM also support such feature and asyncpg too

In sqlalchemy you can also do such thing - by specifying your own creator like this

create_engine(..., creator=my_func) and parsing URL string as you want

But in gino i cant find out a way to do so - because gino code in https://github.com/python-gino/gino/blob/master/src/gino/dialects/asyncpg.py#L518 invoking before creator function in sqlalchemy

Related topic in sqlalchemy https://github.com/sqlalchemy/sqlalchemy/issues/4392

in asyncpg i can do it like this:

DB_CONF = {
    'host': [
        'host1',
        'host2',
        'host3',
    ],
    'port': 1234,
    'user': 'user',
    'password': 'pass',
    'database': 'dn_name',
    'ssl': ssl_context,

}
asyncpg.create_pool(**DB_CONFIG)

Smosker avatar Jun 22 '20 15:06 Smosker

Thanks for letting me know. I'm surprised. If it's supported by both sqlalchemy and the underlying driver, I guess it shouldn't be too hard to come up with a fix, though personally I don't feel like this is the right place to handle it.

I'll mark it as feature request then.

wwwjfy avatar Jun 22 '20 15:06 wwwjfy

I found a way around - if i pass

from gino.ext.starlette import Gino
from sqlalchemy.engine.url import URL

db = Gino(dsn=URL(
            username='user',
            password='pass',
            host=['host1', 'host2'],
            port=1234
            database='db',
        ))

it will work correctly (except for unability to print(db.config)).

Smosker avatar Jun 22 '20 16:06 Smosker

the only thing i dont figure out yet -how to pass option target_session_attrs=read-write in order to get master.

Smosker avatar Jun 22 '20 17:06 Smosker

@Smosker is it better to create a DB URL string and pass this parameter to your db object?

db = Gino()

async def init_app():
    await db.set_bind(f"postgresql://{PG_USER}:{PG_PASSWORD}@{PG_HOST}:{PG_PORT}/{PG_DB}?"
                      f"target_session_attrs={PG_TARGET_SESSION_ATTRS}")

aragentum avatar Jun 26 '20 15:06 aragentum

@aragentum I checked code inside set_bind and it looks like asyncpg ignore target_session_attrs in get parameters - it parse only following https://github.com/MagicStack/asyncpg/blob/1d9457f0b44d58f5537b011725916301a573afe7/asyncpg/connect_utils.py#L251

Smosker avatar Jun 26 '20 15:06 Smosker

I took a closer look.

The difference from SQLAlchemy's creator is that SQLAlchemy maintain its own connection queue and GIno uses asyncpg's, so we can't easily do that.

To sum up, I think we'll need to wait for SQLAlchemy to support comma-separated multiple hosts url parsing. @Smosker's workaround seems work well before that.

For target_session_attrs, I see the discussion here. To include master switch situation, there needs to be some changes in PostgreSQL itself.

wwwjfy avatar Jun 28 '20 13:06 wwwjfy

I've found a solution

You could override _init method of gino.dialects.asyncpg.Pool and pass dsn string directly to asyncpg It works since gino original Pool pass hosts as an arguments to asyncpg

import asyncpg
from gino.dialects.asyncpg import Pool as BasePool
from gino import Gino

class Pool(BasePool):
    """
    Pool allow to provide multiple postgresql hosts,
    since BasePool couldn't do that because it sets hosts implicitly
    """
    async def _init(self):
        args = self._kwargs.copy()
        args.update(
            loop=self._loop,
        )

        dsn = str(self._url)
        if dsn.startswith('postgresql+asyncpg') or dsn.startswith('postgres+asyncpg'):
            dsn = dsn.replace('+asyncpg', '', 1)

        self._pool = await asyncpg.create_pool(dsn, **args)
        return self

db = Gino()


async def bind_db(dsn: str, ssl):
    await db.set_bind(
        dsn,
        ssl=None,
        statement_cache_size=0,
        pool_class=Pool,
    )

snaffi avatar Mar 06 '21 08:03 snaffi