incubator-pegasus icon indicating copy to clipboard operation
incubator-pegasus copied to clipboard

feat(Python-client):add scan filter supported

Open WJSGDBZ opened this issue 3 months ago • 0 comments

What problem does this PR solve?

Add support for hashkey and sortkey scan filters

What is changed and how does it work?

  1. in original logic generate_next_bytes function has two problem a. The input buff (i.e. hashkey) can be either ‘str’ or ‘bytearray’. If it's a str, in-place modification like buff[pos] += 1 won't work since strings are immutable. b. The pos variable was initialized to a fixed index (len(buff) - 1), which is counterintuitive and could lead to an infinite loop. https://github.com/apache/incubator-pegasus/blob/44400f6e3ca1fb1ce48d04d1c9145aad7ac4e991/python-client/pypegasus/pgclient.py#L613-L624

Checklist

Tests
  • Manual test (add detailed scripts or steps below) the test script as below
from twisted.internet.defer import inlineCallbacks
from twisted.internet import reactor

from pypegasus.pgclient import *
from pypegasus.rrdb.ttypes import filter_type

@inlineCallbacks
def test_prefix_match_with_boundaries_full_scan():
    print("Start test_prefix_match_with_boundaries_full_scan")

    c = Pegasus(['127.0.0.1:34601', '127.0.0.1:34602', '127.0.0.1:34603'], 'full_scan_test')

    try:
        suc = yield c.init()
        if not suc:
            print("Failed to connect to Pegasus server")
            reactor.stop()
            return
    except Exception as e:
        print(f"Init failed: {e}")
        reactor.stop()
        return

    hkey = 'hkey1'
    for i in range(110):
        skey = f'skey{i}'
        yield c.set(hkey, skey, str(i), 0)
    
    yield c.set(hkey, 'aa', 'aa', 0)
    yield c.set(hkey, 'b', 'b', 0)
    yield c.set(hkey, 'z', 'z', 0)

    hkey = 'hkey2'
    for i in range(110):
        skey = f'skey{i}'
        yield c.set(hkey, skey, str(i), 0)
    
    yield c.set(hkey, 'aa', 'aa', 0)
    yield c.set(hkey, 'b', 'b', 0)
    yield c.set(hkey, 'z', 'z', 0)

    test_cases = [
        {
            'skey_filter_type':filter_type.FT_MATCH_PREFIX,
            'skey_pattern': b'skey9',
            'hkey_filter_type':filter_type.FT_MATCH_PREFIX,
            'hkey_pattern': 'hkey1',
            'batch_size': 2,
            'expected': ['skey9', 'skey90', 'skey91', 'skey92', 'skey93', 'skey94', 'skey95', 'skey96', 'skey97', 'skey98', 'skey99']
        },
        {
            'skey_filter_type':filter_type.FT_MATCH_PREFIX,
            'skey_pattern': b'',
            'hkey_filter_type':filter_type.FT_MATCH_PREFIX,
            'hkey_pattern': b'',
            'batch_size': 2,
            'expected': 2*['aa', 'b', 'z'] + 2*[f'skey{i}' for i in range(0, 110)]
        },
        {
            'skey_filter_type':filter_type.FT_MATCH_PREFIX,
            'skey_pattern': b'skey',
            'hkey_filter_type':filter_type.FT_MATCH_PREFIX,
            'hkey_pattern': 'hkey2',
            'batch_size': 2,
            'expected': [f'skey{i}' for i in range(0, 110)]
        },
        {
            'skey_filter_type': filter_type.FT_MATCH_PREFIX,
            'skey_pattern': b'skey10',
            'hkey_filter_type': filter_type.FT_MATCH_PREFIX,
            'hkey_pattern': b'hkey1',
            'batch_size': 2,
            'expected': ['skey10', 'skey100', 'skey101', 'skey102', 'skey103', 'skey104', 'skey105', 'skey106', 'skey107', 'skey108', 'skey109']
        },
        # 后缀测试
        {
            'skey_filter_type': filter_type.FT_MATCH_POSTFIX,
            'skey_pattern': b'9',
            'hkey_filter_type': filter_type.FT_MATCH_PREFIX,
            'hkey_pattern': b'hkey1',
            'batch_size': 2,
            'expected': ['skey9', 'skey19', 'skey29', 'skey39', 'skey49', 'skey59', 'skey69', 'skey79', 'skey89', 'skey99', 'skey109']
        },
        {
            'skey_filter_type': filter_type.FT_MATCH_POSTFIX,
            'skey_pattern': b'z',
            'hkey_filter_type': filter_type.FT_MATCH_PREFIX,
            'hkey_pattern': b'hkey1',
            'batch_size': 2,
            'expected': ['z']
        },
        {
            'skey_filter_type': filter_type.FT_MATCH_POSTFIX,
            'skey_pattern': b'',
            'hkey_filter_type': filter_type.FT_MATCH_PREFIX,
            'hkey_pattern': b'hkey1',
            'batch_size': 2,
            'expected': ['aa', 'b', 'z'] + [f'skey{i}' for i in range(0, 110)]
        },
        # 任意匹配测试
        {
            'skey_filter_type': filter_type.FT_MATCH_ANYWHERE,
            'skey_pattern': b'9',
            'hkey_filter_type': filter_type.FT_MATCH_PREFIX,
            'hkey_pattern': b'hkey1',
            'batch_size': 2,
            'expected': ['skey9', 'skey19', 'skey29', 'skey39', 'skey49', 'skey59', 'skey69', 'skey79', 'skey89', 'skey90', 'skey91', 'skey92', 'skey93', 'skey94', 'skey95', 'skey96', 'skey97', 'skey98', 'skey99', 'skey109']
        },
        {
            'skey_filter_type': filter_type.FT_MATCH_ANYWHERE,
            'skey_pattern': b'key',
            'hkey_filter_type': filter_type.FT_MATCH_PREFIX,
            'hkey_pattern': b'hkey1',
            'batch_size': 2,
            'expected': [f'skey{i}' for i in range(110)]
        },
        {
            'skey_filter_type': filter_type.FT_MATCH_ANYWHERE,
            'skey_pattern': b'99',
            'hkey_filter_type': filter_type.FT_MATCH_PREFIX,
            'hkey_pattern': b'hkey1',
            'batch_size': 2,
            'expected': ['skey99']
        },
        {
            'skey_filter_type': filter_type.FT_MATCH_ANYWHERE,
            'skey_pattern': b'xyz',
            'hkey_filter_type': filter_type.FT_MATCH_PREFIX,
            'hkey_pattern': b'hkey1',
            'batch_size': 2,
            'expected': []  
        }
    ]

    for idx, case in enumerate(test_cases):
        print(f"\nRunning test case {idx + 1}...")

        o = ScanOptions()
        o.sortkey_filter_type = case['skey_filter_type']
        o.sortkey_filter_pattern = case['skey_pattern']
        o.batch_size = case['batch_size']
        o.hashkey_filter_type = case['hkey_filter_type']
        o.hashkey_filter_pattern = case['hkey_pattern']

        ss = c.get_unordered_scanners(3, o)
        results = []
        for s in ss:
            while True:
                try:
                    ret = yield s.get_next()
                    if not ret:
                        break
                    results.append(ret)
                except Exception as e:
                    print(f"Exception during scan: {e}")
                    break

            s.close()

        actual_keys = [k[1] for k, _ in results]
        expected_keys = case['expected']
        assert len(actual_keys) == len(expected_keys) and set(actual_keys) == set(expected_keys), \
            f"Test case {idx + 1} failed: Expected {expected_keys} \n got {actual_keys}"

        print(f"✅ Test case {idx + 1} passed: {len(actual_keys)} keys matched")


