pygnmi icon indicating copy to clipboard operation
pygnmi copied to clipboard

handling grpc._channel._MultiThreadedRendezvous

Open brunoonovais opened this issue 3 years ago • 6 comments

Hi Folks,

On a subscribe_stream call, we may receive the grpc._channel._MultiThreadedRendezvous exception. It seems that for some reason, this is not caught by the script using pygnmi, but can be caught inside the enqueue_updates call, example:

            try:
                for update in subscription:
                    self._updates.put(update)
            except grpc.RpcError as e:
                print(f"{e = }")

I couldn't figure out why, so opening this case to see if we can fix this and close the channel when such error is seen, otherwise it can't be treated. Example of my code that never hits this exception:

    try:
        with gNMIclient(target=host, username=username, password=password, insecure=True, debug=True) as gc:
            telemetry_stream = gc.subscribe_stream(subscribe=subscriptionlist)
            for telemetry_entry in telemetry_stream:
                log.info(f"telemetry_entry={telemetry_entry}")
    except grpc.RpcError as rpc_error:
        print(f'RPC failed with code {rpc_error.code()}: {rpc_error}')

A simple way to hit this is by subscribing to a path that doesn't exist.

brunoonovais avatar Nov 10 '22 16:11 brunoonovais

Seems like the issue is that subscribe_stream is running on its own thread. I wonder if we instead use the concurrent.futures module with a future we'll be able to handle this better by capturing the thread's result.

brunoonovais avatar Nov 10 '22 17:11 brunoonovais

Hey @brunoonovais ,

The idea of running subscribe_stream in a separate thread was done for a purpose of scalability to ensure that multiple threads can co-exist. We can try to look if there will be any benefits of changing the structure.

Best, Anton

akarneliuk avatar Nov 13 '22 13:11 akarneliuk

Sounds good, Anton! The point here is that if a subscription closes (i.e. a process restarts or a box reload or any other issue), one can treat the condition if the thread result is passed down. If not, what happens is that the main script won't catch any exception, and basically get stuck.

On Sun, Nov 13, 2022 at 8:58 AM Anton Karneliuk @.***> wrote:

Hey @brunoonovais https://github.com/brunoonovais ,

The idea of running subscribe_stream in a separate thread was done for a purpose of scalability to ensure that multiple threads can co-exist. We can try to look if there will be any benefits of changing the structure.

Best, Anton

— Reply to this email directly, view it on GitHub https://github.com/akarneliuk/pygnmi/issues/104#issuecomment-1312737598, or unsubscribe https://github.com/notifications/unsubscribe-auth/AKG3BBH2Y5DQ4T7OUFRGHOTWIDXZRANCNFSM6AAAAAAR4YTWEU . You are receiving this because you were mentioned.Message ID: @.***>

brunoonovais avatar Nov 13 '22 19:11 brunoonovais

I encountered a similar problem while using subscribe2. While it generally works well, we face issues with the session to the device, such as network problems, node reboot, or service interruptions, I am unable to receive errors through exceptions. The subscribe session stops sending updates and returns nothing, and I am unsure if there is another way to handle this. It would be helpful to have a method to check whether the grpc session to the device is still normal while receiving updates from the subscribe, or perhaps a way to reconnect the existing session again.

theblues25 avatar Mar 03 '23 12:03 theblues25

I'm encountering similar issue while trying to subscribe for the path greater than the limit

subscription_dict = {'subscription': [{'path': 'openconfig-interfaces:interfaces/', 'mode':'sample'}], 'mode': 'poll', 'encoding':'json'}
    subscription_ids = []
    for i in range(2):
        subscription_id = client.subscribe_poll(subscribe=subscription_dict)
        subscription_ids.append(subscription_id)
        time.sleep(2)

It's throwing me the below error but i'm unable to catch it

Exception in thread Thread-11:
Traceback (most recent call last):
  File "/usr/lib64/python3.6/threading.py", line 937, in _bootstrap_inner
    self.run()
  File "/usr/lib64/python3.6/threading.py", line 885, in run
    self._target(*self._args, **self._kwargs)
  File "/ws/sampsamp-bgl/venv/lib64/python3.6/site-packages/pygnmi/client.py", line 999, in enqueue_updates
    for update in subscription:
  File "/ws/sampsamp-bgl/venv/lib64/python3.6/site-packages/grpc/_channel.py", line 426, in __next__
    return self._next()
  File "/ws/sampsamp-bgl/venv/lib64/python3.6/site-packages/grpc/_channel.py", line 827, in _next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
        status = StatusCode.RESOURCE_EXHAUSTED
        details = "gNMI: subscribe: exceeded the maximum number of streaming gRPCs per user: 1"
        debug_error_string = "UNKNOWN:Error received from peer ipv4:xx.xx.xx.xx:57400 {created_time:"2023-11-17T17:28:33.376998151+05:30", grpc_status:8, grpc_message:"gNMI: subscribe: exceeded the maximum number of streaming gRPCs per user: 1"}"
>

Is there any fix? We tried to catch the exception in enqueue_updates even then i'm unable to catch it

def enqueue_updates():
            stub = gNMIStub(channel)
            subscription = stub.Subscribe(_client_stream, metadata=metadata)
            for update in subscription:
                try:
                    self._updates.put(update)
                except Exception as error:
                    print(str(error))
                    raise Exception(error)

sai1274 avatar Nov 17 '23 12:11 sai1274

I'm encountering similar issue while trying to subscribe for the path greater than the limit

subscription_dict = {'subscription': [{'path': 'openconfig-interfaces:interfaces/', 'mode':'sample'}], 'mode': 'poll', 'encoding':'json'}
    subscription_ids = []
    for i in range(2):
        subscription_id = client.subscribe_poll(subscribe=subscription_dict)
        subscription_ids.append(subscription_id)
        time.sleep(2)

It's throwing me the below error but i'm unable to catch it

Exception in thread Thread-11:
Traceback (most recent call last):
  File "/usr/lib64/python3.6/threading.py", line 937, in _bootstrap_inner
    self.run()
  File "/usr/lib64/python3.6/threading.py", line 885, in run
    self._target(*self._args, **self._kwargs)
  File "/ws/sampsamp-bgl/venv/lib64/python3.6/site-packages/pygnmi/client.py", line 999, in enqueue_updates
    for update in subscription:
  File "/ws/sampsamp-bgl/venv/lib64/python3.6/site-packages/grpc/_channel.py", line 426, in __next__
    return self._next()
  File "/ws/sampsamp-bgl/venv/lib64/python3.6/site-packages/grpc/_channel.py", line 827, in _next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
        status = StatusCode.RESOURCE_EXHAUSTED
        details = "gNMI: subscribe: exceeded the maximum number of streaming gRPCs per user: 1"
        debug_error_string = "UNKNOWN:Error received from peer ipv4:xx.xx.xx.xx:57400 {created_time:"2023-11-17T17:28:33.376998151+05:30", grpc_status:8, grpc_message:"gNMI: subscribe: exceeded the maximum number of streaming gRPCs per user: 1"}"
>

Is there any fix? We tried to catch the exception in enqueue_updates even then i'm unable to catch it

def enqueue_updates():
            stub = gNMIStub(channel)
            subscription = stub.Subscribe(_client_stream, metadata=metadata)
            for update in subscription:
                try:
                    self._updates.put(update)
                except Exception as error:
                    print(str(error))
                    raise Exception(error)

@akarneliuk Can you please help us with this? We are interested in capturing the error.

jhanm12 avatar Nov 17 '23 14:11 jhanm12