fastapi-mqtt
fastapi-mqtt copied to clipboard
Prevent throwing exceptions when not connected
- The methods
publish
andunsubscribe
will throw an exception if the client is not currently connected. This adds a check to ensure that the clientis_connected
before attempting to call the underlying methods. - The
MQTTConfig
model uses anint
for the type for the version but did not document what the supported values are. This updates the type to be aLiteral
with supported values or4
or5
.
@sabuhish Any thoughts on this PR?
@azogue You seem to be committing a lot, any thoughts on this PR?
@azogue I think I see your point.
I will explain my reason behind this PR and perhaps there is a better solution I am not seeing or perhaps there is something this PR could still do it to allow the workflow I was considering.
Since FastAPI introduced the new lifespan
feature to replace the startup
and shutdown
I implemented using this extension by creating startup
and shutdown
functions that will be called before and after the yield
in the lifespace
functions.
What I was trying to do was allow the developer to not be concerned if MQTT is enabled or not but allow them to import and use the mqtt
object to wrap functions with subscribe
or call publish
directly in a route and the MQTT parts would just be enabled or disabled whether MQTT was configured or not.
"""
mqtt.py
"""
from config import settings
if settings.MQTT_BROKER_CONFIG is not None:
logger.info("MQTT broker enabled")
logger.debug("MQTT broker config: %s", settings.MQTT_BROKER_CONFIG)
mqtt = MQTT(config=settings.MQTT_BROKER_CONFIG, mqtt_logger=logger)
async def start():
await mqtt.mqtt_startup()
logger.info("MQTT broker started")
async def stop():
await mqtt.mqtt_shutdown()
logger.info("MQTT broker stopped")
else:
# These are no-op functions that will do nothing if MQTT is disabled but allows you to
# use the decorator throughout the app without caring if MQTT is enabled or not
mqtt = MQTT(config=MQTTConfig())
async def start():
pass
async def stop():
pass
"""
lifespan.py
"""
from contextlib import asynccontextmanager
from core.lifespan_tasks import mqtt as mqtt_task
@asynccontextmanager
async def lifespan(app: FastAPI):
await mqtt_task.start()
yield
await mqtt_task.stop()
In this way, if the no-ops are used, then the developer does not need to wrap every call with
if mqtt.is_connect:
mqtt.publish(...)
However, I think part of my reason for this PR was because I didn't see that calling mqtt.subscribe
as a decorator does not care if the client is connected or not as it just adds callbacks to a dict or similar. And afterwards I decided I could solve most of my desired pattern of use by just wrapping FastMQTT in my own class.
class MQTT(FastMQTT):
"""
A Simple wrapper around FastMQTT to prevent the throwing of exception when the MQTT Client is not yet connected.
"""
def publish(self, message_or_topic: str, payload: Any = None, qos: int = 0, retain: bool = False, **kwargs) -> None:
if not self.client.is_connected:
logger.info("MQTT is not connected. Can't publishi to %s", message_or_topic)
return None
try:
return super().publish(message_or_topic, payload, qos, retain, **kwargs)
except AttributeError:
logger.info("MQTT might not be connected, unable to publish to %s", message_or_topic)
return None
def unsubscribe(self, topic: str, **kwargs):
"""
This will remove the subscription, but catch if MQTT is not yet connected.
"""
try:
return super().unsubscribe(topic, **kwargs)
except AttributeError:
return None
And thus with this, I could do something like what you said to just add message to a queue and add a callback that would process all message once connected if I wanted.
For my use case I was considering MQTT as a feature that could be toggled on or off and would just an extra.
Thanks for you well written and thought-out comments. Perhaps I could just submit the PR to update the MQTTConfig model as you suggested and remove the rest
Hi @RobertDeRose
I see your point, and I think I understand it, but in reality there are multiple time windows where the MQTT client could be disconnected, like in
- short network errors, which the
gmqtt
client usually handles very well, with quick retries - long disconnections, which have to be managed separately, and need a full reconnection + re-subscription
- anything happening in setup/shutdown phases (:= before / after the yield in the lifespan), if it happens before the first connection or after closing it.
So, in the end, you would need 'multiple flags' to check in the app before calling publish anyway, and how to manage that complexity is out of scope of this simple wrapper, which is just the glue between the gmqtt client and fastapi
In my own usage of this library, for an app running on-board in an embedded platform, where the MQTT connection is also optional (and shaky), I have to do checks like this:
if (
self.settings.mqtt_connection
and self.app_is_running
and (mqtt_handler := self._app_state.get(MQTTHandler)).active
):
before triggering the publish, and after that condition, that active mqtt_handler
will also check if it's .connected
before the pub, doing other things if not π
But all of that is business logic which has not a place here, IMO. Here, the library has flags to check, and raises exceptions, to expose the 2 ways of work: if-else, or try/except.
As the .publish(...)
method returns None
, the only way of signalling an error is with the exception, that's why I was so strongly opposed to it π
Thanks for you well written and thought-out comments.
Thanks βΊοΈ, same to you
Perhaps I could just submit the PR to update the MQTTConfig model as you suggested and remove the rest
That would be great π
The MQTTConfig model uses an int for the type for the version but did not document what the supported values are. This updates the type to be a Literal with supported values or 4 or 5.
The discussed fix for this lack of validation is already included in #94 β
I'm closing this one, @RobertDeRose π