evitaDB icon indicating copy to clipboard operation
evitaDB copied to clipboard

Change Data Capture support

Open novoj opened this issue 2 years ago • 4 comments

In order to keep the remote clients in sync, we need to introduce some kind of change data collection (i.e. ability to stream information about changes in evitaDB servers to all interested clients). The same principle will one day be used to synchronize changes between master and replicas.

Everything starts with a ...

Client request to watch changes:.

The client will have to explicitly state what operations it wants to monitor and in what scope. The request must be made at the beginning of the monitoring stream, but can be extended at any time. A single client can make multiple requests for the same stream of changes. The request has the following format

  • scope:
    • area: one of the values:
      • system: allows monitoring of top-level changes such as catalog creation, deletion, renaming, etc.
      • catalog_schema: allows you to monitor changes to the catalog schema, or entity collection creation, deletion, renaming, and all entity schema changes in a given catalog.
      • entity_schema: allows to observe the changes of the entity schema in a given catalog and entity type
      • data: allows to monitor changes in the data
    • site: identification of the observed site
      • catalog_name: name of the catalog to monitor, wildcard any should be supported
      • entity_type: Name of the entity type to monitor, wildcard any should be supported.
      • entity_primary_key: primary key of the entity (one or more) to be monitored
      • operation: name of the mutation (one or more) to be observed - there must also be support for "wildcard" operations - like any operation, any operation with attributes, and so on.
    • content: one of the values
      • header: only the information that the change "occurred" is sent along with the new version information (schema/entity version)
      • body: the entire change is sent with all the data involved - only the new values are sent
    • since: (optional - if not sent, it is assumed to be since request creation)
      • version: the version of the schema/entity since when the changes should be sent
      • transaction_id: the last transaction id since when the changes should be sent

The request is confirmed by the server and assigned a unique ID. The client can cancel the request at any time using the assigned ID. The client can also cancel all its requests. If the connection to the server is interrupted - connection is lost, all requests are immediately dropped and the client must initiate all of them again when the connection is re-established.

In order for the client to be able to follow the previous communication, it could use the since part, where it can specify which version is the last one it knows about, and the server will immediately send all changes since then. If the history is too long and the server doesn't have the information yet (the WAL has been purged), the exception will be thrown and the client will have to rebuild the information from scratch.

Server sends changes that match the filter:.

The server maintains a list of filters per open stream to the client, and sends all changes (or just the information about their existence) that match the filter defined by a scope/site combination to a particular client. If the since part is provided and the version doesn't match the current version, the server must scan the WAL to find the particular transaction_id and scan all changes since that moment to find and replay all changes for the particular client. If the last known transaction_id in the WAL is greater than the requested one, an error is sent and the client must rebuild all its data from scratch.

Maintenance events:.

The server will periodically send the last observed transaction_id to all clients, even if no monitored change has occurred. This allows to limit the amount of data to be scanned in case the connection is lost and it would have to be reinstantiated. The client must update its internal last seen transaction_id.

novoj avatar Jul 17 '23 07:07 novoj

@lukashornych @Khertys we need to analyze the current technical possibilities of the underlying protocols - i.e. gRPC streams, GraphQL WebSockets and REST possibilities. Beside WebSockets there is also older specification of SSE. We need to consider all the possibilities.

novoj avatar Jul 17 '23 07:07 novoj

Summary for GQL/REST after some discussion:

  • we would utilize WebSockets as these are well supported in GQL libraries and are fairly easy to implement (Undertow has its own tools for it as well)
  • the area parameter would be split into separate onProductChange subscriptions to fit the schema design language of queries and mutations and to narrow needed parameters (the site) and structure of returned body
  • the client wouldn't use request IDs, these would be used only on server if WebSockets allow storing some isolated metadata in WebSocket connection, otherwise we would need to use the filters to identify GQL subscriptions
  • the maintenance event data would probably send in the headers without sending actual body with data
  • we will need to implement conversions of evitaDB mutations from Java to JSON (currently, we have converters on from JSON to Java)
    • this would be good opportunity to refactor current implementation of converters and automate them as most of them are simple (they usually just look at descriptors and copy their structure, this could be automated) with exception for more complex converters which could be registered somewhere and some lookup process would control the conversion process

Things to analyze more:

  • does more subscriptions mean more WebSockets physical connections, or there is one connection and multiple channels inside?

lukashornych avatar Jul 19 '23 10:07 lukashornych

As we discussed, I've pushed changes to the CDC mechanism. The new way is prepared only for SYSTEM part of the CDC. When our prototype works I'll extend it to the data capture part of the API. What has changed:

  • newly we use Java Fluent API
  • newly there should be only single subscriber per the the client
  • the single subscriber handles multiple "CDC requests" which could be extended or limited in runtime
  • the modus operandi should be:
    • client subscribes itself with first request
    • client accepts onSubscribed and calls request with number of messages it wants to receive
    • client might any time call extendSubscription or limitSubscription for adding or removing existing CDC request by its UUID that is assigned by the client itself
    • client receives all different events in the single stream - the client could distinguish events by the UUID of the request that "triggers" them
    • client might cancel the subscription, or the subscription is automatically canceled when requested count of event is sent by the server
    • I documented all in JavaDoc

@khertys look for the TODO TPO for key changes on your side.

novoj avatar Aug 11 '23 14:08 novoj

I've managed to implement GraphQL subscriptions. System captures are working fully, catalog data captures are being properly called but we don't have evitaDB support, so no testing so far, and catalog schema captures have some kind of problem, where GQL is not being properly called and I will need to look into it.

Otherwise this is still need to be don by me:

  • [ ] mutation converter tests (Java -> JSON)
  • [ ] aggregator converters lookup optimization
  • [ ] REST implemention - we have only mutation converters, we need WebSocket subprotocol and endpoints
  • [ ] gRPC API needs to be little bit refactored so it is not so messy after implementation of the common gRPC lib

lukashornych avatar Oct 06 '23 13:10 lukashornych