ocpp
ocpp copied to clipboard
Best way for a client to handle loss of connection to the server
Hi,
This is a question seeking advice on how you would recommend handling the loss of the connection to the server.
The example given uses
async with websockets.connect(...) as ws:
cp = ChargePoint('xxx', ws)
In a real-world implementation the client must handle the loss of the websocket connection to the server and keep trying to re-connect (with various backoff times).
How would you see this being handled?
One approach would be to wrap the async with
in a loop to re-try if the connect fails, but that would result in the ChargePoint being instantiated every connection, which would lose any state held within it. Would this matter?
while True:
try:
async with websockets.connect(...) as ws:
cp = ChargePoint('xxx', ws)
await asyncio.gather(
cp.start(),
cp.send_boot_notification(),
)
except lost connection error:
await asyncio.sleep(backoff_time)
Another would be to subclass the ChargePoint and override some of its methods to stop it attempting to read or write from the socket if it is disconnected.
There would be a coroutine that regularly tries to reconnect and then sets the _connection property of the ChargePoint.
It might look something like
await asyncio.gather(
cp.connect_to_server(),
cp.start(),
cp.send_boot_notification(),
)
where the updated start is a something like
# part of __init__
self._connected = asyncio.Event()
self._need_connection = asyncio.Event()
self._need_connection.set()
async def start(self):
while True:
await self._connected
try:
super().start()
except lost connection error:
self._connected.clear()
self._need_connection.set()
and connect_to_server is something like
async def connect_to_server(self):
while True:
await self._need_connection()
try:
self._connection = websockets.connect(
'ws://localhost:9500/' + my_name,
subprotocols=['ocpp2.0']
)
self._connected.set()
self._need_connection.clear()
except problem exceptions:
raise
except expected exceptions:
pass
asyncio.sleep(backoff_time)
Handling of send is more difficult. Cannot simply do the same as with recv and wait for it to reconnect before sending as this could result in sending a response to a call that was received before the websocket went down. It might also cause the business logic code to block waiting for a response to a call.
The OCPPP spec often specifies that messages be stored and sent once the connection is restored so the business logic needs to be aware of the connection state.
Perhaps subclass the call method.
async def call(self, payload):
if self._connected.is_set()
try:
return super().call(payload)
except lost connection error:
self._connected.clear()
self._need_connection.set()
return "lost connection payload"
else:
return "no connection payload"
These returns could be raises if that is more convenient for the business logic.
Ii is possible that the client gets a request from the server but loses the websocket before returning the result. This is quite likely as getting the result might take some time in the business logic.
Perhaps subclass the handler
async def _handle_call(self, payload):
if self._connected.is_set()
try:
return super()._handle_call(payload)
except lost connection error:
self._connected.clear()
self._need_connection.set()
I see no need to make the business logic aware of this event other than in a log message.
I was thinking if using the lib tenacity (https://github.com/jd/tenacity) would also do the trick. Something like:
from tenacity import (retry, retry_if_exception_type,
RetryError, stop_after_attempt, wait_exponential)
def log_retry(error, logger=None):
"""Log a retry from tenacity, used as after argument in retry function"""
error_str = f'{error}, %s'
logger = logger or getLogger(__name__)
def _retry(retry_state):
exc = retry_state.outcome.exception()
logger.exception(error_str, exc, exc_info=True)
return _retry
@retry(retry=retry_if_exception_type(websockets.exceptions.ConnectionClosed),
wait=wait_exponential(multiplier=2),
stop=stop_after_attempt(5),
after=log_retry('Connection Attempt Expired', LOGGER))
async with websockets.connect(...) as ws:
cp = ChargePoint('xxx', ws)
Thanks for that input.
It is difficult to see how tenacity will help in this situation as OCPP has requirements for the behaviour of a charging station when it cannot communicate with the CSMS. They mean that the charging station has to continue normal operation and queue messages until communications are restored, so a simple "loop until reconnected" is not viable.
OCPP makes this particularly difficult since all messages are actually RPC calls which expect a reply. This means that the code has to be aware that it might get a "message is queued" reply rather than an OCPP RPC reply and handle the situation appropriately.
I guess an acyncio coroutine could use tenacity to loop trying to reconnect, but the remainder of the charging station would still have to be aware of the situation and queue messages.
I had not come across tenacity and it sounds like a useful tool and I might use it in the project for reconnecting to IO since that is outside OCPP and a simple loop should be possible.
Although this valid; I would expect the implementation of the websockets connection to be outside the scope of OCPP. In addition, this has gone stale, so I will close this for now.