fleet-telemetry icon indicating copy to clipboard operation
fleet-telemetry copied to clipboard

Add MQTT dispatcher

Open erwin314 opened this issue 1 year ago • 1 comments

Description

This PR add's a MQTT dispatcher. Sending Tesla EV fleet telemetry (e.g., SoC) to an MQTT broker provides real-time, lightweight, and efficient data streaming. MQTT's publish-subscribe model scales well for growing fleets, enabling seamless integration with IoT platforms and data analytics systems for improved monitoring and management.

Type of change

Please select all options that apply to this change:

  • [x] New feature (non-breaking change which adds functionality)
  • [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • [ ] Bug fix (non-breaking change which fixes an issue)
  • [ ] Documentation update

Checklist:

Confirm you have completed the following steps:

  • [x] My code follows the style of this project.
  • [x] I have performed a self-review of my code.
  • [x] I have made corresponding updates to the documentation.
  • [x] I have added/updated unit tests to cover my changes.
  • [x] I have added/updated integration tests to cover my changes.

erwin314 avatar Oct 03 '24 12:10 erwin314

OMG, watching intently :)

iainwhyte avatar Oct 03 '24 21:10 iainwhyte

The code hase been updated with the issues mentioned in the comments above.

  • There is now support for MQTT topics: metrics, alerts and errors.
  • Other minor changes and fixes.

erwin314 avatar Oct 07 '24 10:10 erwin314

Add detailed MQTT README.md explaining architecture, design choices, and configuration

erwin314 avatar Oct 09 '24 08:10 erwin314

@erwin314 is there something I can help with to get this fixed? I tried to build your code and currently fail to do so.

netdata-be avatar Nov 25 '24 09:11 netdata-be

@erwin314 is there something I can help with to get this fixed? I tried to build your code and currently fail to do so.

Thanks for reaching out and offering to help! The code was working, but it's been in pull-request mode for a while now. To move forward, perhaps @agbpatro, @aaronpkahn, @ThomasAlxDmy, or @patrickdemers6 could provide feedback on whether this code has a chance of being merged. If there's potential, I’ll update it so it passes all tests again. If merging isn’t feasible, I’m happy to create a dedicated fork focused solely on MQTT.

erwin314 avatar Nov 25 '24 13:11 erwin314

Hi everyone, I've updated the code, and it should now pass all the tests. However, I'm still unsure if investing my time in this is worthwhile, as it's unclear whether it has a potential for being merged.

erwin314 avatar Nov 26 '24 10:11 erwin314

There is 1 suggestion I would like to make, currently you return a json object for each topic, I would rather see the value directly.

Currently:

Topic: telemetry/[VIN]/v/Locked
Value: {"value":"true"}

Better:

Topic: telemetry/VIN/v/Locked
Value: true

Why do you put the value inside the key value? It requires more processing on the consumer to actually read a value.

netdata-be avatar Nov 26 '24 13:11 netdata-be

@erwin314 is there something I can help with to get this fixed? I tried to build your code and currently fail to do so.

Thanks for reaching out and offering to help! The code was working, but it's been in pull-request mode for a while now. To move forward, perhaps @agbpatro, @aaronpkahn, @ThomasAlxDmy, or @patrickdemers6 could provide feedback on whether this code has a chance of being merged. If there's potential, I’ll update it so it passes all tests again. If merging isn’t feasible, I’m happy to create a dedicated fork focused solely on MQTT.

Thank you, this is awesome. We should definitely get it in.

ThomasAlxDmy avatar Nov 27 '24 18:11 ThomasAlxDmy

There is 1 suggestion I would like to make, currently you return a json object for each topic, I would rather see the value directly. ... Why do you put the value inside the key value? It requires more processing on the consumer to actually read a value.

Thank you for the suggestion and for taking the time to review my pull request! I appreciate your feedback.

If the system evolves to include additional features or richer data, using JSON ensures that existing consumers remain compatible as long as they continue extracting the known keys. In contrast, returning direct values can make maintaining backward compatibility more difficult.

