Don't fail on large requests
- etcd3-py version: 0.1.6
- Python version: 3.7
- Operating System: linux
Description
I did a large range request and got what looks like an overflow error. I don't see an obvious way to do pagination or the like; is there one I'm missing, or is/should etcd3-py wrapping those up somehow?
What I Did
result = await self.db.range(key_prefix, prefix=True, keys_only=True)
File "/usr/local/lib/python3.7/dist-packages/etcd3/aio_client.py", line 32, in __modelize
await self.client._raise_for_status(self._resp)
File "/usr/local/lib/python3.7/dist-packages/etcd3/aio_client.py", line 233, in _raise_for_status
raise get_client_error(error, code, status, resp)
etcd3.errors.go_etcd_rpctypes_error.ErrUnknownError: <ErrUnknownError error:'grpc: received message larger than max (14757329 vs. 4194304)', code:8>
Issue-Label Bot is automatically applying the label bug to this issue, with a confidence of 0.66. Please mark this comment with :thumbsup: or :thumbsdown: to give our bot feedback!
Links: app homepage, dashboard and code for this bot.
ref: https://github.com/tensorflow/serving/issues/1382#issuecomment-503375968
Since this lib is using grpc-gateway to make request, It's not likely able to set the grpc option.
But maybe you can paginate by using the key & range_end parameter.
For example:
range('/foo/', prefix=True)
# Is equal to
range('/foo/a', range_end='/foo/b')
range('/foo/b', range_end='/foo/c')
range('/foo/d, range_end='/foo/e')
...
range('/foo/y', range_end='/foo/z')
Apparently the suggested way to do paging is with limit and range_end. Something like:
limit = 1000
range_end = incr_last_byte(prefix)
prefix_start = prefix
done = False
result = []
while not done:
subrange = range(prefix_start, range_end=range_end, limit=limit)
result.append(subrange.kvs[:-1])
done = prefix_start == subrange.kvs[-1].key
prefix_start = subrange.kvs[-1].key
return result
which grabs 1000 entries at a time until range_end. It should also use the same revision as the first result, probably. The more I think about this, the more I think hat range() should wrap all this up.
I'll see if I can work up a PR.
Yeah, maybe we can add a new stateful util, named like "ranger util"
Thank you so much for contribution.
Here's what I ended up with. I can't quite figure out where to put it, so feel free to put it where you think it makes the most sense. I tried to keep as much of the normal range() functionality as I could:
async def big_range(
kvapi,
key=None,
range_end=None,
limit=1000,
revision=None,
keys_only=False,
serializable=False,
min_mod_revision=None,
max_mod_revision=None,
min_create_revision=None,
max_create_revision=None,
sort_order=models.RangeRequestSortOrder.NONE,
sort_target=models.RangeRequestSortTarget.KEY,
prefix=False,
all=False
):
method = '/kv/range'
if all:
key = range_end = '\0'
if prefix:
range_end = incr_last_byte(key)
data = {
"key": key,
"range_end": range_end,
"limit": limit,
"revision": revision,
"sort_order": sort_order,
"sort_target": sort_target,
"serializable": serializable,
"keys_only": keys_only,
"count_only": None,
"min_mod_revision": min_mod_revision,
"max_mod_revision": max_mod_revision,
"min_create_revision": min_create_revision,
"max_create_revision": max_create_revision
}
data = {k: v for k, v in data.items() if v is not None}
result = subrange = await kvapi.call_rpc(method, data=data)
if not result.kvs: return result
subrange = result
while len(subrange.kvs) == limit:
data['key'] = subrange.kvs[-1].key
subrange = await kvapi.call_rpc(method, data=data)
result.kvs.extend(subrange.kvs[1:])
if 'revision' not in data:
data['revision'] = subrange.header.revision
return result
I prefer adding a new stateful util.
Cause you have to make multiple request in during the process which means you must have states to tell what page you are at. (stateful)
And result.kvs.extend(subrange.kvs[1:]) is very not recommended.
Having to do a "big range" means the response must be very big, maybe too big to put them all in memory.
I suggest this:
class Ranger:
def __init__(...):
self.start
self.current = start
self.buf
self.end
self.limit
self.finished
def range() -> returns a iterator
while current < end:
buf = client.range(current, end, limit)
if not buf.kvs:
break
for pair in buf.kvs:
self.current = pair.key
yield pair
current += 1
self.finished = true
I was planning to write some higher level apis too, like get, get_prefix, delete_prefix, get_keys.
You can, if you like, just stick to writing a function and put them in a file like /etcd3/apis/kv_extended.py
Don't for get to test them on both py2 and py3
One issue you're going to run into is writing them to work with both sync and async methods.
yeah, it's impossible to make a function both sync and async while it makes multiple requests brefore return. so i prefer to write two stateful util (async and sync)
and you can make them accessible from a same method
Client.Ranger and AioClient.Ranger