dotnet-sdk
dotnet-sdk copied to clipboard
Add .NET client for dynamic pubsub subscriptions
Description
Implements streaming subscription support as a discrete Dapr.Messaging package.
This was based on the work done by @philliphoff here. The requirements call for a mechanism to return an unbounded stream of message from the subscription and a follow-up call to the sidecar for each message to convey an action that should be taken on it (e.g. drop, retry or mark as completed).
While I think his use of a delegate to force the developer to process each message and immediately notate the action that should be taken on the message, I instead wanted to expose a method returning an IAsyncEnumerable as I think there's some interesting opportunities for combining this with some Rx goodness.
~~My approach then also requires that in addition to setting up the initial subscription, the developer call AcknowledgeMessageAsync
with the message ID and the action to take, but also bakes in a policy so that a default configurable action is taken after a timeout window in which the developer fails to otherwise indicate success or failure.~~
~~Because a separate connection will be created with the sidecar for every combination of pubsub component and topic, the actual implementation is an internal class called PublishSubscribeReceiver
that maintains a single streaming connection to the sidecar for each instance in the ConnectionManager
. In order to facilitate some future backpressure support and otherwise decouple the receipt of messages from the sidecar from the subscription, I write inbound messages to a Channel<TopicMessage>
and read from it from the subscription method implementing IAsyncEnumerable<TopicMessage>
. As a side effect of each message read out, it also registers the message identifier with a TaskCompletionSource
and a CancellationTokenSource
bound to the provided cancellation token so that if the developer fails to acknowledge the message within the configured timeout window, it the default message handler action will be taken automatically.~~
Update: After hammering out a concept of the above, I ultimately agreed with @philliphoff that the approach isn't as smooth as his original approach, so I've modified it accordingly and detailed it more in this comment.
Finally, I opted to implement the extensions property on the TopicMessage
with a Dictionary<string, Value>
where the Value
is the Protobuf Value struct (meaning that a developer is going to have to query the type of the struct and retrieve the appropriate property with the matching value). While I considered just strongly converting all values to a string, there are some Values types this can represent that would make that cumbersome (e.g. List or other structs). Ultimately, if the developer is opting to use this, they can figure out how to retrieve the Value they intend to.
Additional notes
This should ideally be merged after #1331 as it takes a dependency on a package introduced (Dapr.Common which introduces a generic client builder class and contains shared exceptions) in that PR. I created a stub for it here, but it'd be nice to avoid the conflict later on.
This implementation also creates a separate Dapr.Protos package that each of the existing libraries can take advantage of once this is merged in that ensures that there's only one place that Protos have to be updated at instead of each library having its own copy.
I think there's an opportunity to improve on how we do logging across all these distinct projects using a static logging type. As the internal GrpcClient isn't DI-injected, it eliminates several of the simpler approaches to just use an ILogger<>
or ILoggerFactory
. I removed the use of the configuration options as a carrier of the logging implementation as it just didn't feel like a great fit there, but I think it's worth exploring a more concrete and centrally-defined approach to more uniform logging going forward for the .NET client. But perhaps in another issue.
Issue reference
We strive to have all PR being opened based on an issue, where the problem or feature have been discussed prior to implementation.
Please reference the issue this PR will close: #1324
Checklist
Please make sure you've completed the relevant tasks for this PR, out of the following list:
- [X] Code compiles correctly
- [X] Created/updated tests
- [X] Extended the documentation - https://github.com/dapr/docs/pull/4358