incubator-pegasus
incubator-pegasus copied to clipboard
feat(Python-client):add scan filter supported
What problem does this PR solve?
Add support for hashkey and sortkey scan filters
What is changed and how does it work?
- in original logic
generate_next_bytesfunction has two problem a. The input buff (i.e. hashkey) can be either ‘str’ or ‘bytearray’. If it's a str, in-place modification likebuff[pos] += 1won't work since strings are immutable. b. The pos variable was initialized to a fixed index (len(buff) - 1), which is counterintuitive and could lead to an infinite loop. https://github.com/apache/incubator-pegasus/blob/44400f6e3ca1fb1ce48d04d1c9145aad7ac4e991/python-client/pypegasus/pgclient.py#L613-L624
Checklist
Tests
- Manual test (add detailed scripts or steps below) the test script as below
from twisted.internet.defer import inlineCallbacks
from twisted.internet import reactor
from pypegasus.pgclient import *
from pypegasus.rrdb.ttypes import filter_type
@inlineCallbacks
def test_prefix_match_with_boundaries_full_scan():
print("Start test_prefix_match_with_boundaries_full_scan")
c = Pegasus(['127.0.0.1:34601', '127.0.0.1:34602', '127.0.0.1:34603'], 'full_scan_test')
try:
suc = yield c.init()
if not suc:
print("Failed to connect to Pegasus server")
reactor.stop()
return
except Exception as e:
print(f"Init failed: {e}")
reactor.stop()
return
hkey = 'hkey1'
for i in range(110):
skey = f'skey{i}'
yield c.set(hkey, skey, str(i), 0)
yield c.set(hkey, 'aa', 'aa', 0)
yield c.set(hkey, 'b', 'b', 0)
yield c.set(hkey, 'z', 'z', 0)
hkey = 'hkey2'
for i in range(110):
skey = f'skey{i}'
yield c.set(hkey, skey, str(i), 0)
yield c.set(hkey, 'aa', 'aa', 0)
yield c.set(hkey, 'b', 'b', 0)
yield c.set(hkey, 'z', 'z', 0)
test_cases = [
{
'skey_filter_type':filter_type.FT_MATCH_PREFIX,
'skey_pattern': b'skey9',
'hkey_filter_type':filter_type.FT_MATCH_PREFIX,
'hkey_pattern': 'hkey1',
'batch_size': 2,
'expected': ['skey9', 'skey90', 'skey91', 'skey92', 'skey93', 'skey94', 'skey95', 'skey96', 'skey97', 'skey98', 'skey99']
},
{
'skey_filter_type':filter_type.FT_MATCH_PREFIX,
'skey_pattern': b'',
'hkey_filter_type':filter_type.FT_MATCH_PREFIX,
'hkey_pattern': b'',
'batch_size': 2,
'expected': 2*['aa', 'b', 'z'] + 2*[f'skey{i}' for i in range(0, 110)]
},
{
'skey_filter_type':filter_type.FT_MATCH_PREFIX,
'skey_pattern': b'skey',
'hkey_filter_type':filter_type.FT_MATCH_PREFIX,
'hkey_pattern': 'hkey2',
'batch_size': 2,
'expected': [f'skey{i}' for i in range(0, 110)]
},
{
'skey_filter_type': filter_type.FT_MATCH_PREFIX,
'skey_pattern': b'skey10',
'hkey_filter_type': filter_type.FT_MATCH_PREFIX,
'hkey_pattern': b'hkey1',
'batch_size': 2,
'expected': ['skey10', 'skey100', 'skey101', 'skey102', 'skey103', 'skey104', 'skey105', 'skey106', 'skey107', 'skey108', 'skey109']
},
# 后缀测试
{
'skey_filter_type': filter_type.FT_MATCH_POSTFIX,
'skey_pattern': b'9',
'hkey_filter_type': filter_type.FT_MATCH_PREFIX,
'hkey_pattern': b'hkey1',
'batch_size': 2,
'expected': ['skey9', 'skey19', 'skey29', 'skey39', 'skey49', 'skey59', 'skey69', 'skey79', 'skey89', 'skey99', 'skey109']
},
{
'skey_filter_type': filter_type.FT_MATCH_POSTFIX,
'skey_pattern': b'z',
'hkey_filter_type': filter_type.FT_MATCH_PREFIX,
'hkey_pattern': b'hkey1',
'batch_size': 2,
'expected': ['z']
},
{
'skey_filter_type': filter_type.FT_MATCH_POSTFIX,
'skey_pattern': b'',
'hkey_filter_type': filter_type.FT_MATCH_PREFIX,
'hkey_pattern': b'hkey1',
'batch_size': 2,
'expected': ['aa', 'b', 'z'] + [f'skey{i}' for i in range(0, 110)]
},
# 任意匹配测试
{
'skey_filter_type': filter_type.FT_MATCH_ANYWHERE,
'skey_pattern': b'9',
'hkey_filter_type': filter_type.FT_MATCH_PREFIX,
'hkey_pattern': b'hkey1',
'batch_size': 2,
'expected': ['skey9', 'skey19', 'skey29', 'skey39', 'skey49', 'skey59', 'skey69', 'skey79', 'skey89', 'skey90', 'skey91', 'skey92', 'skey93', 'skey94', 'skey95', 'skey96', 'skey97', 'skey98', 'skey99', 'skey109']
},
{
'skey_filter_type': filter_type.FT_MATCH_ANYWHERE,
'skey_pattern': b'key',
'hkey_filter_type': filter_type.FT_MATCH_PREFIX,
'hkey_pattern': b'hkey1',
'batch_size': 2,
'expected': [f'skey{i}' for i in range(110)]
},
{
'skey_filter_type': filter_type.FT_MATCH_ANYWHERE,
'skey_pattern': b'99',
'hkey_filter_type': filter_type.FT_MATCH_PREFIX,
'hkey_pattern': b'hkey1',
'batch_size': 2,
'expected': ['skey99']
},
{
'skey_filter_type': filter_type.FT_MATCH_ANYWHERE,
'skey_pattern': b'xyz',
'hkey_filter_type': filter_type.FT_MATCH_PREFIX,
'hkey_pattern': b'hkey1',
'batch_size': 2,
'expected': []
}
]
for idx, case in enumerate(test_cases):
print(f"\nRunning test case {idx + 1}...")
o = ScanOptions()
o.sortkey_filter_type = case['skey_filter_type']
o.sortkey_filter_pattern = case['skey_pattern']
o.batch_size = case['batch_size']
o.hashkey_filter_type = case['hkey_filter_type']
o.hashkey_filter_pattern = case['hkey_pattern']
ss = c.get_unordered_scanners(3, o)
results = []
for s in ss:
while True:
try:
ret = yield s.get_next()
if not ret:
break
results.append(ret)
except Exception as e:
print(f"Exception during scan: {e}")
break
s.close()
actual_keys = [k[1] for k, _ in results]
expected_keys = case['expected']
assert len(actual_keys) == len(expected_keys) and set(actual_keys) == set(expected_keys), \
f"Test case {idx + 1} failed: Expected {expected_keys} \n got {actual_keys}"
print(f"✅ Test case {idx + 1} passed: {len(actual_keys)} keys matched")
@inlineCallbacks
def test_prefix_match_with_boundaries():
print("Start test_prefix_match_with_boundaries")
c = Pegasus(['127.0.0.1:34601', '127.0.0.1:34602', '127.0.0.1:34603'], 'scan_test')
try:
suc = yield c.init()
if not suc:
print("Failed to connect to Pegasus server")
reactor.stop()
return
except Exception as e:
print(f"Init failed: {e}")
reactor.stop()
return
hkey = 'hkey1'
# 写入测试数据:skey0 ~ skey109
for i in range(110):
skey = f'skey{i}'
yield c.set(hkey, skey, str(i), 0)
yield c.set(hkey, 'aa', 'aa', 0)
yield c.set(hkey, 'b', 'b', 0)
yield c.set(hkey, 'z', 'z', 0)
test_cases = [
{
'pattern': b'',
'start': b'aa',
'stop': b'b',
'start_inclusive': True,
'stop_inclusive': True,
'batch_size': 1,
'expected': ['aa', 'b']
},
{
'pattern': b'skey9',
'start': b'skey50',
'stop': b'skey99',
'start_inclusive': True,
'stop_inclusive': True,
'batch_size': 1,
'expected': ['skey9', 'skey90', 'skey91', 'skey92', 'skey93', 'skey94', 'skey95', 'skey96', 'skey97', 'skey98', 'skey99']
},
{
'pattern': b'skey9',
'start': b'skey50',
'stop': b'skey99',
'start_inclusive': False,
'stop_inclusive': False,
'batch_size': 10,
'expected': ['skey9', 'skey90', 'skey91', 'skey92', 'skey93', 'skey94', 'skey95', 'skey96', 'skey97', 'skey98']
},
{
'pattern': b'skey9',
'start': b'skey9',
'stop': b'skey99',
'start_inclusive': True,
'stop_inclusive': False,
'batch_size': 50,
'expected': ['skey9'] + [f'skey{i}' for i in range(90, 99)]
},
{
'pattern': b'skey9',
'start': b'skey9',
'stop': b'skey99',
'start_inclusive': False,
'stop_inclusive': True,
'batch_size': 1,
'expected': [f'skey{i}' for i in range(90, 100)]
},
{
'pattern': b'skey',
'start': b'skey9',
'stop': b'skey99',
'start_inclusive': False,
'stop_inclusive': True,
'batch_size': 1,
'expected': [f'skey{i}' for i in range(90, 100)]
},
{
'pattern': b'skey9',
'start': b'skey9',
'stop': b'skey99',
'start_inclusive': False,
'stop_inclusive': True,
'batch_size': 1,
'expected': [f'skey{i}' for i in range(90, 100)]
},
{
'pattern': b'skey9',
'start': b'skey100',
'stop': b'skey105',
'start_inclusive': True,
'stop_inclusive': True,
'batch_size': 10,
'expected': []
},
{
'pattern': b'skey9',
'start': b'skey9',
'stop': b'skey9',
'start_inclusive': True,
'stop_inclusive': True,
'batch_size': 1,
'expected': ['skey9']
},
{
'pattern': b'skey9',
'start': b'skey91',
'stop': b'skey9',
'start_inclusive': True,
'stop_inclusive': True,
'batch_size': 1,
'expected': []
},
{
'pattern': b'',
'start': b'skey91',
'stop': b'',
'start_inclusive': True,
'stop_inclusive': True,
'batch_size': 1,
'expected': ['z']+[f'skey{i}' for i in range(91, 100)]
},
{
'pattern': b'skey',
'start': b'skey88',
'stop': b'skey98',
'start_inclusive': True,
'stop_inclusive': True,
'batch_size': 1,
'expected': ['skey9'] + [f'skey{i}' for i in range(88, 99)]
},
{
'pattern': b'skey',
'start': b'skey88',
'stop': b'skey98',
'start_inclusive': False,
'stop_inclusive': False,
'batch_size': 1,
'expected': ['skey9'] + [f'skey{i}' for i in range(89, 98)]
},
{
'pattern': b'skey1',
'start': b'aa',
'stop': b'',
'start_inclusive': True,
'stop_inclusive': False,
'batch_size': 1,
'expected': ['skey1', 'skey10', 'skey100', 'skey101', 'skey102', 'skey103', 'skey104',
'skey105', 'skey106', 'skey107', 'skey108', 'skey109', 'skey11', 'skey12', 'skey13',
'skey14', 'skey15', 'skey16', 'skey17', 'skey18', 'skey19']
}
]
for idx, case in enumerate(test_cases):
print(f"\nRunning test case {idx + 1}...")
o = ScanOptions()
o.sortkey_filter_type = filter_type.FT_MATCH_PREFIX
o.sortkey_filter_pattern = case['pattern']
o.start_inclusive = case['start_inclusive']
o.stop_inclusive = case['stop_inclusive']
o.batch_size = case['batch_size']
s = c.get_scanner(hkey, case['start'], case['stop'], o)
results = []
while True:
try:
ret = yield s.get_next()
if not ret:
break
results.append(ret)
except Exception as e:
print(f"Exception during scan: {e}")
break
s.close()
actual_keys = [k[1] for k, _ in results]
expected_keys = case['expected']
assert set(actual_keys) == set(expected_keys), \
f"Test case {idx + 1} failed: \n Expected {expected_keys} \n got {actual_keys}"
print(f"✅ Test case {idx + 1} passed: {len(actual_keys)} keys matched")
print("All test cases passed!")
if __name__ == "__main__":
tests = [
test_prefix_match_with_boundaries,
test_prefix_match_with_boundaries_full_scan
]
deferreds = [defer.maybeDeferred(test) for test in tests]
d = defer.gatherResults(deferreds, consumeErrors=False)
d.addBoth(lambda _: reactor.stop())
reactor.run()