atomicDEX-API icon indicating copy to clipboard operation
atomicDEX-API copied to clipboard

feat(event-streaming): new streams (orders, swaps, eth_fee_estimation)

Open mariocynicys opened this issue 1 year ago • 3 comments

This PR aims to support event streaming in some of MM2's API endpoints.

The work in this PR till now is only a refactor of the event streaming code after multiple iterations of discussing how this should end up looking like.

The flow goes as follows:

  • A client initially opens an SSE connection with MM2 to receive events on. For wasm, since it can't support SSEs, we use a worker to deliver the events to the client (let's consider it as SSE too).
  • If a client wants to receive some events (balance change, network topology change, new gas estimations, etc...), they need to subscribe to these events (enable these streamers) from the API.
  • A streamer is a background task that is responsible for firing some events. Initially MM2 doesn't have any streamers running. When a client enables a streamer (= subscribes to a particular event), they start receiving events from that streamer. Until they unsubscribe to them.
  • If no client is listening to some active streamer, it will shutdown. A new streamer will start again if a client re-subscribes.
  • (little irrelevant now) Multiple clients subscribing to the same event on MM2 doesn't mean we will have a new streamer for every client. Only one streamer is booted up for the first subscribed client and new subscribing clients will be sharing the same streamer.

The role of the StreamingManager:

  • Manage different clients (each client has an id cid when it first connects through SSE).
  • Add a new (cid, sid) pair (translates to: client with id cid wants to listen to the streamer with id sid). If such a streamer with sid is already running, it is just instructed that a new client is gonna be listening along, otherwise, it will be spawned in the background.
  • Stop listening (cid, sid) pair (= client with cid no longer wants to receive events from streamer with id sid, if no more clients are listening, the streamer will die).
  • Manage different streamers.
  • Manage the connections between the streamers (the background thread) and MM2. Streamers either work in a periodic manner (do some job and possibly fire some events then sleep for a while and repeat) or they need to receive a notification from MM2 (e.g. UTXO balance event streaming rely on electrum subscriptions: the electrum server sends a notification to MM2 when an address balance has changed and MM2 in turn sends this notification to the streamer responsible for firing balance events). To send some data to a streamer, StreamingManager::send(streamer_id, arbitrary_data) is used.

Breaking Changes:

  • No more event_stream_configuration in MM2 json config. Requested streams are initialized and configured dynamically through the API.
  • Thus event_stream_configuration.access_control_allow_origin config was moved to access_control_allow_origin (one scope out) https://github.com/KomodoPlatform/komodo-defi-framework/pull/2172/commits/b9d1218c9e8b4392188149b7434f85947f69b4d9.
  • And event_stream_configuration.worker_path was changed to event_stream_worker_path (one scope out) https://github.com/KomodoPlatform/komodo-defi-framework/pull/2172/commits/ee32fd1290fc66ffa0032c966887b182a47bb84f.
  • No more filters are required in the initial SSE setup with MM2. (e.g. mm2.com/event-stream?filter=NETWORK,BALANCE,DATA_NEEDED:datatype -> mm2.com/event-stream?id=1), but a client id is now required (a random u64 identifier for the client) (e.g. mm2.com/event-stream?id=799384531)
  • How to filter the wanted events then (client gets no events by default)? The client needs to activate specific event streamers through RPC (e.g. stream::enable::balance).
  • Contrary to the note in the previous point (client gets no events by default), some special events are now delivered to the client even if they never requested/filtered them (this happens using broadcast_all). example of these events are DATA_NEEDED:datatype, and HEARTBEAT should be similar later as well.

(so for keplr integration, only ?id=<RANDOM_U64> must be added to the SSE init request and everything should work. ?filter= shall be removed) (the <RANDOM_U64> should be stored by the client since it's required in any event streamer activation/deactivation. If for keplr only though, this isn't necessary for now since DATA_NEEDED:datatype event streamer is a special event that's broadcasted to all clients and doesn't require any sort of activation/subscription before hand)

