SubscribeToStreamAsync ignores LimitRetriesForOperationTo and does not support cancellation
Describe the bug Suppose a simple usecase where you need to accept HTTP request, subscribe to event, and send HTTP response when an event occurs:
public async Task<ActionResult> ControllerMethod() {
var tcs = new TaskCompletionSource<T>();
using (await _eventStoreConnection.SubscribeToStreamAsync(streamName, true, (_, @event) => {
tcs.TrySetResult(@event);
}, (_, __, ex) => {
if (ex != null) tcs.TrySetException(ex);
})) {
return Ok(await tcs.Task);
}
}
Now, if EventStore will temporarily go down, the default 10 reconnection retries will run out, and the connection will become unusable, given it is registered as a singleton. The solution here is to specify KeepRetrying when constructing ConnectionSettings, but now another problem arises - SubscribeToStreamAsync ignores LimitRetriesForOperationTo, when the target EventStore node experiences outage. So the client request will hang until the connection is restored. Even more, the client might timeout and decide to abort the request and retry. We must handle that case and have CancellationToken passed all the way down from the controller, but SubscribeToStreamAsync does not provide the ability for cooperative cancellation!
public async Task<ActionResult> ControllerMethod(CancellationToken token) {
var tcs = new TaskCompletionSource<T>();
using (cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken)))
using (await _eventStoreConnection.SubscribeToStreamAsync(streamName, true, (_, @event) => {
tcs.TrySetResult(@event);
}, (_, __, ex) => {
if (ex != null) tcs.TrySetException(ex);
}/* No parameter for cancellation token! */)) {
return Ok(await tcs.Task);
}
}
Is this intended, and if so, how do you recommend implementing fault-tolerant subscription, that timeouts in case of node failure?
Code to reproduce
var s = ConnectionSettings.Create()
.KeepReconnecting()
.Build();
// There is no EventStore on localhost:2305
var conn = EventStoreConnection.Create(s, new Uri("tcp://localhost:2305"));
await conn.ConnectAsync();
// We hang forever here
await conn.SubscribeToStreamAsync("stream", true, (subscription, @event) =>
{
return Task.CompletedTask;
});
Expected behavior
SubscribeToStreamAsync throws ConnectionClosedException after operation retry limit exceeded, and provides CancellationToken argument to cancel operation
Actual behavior
SubscribeToStreamAsync hangs forever in case of EventStore outage, and can not be cancelled from outside
Config/Logs/Screenshots This batch of log entries appears in the log in an endless loop
[05,17:39:36.627,DEBUG] TcpPackageConnection: connection to [127.0.0.1:2305, L, {5a260518-b1e6-4aea-abe4-931640db370d}] failed. Error: ConnectionRefused.
[05,17:39:36.627,DEBUG] EventStoreConnection 'ES-4c64a066-6c33-4664-87d9-23a5ac61dc67': enqueueing message EventStore.ClientAPI.Internal.TcpConnectionClosedMessage..
[05,17:39:36.627,DEBUG] EventStoreConnection 'ES-4c64a066-6c33-4664-87d9-23a5ac61dc67': TCP connection to [127.0.0.1:2305, L, {5a260518-b1e6-4aea-abe4-931640db370d}] closed..
[05,17:39:36.827,DEBUG] EventStoreConnection 'ES-4c64a066-6c33-4664-87d9-23a5ac61dc67': TimerTick checking reconnection....
[05,17:39:36.827,DEBUG] EventStoreConnection 'ES-4c64a066-6c33-4664-87d9-23a5ac61dc67': DiscoverEndPoint.
[06,17:39:36.827,DEBUG] EventStoreConnection 'ES-4c64a066-6c33-4664-87d9-23a5ac61dc67': enqueueing message EventStore.ClientAPI.Internal.EstablishTcpConnectionMessage..
[05,17:39:36.827,DEBUG] EventStoreConnection 'ES-4c64a066-6c33-4664-87d9-23a5ac61dc67': EstablishTcpConnection to [127.0.0.1:2305].
EventStore details
-
EventStore server version: 4.1.0
-
Operating system: Ubuntu 18.04
-
EventStore client version: 5.0.1