python-sdk icon indicating copy to clipboard operation
python-sdk copied to clipboard

PubSub topic used is not available in subscribing topic method

Open ngiind opened this issue 2 years ago • 11 comments

When subscribing using wildcards ("+" or "#" relevant for MQTT), the actual topic is not known by the subscribing topic method. This could be useful in many cases, for instance when embedding a unique id in the topic to enforce authorization in MQTT.

Could this be added to the callback somehow?

ngiind avatar Aug 01 '22 11:08 ngiind

Could the source or subject attributes of the cloudevent be used?

ngiind avatar Aug 01 '22 11:08 ngiind

The solution for this would be for this to be provided via the cloud event or GRPC Metadata. That however requires support for this in the Dapr Runtime / sidecar (dapr/dapr repo) first.

berndverst avatar Aug 04 '22 00:08 berndverst

It could be added to the gloud event from the python SDK, without ant updates to the runtime, no? From the request.topic property?

    def OnTopicEvent(self, request, context):
        """Subscribes events from Pubsub."""
        pubsub_topic = request.pubsub_name + DELIMITER + \
            request.topic + DELIMITER + request.path
        no_validation_key = request.pubsub_name + DELIMITER + request.path

        if pubsub_topic not in self._topic_map:
            if no_validation_key in self._topic_map:
                pubsub_topic = no_validation_key
            else:
                context.set_code(grpc.StatusCode.UNIMPLEMENTED)  # type: ignore
                raise NotImplementedError(
                    f'topic {request.topic} is not implemented!')

        event = v1.Event()
        event.SetEventType(request.type)
        event.SetEventID(request.id)
        event.SetSource(request.source)
        event.SetData(request.data)
        event.SetContentType(request.data_content_type)

        # TODO: add metadata from context to CE envelope

        response = self._topic_map[pubsub_topic](event)
        if isinstance(response, TopicEventResponse):
            return appcallback_v1.TopicEventResponse(status=response.status.value)
        return empty_pb2.Empty()

But it is maybe better provided through the runtime. E.g. by providing the topic in the request.source property?

ngiind avatar Aug 04 '22 07:08 ngiind

@ngiind the source attribute might be used for something else if the cloud event didn't originate from Dapr. Perhaps we could add this as a custom cloud event extension attribute.

berndverst avatar Aug 04 '22 07:08 berndverst

@berndverst when is the source not Dapr?

Maybe the topic does not belong in the cloud event, and should instead be a separate parameter for the TopicSubscribeCallable? The MQTT Cloud events binding spec does not seem to include the topic in the event (https://github.com/cloudevents/spec/blob/main/cloudevents/bindings/mqtt-protocol-binding.md).

ngiind avatar Aug 04 '22 11:08 ngiind

I see what you are requesting now. Yes we could just get the topic from the request itself. We could set it as a cloud event extension. How about that?

If I change the method signature for the topic callback I would introduce a breaking change. Unfortunately the topic functions take in an instance of CloudEvent from the official Cloud Event SDK. None of these fields in the Python Cloud Events SDK are designed to hold the topic information. Source could be populated already.

I think setting a cloud event extension (custom property) and using a prefix like dapr_ may make sense.

extensions = dict()
extensions['dapr_topic'] = request.topic

event = v1.Event()
        event.SetEventType(request.type)
        event.SetEventID(request.id)
        event.SetSource(request.source)
        event.SetData(request.data)
        event.SetContentType(request.data_content_type)
        event.SetExtensions(extensions=extensions)

Then in your handler you can either get the complete list via event.Extensions() or you can use event.Get('dapr_topic')

Thoughts @ngiind ?

berndverst avatar Sep 20 '22 00:09 berndverst

@berndverst: I think this seems like an ok solution, considering that we need to be backwards-compatible. This should be an issue in the other SDKs as well, no? Maybe we need to align?

ngiind avatar Sep 22 '22 10:09 ngiind

@ngiind yes we probably need to agree on the name / prefix. We can make this change in the dapr.ext.grpc library once we have decided what to do and do an off-cycle release of that library.

berndverst avatar Oct 04 '22 23:10 berndverst

Sounds like a good plan @berndverst. How do we synchronize with the other SDKs? I'm a bit of a neebie, sorry :).

ngiind avatar Oct 05 '22 06:10 ngiind

It will be best for us to raise a new proposal in the dapr/dapr repo. Though this is also something the "SIG SDK spec" working group should handle. The dapr/dapr issue is probably a good start and it can be routed from there accordingly.

berndverst avatar Oct 05 '22 07:10 berndverst

Ok. Thanks! Shall I raise the issue in dapr/dapr?

ngiind avatar Oct 05 '22 08:10 ngiind

This issue has been automatically marked as stale because it has not had activity in the last 60 days. It will be closed in the next 7 days unless it is tagged (pinned, good first issue, help wanted or triaged/resolved) or other activity occurs. Thank you for your contributions.

dapr-bot avatar Dec 16 '22 23:12 dapr-bot

Nudging this. I can help implement it.

ngiind avatar Dec 19 '22 06:12 ngiind

@ngiind it is now implemented in the attached PR - it requires Dapr 1.10. It could not have been implemented prior to my work updating the protos in the Dapr Runtime.

event.Subject() will be the pubsub topic.

This is a synonym for event.Extensions()['topic']

berndverst avatar Jan 12 '23 01:01 berndverst

Thank you, @berndverst ! <3

ngiind avatar Jan 12 '23 10:01 ngiind

Thank you, @berndverst ! <3

@ngiind small correction:

event.Subject() will be the topic via which the event was delivered, this could be a deadletter topic for example. The originally intended topic is also accessible via event.Extensions()['topic'] if desired.

berndverst avatar Jan 12 '23 19:01 berndverst