mariocynicys avatar Jul 22 '24 13:07 mariocynicys

@mariocynicys can you please document any breaking changes here https://github.com/KomodoPlatform/komodo-defi-framework/pull/2172#issue-2422905962 , this is particularly important for GUIs that rely on streaming interface like Keplr integration c.c. @naezith @onur-ozkan some things will need to be changed for Keplr integration to use dev branch after this PR is merged.

shamardy avatar Aug 27 '24 13:08 shamardy

@mariocynicys can you please document any breaking changes here #2172 (comment) , this is particularly important for GUIs that rely on streaming interface like Keplr integration c.c. @naezith @onur-ozkan some things will need to be changed for Keplr integration to use dev branch after this PR is merged.

Can we pull the dev branch here? Builds from this branch currently missing some changes on the tendermint side which causes issues for Moon-Fi GUI team to test certain items.

onur-ozkan avatar Sep 10 '24 12:09 onur-ozkan

Great work! I particularly like that the gas fee estimator now is within the common framework of event handling

dimxy avatar Sep 27 '24 06:09 dimxy

Thus event_stream_configuration.access_control_allow_origin config was moved to access_control_allow_origin (one scope out) https://github.com/KomodoPlatform/komodo-defi-framework/commit/b9d1218c9e8b4392188149b7434f85947f69b4d9.

We already have rpccors in the root level of config file. Seems like this is a redundant field now.

onur-ozkan avatar Oct 01 '24 07:10 onur-ozkan

No more filters are required in the initial SSE setup with MM2. (e.g. mm2.com/event-stream?filter=NETWORK,BALANCE,DATA_NEEDED:datatype -> mm2.com/event-stream?id=1), but a client id is now required (a random u64 identifier for the client) (e.g. mm2.com/event-stream?id=799384531)

Why are we using randomly generated numerical IDs as the previous ones (like COIN_BALANCE, NETWORK, etc) were much better in terms of development and user experiences ? I assume in order to listen some events from MM2, people need to find out these IDs from some RPCs?

How to filter the wanted events then (client gets no events by default)? The client needs to activate specific event streamers through RPC (e.g. stream::enable::balance).

We can keep events turned off unless there is at least one client listening to them. So, when mm2 gets a client on mm2/event-stream?filter=NETWORK, if it's the first client, it means the NETWORK event was off but will now be turned on because mm2 received a client.

Contrary to the note in the previous point (client gets no events by default), some special events are now delivered to the client even if they never requested/filtered them (this happens using broadcast_all). example of these events are DATA_NEEDED:datatype, and HEARTBEAT should be similar later as well.

I suggest not doing this. It adds unnecessary overhead for people who will never need/use these events, especially those who want to run mm2 as a daemon.

onur-ozkan avatar Oct 01 '24 07:10 onur-ozkan

No more filters are required in the initial SSE setup with MM2. (e.g. mm2.com/event-stream?filter=NETWORK,BALANCE,DATA_NEEDED:datatype -> mm2.com/event-stream?id=1), but a client id is now required (a random u64 identifier for the client) (e.g. mm2.com/event-stream?id=799384531)

Why are we using randomly generated numerical IDs as the previous ones (like COIN_BALANCE, NETWORK, etc) were much better in terms of development and user experiences ? I assume in order to listen some events from MM2, people need to find out these IDs from some RPCs?

These are ids for the connected clients and not an identification for the streamer. streamers are identified using streamer ids which are strings (e.g. NETWORK, BALANCE:BTC, FEE_ESTIMATION:ETH, etc...). These client ids are required in the initial SSE connection to identify the client. The client can then use that id to identify itself when querying the streamer enablers/disablers through RPC.

We can keep events turned off unless there is at least one client listening to them. So, when mm2 gets a client on mm2/event-stream?filter=NETWORK, if it's the first client, it means the NETWORK event was off but will now be turned on because mm2 received a client.

Yup, this is the case here.

