gino
gino copied to clipboard
Multihost string example
- GINO version:1.0.1
- Python version: 3.7
- asyncpg version: 0.18.3
Description
Hello, i want to pass couple of hosts to connection string, is it possible to do so with Gino?
I want to achive same behavior as in psql for example - to connect to the first avaliable host.
What I Did
I Tried
g = Gino(user='user',database='db',host='host1,host2,host3', password='pass, ssl=True,port=1234)
db.init_app(app)
But i receive
python/gino/gino/ext/starlette.py in set_bind(self, bind, loop, **kwargs)
224 async def set_bind(self, bind, loop=None, **kwargs):
225 kwargs.setdefault("strategy", "starlette")
--> 226 return await super().set_bind(bind, loop=loop, **kwargs)
python/gino/gino/api.py in set_bind(self, bind, loop, **kwargs)
415 from . import create_engine
416
--> 417 bind = await create_engine(bind, loop=loop, **kwargs)
418 self.bind = bind
419 return bind
python/gino/gino/strategies.py in create(self, name_or_url, loop, **kwargs)
53 dialect = dialect_cls(**dialect_args)
54 pool_class = kwargs.pop("pool_class", None)
---> 55 pool = await dialect.init_pool(u, loop, pool_class=pool_class)
56
57 engine_args = dict(loop=loop)
python/gino/gino/dialects/asyncpg.py in init_pool(self, url, loop, pool_class)
463 if pool_class is None:
464 pool_class = Pool
--> 465 return await pool_class(url, loop, init=self.on_connect(), **self._pool_kwargs)
466
467 # noinspection PyMethodMayBeStatic
python/gino/gino/dialects/asyncpg.py in _init(self)
216 password=self._url.password,
217 )
--> 218 self._pool = await asyncpg.create_pool(**args)
219 return self
220
python/asyncpg/asyncpg/pool.py in _async__init__(self)
398 self._initializing = True
399 try:
--> 400 await self._initialize()
401 return self
402 finally:
python/asyncpg/asyncpg/pool.py in _initialize(self)
415 # speed up successive connection attempts.
416 first_ch = self._holders[-1] # type: PoolConnectionHolder
--> 417 await first_ch.connect()
418
419 if self._minsize > 1:
python/asyncpg/asyncpg/pool.py in connect(self)
123 'connection already exists')
124
--> 125 self._con = await self._pool._get_new_connection()
126 self._generation = self._pool._generation
127 self._maybe_cancel_inactive_callback()
python/asyncpg/asyncpg/pool.py in _get_new_connection(self)
461 loop=self._loop,
462 connection_class=self._connection_class,
--> 463 **self._connect_kwargs)
464
465 self._working_addr = con._addr
python/asyncpg/asyncpg/connection.py in connect(dsn, host, port, user, password, passfile, database, loop, timeout, statement_cache_size, max_cached_statement_lifetime, max_cacheable_statement_size, command_timeout, ssl, connection_class, server_settings)
1686 statement_cache_size=statement_cache_size,
1687 max_cached_statement_lifetime=max_cached_statement_lifetime,
-> 1688 max_cacheable_statement_size=max_cacheable_statement_size)
1689
1690
python/asyncpg/asyncpg/connect_utils.py in _connect(loop, timeout, connection_class, **kwargs)
549 timeout -= time.monotonic() - before
550
--> 551 raise last_error
552
553
python/asyncpg/asyncpg/connect_utils.py in _connect(loop, timeout, connection_class, **kwargs)
541 addr=addr, loop=loop, timeout=timeout,
542 params=params, config=config,
--> 543 connection_class=connection_class)
544 except (OSError, asyncio.TimeoutError, ConnectionError) as ex:
545 last_error = ex
python/asyncpg/asyncpg/connect_utils.py in _connect_addr(addr, loop, timeout, params, config, connection_class)
511 before = time.monotonic()
512 tr, pr = await asyncio.wait_for(
--> 513 connector, timeout=timeout, loop=loop)
514 timeout -= time.monotonic() - before
515
python3/src/Lib/asyncio/tasks.py in wait_for(fut, timeout, loop)
440
441 if fut.done():
--> 442 return fut.result()
443 else:
444 fut.remove_done_callback(cb)
python/asyncpg/asyncpg/connect_utils.py in _create_ssl_connection(protocol_factory, host, port, loop, ssl_context, ssl_is_advisory)
604 ssl=ssl_context,
605 server_hostname=host,
--> 606 ssl_is_advisory=ssl_is_advisory)
607
608
python/asyncpg/asyncpg/connect_utils.py in _negotiate_ssl_connection(host, port, conn_factory, loop, ssl, server_hostname, ssl_is_advisory)
558 # negotiation fails or the PostgreSQL user isn't permitted to use SSL,
559 # there's nothing that would attempt to reconnect with a non-SSL socket.
--> 560 reader, writer = await asyncio.open_connection(host, port, loop=loop)
561
562 tr = writer.transport
python3/src/Lib/asyncio/streams.py in open_connection(host, port, loop, limit, **kwds)
75 protocol = StreamReaderProtocol(reader, loop=loop)
76 transport, _ = await loop.create_connection(
---> 77 lambda: protocol, host, port, **kwds)
78 writer = StreamWriter(transport, protocol, reader, loop)
79 return reader, writer
python3/src/Lib/asyncio/base_events.py in create_connection(self, protocol_factory, host, port, ssl, family, proto, flags, sock, local_addr, server_hostname, ssl_handshake_timeout)
911 infos = await self._ensure_resolved(
912 (host, port), family=family,
--> 913 type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)
914 if not infos:
915 raise OSError('getaddrinfo() returned empty list')
python3/src/Lib/asyncio/base_events.py in _ensure_resolved(self, address, family, type, proto, flags, loop)
1288 else:
1289 return await loop.getaddrinfo(host, port, family=family, type=type,
-> 1290 proto=proto, flags=flags)
1291
1292 async def _create_server_getaddrinfo(self, host, port, family, flags):
python3/src/Lib/asyncio/base_events.py in getaddrinfo(self, host, port, family, type, proto, flags)
790
791 return await self.run_in_executor(
--> 792 None, getaddr_func, host, port, family, type, proto, flags)
793
794 async def getnameinfo(self, sockaddr, flags=0):
python3/src/Lib/concurrent/futures/thread.py in run(self)
55
56 try:
---> 57 result = self.fn(*self.args, **self.kwargs)
58 except BaseException as exc:
59 self.future.set_exception(exc)
python3/src/Lib/socket.py in getaddrinfo(host, port, family, type, proto, flags)
750 # and socket type values to enum constants.
751 addrlist = []
--> 752 for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
753 af, socktype, proto, canonname, sa = res
754 addrlist.append((_intenum_converter(af, AddressFamily),
gaierror: [Errno -2] Name or service not known
It's out of scope of an "ORM". I think you're looking for a load balancer like service or proxy.
psql
can work with such kind of connect string
django ORM
also support such feature and asyncpg
too
In sqlalchemy
you can also do such thing - by specifying your own creator like this
create_engine(..., creator=my_func)
and parsing URL string as you want
But in gino i cant find out a way to do so - because gino code in https://github.com/python-gino/gino/blob/master/src/gino/dialects/asyncpg.py#L518 invoking before creator function in sqlalchemy
Related topic in sqlalchemy https://github.com/sqlalchemy/sqlalchemy/issues/4392
in asyncpg i can do it like this:
DB_CONF = {
'host': [
'host1',
'host2',
'host3',
],
'port': 1234,
'user': 'user',
'password': 'pass',
'database': 'dn_name',
'ssl': ssl_context,
}
asyncpg.create_pool(**DB_CONFIG)
Thanks for letting me know. I'm surprised. If it's supported by both sqlalchemy and the underlying driver, I guess it shouldn't be too hard to come up with a fix, though personally I don't feel like this is the right place to handle it.
I'll mark it as feature request then.
I found a way around - if i pass
from gino.ext.starlette import Gino
from sqlalchemy.engine.url import URL
db = Gino(dsn=URL(
username='user',
password='pass',
host=['host1', 'host2'],
port=1234
database='db',
))
it will work correctly (except for unability to print(db.config)
).
the only thing i dont figure out yet -how to pass option target_session_attrs=read-write
in order to get master.
@Smosker is it better to create a DB URL string and pass this parameter to your db
object?
db = Gino()
async def init_app():
await db.set_bind(f"postgresql://{PG_USER}:{PG_PASSWORD}@{PG_HOST}:{PG_PORT}/{PG_DB}?"
f"target_session_attrs={PG_TARGET_SESSION_ATTRS}")
@aragentum I checked code inside set_bind
and it looks like asyncpg ignore target_session_attrs
in get parameters - it parse only following https://github.com/MagicStack/asyncpg/blob/1d9457f0b44d58f5537b011725916301a573afe7/asyncpg/connect_utils.py#L251
I took a closer look.
The difference from SQLAlchemy's creator
is that SQLAlchemy maintain its own connection queue and GIno uses asyncpg's, so we can't easily do that.
To sum up, I think we'll need to wait for SQLAlchemy to support comma-separated multiple hosts url parsing. @Smosker's workaround seems work well before that.
For target_session_attrs
, I see the discussion here. To include master switch situation, there needs to be some changes in PostgreSQL itself.
I've found a solution
You could override _init method of gino.dialects.asyncpg.Pool
and pass dsn string directly to asyncpg
It works since gino original Pool pass hosts as an arguments to asyncpg
import asyncpg
from gino.dialects.asyncpg import Pool as BasePool
from gino import Gino
class Pool(BasePool):
"""
Pool allow to provide multiple postgresql hosts,
since BasePool couldn't do that because it sets hosts implicitly
"""
async def _init(self):
args = self._kwargs.copy()
args.update(
loop=self._loop,
)
dsn = str(self._url)
if dsn.startswith('postgresql+asyncpg') or dsn.startswith('postgres+asyncpg'):
dsn = dsn.replace('+asyncpg', '', 1)
self._pool = await asyncpg.create_pool(dsn, **args)
return self
db = Gino()
async def bind_db(dsn: str, ssl):
await db.set_bind(
dsn,
ssl=None,
statement_cache_size=0,
pool_class=Pool,
)