etcd3-py icon indicating copy to clipboard operation
etcd3-py copied to clipboard

Don't fail on large requests

Open pjz opened this issue 6 years ago • 9 comments

  • etcd3-py version: 0.1.6
  • Python version: 3.7
  • Operating System: linux

Description

I did a large range request and got what looks like an overflow error. I don't see an obvious way to do pagination or the like; is there one I'm missing, or is/should etcd3-py wrapping those up somehow?

What I Did

result = await self.db.range(key_prefix, prefix=True, keys_only=True)
File "/usr/local/lib/python3.7/dist-packages/etcd3/aio_client.py", line 32, in __modelize
await self.client._raise_for_status(self._resp)
File "/usr/local/lib/python3.7/dist-packages/etcd3/aio_client.py", line 233, in _raise_for_status
raise get_client_error(error, code, status, resp)
etcd3.errors.go_etcd_rpctypes_error.ErrUnknownError: <ErrUnknownError error:'grpc: received message larger than max (14757329 vs. 4194304)', code:8>

pjz avatar Nov 02 '19 03:11 pjz

Issue-Label Bot is automatically applying the label bug to this issue, with a confidence of 0.66. Please mark this comment with :thumbsup: or :thumbsdown: to give our bot feedback!

Links: app homepage, dashboard and code for this bot.

issue-label-bot[bot] avatar Nov 02 '19 03:11 issue-label-bot[bot]

ref: https://github.com/tensorflow/serving/issues/1382#issuecomment-503375968

Since this lib is using grpc-gateway to make request, It's not likely able to set the grpc option.

But maybe you can paginate by using the key & range_end parameter.

For example:

range('/foo/', prefix=True)

# Is equal to

range('/foo/a', range_end='/foo/b')
range('/foo/b', range_end='/foo/c')
range('/foo/d, range_end='/foo/e')
...
range('/foo/y', range_end='/foo/z')

Revolution1 avatar Nov 02 '19 10:11 Revolution1

Apparently the suggested way to do paging is with limit and range_end. Something like:

limit = 1000
range_end = incr_last_byte(prefix)
prefix_start = prefix
done = False
result = []
while not done:
    subrange = range(prefix_start, range_end=range_end, limit=limit)
    result.append(subrange.kvs[:-1])
    done = prefix_start == subrange.kvs[-1].key
    prefix_start = subrange.kvs[-1].key
return result

which grabs 1000 entries at a time until range_end. It should also use the same revision as the first result, probably. The more I think about this, the more I think hat range() should wrap all this up. I'll see if I can work up a PR.

pjz avatar Nov 05 '19 00:11 pjz

Yeah, maybe we can add a new stateful util, named like "ranger util"

Thank you so much for contribution.

Revolution1 avatar Nov 05 '19 04:11 Revolution1

Here's what I ended up with. I can't quite figure out where to put it, so feel free to put it where you think it makes the most sense. I tried to keep as much of the normal range() functionality as I could:

async def big_range(
    kvapi,
    key=None,
    range_end=None,
    limit=1000,
    revision=None,
    keys_only=False,
    serializable=False,
    min_mod_revision=None,
    max_mod_revision=None,
    min_create_revision=None,
    max_create_revision=None,
    sort_order=models.RangeRequestSortOrder.NONE,
    sort_target=models.RangeRequestSortTarget.KEY,
    prefix=False,
    all=False
    ):
    method = '/kv/range'
    if all:
        key = range_end = '\0'
    if prefix:
        range_end = incr_last_byte(key)
    data = {
        "key": key,
        "range_end": range_end,
        "limit": limit,
        "revision": revision,
        "sort_order": sort_order,
        "sort_target": sort_target,
        "serializable": serializable,
        "keys_only": keys_only,
        "count_only": None,
        "min_mod_revision": min_mod_revision,
        "max_mod_revision": max_mod_revision,
        "min_create_revision": min_create_revision,
        "max_create_revision": max_create_revision
    }
    data = {k: v for k, v in data.items() if v is not None}
    result = subrange = await kvapi.call_rpc(method, data=data)
    if not result.kvs: return result
    subrange = result
    while len(subrange.kvs) == limit:
        data['key'] = subrange.kvs[-1].key
        subrange = await kvapi.call_rpc(method, data=data)
        result.kvs.extend(subrange.kvs[1:])
        if 'revision' not in data:
            data['revision'] = subrange.header.revision
    return result

pjz avatar Nov 07 '19 14:11 pjz

I prefer adding a new stateful util.

Cause you have to make multiple request in during the process which means you must have states to tell what page you are at. (stateful)

And result.kvs.extend(subrange.kvs[1:]) is very not recommended. Having to do a "big range" means the response must be very big, maybe too big to put them all in memory.

I suggest this:

class Ranger:
    def __init__(...):
        self.start
        self.current = start
        self.buf
        self.end
        self.limit
        self.finished

    def range() -> returns a iterator
           while current < end:
                  buf = client.range(current, end, limit)
                  if not buf.kvs:
                         break
                  for pair in buf.kvs:
                        self.current = pair.key
                        yield pair
                  current += 1
           self.finished = true

Revolution1 avatar Nov 07 '19 15:11 Revolution1

I was planning to write some higher level apis too, like get, get_prefix, delete_prefix, get_keys.

You can, if you like, just stick to writing a function and put them in a file like /etcd3/apis/kv_extended.py

Don't for get to test them on both py2 and py3

Revolution1 avatar Nov 07 '19 15:11 Revolution1

One issue you're going to run into is writing them to work with both sync and async methods.

pjz avatar Nov 07 '19 15:11 pjz

yeah, it's impossible to make a function both sync and async while it makes multiple requests brefore return. so i prefer to write two stateful util (async and sync)

and you can make them accessible from a same method Client.Ranger and AioClient.Ranger

Revolution1 avatar Nov 07 '19 16:11 Revolution1