weaviate-python-client icon indicating copy to clipboard operation
weaviate-python-client copied to clipboard

Add Async Client to be able to make asynchronous requests

Open StefanBogdan opened this issue 3 years ago • 26 comments
trafficstars

StefanBogdan avatar Dec 15 '21 09:12 StefanBogdan

We'll need to do this very soon for our use case and are happy to contribute something back.

A lot of projects have found maintaining both sync and async versions of clients or libraries in python have been tricky (Elasticsearch, httpx, Mongo/Motor, to name a few) -- any thoughts on the preferred implementation?

From our point of view, the easiest thing to do is "fake" async by just running requests in a separate threadpool and awaiting them from the event loop, which is what Motor/Mongo does. This results in minimal code duplication and maintenance overhead with a small performance penalty. The major downside from our point of view is that without better support for variadic generics it's impossible to infer types for MyPy users via existing types in the sync version.

Elasticsearch-py uses code gen unasync but that can add a lot of flakiness/complexity and would also require a fairly large refactor.

The other solution is to accept some level of duplication while trying to minimize the number of places in the code & API where async vs sync actually makes a difference. This would require a larger refactor and possibly breaking API changes, but it is, for example, more or less the solution httpx ended up with. This works better when anticipating a major release or designing something from scratch.

Just thought I would get your thoughts on any preferences here before starting something!

reppertj avatar Feb 25 '22 17:02 reppertj

cc: @michaverhagen and @StefanBogdan

bobvanluijt avatar Mar 08 '22 22:03 bobvanluijt

Hi @reppertj , we are going to use the aiohttp library for the AsyncClient because it has very similar requests methods. The way I want it to be structured is to have an Abstract Class for all the classes that make HTTP requests to Weaviate, then implement a Sync and Async versions where all the pre-process is done either in the Abstract class or by some function, that is used by both sync/async Classes. the Sync/Async classes will have only to handle the errors.

The current status of the version 4.0.0 is 80% done, but not tested.

StefanBogdan avatar Mar 09 '22 13:03 StefanBogdan

@StefanBogdan @bobvanluijt any news/plans for this?

creatorrr avatar Mar 05 '23 09:03 creatorrr

Cc @dirkkul

bobvanluijt avatar Mar 05 '23 15:03 bobvanluijt

No concrete plans, sorry. This is something I'd like to explore at some point, but we need to see when we'll find the time

dirkkul avatar Mar 06 '23 09:03 dirkkul

Gotcha. It can be a big blocker for a lot of people (including us) because the code currently blocks during the request and when the cluster is struggling with too many requests it could lock up the handler processes. Please see if you can prioritize this at some point. :)

creatorrr avatar Mar 07 '23 03:03 creatorrr

Is there no other way around this?

kubre avatar Mar 15 '23 12:03 kubre

Any news on this ? My service is slowing down lots of requests because of this :/

netapy avatar May 02 '23 13:05 netapy

@netapy Nope, I decided simply write this part using aiohttp and aiographql clients. It's not as clean looking as it might have been with lib itself but wrapping all the connection and querying part inside the a single class would help you alot.

kubre avatar May 02 '23 15:05 kubre

@netapy Nope, I decided simply write this part using aiohttp and aiographql clients. It's not as clean looking as it might have been with lib itself but wrapping all the connection and querying part inside the a single class would help you alot.

@kubre I'm sorry could you show me some code ? I'm just trying out a basic example but it doesn't seem to treat the queries concurrently.

@app.get("/search/{query_string}")
async def read_search(query_string: str):

    url = 'http://api.example.tech:8080/v1/graphql'
    headers = {
        'Content-Type': 'application/json',
        'Authorization': f'Bearer MYTOKEN
    }

    query = '''
        query {
            Get {
                Article(
                    hybrid: {
                        query: "xxxxxx"
                        alpha: 0.7
                    }) {
                    titre
                    pathTitle
                    texte
                    code
                }
            }
        }
    '''.replace("xxxxxx", query_string)

    await asyncio.sleep(3)

    async with aiohttp.ClientSession() as session:
        async with session.post(url, headers=headers, data=json.dumps({'query': query})) as response:
            return await response.json()

netapy avatar May 03 '23 09:05 netapy

@netapy Do you mean your entire thread is blocked until it finishes this request? I can't see any issues with the above code. Just calling API with aiohttp should stop it from blocking the main thread. Even when the below code is executing any other request coming into the server is processed without any issues even DB queries like reading objects while some other are being inserted

  async with self.session.post(
      f"{self.url}/v1/batch/objects?consistency_level=ALL",
      json={"objects": batch},
  ) as response:
      await response.json()

