graphcast-sdk
graphcast-sdk copied to clipboard
Tracking: Message processing pipeline
Try to layout the ideal message pipeline after receiving a gossip.
Message Pipelining in SDK
Handle general Waku messages received
- Nonce/timestamp (outer layer of GraphcastMessage) validation for all messages
- Async message processing: when Graphcast Agent receives a Waku signal, asynchronously send the contained WakuMessage to a Radio Operator through a MPSC channel.
- A separate runtime is used for processing waku signals; compared to using a new thread, runtime is faster for TCP/UDP communications.
- Compared to the previous process of receive, parse, and validate, and save to persistence, Graphcast Agent now uses the quickest and lightest processing of gossips.
Message Pipelining in Radios
The majority of message processing falls on specific radios and their intended application. While the operations are mostly async, there are some improvements worth considering:
- Validation for a specific content topic is done async and sequentially such that the order of messages are preserved. This is the processing bottleneck so we should explore concurrent validations.
- Handlers after validation can be spawned to concurrent threads and joined afterwards
- Subgraph radio on the public POI consensus computation: set up concurrent threads such that computation for one deployment does not need to wait for the computation of another deployment and block.
- Listener radio on the storage of generic message: batch processing can be achieved through a new psql mutation resolver to throttle writing to the DB. Operator can either wait for some time or a number of messages before writing messages to the database. We are not necessarily concern with the order of messages inside the DB, but it's best to avoid parallel writes.
- Raw/Generic Message Retention: For ephemeral use cases (most of the radios), raw messages are thrown away right after being decoded as a Graphcast typed message.
- For listener radio, introduce a configurable retention window for raw or generic messages.
- Aggregated summary and reports: Track an aggregation of messages
- Subgraph radio: For each deployment (topic), tally the number of messages received, number of participating indexers (gossip network connectivity), attestation results (number of matched results versus divergence result). Allow indexers to configure report options - notification methods, report frequencies, and aggregation methods (summation, mean/mode/median, periodic summation, or a moving average).
- Listener radio: Tally fields available on the GraphcastMessage level. For each type of GraphcastMessage, track number of peer nodes, number of messages per sender, number of messages per topic, and size and frequency of messages.
- Persistence of Aggregations: Aggregated data should be persisted in a scalable storage solution for quick retrieval and analytics. Explore prometheus histogram and summaries or storing a summaries table in the database.
- Pruning: Introduce a pruning configuration, either by message timestamp or maximum number of messages allowed to store. Ensure optimal resource usage.
Sequence of process
sequenceDiagram
participant GP as Graphcast Peer
participant SDK as Graphcast agent (SDK)
participant RO as Radio Operator
participant S as Stateful
GP->>SDK: Waku Signal
SDK->>SDK: Parse signal to WakuMessage
SDK->>RO: Send WakuMessage to Radio operator
Note over RO: Sequential Validation
alt Concurrent
RO->>RO: Different tasks for each message types
else Parallel
RO->>RO: Independently process messages of the same type
end
alt Persistence - In-memory
RO->>S: Update to contain recent typed messages
else Persistence - Retent FIFO
RO->>S: Prune old Messages
RO->>S: Store new Messages
end
S ->> S: Update aggregated summary
RO ->> S: Query summary for periodic report
A different perspective on messages
timeline
title Message lifespan
section Generic
Trigger: Either manual or perodic interval based on Networks
Create : Radio Operator collect necessary info
Send : Graphcast agent wraps message into WakuMessage
: Stores a temporary local copy
Waku signal propagation: Send to connected gossip peers
Receive : Graphcast agent receives signal
: Parse raw message into generic Graphcast message
: Graphcast field validation (nonce, timestamp, first time sender)
: Pass message to Radio Operator
Process: Radio Operator handles application specific logic
section Listener Radio
Storage: Asynchronously store generic graphcast message to database
: Summarize messages received/stored
section Subgraph Radio
Further decoding: Try decode messages to PublicPOIMessage
: Validate POI message fields (block hash, sender identity)
: Queue in remote_messages
Message comparison : determine comparison point from local copy
: trigger after message collection duration
: Take local and remote messages with matching deployment and block number
: Determine comparison results for NotFound, Diverged, Matched
: Notification
: Summarize results
Nice start! There are some aspects of this work that are missing from the spec, including:
- Some notion of a raw/generic message retention window
- Batched consumption and processing of raw messages to do aggregations (e.g. listener-radio counting messages per sender or topic)
- Persistence of aggregations
- Periodic pruning of raw message data that is outside the retention window
I recognise some of these may be Radio-level concerns, but we should have a spec for message pipelining that factors in these requirements.
Updated the previous comment to include retention, persistence, aggregations, and pruning.
Linking 2 DB issues
- [x] https://github.com/graphops/listener-radio/issues/19
- [ ] https://github.com/graphops/subgraph-radio/issues/60
Radio specific issues
- [ ] https://github.com/graphops/subgraph-radio/issues/83
- [ ] https://github.com/graphops/listener-radio/issues/23