java-sdk icon indicating copy to clipboard operation
java-sdk copied to clipboard

Support taking an ObjectSerializer as well on pubsub message handling

Open CodeMonkeyLeet opened this issue 3 years ago • 3 comments

Describe the proposal

Porting a suggestion from user bencod#6730 in Dapr Discord:

On publishing, Dapr client accepts message data as Object and it serialize the Object using ObjectSerializer - the Dapr one or user provided one.

On message handling user receives a CloudEvent that has data which according to documentation is String or Json, or the binary data of the message.

It requires user to deserialize the message. This behavior is not consistent with publishing serialization which is done by client. I would expect deserialization to be done by infra as well, to allow user to provide an ObjectSerializer and CloudEvent to be typed. Something as follows:

@Topic(name = "testTopic", pubsubName = "testPubSub", serializer = MyMsgObjectSerializer.class)
public Mono<Void> handleMessage(@RequestBody(required = false) CloudEvent<MyMsg> cloudEvent) {
   ...
}

CodeMonkeyLeet avatar Aug 16 '21 22:08 CodeMonkeyLeet

CloudEvent is a JSOn object in Dapr. So, this feature would only be applicable for the data part of the pubsub event.

artursouza avatar Aug 20 '21 00:08 artursouza

I think this issue can be solved if we remove CloudEvent from the equation. For example, if the subscriber receives the object directly - similar to how is done in the .Net SDK. Something like this:

@Topic(name = "testTopic", pubsubName = "testPubSub", serializer = MyMsgObjectSerializer.class)
public Mono<Void> handleMessage(@RequestBody(required = false) MyMsg myMsg) {
   ...
}

artursouza avatar Oct 15 '21 00:10 artursouza

I have a few open questions on dapr kafka pubsub component, 1)In pub/sub in what situation Dapr retries sending a message again.. I read in documentation that the delivery is at least once. what is considered successful delivery and how to test it? 2) How dapr handle Autocommit? can we have the option to set autocommit time? 3) Dapr pub/sub tries to send it to subscribers and subscribers fail to process, what would happen then? 4) Unique ID for each message which gets published through pubsub 5) Ability to "ResetOffset" for Kafka pub/sub? 6)Is it possible to set Kafka producer/consumer properties when using it as pub/sub component in Dapr?

Example : kafka.NewProducer(&kafka.ConfigMap{ "bootstrap.servers": "host1:9092,host2:9092", "client.id": socket.gethostname(), "acks": "all", "batch.size": "32000", "compression.type": "gzip", })

  1. Can we configure PubSub publish retry count?
  2. if the sidecar crashes do all pending messages get lost? Where is this described in the dapr docs?
  3. how does Dapr guarantee messages are never lost when a sidecar gets the message from the message bus, but then it crashes before it can deliver it to the subscribing service code?

Venkat1188 avatar May 17 '22 11:05 Venkat1188