incubator-pegasus icon indicating copy to clipboard operation
incubator-pegasus copied to clipboard

feat(Python-client):add dns resolution support

Open WJSGDBZ opened this issue 3 months ago • 2 comments

What problem does this PR solve?

add dns resolution support

What is changed and how does it work?

When the Pegasus operation times out, it will attempt to fetch the topology from meta. If all meta servers fail to respond, it will try to re-resolve the DNS to obtain new IP addresses.

Checklist

Tests
  • Manual test (add detailed scripts or steps below) Objective: Validate DNS resolution failover mechanism in resolve_all_ips function.

Steps:

  1. Initial Setup
    Modify resolve_all_ips to force initial DNS resolution to Onebox (127.0.0.1).
    def resolve_all_ips(domain):
        if first_call: 
            return ["127.0.0.1"]  # Force Onebox resolution
    
  2. Stop Onebox service to trigger client timeouts
./run.sh stop_onebox
  1. Failover Trigger when client fail to query server after 5 times, dns-resolve will be trigger.
  2. DNS resolved second in resolve_all_ips , it will truly resolve hostname to ip address, and start to query real server.
  3. In local tests, the meta addresses can be successfully re-resolved
2025-10-16 14:51:09 [134627073400832] [INFO] pgclient.py:375 resolved hostname xxxxxxx to IP type:host_port_types.kHostTypeIpv4, addr:['xx.xx.xx.xx']
2025-10-16 14:51:09 [134627073400832] [INFO] pgclient.py:389 removed stale server: 127.0.0.1:xxxxx
2025-10-16 14:51:09 [134627073400832] [INFO] pgclient.py:389 removed stale server: 127.0.0.1:xxxxx
2025-10-16 14:51:09 [134627073400832] [INFO] pgclient.py:389 removed stale server: 127.0.0.1:xxxxx

Real test process as follow

from twisted.internet.defer import inlineCallbacks
from twisted.internet import reactor, task
from pypegasus.pgclient import *
from twisted.internet.defer import inlineCallbacks, Deferred

from pypegasus.pgclient import Pegasus

@inlineCallbacks
def run_test():
    c = Pegasus(['hostname:34601', '127.0.0.1.1:34602', '127.0.0.1.1:34603'], 'table_test')
    
    try:
        suc = yield c.init() 
        if not suc:
            reactor.stop()
            return
    except Exception as e:
        reactor.stop()
        return

    @inlineCallbacks
    def periodic_request():
        while True:
            # set
            try:
                ret = yield c.set('hkey1', 'skey1', 'value', 0, 500)
                print('set ret: ', ret)
            except Exception as e:
                print(e)

            # exist
            ret = yield c.exist('hkey1', 'skey1')
            print('exist ret: ', ret)

            ret = yield c.exist('hkey1', 'skey2')
            print('exist ret: ', ret)

            # get
            ret = yield c.get('hkey1', 'skey1')
            print('get ret: ', ret)

            ret = yield c.get('hkey1', 'skey2')
            print('get ret: ', ret)

            # ttl
            yield c.set('hkey2', 'skey1', 'value', 123)

            d = Deferred()
            reactor.callLater(2, d.callback, 'ok')      # 2 seconds later
            yield d

            ret = yield c.ttl('hkey2', 'skey1')
            print('ttl ret: ', ret)

            # remove
            ret = yield c.remove('hkey2', 'skey1')
            print('remove ret: ', ret)

            # multi_set
            kvs = {'skey1': 'value1', 'skey2': 'value2', 'skey3': 'value3'}
            ret = yield c.multi_set('hkey3', kvs, 999)
            print('multi_set ret: ', ret)

            # multi_get
            ks = set(kvs.keys())
            ret = yield c.multi_get('hkey3', ks)
            print('multi_get ret: ', ret)

            ret = yield c.multi_get('hkey3', ks, 1)
            print('multi_get ret: ', ret)
            while ret[0] == 7:              # has more data
                ks.remove(ret[1].keys()[0])
                ret = yield c.multi_get('hkey3', ks, 1)
                print('multi_get ret: ', ret)

            ret = yield c.multi_get('hkey3', ks, 100, 10000, True)
            print('multi_get ret: ', ret)

            # sort_key_count
            ret = yield c.sort_key_count('hkey3')
            print('sort_key_count ret: ', ret)

            # get_sort_keys
            ret = yield c.get_sort_keys('hkey3', 100, 10000)
            print('get_sort_keys ret: ', ret)

            # multi_del
            ret = yield c.multi_del('hkey3', ks)
            print('multi_del ret: ', ret)

            # scan
            o = ScanOptions()
            o.batch_size = 1
            s = c.get_scanner('hkey3', '1', '7', o)
            while True:
                try:
                    ret = yield s.get_next()
                    print('get_next ret: ', ret)
                except Exception as e:
                    print(e)
                    break

                if not ret:
                    break
            s.close()

            # scan all
            yield c.multi_set('0', kvs, 999)
            yield c.multi_set('1', kvs, 999)
            yield c.multi_set('2', kvs, 999)
            yield c.multi_set('3', kvs, 999)
            yield c.multi_set('4', kvs, 999)
            yield c.multi_set('5', kvs, 999)
            yield c.multi_set('6', kvs, 999)

            ss = c.get_unordered_scanners(3, o)
            for s in ss:
                while True:
                    try:
                        ret = yield s.get_next()
                        print('get_next ret: ', ret)
                    except Exception as e:
                        print(e)
                        break

                    if not ret:
                        break
                s.close()

    loop = task.LoopingCall(periodic_request)
    loop.start(200) 
        

if __name__ == "__main__":
    reactor.callWhenRunning(run_test)
    reactor.run()

WJSGDBZ avatar Oct 16 '25 07:10 WJSGDBZ

@WJSGDBZ Could you please add some simple tests, and fix the conflicts

acelyc111 avatar Nov 11 '25 16:11 acelyc111

Sure, I've already added the test process above.

WJSGDBZ avatar Nov 27 '25 02:11 WJSGDBZ