By consistently returning a JSON object, we standardize the data format across all topics, regardless of the type or complexity of the value. This approach allows consumers to parse messages using uniform logic, reducing the likelihood of errors. If some topics return raw values while others return structured data, client-side code must accommodate both cases, leading to increased complexity.

Additionally, JSON maintains the data type of the value, helping to avoid misinterpretation. For example, {"value": "true"} clearly indicates a string representation instead of a boolean. This distinction is important because the type of a field may vary by vehicle type or even only by the software version running on it. This might be relevant to the receiving systems.

While it’s true that consumers need to perform an extra step to extract the value, this processing is typically negligible in practice. Parsing JSON is a lightweight operation, supported natively by nearly all programming languages and platforms. On the other hand, returning raw values requires consumers to handle data differently for each topic, which increases the overall complexity of the client-side implementation.

That said, I’m open to further discussion if you feel strongly about this point or have alternative suggestions that might achieve the same benefits with less overhead.

erwin314 avatar Nov 28 '24 12:11 erwin314

@erwin314 is there something I can help with to get this fixed? I tried to build your code and currently fail to do so.

Thanks for reaching out and offering to help! The code was working, but it's been in pull-request mode for a while now. To move forward, perhaps @agbpatro, @aaronpkahn, @ThomasAlxDmy, or @patrickdemers6 could provide feedback on whether this code has a chance of being merged. If there's potential, I’ll update it so it passes all tests again. If merging isn’t feasible, I’m happy to create a dedicated fork focused solely on MQTT.

Thank you, this is awesome. We should definitely get it in.

Thanks for the positive feedback—glad to hear there’s interest in getting this merged! Could you provide some guidance on the steps needed to move this forward?

erwin314 avatar Nov 28 '24 12:11 erwin314

Thanks for taking the time to answer.

Additionally, JSON maintains the data type of the value, helping to avoid misinterpretation. For example, {"value": "true"} clearly indicates a string representation instead of a boolean.

Well, actually that is a secondary problem...

It is a Boolean and because you use a transformer from the logger it is casted back to a string, see here. Actually EVERYTHING is casted as a string with that transformer.

I agree with your explanation that datatypes should be consistent. But there is actually no need to always include a json with the value key in the MQTT topic it's always stored as a string, you have to do a json parse anyhow. And that fixes it is well.

Example in python:

>>> import json
>>> response='{"value": true}'
>>> json.loads(response)
{'value': True}
>>> response='true'
>>> json.loads(response)
True

As you can see both are correctly parsed as a boolean. AS far as I know this is the case in all languages.

So no need to wrap the json in the value key.

Just to make sure you see what I mean let's do it again but this time not as a boolean but as a string

>>> import json
>>> response='{"value": "true"}'
>>> json.loads(response)
{'value': 'true'}
>>> response='"true"'
>>> json.loads(response)
'true'

So also a string is correctly parsed as a string.

I would suggest to write your json as follows, that will write the json correctly, also with the correct types ( bool, string, int, float ... )

diff --git a/datastore/mqtt/mqtt_payload.go b/datastore/mqtt/mqtt_payload.go
index 3692c3e..595a9b9 100644
--- a/datastore/mqtt/mqtt_payload.go
+++ b/datastore/mqtt/mqtt_payload.go
@@ -19,7 +19,7 @@ func (p *MQTTProducer) processVehicleFields(rec *telemetry.Record, payload *prot
                        continue
                }
                mqttTopicName := fmt.Sprintf("%s/%s/v/%s", p.config.TopicBase, rec.Vin, key)
-               jsonValue, err := json.Marshal(map[string]interface{}{"value": value})
+               jsonValue, err := json.Marshal(value)
                if err != nil {
                        return tokens, fmt.Errorf("failed to marshal JSON for MQTT topic %s: %v", mqttTopicName, err)
                }

Here is a screenshot of how that looks in the MQTT browser: Screeshot

So to summarize:

  1. I think you are using the wrong approach by going to the transformers which seems to be invented for the simple console logging and casting everything to strings. It would make more sense to use the probuf types when creating the json data
  2. Do not wrap everything a map with the key value as you could see in the example above, it's not needed to preserve correct json parsing

