flowtide icon indicating copy to clipboard operation
flowtide copied to clipboard

Easier watermark publishing

Open Ulimo opened this issue 1 year ago • 0 comments

Right now each sink has its own implementation on how to notify the user of a new watermark.

This makes it hard to standardize handling watermark updates, for instance creating a service where one can check if an update has been replicated to the destination of the stream.

Example story, frontend wants to display status of the replication to destination:

  • frontend sends request to API
  • API updates row 3, operation id 321, returns operationid 321 to frontend.
  • Stream handles operation 321, sink updates destination and get watermark 321 for the source.
  • Frontend can now check status if the operation has been handled in destination system and inform the end user.

This could for example be when replicating data into an authorization system such as SpiceDB, Permify, OpenFGA etc, when the frontend wants confirmation.

Suggestion

Create a new interface IWatermarkPublisher with:

public interface IWatermarkPublisher
{
  Task Initialize(string streamName, ILogger logger);

  Task Publish(NamedTable sink, Watermark watermark, Watermark? destinationWatermark); 
}

EgressVertex gets a new protected method PublishWatermark(NamedTable sink, Watermark watermark, Watermark? destinationWatermark = default) It is up to the different sinks to call this method when data has been written correctly into the destination.

Destination watermark is optional and is only for the sinks that can actually supply that information.

operators such as SimpleGroupedWriteOperator that handles keeping track of when data should be sent and when it is completed can automatically call PublishWatermark.

It should also be added to dependency injection project:

builder.Services.AddFlowtideStream("name")
  ...
  .AddWatermarkPublisher(new MyWatermarkPublisher());

Multiple publishers can be added with the same call:

builder.Services.AddFlowtideStream("name")
  ...
  .AddWatermarkPublisher(new MyWatermarkPublisher1());
  .AddWatermarkPublisher(new MyWatermarkPublisher2());

This can then be extended with AddRedisWatermarkPublisher etc in the future.

It should also be possible to give PublisherFailureOptions on how the stream should handle publishing failures, examples:

  • RestartFromCheckpoint - the stream will get a failure and restart from last checkpoint
  • RetrySynchronous - Retries until it gets a success and halts the stream
  • RetryAsynchronous - retries in a background thread until success but allows the stream to continue.

Ulimo avatar Nov 12 '24 12:11 Ulimo