Contrary to the note in the previous point (client gets no events by default), some special events are now delivered to the client even if they never requested/filtered them (this happens using broadcast_all). example of these events are DATA_NEEDED:datatype, and HEARTBEAT should be similar later as well.

I suggest not doing this. It adds unnecessary overhead for people who will never need/use these events, especially those who want to run mm2 as a daemon.

data needed won't really be a big overhead and will only be used for necessary stuff and it isn't periodic as well so u might just get 5 msgs a day or something. For the heartbeat that might be the case, but again, the heartbeat msg is very small.

i didn't get the daemon point here, but for clarity, we have no overhead if no client is connected through SSE (no client made the initial SSE connection).

mariocynicys avatar Oct 01 '24 07:10 mariocynicys

data needed won't really be a big overhead and will only be used for necessary stuff and it isn't periodic as well so u might just get 5 msgs a day or something. For the heartbeat that might be the case, but again, the heartbeat msg is very small.

If user isn't interested, there is no point to send it even if the overhead is super small IMO.

i didn't get the daemon point here, but for clarity, we have no overhead if no client is connected through SSE (no client made the initial SSE connection).

If that's the case sure, but isn't "some special events are now delivered to the client even if they never requested/filtered them (this happens using broadcast_all)." mean opposite of that case?

onur-ozkan avatar Oct 01 '24 07:10 onur-ozkan

These are ids for the connected clients and not an identification for the streamer. streamers are identified using streamer ids which are strings (e.g. NETWORK, BALANCE:BTC, FEE_ESTIMATION:ETH, etc...). These client ids are required in the initial SSE connection to identify the client. The client can then use that id to identify itself when querying the streamer enablers/disablers through RPC.

It sounds complicated than it should be I guess. I will check the implementation to understand it better before making suggestion on this.

onur-ozkan avatar Oct 01 '24 08:10 onur-ozkan

If that's the case sure, but isn't "some special events are now delivered to the client even if they never requested/filtered them (this happens using broadcast_all)." mean opposite of that case?

The client still needs to do an initial SSE connection to accept events from mm2. If the client never does that, mm2 will not have a backward connection to the client to deliver any events. What's meant in this point is when a client did the initial SSE connection (and did or didn't subscribe to bunch of streamers), such a client will receive these mandatory events.

mariocynicys avatar Oct 01 '24 08:10 mariocynicys

I tried to use this PR and it appears to work okay. This is great work. I am about to approve it, to move forward and let GUI play with it. There are few notes about I'd like to see resolved before (mandatory HEARBEATs)

I'd like to leave a note though, how IMO the event streaming could be simplified:

  • there are several dedicated rpcs for each event type to enable it - these rpcs could be converted to single or even eliminated,
  • GUI needs to generate unique client-id to receive events, this is not GUI friendly and need to sync between several GUI processes - client-id could be KDF created or even eliminated,
  • task manager rpcs become too heavily integrated with event streaming: GUI needs to send client-id in each task init rpc and the task manager tackles it - I think client-id could be eliminated from the task API.

I think , these problems come from use of one-directional Server-Side Events (SSE) protocol. SSE is a web protocol but we use it for native configuration. We have an example of event delivering as eth_subscribe call. It works over web sockets which, as two-directional, can send both control and data over same connection (no need for dedicated rpcs and client-ids). I think we could do this similarly, to simplify rpcs and internal architecture: GUI opens a ws connection and sends a json to subscribe to events (using known names like 'TASK', 'BALANCE' etc.). We don't need client-id anymore: API already knowns which connection expects which events. In any time client (GUI) can subscribe to new events or unsubscribe from others. We need a streaming manager serving as an event bus, which:

  • receives events from all event sources in code (task manager, polling loops)
  • maintains client subscriptions: when a client subscribes on some events the streaming manager keeps those events in a list for this client and filters and sends only the subscribed events to the client (over the same channel that used now).

dimxy avatar Nov 22 '24 11:11 dimxy

@mariocynicys could you fix conflicts please

laruh avatar Nov 26 '24 03:11 laruh