kubre avatar May 03 '23 10:05 kubre

Hi people, is there any update on this?

barbu110 avatar Sep 10 '23 20:09 barbu110

@barbu110 Hi people, is there any update on this?

I don't think they integrated aysnc into the client yet – however I managed to make it work using a classi HTTP call with aiohttp.

async def search(query_string: str):   

    query = '''
        {
            Get{
                Article(
                    limit: 20
                    hybrid: {
                        query: "xxxxxx"
                        alpha: 0.75
                        fusionType: relativeScoreFusion
                    }){
                        texte
                        title
                    }
                    _additional {
                        score
                    }
                }
            }
        }
    '''.replace("xxxxxx", query_string)

    async with aiohttp.ClientSession() as session:
        async with session.post(url, headers=headers, data=json.dumps({'query': query})) as response:
            return await response.json()

This doesn't block the main thread and works flawlessly. I don't use the client in production anymore, I find it useful only for data batch upload.

I hope this helps.

netapy avatar Sep 11 '23 05:09 netapy

I'm successfully using aiohttp as well, and we saw a change from around 40rps to 130rps improvement. Ofcourse our service does more than just query weaviate, but it really helps out.

Some details to note:

  • create aiohttp.ClientSession only once for all future weaviate queries
  • you can re-use the weaviate python client as a query builder
# example
query = weaviate_client.query.get(class_name=class_name, properties=properties).with_limit(1).build()
async with aiohttp_client_session.post(url, headers=headers, data=json.dumps({"query": request})) as response:
    result = await response.json()

andersfylling avatar Sep 18 '23 07:09 andersfylling

Any reason you don't commit to a pure async implementation, and then have a lightweight sync wrapper around it?

andersfylling avatar Sep 18 '23 09:09 andersfylling

@dirkkul Would you accept design proposals/PRs? We have a patched private fork of the Weaviate Python client with async capability. This is quite dangerous and hacky so I would rather just work on upstreaming it :)

Also, for what it's worth I think we can start with just an async client for reads. This is the use case that most people seem to want when they talk about wanting async support

plv avatar Sep 26 '23 23:09 plv

@andersfylling what is the url value here.

example

query = weaviate_client.query.get(class_name=class_name, properties=properties).with_limit(1).build() async with aiohttp_client_session.post(url, headers=headers, data=json.dumps({"query": request})) as response: result = await response.json()

kranthi419 avatar Oct 31 '23 19:10 kranthi419

@kranthi419 the url looks like 'http://api.example.tech:8080/v1/graphql'.

mohit-sarvam avatar Nov 01 '23 17:11 mohit-sarvam

Would love an update on this!

TweedBeetle avatar Nov 13 '23 22:11 TweedBeetle

@andersfylling so how do use use the query from = weaviate_client.query? Later on you use json.dumps({"query": request})) , not the query variable

rubywwwilde avatar Dec 03 '23 08:12 rubywwwilde

we are currently working on bringing v4 out of beta as fast as possible - async is the next item right afterwards

dirkkul avatar Dec 13 '23 07:12 dirkkul

@dirkkul Loving v4! Any news on async functionality? :D 👉👈

TweedBeetle avatar Feb 26 '24 09:02 TweedBeetle

It is on the roadmap for this quarter

dirkkul avatar Feb 27 '24 09:02 dirkkul

Any news on this ? :)

netapy avatar Apr 17 '24 21:04 netapy

There is a draft PR but it is not ready yet - we have some other important things to work on, but it is on its way

dirkkul avatar Apr 17 '24 21:04 dirkkul

Hi all, we have released v4.7.0b0 here as the first beta release of the new async implementation!

We intend on conducting this community beta testing for a few weeks to get feedback and uncover any potentially missed bugs before we fully release it in v4.7.0. If you're happy to get involved then we'd love to hear your feedback! Cheers 😁

tsmith023 avatar Jun 10 '24 17:06 tsmith023

@TweedBeetle @netapy

It took longer than we thought, but we would be very happy to get your feedback and if everything is working for you, thx! :)

dirkkul avatar Jun 11 '24 07:06 dirkkul

Closed by https://github.com/weaviate/weaviate-python-client/pull/1007

tsmith023 avatar Jul 23 '24 20:07 tsmith023