hono
hono copied to clipboard
Do not autocreate internal-command topics for adapters
In some cases, it's not possible for the user used to read kafka topics to autocreate topics on the kafka cluster. This blocks the usage of commands which is a pitty. Currently I'm thinking of 2 options to tackle this:
- Configure the topic used to consume commands per adapter.
- Make the topic name predictable (not depending on the guid of the container) so manual creation of these topics is possible.
@BobClaerhout Would it also be a solution for your use case if the protocol adapters subscribed directly to (northbound) command topics? Then each protocol adapter instance would subscribe to the command topics of the tenants for which devices are connected to it. It would then discard commands for devices that are not connected to it. This concept was originally discussed as concept 1. in https://github.com/eclipse/hono/issues/2273#issuecomment-718418108. We could make it configurable to use this behavior as an alternative to the command router. FMPOV this would reduce complexity significantly. The overhead of reading and discarding commands would be acceptable unless an extremely large number of adapter instances are running and an extremely large number of commands are being sent for a tenant.
Yes, I think that's a pretty good solution for those usecases. In the case you do not have full control of the kafka cluster, this is usually for a limited number of tenants (if not only 1). In our case, the amount of adapters will also be limited and so is the amount of commands.
So TL;DR: I think that's a nice solution for this problem.
It could also be solved like this:
The Command Router would publish the commands not on the hono.cmd_internal.[adapterInstanceId] topic but instead on the hono.cmd_internal topic and use the adapterInstanceId as key for the records it publishes there.
In the protocol adapters, the consumers wouldn't subscribe on the hono.cmd_internal.[adapterInstanceId] topic anymore but instead manually assign the hono.cmd_internal topic with the partition that corresponds to the adapterInstanceId key. When processing the received records, each consumer would ignore records that have a key other than the adapterInstanceId of the consumer.
This would mean the hono.cmd_internal topic could be created beforehand, preferably with a number of partitions that is equal or greater than the number of protocol adapter pods multiplied with the number of vert.x verticle instances configured for them (i.e. equal or greater than the number of different adapterInstanceIds at a time).
The only downside I see here is that there could be some added traffic between Kafka and the protocol adapters, because consumers could also get command records for other adapterInstanceIds (which they then would ignore).
This comes down to the mapping from record key to partition index.
In the worst case, all adapterInstanceId keys would be assigned to the same partition index, in which case all consumers would get all commands. But with a high number of partitions, the distribution should be better and we could even use our own Partitioner implementation (for calculating the partition index from the key) in Command Router and adapters to optimize the distribution.
The advantage of this approach would also be, that there would be no hono.cmd_internal.[adapterInstanceId] topics to delete after the corresponding adapters have shut down (see #2901).
All in all, I think I would prefer this approach over the one with the dynamic topic names we are currently using. @sophokles73 @b-abel @kaniyan WDYT?
I am no expert but this sounds very reasonable. I do not think that we need a very high number of partitions in order for this to work. In particular, I believe that the number of pods could/should serve as an upper boundary for the number of partitions. That would also be consistent with the idea that the number of pods can change dynamically, e.g. when scaling up or down, while the number of partitions should remain constant. Is there a particular reason why you think that the adapterInstanceIds we use will not be distributed evenly across the partitions when using them as record keys?
Is there a particular reason why you think that the adapterInstanceIds we use will not be distributed evenly across the partitions when using them as record keys?
In order to let one adapterInstanceId key be the only one mapped to a specific partition, we would have to manage a mapping table of already used key -> partition mappings and use that in the Partitioner logic.
With the default Partitioner (see here) we'll probably get a decent distribution by means of the chosen hashing algorithm, but still there could of course be quite some partitions to which multiple adapterInstanceIds get mapped.
To reduce that possibility, I would rather use more partitions.
In order to let one adapterInstanceId key be mapped exclusively to one partition, we would have to manage a mapping table of already used key -> partition mappings and use that in the Partitioner logic.
What exactly do you mean with exclusively here? Do you mean that the partition will only contain messages with the same adapterInstanceId or do you mean that all messages with the same key will go into the same partition? I guess the latter is the default behavior anyway, isn't it? I do not see how the former could be achieved, given that we have no control over the number of pods/adapter instances but we have a fixed number of partitions. Or am I missing something?
All in all, I think I would prefer this approach over the one with the dynamic topic names we are currently using.
I agree that this is an improvement over the dynamic creation and deletion of topics, which unfortunately is not well supported by Kafka. But I am not convinced that the idea is better than the proposal above. If the protocol adapters subscribe directly to the northbound command topics, this has the disadvantage that they will retrieve and ignore messages directed to devices that are connected to other adapter instances. If the adapter instances should really become too busy by this, they can always be scaled up. This would only become a problem with an extremely high number of commands, wouldn't it? The claim that this would be a problem in real-world scenarios needs to be IMHO very well justified before investing in an alternative with much higher complexity.
Do you mean that the partition will only contain messages with the same adapterInstanceId
Yes (I've rephrased my comment above). And yes, I also don't see a way to achieve this. I just wanted to mention that this is a disadvantage of this approach. On the other hand the added traffic is probably not really relevant in most cases.
But I am not convinced that the idea is better than the proposal above. If the protocol adapters subscribe directly to the northbound command topics, this has the disadvantage that they will retrieve and ignore messages directed to devices that are connected to other adapter instances. If the adapter instances should really become too busy by this, they can always be scaled up.
But scaling up the adapters would increase the combined traffic between the adapters and Kafka even more. Letting adapters directly receive all commands is something I would only see for use cases with a handful of adapters. For other use cases, I wouldn't consider that a good design.
The claim that this would be a problem in real-world scenarios needs to be IMHO very well justified before investing in an alternative with much higher complexity.
Well, the investment in the Command Router solution has already been done. And the change towards using a fixed hono.cmd_internal topic would be quite small.
On the other hand, letting adapters directly subscribe to the tenant command topics would be a lot bigger change and would also mean changes in behaviour.
To sum it up, I would suggest using this issue as an initiative to use one, or more, fixed command-internal topic names, without the containerId-specific adapterInstanceId in the topic name. This will allow manual creation of the topic(s) (also solving #2912) and will prevent issues with topic deletion (solving #2901).
The alternative approach, not using the command-internal topic at all (and also not using the Command Router), by means of letting protocol adapters directly receive all commands, should be discussed/addressed separately, IMHO.
Details regarding the fixed command-internal topic(s):
Per default, I would suggest letting protocol adapters check on startup whether the hono.cmd_internal topic exists. If it doesn't, the topic will be created with a default of 10 partitions.
There may be the case that protocol adapters are scaled up and the number of hono.cmd_internal topic partitions doesn't seem large enough any more to prevent adapters from unnecessarily receiving commands targeted at other adapters. To address that, I would suggest a new protocol adapter configuration property like hono.commandInternal.topicSuffix.
If this is set, the adapter will use a hono.cmd_internal.[topicSuffix] topic and will append this topic suffix to the adapterInstanceId. When the Command Router needs to map an adapterInstanceId to the command-internal topic name, it will try to extract the topic suffix from the adapterInstanceId. If such a suffix is set, the Command Router will then use the hono.cmd_internal.[topicSuffix] topic to publish the commands.
When the partition count of the original command-internal topic isn't large enough anymore, one would just create a new topic, e.g. hono.cmd_internal.2, adapt the protocol adapter configuration to define hono.commandInternal.topicSuffix: 2 and then restart the adapters.
Does this require the adapter instance ID to be stable across (adapter) container restarts? If the adapter instance ID changes from X to Y and there are commands in the topic with key X then the newly started adapter will no longer process the commands with key X, right? What will happen with such commands?
Does this require the adapter instance ID to be stable across (adapter) container restarts?
The adapter instance ID would remain the same (apart from the potentially added suffix). That means it still contains the container ID and is therefore not stable across adapter container restarts (see https://github.com/eclipse/hono/issues/2028#issuecomment-875608809).
When a protocol adapter restarts, the command subscription connections to the devices get disconnected, therefore all the mapping entries with the old adapterInstanceId and the formerly connected devices become obsolete.
If the adapter instance ID changes from X to Y and there are commands in the topic with key X then the newly started adapter will no longer process the commands with key X, right? What will happen with such commands?
Yes, these commands with the old key X wouldn't be handled. Even if the restarted adapter would process these records, it could be that after the adapter restart, the command target device has connected to another adapter entirely, so that this command record would be at the wrong adapter then.
Thinking about this scenario more, I think it wouldn't be easy to process these "lost" commands. Maybe the Command Router could do it: When the shutdown of an adapter gets reported via the adapterInstanceLivenessService, the remaining records in the corresponding topics could be fetched by a dedicated consumer in the Command Router and then delivery-failure command responses could be sent for them. That would be the theory, in the details I still see some hurdles there. But anyway, such lost comands currently also don't get handled, so the suggested changes regarding the command-internal topic here don't make things worse.
Here is another approach, addressing the concerns regarding unnecessary message deliveries and handling of "lost" messages, requiring the protocol adapters to be part of a K8s StatefulSet instead of a Deployment in order for the command-internal topic names to be fixed.
Details:
The protocol adapters use the same adapterInstanceId format as before, i.e. [podname]_[containerId]_[verticleIndex].
They use command-internal topics with the naming format hono.cmd_internal.[podname], having as many partitions as the protocol adapter has vert.x verticle instances.
The command consumers in a protocol adapter subscribe to the hono.cmd_internal.[podname] topic and each use a fixed partition assignment of the partition corresponding to the consumer verticle index.
If on protocol adapter startup the topic doesn't exist, it will be created, if it doesn't have enough partitions, its partition count will be increased to match the number of verticles.
The Command Router will take the adapterInstanceId and extract pod name and verticle index from it. A command message will then be published on the hono.cmd_internal.[podname] topic with the verticle index as partition number.
This means: Compared to the approach presented in the earlier comment, one Kafka consumer doesn't potentially receive commands directed at another adapter instance, because each Kafka consumer instance will use its own, fixed topic partition.
When using K8s Deployments for the protocol adapters, a used command-internal topic will be e.g. hono.cmd_internal.hono-adapter-mqtt-vertx-69dcc8c68-mxhbv. The above approach will work with this, but, in order to get really fixed topic names, the pod names need to be stable. For that, K8s StatefulSets need to be used, so that the command-internal topic name will be e.g. hono.cmd_internal.hono-adapter-mqtt-vertx-1.
Regarding "lost" messages, i.e. messages in a command-internal topic not processed before the target adapter crashed:
If a StatefulSet is used (or if a Deployment is used and the container was just restarted, without a pod name change), the consumers of the restarted container will receive the not-yet-processed messages and will see that their record keys contain the old container id (adapterInstanceId used as key). So, in case the target device has by chance connected to the same pod again after the container restart, the command record could even be forwarded to the device. Otherwise the command record will be handled by sending a delivery failure command response (if it is a request/response command).
I think we should go with this approach.
EDIT: At least it looks like the better approach if K8s StatefulSets are used. On the other hand, Kubernetes deployments are still more flexible regarding updates, etc. So it looks like it's preferable to stick with K8s deployments here. Hence, for the sake of having fixed topic names, the approach in my previous comment again looks like the better alternative.