python-sdk
python-sdk copied to clipboard
PubSub topic used is not available in subscribing topic method
When subscribing using wildcards ("+" or "#" relevant for MQTT), the actual topic is not known by the subscribing topic method. This could be useful in many cases, for instance when embedding a unique id in the topic to enforce authorization in MQTT.
Could this be added to the callback somehow?
Could the source or subject attributes of the cloudevent be used?
The solution for this would be for this to be provided via the cloud event or GRPC Metadata. That however requires support for this in the Dapr Runtime / sidecar (dapr/dapr repo) first.
It could be added to the gloud event from the python SDK, without ant updates to the runtime, no? From the request.topic property?
def OnTopicEvent(self, request, context):
"""Subscribes events from Pubsub."""
pubsub_topic = request.pubsub_name + DELIMITER + \
request.topic + DELIMITER + request.path
no_validation_key = request.pubsub_name + DELIMITER + request.path
if pubsub_topic not in self._topic_map:
if no_validation_key in self._topic_map:
pubsub_topic = no_validation_key
else:
context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore
raise NotImplementedError(
f'topic {request.topic} is not implemented!')
event = v1.Event()
event.SetEventType(request.type)
event.SetEventID(request.id)
event.SetSource(request.source)
event.SetData(request.data)
event.SetContentType(request.data_content_type)
# TODO: add metadata from context to CE envelope
response = self._topic_map[pubsub_topic](event)
if isinstance(response, TopicEventResponse):
return appcallback_v1.TopicEventResponse(status=response.status.value)
return empty_pb2.Empty()
But it is maybe better provided through the runtime. E.g. by providing the topic in the request.source property?
@ngiind the source attribute might be used for something else if the cloud event didn't originate from Dapr. Perhaps we could add this as a custom cloud event extension attribute.
@berndverst when is the source not Dapr?
Maybe the topic does not belong in the cloud event, and should instead be a separate parameter for the TopicSubscribeCallable? The MQTT Cloud events binding spec does not seem to include the topic in the event (https://github.com/cloudevents/spec/blob/main/cloudevents/bindings/mqtt-protocol-binding.md).
I see what you are requesting now. Yes we could just get the topic from the request itself. We could set it as a cloud event extension. How about that?
If I change the method signature for the topic callback I would introduce a breaking change. Unfortunately the topic functions take in an instance of CloudEvent from the official Cloud Event SDK. None of these fields in the Python Cloud Events SDK are designed to hold the topic information. Source could be populated already.
I think setting a cloud event extension (custom property) and using a prefix like dapr_
may make sense.
extensions = dict()
extensions['dapr_topic'] = request.topic
event = v1.Event()
event.SetEventType(request.type)
event.SetEventID(request.id)
event.SetSource(request.source)
event.SetData(request.data)
event.SetContentType(request.data_content_type)
event.SetExtensions(extensions=extensions)
Then in your handler you can either get the complete list via
event.Extensions()
or you can use event.Get('dapr_topic')
Thoughts @ngiind ?
@berndverst: I think this seems like an ok solution, considering that we need to be backwards-compatible. This should be an issue in the other SDKs as well, no? Maybe we need to align?
@ngiind yes we probably need to agree on the name / prefix. We can make this change in the dapr.ext.grpc
library once we have decided what to do and do an off-cycle release of that library.
Sounds like a good plan @berndverst. How do we synchronize with the other SDKs? I'm a bit of a neebie, sorry :).
It will be best for us to raise a new proposal in the dapr/dapr repo. Though this is also something the "SIG SDK spec" working group should handle. The dapr/dapr issue is probably a good start and it can be routed from there accordingly.
Ok. Thanks! Shall I raise the issue in dapr/dapr?
This issue has been automatically marked as stale because it has not had activity in the last 60 days. It will be closed in the next 7 days unless it is tagged (pinned, good first issue, help wanted or triaged/resolved) or other activity occurs. Thank you for your contributions.
Nudging this. I can help implement it.
@ngiind it is now implemented in the attached PR - it requires Dapr 1.10. It could not have been implemented prior to my work updating the protos in the Dapr Runtime.
event.Subject()
will be the pubsub topic.
This is a synonym for event.Extensions()['topic']
Thank you, @berndverst ! <3
Thank you, @berndverst ! <3
@ngiind small correction:
event.Subject()
will be the topic via which the event was delivered, this could be a deadletter topic for example.
The originally intended topic is also accessible via event.Extensions()['topic']
if desired.