incubator-pegasus
incubator-pegasus copied to clipboard
feat(Python-client):add dns resolution support
What problem does this PR solve?
add dns resolution support
What is changed and how does it work?
When the Pegasus operation times out, it will attempt to fetch the topology from meta. If all meta servers fail to respond, it will try to re-resolve the DNS to obtain new IP addresses.
Checklist
Tests
- Manual test (add detailed scripts or steps below)
Objective: Validate DNS resolution failover mechanism in
resolve_all_ipsfunction.
Steps:
-
Initial Setup
Modifyresolve_all_ipsto force initial DNS resolution to Onebox (127.0.0.1).def resolve_all_ips(domain): if first_call: return ["127.0.0.1"] # Force Onebox resolution - Stop Onebox service to trigger client timeouts
./run.sh stop_onebox
- Failover Trigger when client fail to query server after 5 times, dns-resolve will be trigger.
- DNS resolved second in resolve_all_ips , it will truly resolve hostname to ip address, and start to query real server.
- In local tests, the meta addresses can be successfully re-resolved
2025-10-16 14:51:09 [134627073400832] [INFO] pgclient.py:375 resolved hostname xxxxxxx to IP type:host_port_types.kHostTypeIpv4, addr:['xx.xx.xx.xx']
2025-10-16 14:51:09 [134627073400832] [INFO] pgclient.py:389 removed stale server: 127.0.0.1:xxxxx
2025-10-16 14:51:09 [134627073400832] [INFO] pgclient.py:389 removed stale server: 127.0.0.1:xxxxx
2025-10-16 14:51:09 [134627073400832] [INFO] pgclient.py:389 removed stale server: 127.0.0.1:xxxxx
Real test process as follow
from twisted.internet.defer import inlineCallbacks
from twisted.internet import reactor, task
from pypegasus.pgclient import *
from twisted.internet.defer import inlineCallbacks, Deferred
from pypegasus.pgclient import Pegasus
@inlineCallbacks
def run_test():
c = Pegasus(['hostname:34601', '127.0.0.1.1:34602', '127.0.0.1.1:34603'], 'table_test')
try:
suc = yield c.init()
if not suc:
reactor.stop()
return
except Exception as e:
reactor.stop()
return
@inlineCallbacks
def periodic_request():
while True:
# set
try:
ret = yield c.set('hkey1', 'skey1', 'value', 0, 500)
print('set ret: ', ret)
except Exception as e:
print(e)
# exist
ret = yield c.exist('hkey1', 'skey1')
print('exist ret: ', ret)
ret = yield c.exist('hkey1', 'skey2')
print('exist ret: ', ret)
# get
ret = yield c.get('hkey1', 'skey1')
print('get ret: ', ret)
ret = yield c.get('hkey1', 'skey2')
print('get ret: ', ret)
# ttl
yield c.set('hkey2', 'skey1', 'value', 123)
d = Deferred()
reactor.callLater(2, d.callback, 'ok') # 2 seconds later
yield d
ret = yield c.ttl('hkey2', 'skey1')
print('ttl ret: ', ret)
# remove
ret = yield c.remove('hkey2', 'skey1')
print('remove ret: ', ret)
# multi_set
kvs = {'skey1': 'value1', 'skey2': 'value2', 'skey3': 'value3'}
ret = yield c.multi_set('hkey3', kvs, 999)
print('multi_set ret: ', ret)
# multi_get
ks = set(kvs.keys())
ret = yield c.multi_get('hkey3', ks)
print('multi_get ret: ', ret)
ret = yield c.multi_get('hkey3', ks, 1)
print('multi_get ret: ', ret)
while ret[0] == 7: # has more data
ks.remove(ret[1].keys()[0])
ret = yield c.multi_get('hkey3', ks, 1)
print('multi_get ret: ', ret)
ret = yield c.multi_get('hkey3', ks, 100, 10000, True)
print('multi_get ret: ', ret)
# sort_key_count
ret = yield c.sort_key_count('hkey3')
print('sort_key_count ret: ', ret)
# get_sort_keys
ret = yield c.get_sort_keys('hkey3', 100, 10000)
print('get_sort_keys ret: ', ret)
# multi_del
ret = yield c.multi_del('hkey3', ks)
print('multi_del ret: ', ret)
# scan
o = ScanOptions()
o.batch_size = 1
s = c.get_scanner('hkey3', '1', '7', o)
while True:
try:
ret = yield s.get_next()
print('get_next ret: ', ret)
except Exception as e:
print(e)
break
if not ret:
break
s.close()
# scan all
yield c.multi_set('0', kvs, 999)
yield c.multi_set('1', kvs, 999)
yield c.multi_set('2', kvs, 999)
yield c.multi_set('3', kvs, 999)
yield c.multi_set('4', kvs, 999)
yield c.multi_set('5', kvs, 999)
yield c.multi_set('6', kvs, 999)
ss = c.get_unordered_scanners(3, o)
for s in ss:
while True:
try:
ret = yield s.get_next()
print('get_next ret: ', ret)
except Exception as e:
print(e)
break
if not ret:
break
s.close()
loop = task.LoopingCall(periodic_request)
loop.start(200)
if __name__ == "__main__":
reactor.callWhenRunning(run_test)
reactor.run()
@WJSGDBZ Could you please add some simple tests, and fix the conflicts
Sure, I've already added the test process above.