netdata-be avatar Nov 28 '24 13:11 netdata-be

... So to summarize:

  1. I think you are using the wrong approach by going to the transformers which seems to be invented for the simple console logging and casting everything to strings. It would make more sense to use the probuf types when creating the json data
  2. Do not wrap everything a map with the key value as you could see in the example above, it's not needed to preserve correct json parsing
  1. From what I understand, PayloadToMap() doesn’t inherently cast everything to a string. Instead, most fields seem to be using a string-based protobuf defined by the vehicle. However, I'll double-check to confirm. (note: the link you mentioned does not seem to show a string cast?)
  2. You’ve made a great point about the value being JSON-parsable. This definitely seems like an improvement worth exploring. While we would lose the flexibility to add additional fields, it’s fair to question whether retaining that flexibility might be over-engineering in this case. I'll make the code change and test if it's indeed an improvement a.s.a.p

erwin314 avatar Nov 28 '24 15:11 erwin314

@erwin314 it seems you are right! It seems Tesla is sending it as string, so that's correct, I played with the raw data and saw indeed that it is working as intended ( But I think that's a broken implementation from Tesla since everything seems to be a string)

An other small thing, what do you think of this:

--- a/datastore/mqtt/mqtt_payload.go
+++ b/datastore/mqtt/mqtt_payload.go
@@ -15,11 +15,11 @@ func (p *MQTTProducer) processVehicleFields(rec *telemetry.Record, payload *prot
        var tokens []pahomqtt.Token
        convertedPayload := transformers.PayloadToMap(payload, false, p.logger)
        for key, value := range convertedPayload {
-               if key == "Vin" || key == "CreatedAt" {
+               if key == "Vin" || key == "CreatedAt" || value == "<invalid>" {

My reasoning is, if the value is invalid then don't publish it... Not sure though if the message is then acked correctly to the car...

netdata-be avatar Nov 28 '24 16:11 netdata-be

I have updated the way the MQTT values are formatted. They now directly contain the value as json, so they are no-longer wrapped in an extra value entry. And the Invalid field value is now translated to null in json. Even though "null" might not strictly be the same as "invalid", it seems the most straight forward approach.

erwin314 avatar Nov 29 '24 10:11 erwin314

I've added MQTT support for connectivity events, with one topic per VIN. I assume there's at most one connectivity (dis)connected at a time?

Regarding ConnectionID, it seems to change every time a car (re)connects. This suggests it’s not an ID for a connection to a specific telemetry server, but rather a random ID generated for each new connection. That's why I put it in the value and not in the topic. I'm not sure what happens if a car is connected to multiple telemetry servers (can it even do that?)

Let me know if there’s any clarification or adjustment needed for this approach!

erwin314 avatar Nov 29 '24 13:11 erwin314

@ThomasAlxDmy To me this looks feature complete now. Can you pull some strings to get this unblocked?

netdata-be avatar Dec 02 '24 06:12 netdata-be

@patrickdemers6 is there something we can do to get this merged? Is there something missing which needs to be added?

netdata-be avatar Dec 12 '24 14:12 netdata-be

@erwin314 for your fork, have you published any built docker images? Does your fork run? Debating putting in the effort to run it so I can just get set up with an MQTT dispatcher for telemetry.

andrew-kennedy avatar Jan 10 '25 04:01 andrew-kennedy

Looks like there are conflicts to merge. If you can fix those, then let's ship. @erwin314

patrickdemers6 avatar Jan 10 '25 04:01 patrickdemers6

This is great work, thanks for all your hard work on it!

Thank you!

Looks like there are conflicts to merge. If you can fix those, then let's ship.

Where do you see the merge conflicts? @patrickdemers6 Because Github says: No conflicts with base branch - Changes can be cleanly merged.

erwin314 avatar Jan 10 '25 07:01 erwin314

Ah, had to switch to squash and merge instead of rebase.

The best way to learn if this implementation works is to get you and all the others that are interested using it.

patrickdemers6 avatar Jan 10 '25 07:01 patrickdemers6