python-sdk icon indicating copy to clipboard operation
python-sdk copied to clipboard

Incomplete transfer of data from context to CloudEvent

Open jrehnen opened this issue 2 years ago • 8 comments

Describe the issue https://github.com/dapr/python-sdk/blob/e89ac58f531f05ad4b47d46c1ccfcd4682c94573/ext/dapr-ext-grpc/dapr/ext/grpc/_servicier.py#L187-L194 Not all metadata is copied from context to CloudEvent. This prevents you from accessing potentially relevant information the original message would've given.

Expected Behaviour All data from context should be accessible in CloudEvent.

jrehnen avatar May 20 '22 06:05 jrehnen

Could you provide an example I can use to implement this?

How can I force the processing of an event with metadata that would trigger this code path, and how would you usually verify that the cloud event has the expected data? I'm not familiar with cloud events myself.

berndverst avatar May 31 '22 23:05 berndverst

An example would be CloudEvents generated by Debezium, which follow a structure as described here.

As you can see, the structure has more fields than just type, id, source, data and content_type. These fields however are not included in the CloudEvent received by the consumer. I specifically would want to access the iodebeziumop and the iodebeziumschema fields.

I'm not entirely sure myself, but you should be able to reproduce the issue by first publishing an event of the Debezium structure as a CloudEvent and then receiving it on the subscriber side. As far as I know, you should be missing all the originally given fields except for type, id, source, data and content_type.

Hope this helps and thank you in advance.

jrehnen avatar Jun 02 '22 11:06 jrehnen

@jrehnen after much investigation I have determined that additional Cloud Event metadata (such as these debezium fields) are not currently stored by most Dapr PubSub components.

I will be updating all PubSub components to do so for the 1.9 release and will add Python SDK support at that time. While I already have a Python SDK implementation which I believe to be working, I do not actually have a Dapr PubSub component which I know to handle the CloudEvent Metadata correctly.

berndverst avatar Jun 30 '22 20:06 berndverst

ext.grpc._servicier.py in _CallbackServicer:

def OnTopicEvent(self, request: TopicEventRequest, context: grpc.ServicerContext):

The missing metadata should be returned via gRPC metadata. Cloud Events treats non-standard fields as headers and in gRPC that means metadata.

This means this data should be available via one of the following:

context.invocation_metadata()
context.trailing_metadata()

However, in my testing the data the headers are not returned today because no Dapr PubSub component seems to be saving these additional non-standard headers / metadata, and the PubSub components aren't sending this data to subscribers.

Before the Python SDK changes could have any impact we need to update PubSub components. This will be specific to each PubSub component.

berndverst avatar Jun 30 '22 20:06 berndverst

@jrehnen good news - if you are sending a cloud event to Dapr directly (with data content type "application/cloudevents+json") we are actually storing all custom attributes.

However the proto for the TopicEventRequest has no way for us to pass these additional attributes.

I'll add support for the cloud event extension attributes to the proto, which then will enable you to access them in the Python SDK via the cloudevent extension dictionary. This should not even require a SDK change.

Due to the runtime change required however this won't happen until release 1.9.

If you use HTTP as the app protocol instead you may already be able to receive these debezium attributes!

Please note once again that all of this will only be supported if the data content type is "application/cloudevents+json"

berndverst avatar Jul 06 '22 04:07 berndverst

Confirmed this is working with HTTP (flask_dapr or dapr-ext-fastapi extensions or any other HTTP method):

import logging
from flask_dapr import DaprApp
from flask import Flask, request


app = Flask(__name__)
dapr = DaprApp(app)


@dapr.subscribe(pubsub='pubsub', topic='mytopic')
def handlethis():
  logging.warn(request.data)
  return "test", 200


if __name__ == '__main__':
  app.run(host='0.0.0.0', port=5000)

Run the subscriber app:

dapr run --app-id python-subscriber --app-protocol http --app-port 5000 python3 app.py

Now publish the cloud event - note the application/cloudevents+json content type. See the custom property SOMECUSTOMPROPERTY - this could be the debezium properties you are interested in.

 dapr publish --publish-app-id python-subscriber --pubsub pubsub --topic mytopic --data '{"specversion" : "1.0", "type" : "com.dapr.cloudevent.sent", "source" : "testcloudeventspubsub", "subject" : "Cloud Events Test", "id" : "someCloudEventId", "SOMECUSTOMPROPERTY": "CUSTOMVALUE", "time" : "2021-08-02T09:00:00Z", "datacontenttype" : "application/cloudevents+json", "data" : {"orderId": "100"}}'
== APP == WARNING:root:b'{"SOMECUSTOMPROPERTY":"CUSTOMVALUE","data":{"orderId":"100"},"datacontenttype":"application/cloudevents+json","id":"someCloudEventId","pubsubname":"pubsub","source":"testcloudeventspubsub","specversion":"1.0","subject":"Cloud Events Test","time":"2021-08-02T09:00:00Z","topic":"mytopic","traceid":"00-e940d870ac248252ed5123a6f870dbe3-e3596874062b9598-01","traceparent":"00-e940d870ac248252ed5123a6f870dbe3-e3596874062b9598-01","tracestate":"","type":"com.dapr.cloudevent.sent"}'

@jrehnen my recommendation for now is to use HTTP if you need to use PubSub.

berndverst avatar Jul 06 '22 06:07 berndverst

This issue has been automatically marked as stale because it has not had activity in the last 60 days. It will be closed in the next 7 days unless it is tagged (pinned, good first issue, help wanted or triaged/resolved) or other activity occurs. Thank you for your contributions.

dapr-bot avatar Sep 17 '22 22:09 dapr-bot

This will not make the next release as the required work isn't done in the Dapr runtime.

berndverst avatar Sep 19 '22 22:09 berndverst

@jrehnen the linked PR should do what you want now. This will only work with Dapr 1.10 forward.

event.Extensions() is a dictionary which will include all those custom properties.

berndverst avatar Jan 12 '23 00:01 berndverst