@inlineCallbacks
def test_prefix_match_with_boundaries():
    print("Start test_prefix_match_with_boundaries")

    c = Pegasus(['127.0.0.1:34601', '127.0.0.1:34602', '127.0.0.1:34603'], 'scan_test')

    try:
        suc = yield c.init()
        if not suc:
            print("Failed to connect to Pegasus server")
            reactor.stop()
            return
    except Exception as e:
        print(f"Init failed: {e}")
        reactor.stop()
        return

    hkey = 'hkey1'

    # 写入测试数据:skey0 ~ skey109
    for i in range(110):
        skey = f'skey{i}'
        yield c.set(hkey, skey, str(i), 0)
    
    yield c.set(hkey, 'aa', 'aa', 0)
    yield c.set(hkey, 'b', 'b', 0)
    yield c.set(hkey, 'z', 'z', 0)

    test_cases = [
        {
            'pattern': b'',
            'start': b'aa',
            'stop': b'b',
            'start_inclusive': True,
            'stop_inclusive': True,
            'batch_size': 1,
            'expected': ['aa', 'b']
        },
        {
            'pattern': b'skey9',
            'start': b'skey50',
            'stop': b'skey99',
            'start_inclusive': True,
            'stop_inclusive': True,
            'batch_size': 1,
            'expected': ['skey9', 'skey90', 'skey91', 'skey92', 'skey93', 'skey94', 'skey95', 'skey96', 'skey97', 'skey98', 'skey99']
        },
        {
            'pattern': b'skey9',
            'start': b'skey50',
            'stop': b'skey99',
            'start_inclusive': False,
            'stop_inclusive': False,
            'batch_size': 10,
            'expected': ['skey9', 'skey90', 'skey91', 'skey92', 'skey93', 'skey94', 'skey95', 'skey96', 'skey97', 'skey98']
        },
        {
            'pattern': b'skey9',
            'start': b'skey9',
            'stop': b'skey99',
            'start_inclusive': True,
            'stop_inclusive': False,
            'batch_size': 50,
            'expected': ['skey9'] + [f'skey{i}' for i in range(90, 99)]
        },
        {
            'pattern': b'skey9',
            'start': b'skey9',
            'stop': b'skey99',
            'start_inclusive': False,
            'stop_inclusive': True,
            'batch_size': 1,
            'expected': [f'skey{i}' for i in range(90, 100)]
        },
        {
            'pattern': b'skey',
            'start': b'skey9',
            'stop': b'skey99',
            'start_inclusive': False,
            'stop_inclusive': True,
            'batch_size': 1,
            'expected': [f'skey{i}' for i in range(90, 100)]
        },
        {
            'pattern': b'skey9',
            'start': b'skey9',
            'stop': b'skey99',
            'start_inclusive': False,
            'stop_inclusive': True,
            'batch_size': 1,
            'expected': [f'skey{i}' for i in range(90, 100)]
        },
        {
            'pattern': b'skey9',
            'start': b'skey100',
            'stop': b'skey105',
            'start_inclusive': True,
            'stop_inclusive': True,
            'batch_size': 10,
            'expected': []
        },
        {
            'pattern': b'skey9',
            'start': b'skey9',
            'stop': b'skey9',
            'start_inclusive': True,
            'stop_inclusive': True,
            'batch_size': 1,
            'expected': ['skey9']
        },
        {
            'pattern': b'skey9',
            'start': b'skey91',
            'stop': b'skey9',
            'start_inclusive': True,
            'stop_inclusive': True,
            'batch_size': 1,
            'expected': []
        },
        {
            'pattern': b'',
            'start': b'skey91',
            'stop': b'',
            'start_inclusive': True,
            'stop_inclusive': True,
            'batch_size': 1,
            'expected': ['z']+[f'skey{i}' for i in range(91, 100)]
        },
        {
            'pattern': b'skey',
            'start': b'skey88',
            'stop': b'skey98',
            'start_inclusive': True,
            'stop_inclusive': True,
            'batch_size': 1,
            'expected': ['skey9'] + [f'skey{i}' for i in range(88, 99)]
        },
        {
            'pattern': b'skey',
            'start': b'skey88',
            'stop': b'skey98',
            'start_inclusive': False,
            'stop_inclusive': False,
            'batch_size': 1,
            'expected': ['skey9'] + [f'skey{i}' for i in range(89, 98)]
        },
        {
            'pattern': b'skey1',
            'start': b'aa',
            'stop': b'',
            'start_inclusive': True,
            'stop_inclusive': False,
            'batch_size': 1,
            'expected': ['skey1', 'skey10', 'skey100', 'skey101', 'skey102', 'skey103', 'skey104', 
                         'skey105', 'skey106', 'skey107', 'skey108', 'skey109', 'skey11', 'skey12', 'skey13', 
                         'skey14', 'skey15', 'skey16', 'skey17', 'skey18', 'skey19']
        }
    ]

    for idx, case in enumerate(test_cases):
        print(f"\nRunning test case {idx + 1}...")

        o = ScanOptions()
        o.sortkey_filter_type = filter_type.FT_MATCH_PREFIX
        o.sortkey_filter_pattern = case['pattern']
        o.start_inclusive = case['start_inclusive']
        o.stop_inclusive = case['stop_inclusive']
        o.batch_size = case['batch_size']

        s = c.get_scanner(hkey, case['start'], case['stop'], o)

        results = []
        while True:
            try:
                ret = yield s.get_next()
                if not ret:
                    break
                results.append(ret)
            except Exception as e:
                print(f"Exception during scan: {e}")
                break

        s.close()

        actual_keys = [k[1] for k, _ in results]
        expected_keys = case['expected']

        assert set(actual_keys) == set(expected_keys), \
            f"Test case {idx + 1} failed: \n Expected {expected_keys} \n got {actual_keys}"

        print(f"✅ Test case {idx + 1} passed: {len(actual_keys)} keys matched")

    print("All test cases passed!")

if __name__ == "__main__":
    tests = [
        test_prefix_match_with_boundaries,
        test_prefix_match_with_boundaries_full_scan
    ]

    deferreds = [defer.maybeDeferred(test) for test in tests]

    d = defer.gatherResults(deferreds, consumeErrors=False)

    d.addBoth(lambda _: reactor.stop())

    reactor.run()

WJSGDBZ avatar Oct 16 '25 03:10 WJSGDBZ