feat(drive-abci): preliminary support for event subscriptions in Platform
Issue being fixed or feature implemented
- Provide DAPI clients with a supported path to consume Drive’s new platform event bus so they can react to block commits and finalized state transitions without polling (packages/dapi-grpc/protos/ platform/v0/platform.proto:8, packages/rs-drive-abci/src/abci/handler/finalize_block.rs:1, packages/rs-dapi/src/services/platform_service/subscribe_platform_events.rs:1).
- Align node tooling and SDKs with the new streaming capability, including configurable timeouts and developer ergonomics, so operators and integrators can adopt the feature safely (packages/ dashmate/templates/platform/gateway/envoy.yaml.dot:1, packages/rs-sdk/src/platform/events.rs:1).
What was done?
- Added a versioned subscribePlatformEvents RPC plus event filter schema, then emitted block and state-transition events from the ABCI finalize flow via a shared in-process event bus to make Drive the authoritative producer (packages/dapi-grpc/protos/platform/v0/platform.proto:8, packages/rs-drive-abci/src/abci/handler/finalize_block.rs:1, packages/rs-dash-event-bus/src/event_bus.rs:1).
- Proxied the stream through rs-dapi with new Prometheus counters and per-route deadlines so each downstream subscription receives its own upstream channel while remaining observable and bounded (packages/rs-dapi/src/services/platform_service/subscribe_platform_events.rs:1, packages/rs-dapi/src/metrics.rs:89, packages/rs-dapi/src/server/grpc.rs:21).
- Taught dashmate to surface the additional timeouts, migrate older configs, and render Envoy settings from the single source of truth so operators don’t need manual edits when enabling platform event subscriptions (packages/dashmate/configs/getConfigFileMigrationsFactory.js:1164, packages/dashmate/templates/platform/gateway/envoy.yaml.dot:1, packages/dashmate/docs/config/dapi.md:39).
- Extended the Rust SDK (and wasm bindings) with a subscription helper, error mapping, sample app, and network test to help client developers consume the new stream and validate the handshake semantics (packages/rs-sdk/src/platform/events.rs:1, packages/rs-sdk/tests/fetch/platform_events.rs:1, packages/wasm-sdk/src/error.rs:169).
Note
This is only preliminary implementation, adding necessary scaffolding to emit and deliver events to user. Implemented events are more a proof-of-concept. Further work is needed to emit meaningful events, like document update, etc.
How Has This Been Tested?
- Added async network test for the new SDK stream subscription (packages/rs-sdk/tests/fetch/platform_events.rs:1).
- No automated suite was run in this environment; the new test exercises the end-to-end flow when network-testing is enabled.
Breaking Changes
- Code-wise - none. The gRPC interface is additive and dashmate migrations backfill the new timeout settings.
- Some dashmate config settings were altered (especially timeouts).
Checklist:
- [x] I have performed a self-review of my own code
- [ ] I have commented my code, particularly in hard-to-understand areas
- [x] I have added or updated relevant unit/integration/functional/e2e tests
- [x] I have added "!" to the title and described breaking changes in the corresponding section if my code contains any
- [x] I have made corresponding changes to the documentation if needed
For repository code-owners and collaborators only
- [x] I have assigned this pull request to a milestone
Walkthrough
Platform event streaming support added through new Protobuf messages, a gRPC RPC method, event bus infrastructure for publishing events, subscription forwarding in rs-dapi, client library implementations, and associated configuration, tests, and documentation updates.
Changes
| Cohort / File(s) | Summary |
|---|---|
Protocol Buffer Definitions packages/dapi-grpc/protos/platform/v0/platform.proto |
Added versioned subscription/response messages (PlatformSubscriptionRequest, PlatformSubscriptionResponse), event and filter definitions (PlatformEventV0, PlatformFilterV0), and server-streaming RPC method subscribePlatformEvents to Platform service. |
rs-dapi Core Implementation packages/rs-dapi/src/services/platform_service/mod.rs, subscribe_platform_events.rs, wait_for_state_transition_result.rs |
Added new subscribe_platform_events_impl method for streaming subscriptions with active session tracking and metrics. Replaced timeout-based loop with gRPC deadline-based approach in wait_for_state_transition_result. |
rs-dapi Configuration & Server packages/rs-dapi/src/config/mod.rs, config/tests.rs, server/grpc.rs, src/main.rs, metrics.rs |
Added platform_events_timeout and core_stream_timeout config fields. Refactored TimeoutLayer to support per-method timeouts. Removed legacy PlatformEvents metric variants. Made server future binding immutable. |
rs-dapi Documentation packages/rs-dapi/doc/DESIGN.md, README.md |
Updated design docs to reflect direct streaming pattern without multiplexing. Added new environment variable documentation. |
rs-dapi Utilities packages/rs-dapi/src/services/streaming_service/bloom.rs, block_header_stream.rs, mod.rs |
Simplified conditional logic in bloom filter and removed unused import in block header stream. |
Event Bus Core packages/rs-dash-event-bus/Cargo.toml, src/event_bus.rs, src/lib.rs |
Updated description from multiplexer to utilities. Added synchronous removal fallback path in SubscriptionHandle::Drop with metrics updates. |
rs-drive-abci Event Bus Integration packages/rs-drive-abci/Cargo.toml, src/abci/app/mod.rs, app/consensus.rs, app/full.rs, handler/finalize_block.rs, query/mod.rs, query/service.rs, server.rs |
Added EventBus infrastructure to ABCI app, implemented EventBusApplication trait, added event publishing in finalize_block for BlockCommitted and StateTransitionFinalized events, integrated PlatformFilterAdapter for event filtering. |
rs-drive-abci Tests tests/strategy_tests/main.rs |
Added event_bus_subscribe_all_and_receive integration test. |
rs-sdk Subscription Support packages/rs-sdk/Cargo.toml, src/error.rs, src/platform.rs, src/platform/events.rs, src/sdk.rs, examples/platform_events.rs |
Added subscriptions feature, SubscriptionError variant, subscribe_platform_events method to Sdk, new events module, and example demonstrating subscription usage. |
rs-sdk Tests tests/fetch/mod.rs, fetch/platform_events.rs, fetch/config.rs |
Added feature-gated platform_events test module and test for handshake validation. |
wasm-sdk Error Handling packages/wasm-sdk/src/error.rs |
Added SubscriptionError mapping to WasmSdkError conversion. |
Generated Java Client packages/dapi-grpc/clients/platform/v0/java/org/dash/platform/dapi/v0/PlatformGrpc.java |
Generated server-streaming RPC bindings for subscribePlatformEvents with method handler and stub implementations. |
Generated Python Client packages/dapi-grpc/clients/platform/v0/python/platform_pb2_grpc.py |
Generated unary_stream RPC for subscribePlatformEvents with servicer, stub, and experimental API bindings. |
Generated Web Client packages/dapi-grpc/clients/platform/v0/web/platform_pb.d.ts, platform_pb.js, platform_pb_service.d.ts, platform_pb_service.js |
Generated TypeScript declarations and JavaScript implementation for subscription and event messages with full serialization support. |
Generated NodeJS Client packages/dapi-grpc/clients/platform/v0/nodejs/platform_protoc.js |
Generated protobuf JS with constructors, (de)serialization, and oneof handling for subscription, event, and filter messages. |
Generated Objective-C Client packages/dapi-grpc/clients/platform/v0/objective-c/Platform.pbobjc.h, Platform.pbobjc.m, Platform.pbrpc.h, Platform.pbrpc.m |
Generated Objective-C message descriptors, class definitions, and gRPC service bindings for subscribePlatformEvents. |
Configuration & Documentation packages/dashmate/configs/defaults/getBaseConfigFactory.js, getConfigFileMigrationsFactory.js, docker-compose.yml, docs/config/gateway.md, docs/config/dapi.md, docs/services/gateway.md, src/config/configJsonSchema.js |
Migrated from waitForStResultTimeout to structured timeouts (waitForStateTransitionResult, subscribePlatformEvents, coreStreams), updated env vars, and migrated configuration schema. |
gRPC Coverage & Utilities .github/scripts/check-grpc-coverage.py, packages/dapi-grpc/src/lib.rs, packages/rs-dapi/src/services/streaming_service/bloom.rs |
Excluded subscribePlatformEvents from coverage checks and added lint suppression for non-CamelCase types. |
Test Utilities packages/wasm-drive-verify/tests/fuzz_tests.rs |
Removed unused import and made query binding immutable via interior mutation. |
Sequence Diagram(s)
sequenceDiagram
participant Client
participant rsDapi as rs-dapi<br/>PlatformService
participant EventBus as Event Bus<br/>(in-process)
participant Drive as Drive<br/>(upstream)
participant ABCI as rs-drive-abci<br/>ConsensusApp
Client->>rsDapi: PlatformSubscriptionRequest<br/>(filter, keepalive)
activate rsDapi
rsDapi->>EventBus: subscribe(filter)
activate EventBus
rsDapi->>Drive: subscribe_platform_events(filter)
activate Drive
ABCI->>EventBus: publish BlockCommitted
EventBus-->>rsDapi: event via receiver
rsDapi-->>Client: PlatformSubscriptionResponse<br/>(subscription_id, BlockCommitted)
ABCI->>EventBus: publish StateTransitionFinalized
EventBus-->>rsDapi: event via receiver
rsDapi-->>Client: PlatformSubscriptionResponse<br/>(subscription_id, StateTransitionFinalized)
Drive->>rsDapi: upstream stream event
rsDapi->>EventBus: forward event
EventBus-->>rsDapi: relayed via channel
rsDapi-->>Client: PlatformSubscriptionResponse
note over rsDapi: Keepalive timer
rsDapi-->>Client: PlatformSubscriptionResponse<br/>(subscription_id, Keepalive)
Client->>rsDapi: cancel/stream ends
deactivate rsDapi
deactivate EventBus
deactivate Drive
Estimated code review effort
🎯 4 (Complex) | ⏱️ ~60 minutes
Key areas requiring attention:
- Event bus integration across multiple crates: Changes to rs-drive-abci event publishing logic in finalize_block and new EventBusApplication trait require verification that events are published correctly and metrics are updated properly.
- Timeout refactoring in rs-dapi: The TimeoutLayer now handles per-method timeouts; verify that all four timeout paths (unary, state_transition, platform_events, core_stream) map correctly and no methods are missed.
- Configuration migration in dashmate: Complex migration logic for legacy waitForStResultTimeout and upstreams; verify backward compatibility and that all config paths resolve correctly.
- Client code generation: Verify that all language-specific generated bindings are syntactically correct and follow existing patterns (especially ObjectiveC descriptors and Java gRPC wiring).
- Subscription stream lifecycle: ActiveSessionGuard, metrics updates, channel closure handling, and keepalive logic should be tested for edge cases.
🐰 Streaming events through the bus so bright, With filters and timeouts set just right, Events flow free from Drive's stream, A subscription platform dream! 🎉
Pre-merge checks and finishing touches
✅ Passed checks (2 passed)
| Check name | Status | Explanation |
|---|---|---|
| Title check | ✅ Passed | The pull request title 'feat(drive-abci): preliminary support for event subscriptions in Platform' accurately describes the primary change in the changeset. The title uses a conventional format with a scope (drive-abci) and clearly indicates the main objective: adding preliminary support for event subscriptions. This aligns well with the extensive changes across multiple packages that introduce event bus infrastructure, streaming APIs, and subscription handling. |
| Description Check | ✅ Passed | Check skipped - CodeRabbit’s high-level summary is enabled. |
✨ Finishing touches
- [ ] 📝 Generate docstrings
🧪 Generate unit tests (beta)
- [ ] Create PR with unit tests
- [ ] Post copyable unit tests in a comment
- [ ] Commit unit tests in branch
feat/drive-event-bus
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.
Comment @coderabbitai help to get the list of available commands and usage tips.
✅ gRPC Query Coverage Report
================================================================================
gRPC Query Coverage Report - NEW QUERIES ONLY
================================================================================
Total queries in proto: 48
Previously known queries: 47
New queries found: 1
================================================================================
New Query Implementation Status:
--------------------------------------------------------------------------------
⊘ subscribePlatformEvents EXCLUDED
================================================================================
Summary:
--------------------------------------------------------------------------------
All new queries are excluded
Total known queries: 48
- Implemented: 44
- Not implemented: 2
- Excluded: 2
Not implemented queries:
- getConsensusParams
- getTokenPreProgrammedDistributions
@coderabbitai review all
@lklimek: I'll perform a comprehensive review of all the changes in this PR.
✅ Actions performed
Full review triggered.
Client streaming and bi-directional streams are not available via GRPC Web, it is not supported by the Javascript libraries and most likely will not be supported, because all of them uses Fetch as underlying layer for transport.
Server streaming is supported though, it would be great to have a subscription RPC that we could use from Web, and I feel like it is rarely when you want to change your subscription conditions, it is much easier to just cancel an create new subscription, and passing a list of params for subscription should work too.
I recently received a question, how do one subscribe on new documents or transactions to his data contracts, because people would like to build applications like chats, or integrations in their backend already, for example for accepting tokens in payment gateways, so I'm looking forward to it!
Client streaming and bi-directional streams are not available via GRPC Web, it is not supported by the Javascript libraries and most likely will not be supported, because all of them uses Fetch as underlying layer for transport.
Thanks for that info! I'll refactor to unidirectional grpc-web.
✅ DashSDKFFI.xcframework built for this PR.
- Workflow run: https://github.com/dashpay/platform/actions/runs/19038066595
- Artifacts: DashSDKFFI.xcframework (folder), DashSDKFFI.xcframework.zip, xc_checksum.txt
SwiftPM (host the zip at a stable URL, then use):
.binaryTarget(
name: "DashSDKFFI",
url: "https://your.cdn.example/DashSDKFFI.xcframework.zip",
checksum: "c9201581129c144406ed01884ba79abfeac528e52fc6d457be057843a6651760"
)
Xcode manual integration:
- Download 'DashSDKFFI.xcframework' artifact from the run link above.
- Drag it into your app target (Frameworks, Libraries & Embedded Content) and set Embed & Sign.
- If using the Swift wrapper package, point its binaryTarget to the xcframework location or add the package and place the xcframework at the expected path.
@coderabbitai review all
@lklimek: I'll perform a comprehensive review of all the changes in this PR.
✅ Actions performed
Full review triggered.
@coderabbitai review
✅ Actions performed
Review triggered.
Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.
@coderabbitai review
✅ Actions performed
Review triggered.
Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.