Add MQTT dispatcher
Description
This PR add's a MQTT dispatcher. Sending Tesla EV fleet telemetry (e.g., SoC) to an MQTT broker provides real-time, lightweight, and efficient data streaming. MQTT's publish-subscribe model scales well for growing fleets, enabling seamless integration with IoT platforms and data analytics systems for improved monitoring and management.
Type of change
Please select all options that apply to this change:
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected)
- [ ] Bug fix (non-breaking change which fixes an issue)
- [ ] Documentation update
Checklist:
Confirm you have completed the following steps:
- [x] My code follows the style of this project.
- [x] I have performed a self-review of my code.
- [x] I have made corresponding updates to the documentation.
- [x] I have added/updated unit tests to cover my changes.
- [x] I have added/updated integration tests to cover my changes.
OMG, watching intently :)
The code hase been updated with the issues mentioned in the comments above.
- There is now support for MQTT topics: metrics, alerts and errors.
- Other minor changes and fixes.
Add detailed MQTT README.md explaining architecture, design choices, and configuration
@erwin314 is there something I can help with to get this fixed? I tried to build your code and currently fail to do so.
@erwin314 is there something I can help with to get this fixed? I tried to build your code and currently fail to do so.
Thanks for reaching out and offering to help! The code was working, but it's been in pull-request mode for a while now. To move forward, perhaps @agbpatro, @aaronpkahn, @ThomasAlxDmy, or @patrickdemers6 could provide feedback on whether this code has a chance of being merged. If there's potential, I’ll update it so it passes all tests again. If merging isn’t feasible, I’m happy to create a dedicated fork focused solely on MQTT.
Hi everyone, I've updated the code, and it should now pass all the tests. However, I'm still unsure if investing my time in this is worthwhile, as it's unclear whether it has a potential for being merged.
There is 1 suggestion I would like to make, currently you return a json object for each topic, I would rather see the value directly.
Currently:
Topic: telemetry/[VIN]/v/Locked
Value: {"value":"true"}
Better:
Topic: telemetry/VIN/v/Locked
Value: true
Why do you put the value inside the key value? It requires more processing on the consumer to actually read a value.
@erwin314 is there something I can help with to get this fixed? I tried to build your code and currently fail to do so.
Thanks for reaching out and offering to help! The code was working, but it's been in pull-request mode for a while now. To move forward, perhaps @agbpatro, @aaronpkahn, @ThomasAlxDmy, or @patrickdemers6 could provide feedback on whether this code has a chance of being merged. If there's potential, I’ll update it so it passes all tests again. If merging isn’t feasible, I’m happy to create a dedicated fork focused solely on MQTT.
Thank you, this is awesome. We should definitely get it in.
There is 1 suggestion I would like to make, currently you return a json object for each topic, I would rather see the value directly. ... Why do you put the value inside the key value? It requires more processing on the consumer to actually read a value.
Thank you for the suggestion and for taking the time to review my pull request! I appreciate your feedback.
If the system evolves to include additional features or richer data, using JSON ensures that existing consumers remain compatible as long as they continue extracting the known keys. In contrast, returning direct values can make maintaining backward compatibility more difficult.
By consistently returning a JSON object, we standardize the data format across all topics, regardless of the type or complexity of the value. This approach allows consumers to parse messages using uniform logic, reducing the likelihood of errors. If some topics return raw values while others return structured data, client-side code must accommodate both cases, leading to increased complexity.
Additionally, JSON maintains the data type of the value, helping to avoid misinterpretation. For example, {"value": "true"} clearly indicates a string representation instead of a boolean. This distinction is important because the type of a field may vary by vehicle type or even only by the software version running on it. This might be relevant to the receiving systems.
While it’s true that consumers need to perform an extra step to extract the value, this processing is typically negligible in practice. Parsing JSON is a lightweight operation, supported natively by nearly all programming languages and platforms. On the other hand, returning raw values requires consumers to handle data differently for each topic, which increases the overall complexity of the client-side implementation.
That said, I’m open to further discussion if you feel strongly about this point or have alternative suggestions that might achieve the same benefits with less overhead.
@erwin314 is there something I can help with to get this fixed? I tried to build your code and currently fail to do so.
Thanks for reaching out and offering to help! The code was working, but it's been in pull-request mode for a while now. To move forward, perhaps @agbpatro, @aaronpkahn, @ThomasAlxDmy, or @patrickdemers6 could provide feedback on whether this code has a chance of being merged. If there's potential, I’ll update it so it passes all tests again. If merging isn’t feasible, I’m happy to create a dedicated fork focused solely on MQTT.
Thank you, this is awesome. We should definitely get it in.
Thanks for the positive feedback—glad to hear there’s interest in getting this merged! Could you provide some guidance on the steps needed to move this forward?
Thanks for taking the time to answer.
Additionally, JSON maintains the data type of the value, helping to avoid misinterpretation. For example,
{"value": "true"}clearly indicates a string representation instead of a boolean.
Well, actually that is a secondary problem...
It is a Boolean and because you use a transformer from the logger it is casted back to a string, see here. Actually EVERYTHING is casted as a string with that transformer.
I agree with your explanation that datatypes should be consistent. But there is actually no need to always include a json with the value key
in the MQTT topic it's always stored as a string, you have to do a json parse anyhow.
And that fixes it is well.
Example in python:
>>> import json
>>> response='{"value": true}'
>>> json.loads(response)
{'value': True}
>>> response='true'
>>> json.loads(response)
True
As you can see both are correctly parsed as a boolean. AS far as I know this is the case in all languages.
So no need to wrap the json in the value key.
Just to make sure you see what I mean let's do it again but this time not as a boolean but as a string
>>> import json
>>> response='{"value": "true"}'
>>> json.loads(response)
{'value': 'true'}
>>> response='"true"'
>>> json.loads(response)
'true'
So also a string is correctly parsed as a string.
I would suggest to write your json as follows, that will write the json correctly, also with the correct types ( bool, string, int, float ... )
diff --git a/datastore/mqtt/mqtt_payload.go b/datastore/mqtt/mqtt_payload.go
index 3692c3e..595a9b9 100644
--- a/datastore/mqtt/mqtt_payload.go
+++ b/datastore/mqtt/mqtt_payload.go
@@ -19,7 +19,7 @@ func (p *MQTTProducer) processVehicleFields(rec *telemetry.Record, payload *prot
continue
}
mqttTopicName := fmt.Sprintf("%s/%s/v/%s", p.config.TopicBase, rec.Vin, key)
- jsonValue, err := json.Marshal(map[string]interface{}{"value": value})
+ jsonValue, err := json.Marshal(value)
if err != nil {
return tokens, fmt.Errorf("failed to marshal JSON for MQTT topic %s: %v", mqttTopicName, err)
}
Here is a screenshot of how that looks in the MQTT browser:
So to summarize:
- I think you are using the wrong approach by going to the transformers which seems to be invented for the simple console logging and casting everything to strings. It would make more sense to use the probuf types when creating the json data
- Do not wrap everything a map with the key
valueas you could see in the example above, it's not needed to preserve correct json parsing
... So to summarize:
- I think you are using the wrong approach by going to the transformers which seems to be invented for the simple console logging and casting everything to strings. It would make more sense to use the probuf types when creating the json data
- Do not wrap everything a map with the key
valueas you could see in the example above, it's not needed to preserve correct json parsing
- From what I understand, PayloadToMap() doesn’t inherently cast everything to a string. Instead, most fields seem to be using a string-based protobuf defined by the vehicle. However, I'll double-check to confirm. (note: the link you mentioned does not seem to show a string cast?)
- You’ve made a great point about the value being JSON-parsable. This definitely seems like an improvement worth exploring. While we would lose the flexibility to add additional fields, it’s fair to question whether retaining that flexibility might be over-engineering in this case. I'll make the code change and test if it's indeed an improvement a.s.a.p
@erwin314 it seems you are right! It seems Tesla is sending it as string, so that's correct, I played with the raw data and saw indeed that it is working as intended ( But I think that's a broken implementation from Tesla since everything seems to be a string)
An other small thing, what do you think of this:
--- a/datastore/mqtt/mqtt_payload.go
+++ b/datastore/mqtt/mqtt_payload.go
@@ -15,11 +15,11 @@ func (p *MQTTProducer) processVehicleFields(rec *telemetry.Record, payload *prot
var tokens []pahomqtt.Token
convertedPayload := transformers.PayloadToMap(payload, false, p.logger)
for key, value := range convertedPayload {
- if key == "Vin" || key == "CreatedAt" {
+ if key == "Vin" || key == "CreatedAt" || value == "<invalid>" {
My reasoning is, if the value is invalid then don't publish it... Not sure though if the message is then acked correctly to the car...
I have updated the way the MQTT values are formatted. They now directly contain the value as json, so they are no-longer wrapped in an extra value entry. And the Invalid field value is now translated to null in json. Even though "null" might not strictly be the same as "invalid", it seems the most straight forward approach.
I've added MQTT support for connectivity events, with one topic per VIN. I assume there's at most one connectivity (dis)connected at a time?
Regarding ConnectionID, it seems to change every time a car (re)connects. This suggests it’s not an ID for a connection to a specific telemetry server, but rather a random ID generated for each new connection. That's why I put it in the value and not in the topic. I'm not sure what happens if a car is connected to multiple telemetry servers (can it even do that?)
Let me know if there’s any clarification or adjustment needed for this approach!
@ThomasAlxDmy To me this looks feature complete now. Can you pull some strings to get this unblocked?
@patrickdemers6 is there something we can do to get this merged? Is there something missing which needs to be added?
@erwin314 for your fork, have you published any built docker images? Does your fork run? Debating putting in the effort to run it so I can just get set up with an MQTT dispatcher for telemetry.
Looks like there are conflicts to merge. If you can fix those, then let's ship. @erwin314
This is great work, thanks for all your hard work on it!
Thank you!
Looks like there are conflicts to merge. If you can fix those, then let's ship.
Where do you see the merge conflicts? @patrickdemers6 Because Github says: No conflicts with base branch - Changes can be cleanly merged.
Ah, had to switch to squash and merge instead of rebase.
The best way to learn if this implementation works is to get you and all the others that are interested using it.