ADO.NET Streaming Provider
This PR adds an ADO.NET Streaming provider.
This provider is intended to be a first-class sibling to the other ADO.NET providers such as Clustering, Persistence and Reminders.
Big Note
Only the Microsoft SQL Server queries are defined as of now. This is causing some provider tests to fail as they expect all RDBMS scripts to be available. My personal experience lies in SQL Server far more than others so any help with them is welcome.
Update: MariaDB/MySQL queries now defined.
Update: PostegreSQL queries now defined.
Approach
The idea here is to have "Azure Queue" like behaviour on top of a database and little else. This provider does not attempt, nor is it capable of, attaining Eventhub-like performance or capability. It does attempt to attain the best possible performance one can have in a database by using regular objects by default and some custom sql magic.
The easy:
- This design is going for queue-like behaviour, not rewindable-hub-like behaviour. This means message archiving and point-in-time-subscriptions need not be supported.
The not so easy:
- Concurrent batch queries against the same table are prone to deadlocks. These cause error noise and severe delays, which can hinder the functionality of the provider to the point of uselessness. To avert this occurrence, resource locks (for rows in this context) must be acquired in the same order across all queries without exception. While this is easy to induce under the correct conditions, and this provider creates those conditions, such conditions are brittle to change. User interference with the queries may make them vulnerable to deadlocks again.
- The alternative to the above is to acquire and handle only one resource at a time. This has the obvious downturn in performance, but is not too unlike what a cloud work queue does either. This behaviour can be induced by the user by reducing the batch size to 1, though ideally any user changes to the queries will keep the need for consistent ordering in mind.
- Another alternative is to update the queries to always use table locks. Again, less perf but guaranteed safety.
- A shared database is an expensive resource and must be handled with care to avoid accidental stampedes. Therefore, although the provider fully supports the concept of multiple "queues" (akin to the azure provider, which in turn creates more pulling agents, one per queue), the default partition count is set to 1 for safety. The user is free to raise this number as desired and it will have the desired effect.
Design Overview
- The lower layer maintains three tables:
OrleansStreamMessage,OrleansStreamDeadLetterandOrleansStreamControl. -
OrleansStreamMessageholds all transient messages. New messages are added to this table upon "queueing", marked appropriately upon "dequeuing" and deleted upon successful confirmation. -
OrleansStreamDeadLetteris where undelivered messages are evicted to, either through normal expiry or upon delivery failure. -
OrleansStreamControlhelps maintain the eviction schedule so it remains independent of cluster size. - The integration layer is by and large copied from the Azure Queue provider.
- The lower layer also attempts to mimic a standard cloud queue albeit with restrictions of an RDBMS.
- As with the other providers, this provider supports data segregation via mapping
ClusterOptions.ServiceIdto aServiceIdcolumn. This allows multiple deployments to share the same database without overlapping. - The underlying tables support multiple named adonet providers sharing the same database via mapping the provider name to a
ProviderIdcolumn. - In addition, the provider supports the use of multiple "queues" (therefore multiple pulling agents), via the automatic mapping of the underlying Orleans
QueueIdto an appropriateQueueIdcolumn.
Implementation Details
Tables:
OrleansStreamMessage
CREATE TABLE [OrleansStreamMessage]
(
[ServiceId] NVARCHAR(150) NOT NULL,
[ProviderId] NVARCHAR(150) NOT NULL,
[QueueId] NVARCHAR(150) NOT NULL,
[MessageId] BIGINT NOT NULL,
[Dequeued] INT NOT NULL,
[VisibleOn] DATETIME2(7) NOT NULL,
[ExpiresOn] DATETIME2(7) NOT NULL,
[CreatedOn] DATETIME2(7) NOT NULL,
[ModifiedOn] DATETIME2(7) NOT NULL,
[Payload] VARBINARY(MAX) NULL,
CONSTRAINT [PK_OrleansStreamMessage] PRIMARY KEY CLUSTERED
(
[ServiceId] ASC,
[ProviderId] ASC,
[QueueId] ASC,
[MessageId] ASC
)
);
This table holds all the messages waiting to be delivered.
- The unique identifier of a message is
(ServiceId, ProviderId, QueueId, MessageId). - Due to RDMBS's lack of a partitioned sequence feature, the
MessageIdis implemented as a regular sequence. This technically makes the identifier above a non-candidate super-key (an undesirable anti-pattern) and allows for gaps inMessageIdin each(ServiceId, ProviderId, QueueId)group. However, as the existence of gaps does not impact the implementation, there is no harm to this effect. - The table is clustered by the unique identifier to:
- Allow quick identification and deletion of a message upon confirmation.
- Cheapen deadlock free batch queries by allowing them to lock rows in a well-known already-persisted order.
-
Dequeuedcontains the number of times the message was dequeued. This also doubles a message pop receipt for confirmation/deletion. The deletion request is only successful if the caller is passing the same number it received. -
VisibleOnis updated upon dequeing, to the time in the future at which the message will become dequeuable again.- When
Dequeuedreaches theMaxAttemptsoption andVisibleOnis also reached then the message is considered undequeuable and becomes eligible for eviction to dead letters.
- When
-
ExpiresOnis set upon initial queueing to the time in the future upon which the message will no longer be dequeueable, regardless ofDequeuedcount. Messages whereExpiredOnis reached andVisibleOnis also reached, regardless ofDequeuedbecome eligible for eviction to dead letters. -
CreatedOnandModifiedOnare for troubleshooting only and have no impact on logic. -
Payloadholds the binary serialized Orleans message container.
Important: To avoid deadlocks from concurrent batch queries against this table, all such queries are induced to lock data rows in the same order, and that order is the same order of the clustered index above.
OrleansStreamDeadLetter
CREATE TABLE [OrleansStreamDeadLetter]
(
[ServiceId] NVARCHAR(150) NOT NULL,
[ProviderId] NVARCHAR(150) NOT NULL,
[QueueId] NVARCHAR(150) NOT NULL,
[MessageId] BIGINT NOT NULL,
[Dequeued] INT NOT NULL,
[VisibleOn] DATETIME2(7) NOT NULL,
[ExpiresOn] DATETIME2(7) NOT NULL,
[CreatedOn] DATETIME2(7) NOT NULL,
[ModifiedOn] DATETIME2(7) NOT NULL,
[DeadOn] DATETIME2(7) NOT NULL,
[RemoveOn] DATETIME2(7) NOT NULL,
[Payload] VARBINARY(MAX) NULL,
CONSTRAINT [PK_OrleansStreamDeadLetter] PRIMARY KEY CLUSTERED
(
[ServiceId] ASC,
[ProviderId] ASC,
[QueueId] ASC,
[MessageId] ASC
)
);
This table holds messages that failed to be successfully delivered, including:
- Maximum number of attempts reached.
- Expiration time reached.
Eviction happens in two occasions:
- Single message eviction from
OrleansStreamMessagetoOrleansStreamDeadLetteris attempted by the stream failure component on a case-by-case basis. - Batch message eviction of any leftovers is opportunistic and performed in a timely fashion by the "dequeueing" query at regular intervals.
Columns are copied from the original row in OrleansStreamMessage as-is with two exceptions:
-
DeadOnholds the time at which the message was evicted to dead letters. -
RemoveOnholds the time in the future at which the message will be deleted from the dead letters table itself.
OrleansStreamControl
CREATE TABLE [OrleansStreamControl]
(
[ServiceId] NVARCHAR(150) NOT NULL,
[ProviderId] NVARCHAR(150) NOT NULL,
[QueueId] NVARCHAR(150) NOT NULL,
[EvictOn] DATETIME2(7) NOT NULL,
CONSTRAINT [PK_OrleansStreamControl] PRIMARY KEY CLUSTERED
(
[ServiceId] ASC,
[ProviderId] ASC,
[QueueId] ASC
)
);
This table is designed to hold synchronization variables at queue level to help ensure any opportunistic scheduled task remains stable, regardless of cluster size. For now, the only such task is message eviction.
EvictOn: The time in the future after which the next opportunistic eviction task will run.
At runtime, the dequeuing queries will check this table, and, when EvictOn is reached, will attempt to win a race to update the old value to the next schedule in the future. The query that wins that race also gets to run the eviction query. Hence, the process is opportunistic, and guaranteed to run eventually, without the need for extra background workers.
Queries / Stored Procedures
QueueStreamMessage
CREATE PROCEDURE [QueueStreamMessage]
@ServiceId NVARCHAR(150),
@ProviderId NVARCHAR(150),
@QueueId NVARCHAR(150),
@Payload VARBINARY(MAX),
@ExpiryTimeout INT
This query adds a new message to the OrleansStreamMessage table with the following behaviour:
-
Dequeuedis set to zero. -
VisibleOn,CreatedOnandModifiedOnare set to "now". -
ExpiresOnis set to "now" +@ExpiryTimeout(seconds).
GetStreamMessages
CREATE PROCEDURE [GetStreamMessages]
@ServiceId NVARCHAR(150),
@ProviderId NVARCHAR(150),
@QueueId NVARCHAR(150),
@MaxCount INT,
@MaxAttempts INT,
@VisibilityTimeout INT,
@RemovalTimeout INT,
@EvictionInterval INT,
@EvictionBatchSize INT
This query performs a number of steps:
- Performs opportunistic eviction as necessary.
- Gets a batch of messages from
OrleansStreamMessagewhere:-
Dequeuedis underMaxAttempts(options). -
VisibleOnhas been reached. -
ExpiresOnhas not been reached.
-
- Marks the batch:
- Increases
Dequeuedby 1. - Sets
VisibleOnto now +@VisibilityTimeout(seconds) - Sets
ModifiedOnto now.
- Increases
ConfirmStreamMessages
CREATE PROCEDURE [ConfirmStreamMessages]
@ServiceId NVARCHAR(150),
@ProviderId NVARCHAR(150),
@QueueId NVARCHAR(150),
@Items NVARCHAR(MAX)
This query deletes specific messages from the OrleansStreamMessage.
A list of messages and their pop receipts (taken from the Dequeued column at the time of dequeueing) is passed in @Items in the form 1:2|3:4|5:6, where the first number is the message identifier and the second number is the dequeue count.
Row deletion only occurs if both numbers match in the table. Otherwise, the row is assumed to have been dequeued again and therefore left alone.
FailStreamMessage
CREATE PROCEDURE [FailStreamMessage]
@ServiceId NVARCHAR(150),
@ProviderId NVARCHAR(150),
@QueueId NVARCHAR(150),
@MessageId BIGINT,
@MaxAttempts INT,
@RemovalTimeout INT
This query applies failure logic to a single message in OrleansStreamMessage. It is called by the stream failure handler component upon message or subscription failure. The logic is:
- If the message dequeue counter has not reached
MaxAttempts(options) then the message is made visible again. - Otherwise the message is moved immediately to dead letters.
EvictStreamMessages
CREATE PROCEDURE [EvictStreamMessages]
@ServiceId NVARCHAR(150),
@ProviderId NVARCHAR(150),
@QueueId NVARCHAR(150),
@BatchSize INT,
@MaxAttempts INT,
@RemovalTimeout INT
This query performs opportunistic eviction of a batch of messages from OrleansStreamMessage to OrleansStreamDeadLetter if the eviction policy applies to them.
This query is called by GetStreamMessages at regular intervals.
EvictStreamDeadLetters
CREATE PROCEDURE [EvictStreamDeadLetters]
@ServiceId NVARCHAR(150),
@ProviderId NVARCHAR(150),
@QueueId NVARCHAR(150),
@BatchSize INT
This query performs opportunistic removal of dead letters from OrleansStreamDeadLetter.
This query is called by GetStreamMessages at regular intervals.
Integration Artefacts
Middleware streaming artefacts are by and large copied from the Azure Queue and SQS implementations.
-
AdoNetBatchContainer: The adonet flavour ofIBatchContainer, same as other ones. -
AdoNetQueueAdapter: The adonetIQueueAdapterimplementation, which simply forwards requests to the RDBMS queries. -
AdoNetQueueAdapterFactory: The adonet factory ofIQueueAdapterinstances. Due to the lack async lifetime, this class has some logic to avoid creating more than one instance of the relational queries object. -
AdoNetQueueAdapterReceiver: The adonet implementation ofIQueueAdapterReceiver, which simply forwards requests to the RDBMS queries. -
AdoNetStreamFailureHandler: The adonet implementation ofIStreamFailureHandler, which forwards failure notifications to theFailStreamMessagequery. -
AdoNetStreamQueueMapper: Maps OrleansStreamIdandQueueIdvalues to the appropriateQueueIdcolumn in the message table.
Benchmarks (SQL Server)
To be updated as tweaks are made.
QueueStreamMessage
Message "queueing" translates to inserting a new row at the end of a clustered table. This approaches an O(1) operation in principle, however with the expected inefficiencies:
- Latency correlated to payload size.
- Concurrent operations competing for writing into the transaction log.
The "queue id" abstraction appears to make no difference, given 2).
BenchmarkDotNet v0.13.12, Windows 11 (10.0.22631.3593/23H2/2023Update/SunValley3)
12th Gen Intel Core i5-1240P, 1 CPU, 16 logical and 12 physical cores
.NET SDK 8.0.205
[Host] : .NET 8.0.5 (8.0.524.21615), X64 RyuJIT AVX2
Job=InProcess Toolchain=InProcessEmitToolchain InvocationCount=1
IterationCount=3 UnrollFactor=1 WarmupCount=1
| Method | QueueCount | PayloadSize | Concurrency | Mean | Error | StdDev |
|---|---|---|---|---|---|---|
| QueueStreamMessage | 1 | 1000 | 1 | 2.427 ms | 11.8353 ms | 0.6487 ms |
| QueueStreamMessage | 1 | 1000 | 4 | 3.631 ms | 4.0146 ms | 0.2201 ms |
| QueueStreamMessage | 1 | 1000 | 8 | 5.039 ms | 8.1941 ms | 0.4491 ms |
| QueueStreamMessage | 1 | 10000 | 1 | 2.546 ms | 10.7113 ms | 0.5871 ms |
| QueueStreamMessage | 1 | 10000 | 4 | 4.217 ms | 7.0977 ms | 0.3890 ms |
| QueueStreamMessage | 1 | 10000 | 8 | 5.857 ms | 3.1351 ms | 0.1718 ms |
| QueueStreamMessage | 1 | 100000 | 1 | 4.417 ms | 13.0434 ms | 0.7150 ms |
| QueueStreamMessage | 1 | 100000 | 4 | 9.071 ms | 0.6015 ms | 0.0330 ms |
| QueueStreamMessage | 1 | 100000 | 8 | 14.487 ms | 7.7452 ms | 0.4245 ms |
| QueueStreamMessage | 4 | 1000 | 1 | 2.241 ms | 9.5473 ms | 0.5233 ms |
| QueueStreamMessage | 4 | 1000 | 4 | 3.536 ms | 10.6093 ms | 0.5815 ms |
| QueueStreamMessage | 4 | 1000 | 8 | 4.871 ms | 6.6343 ms | 0.3636 ms |
| QueueStreamMessage | 4 | 10000 | 1 | 2.383 ms | 5.7473 ms | 0.3150 ms |
| QueueStreamMessage | 4 | 10000 | 4 | 4.093 ms | 7.9349 ms | 0.4349 ms |
| QueueStreamMessage | 4 | 10000 | 8 | 5.884 ms | 5.1694 ms | 0.2834 ms |
| QueueStreamMessage | 4 | 100000 | 1 | 4.871 ms | 9.6464 ms | 0.5288 ms |
| QueueStreamMessage | 4 | 100000 | 4 | 9.088 ms | 1.0694 ms | 0.0586 ms |
| QueueStreamMessage | 4 | 100000 | 8 | 14.302 ms | 2.6907 ms | 0.1475 ms |
| QueueStreamMessage | 8 | 1000 | 1 | 2.343 ms | 10.0013 ms | 0.5482 ms |
| QueueStreamMessage | 8 | 1000 | 4 | 3.597 ms | 6.4206 ms | 0.3519 ms |
| QueueStreamMessage | 8 | 1000 | 8 | 4.999 ms | 6.3607 ms | 0.3487 ms |
| QueueStreamMessage | 8 | 10000 | 1 | 2.472 ms | 8.3953 ms | 0.4602 ms |
| QueueStreamMessage | 8 | 10000 | 4 | 4.517 ms | 11.1712 ms | 0.6123 ms |
| QueueStreamMessage | 8 | 10000 | 8 | 5.934 ms | 6.7816 ms | 0.3717 ms |
| QueueStreamMessage | 8 | 100000 | 1 | 4.611 ms | 9.1420 ms | 0.5011 ms |
| QueueStreamMessage | 8 | 100000 | 4 | 8.902 ms | 1.9354 ms | 0.1061 ms |
| QueueStreamMessage | 8 | 100000 | 8 | 14.486 ms | 5.8374 ms | 0.3200 ms |
GetStreamMessages
"Dequeing" messages is a multi-step process in a SQL query plan:
- Identify the correct leaf list given the (ServiceId, ProviderId, QueueId) tuple in the clustered table (In a binary tree, this approaches O(log(K)) where K is the number of distinct keys)
- From the linked list identified by 1), scan through the list, skipping non-visible rows, until enough rows are identified, marking rows non-visible as they are found, up to the batch size.
The cost of 2) is comprised of O(S) where S is the number of skipped rows, plus O(B) where B is the batch size. At best case, the cost is only O(B) when no rows are skipped. At worst case O(S) = O(N) where N is the size of leaf list itself (or the table itself if only one queue is used).
In a healthy scenario, where B rows are marked and then deleted in a stable loop, this implementation will approach the best case of O(log(K)) + O(B).
In an unhealthy scenario, where rows are marked but not deleted, this implementation will march towards O(N) where N is the count of rows for the given tuple.
Given the effect above, having multiple "queue id" values may help improve dequeuing performance in a cluster demanding high throughput regardless of poison pills or consumer stability, by partitioning the leaf lists and reducing overhead from skipped rows.
Note that QueueCount also means Concurrency in the dequeuing benchmark below.
BenchmarkDotNet v0.13.12, Windows 11 (10.0.22631.3593/23H2/2023Update/SunValley3)
12th Gen Intel Core i5-1240P, 1 CPU, 16 logical and 12 physical cores
.NET SDK 8.0.205
[Host] : .NET 8.0.5 (8.0.524.21615), X64 RyuJIT AVX2
Job-FHPUWH : .NET 8.0.5 (8.0.524.21615), X64 RyuJIT AVX2
InvocationCount=1 IterationCount=3 UnrollFactor=1
WarmupCount=1
| Method | QueueCount | PayloadSize | BatchSize | FullnessRatio | Mean | Error | StdDev |
|---|---|---|---|---|---|---|---|
| GetStreamMessages | 1 | 10000 | 1 | 0 | 879.2 μs | 1,463.9 μs | 80.24 μs |
| GetStreamMessages | 1 | 10000 | 1 | 0.5 | 1,903.1 μs | 4,762.8 μs | 261.07 μs |
| GetStreamMessages | 1 | 10000 | 1 | 1 | 2,931.8 μs | 11,783.7 μs | 645.91 μs |
| GetStreamMessages | 1 | 10000 | 16 | 0 | 1,169.5 μs | 5,274.0 μs | 289.09 μs |
| GetStreamMessages | 1 | 10000 | 16 | 0.5 | 7,068.0 μs | 5,357.2 μs | 293.65 μs |
| GetStreamMessages | 1 | 10000 | 16 | 1 | 10,318.7 μs | 1,287.4 μs | 70.57 μs |
| GetStreamMessages | 1 | 10000 | 32 | 0 | 755.6 μs | 407.3 μs | 22.33 μs |
| GetStreamMessages | 1 | 10000 | 32 | 0.5 | 11,937.8 μs | 1,197.6 μs | 65.64 μs |
| GetStreamMessages | 1 | 10000 | 32 | 1 | 18,236.6 μs | 6,146.3 μs | 336.90 μs |
| GetStreamMessages | 4 | 10000 | 1 | 0 | 1,584.6 μs | 7,606.1 μs | 416.91 μs |
| GetStreamMessages | 4 | 10000 | 1 | 0.5 | 3,019.9 μs | 4,734.3 μs | 259.50 μs |
| GetStreamMessages | 4 | 10000 | 1 | 1 | 4,535.6 μs | 6,575.8 μs | 360.44 μs |
| GetStreamMessages | 4 | 10000 | 16 | 0 | 1,299.5 μs | 413.8 μs | 22.68 μs |
| GetStreamMessages | 4 | 10000 | 16 | 0.5 | 10,913.4 μs | 7,305.4 μs | 400.43 μs |
| GetStreamMessages | 4 | 10000 | 16 | 1 | 16,899.9 μs | 1,671.9 μs | 91.64 μs |
| GetStreamMessages | 4 | 10000 | 32 | 0 | 1,665.1 μs | 6,011.5 μs | 329.51 μs |
| GetStreamMessages | 4 | 10000 | 32 | 0.5 | 18,987.9 μs | 1,642.4 μs | 90.03 μs |
| GetStreamMessages | 4 | 10000 | 32 | 1 | 29,302.9 μs | 4,583.8 μs | 251.25 μs |
| GetStreamMessages | 8 | 10000 | 1 | 0 | 2,044.6 μs | 3,070.2 μs | 168.29 μs |
| GetStreamMessages | 8 | 10000 | 1 | 0.5 | 4,284.7 μs | 4,326.1 μs | 237.13 μs |
| GetStreamMessages | 8 | 10000 | 1 | 1 | 6,006.3 μs | 3,535.4 μs | 193.79 μs |
| GetStreamMessages | 8 | 10000 | 16 | 0 | 2,134.3 μs | 2,776.8 μs | 152.20 μs |
| GetStreamMessages | 8 | 10000 | 16 | 0.5 | 16,383.5 μs | 6,648.7 μs | 364.44 μs |
| GetStreamMessages | 8 | 10000 | 16 | 1 | 24,770.0 μs | 15,440.0 μs | 846.32 μs |
| GetStreamMessages | 8 | 10000 | 32 | 0 | 2,135.7 μs | 2,950.9 μs | 161.75 μs |
| GetStreamMessages | 8 | 10000 | 32 | 0.5 | 28,778.3 μs | 11,489.8 μs | 629.79 μs |
| GetStreamMessages | 8 | 10000 | 32 | 1 | 45,012.4 μs | 29,096.1 μs | 1,594.86 μs |
Microsoft Reviewers: Open in CodeFlow
Added the MariaDB/MySQL database artefacts and updated the tests to cover them.
Notes:
- Tested on MariaDB 11.3.2.
- No benchmarks yet, but perceived performance appears abysmal vs Microsoft SQL Server. Due to archaic SQL language support in MariaDB/MySQL, the implementation relies on long lived transactions with a mix of temporary tables and SELECT ORDER BY FOR UPDATE to induce the ordered row locks necessary to avoid deadlocks.
- The older MySQL driver suffers from internal semaphore deadlocking under high concurrency in a sync context (such as xunit's) due to bug https://bugs.mysql.com/bug.php?id=114272 as detected by the chaos test
RelationalOrleansQueries_ChaosTest. This deadlocks the connection pool itself and was making some tests hang forever. This required upgrading the driver in the test project to the latest version v8.4.0.
Community contribution is welcome for performance improvement. We don't need nor want MySQL so this is out of scope for us and therefore a best effort to complete this feature.
The test UnitTests.StorageTests.AdoNet.MySqlRelationalStoreTests.CancellationToken_MySql_Test appears to be failing due to timeout. This is unrelated to this PR as far as I can tell.
Added the PostgreSQL database artefacts and updated the tests to cover them.
A few tests, which were copied from the azure queue implementation, appear flaky. One of these tests is skipped in the original due to such flakiness. However neither the SQL Server nor the MySQL implementations show flakiness for the same test, only PostgreSQL does. So perhaps something is buggy with those particular scripts. Help is welcome.
Also of note, I added a ClearAllPools() step to the test database setup flow. This was due to PostgreSQL pooled connections not healing themselves fast enough after being force disconnected upon dropping the database, and therefore causing whatever next test to fail. Neither SQL Server nor MySQL suffer from this issue. Not sure how to address it otherwise.
As with MySQL, this is a best effort for us.
The test
UnitTests.StorageTests.AdoNet.MySqlRelationalStoreTests.CancellationToken_MySql_Testappears to be failing due to timeout. This is unrelated to this PR as far as I can tell.
Upgrading MySql.Data to 8.4.0 appears to have caused this failure. This test passes with the prior version 8.0.31.
So we either get connection pool deadlocks on < 8.4, or no working cancellation tokens on >= 8.4.
Pick your poison I suppose. Which one do you prefer?
Edit: Trial and error shows cancellation tokens are broken since 8.0.33 (April 2023).
We have the same issue with #8931 (I think). I think it would be safer for now to revert to 8.0.33 for the time being. Thought @ReubenBond ?
@benjaminpetit agreed, but we can't stop end users from upgrading unless we set an upper bound. We will need to investigate further and fix
Lacking control over driver versions, we could wrap all SQL calls with .WaitAsync(token), as a protective measure, to guarantee responsiveness to the caller. Or even WaitAsync(timeout, token) if command timeouts also become buggy, who knows. We could log this event to let users know that their drivers are faulty.
After further testing, here are some funny findings:
- I observed that running against the latest version of MariaDB (11.3.2) in a docker desktop instance (as opposed to a locally installed instance, also latest) still caused deadlocks even with v8.4 drivers. Tested x3.
- I proceeded to pull MariaDB 10.6, as configured in the repository workflow file. Retested x3. No deadlocks.
- I proceeded to sync the branch to downgrade the MySQL driver back to 8.0.31. Retested x3. No deadlocks... from the driver. However, these showed up on the engine:
The above queries stayed waiting for a schema lock forever in order to drop the database. The commands eventually time out on the client side and therefore newer tests start attempting to drop the database again. What was holding these, who knows, probably each other.
I suspect this happens due to a combination of buggy pooled connection recovery plus dropping the database all the time. So to reduce the likelihood of this happening I made the following code run before all tests:
Tested again x3. No deadlocks, all green.
I also observed in passing that a deadlock will happen quickly once the pool limit is reached but rarely before. Therefore I suspect the concurrency limit I added to the more aggressive tests may be keeping it at bay. And I also suspect the other tests never reach this concurrency to begin with, at least I never observed it. However that limit didn't seem to help the v8.4 drivers against MariaDB 11.3.2 in docker desktop. Neither did it help with the lack of stable connection recovery after a database drop. Only clearing the pool did that.
Anyway, it appears whatever regression there was in the driver, engine, or the interaction between both, doesn't appear to be fully fixed.
I'm not invested enough in MySQL to go down this rabbit hole any further so I'm happy with leaving the old versions as-is.
As discussed in dev channel, I too finally managed to take a look. Looks good, it was a good discussion too.
As for MySQL/MariaDB observations, indeed the RelationalStorage implementation has handling on the official Oracle connector that it just deadlocks on all versions. It might have been sensible to make it apply to any connector and be version based, but it wasn't possible back then. One option would be to ignore such things and tell "tough luck" for developers. But the aim was to make it a core piece of a software-intensive, dependable system, so seem like near zero price to pay to see if such issues could be fixed. SQL Server (the old one) and Postgres have had some intermittent deadlock issues, Oracle MySQL one seem to be persistent..?
The storage tests are capped regaring concurrency also, but due to exhausting the ThreadPool in the CI during high-concurrency banging the DB in tests more than other reasons.
I'll cross-reference https://github.com/dotnet/orleans/issues/634 as this long-standing dream is becoming real. :)
Lacking control over driver versions, we could wrap all SQL calls with
.WaitAsync(token), as a protective measure, to guarantee responsiveness to the caller. Or evenWaitAsync(timeout, token)if command timeouts also become buggy, who knows. We could log this event to let users know that their drivers are faulty.
Or we could surface a configuration option to do that for the developer (see my previous). Not my call to make, but I provide the rational for why it was done it was done in https://github.com/dotnet/orleans/blob/main/src/AdoNet/Shared/Storage/RelationalStorage.cs#L273.
@veikkoeeva Thanks a lot for reviewing. Indeed I'm not sure what do to with MySQL either. I'll leave it to @ReubenBond to decide. It seems stable enough for now at least, with enough band aids around it. Yet I do hope no one has the unfortunate idea of running this provider on a MySQL database regardless of deadlocks. The implementation is quite inefficient due to the limitations of the engine and is just asking for trouble either way.
Not necessarily for this PR, but I was looking at a way to make this provider rewindable. This would improve its ability to support projections with less fuss required in the Orleans code.
From the point of view of SQL artefacts, it looks straightforward:
- Have another table
OrleansStreamArchive(or *Delivered, or *History, etc, naming things is hard). - Update
FailStreamMessageto optionally move the delivered message to that table instead of deleting it. - Update
GetStreamMessagesto take an@AfterMessageIdparameter with theMessageIdafter which to get messages from. If the parameter is specified then all include all appropriate messages from the archive table.
One issue here arises from the StreamIds being distributed across the QueueIds to support a fixed set of pulling agents. I understand why this is required yet it gets in the way of efficient rewinding. Even if you know what QueueId to target, you may still potentially touch far more foreign messages than the ones that belong to that particular StreamId you're rewinding.
A solution here is to also store the StreamIds as part of the clustered key. This would not change cardinality, nor affect the queue-based pulling agent, and would allow rewinding a particular StreamId without touching anything else.
However I'm having trouble identifying the correct bits in the streaming middleware to plug into. The only rewindable provider I see is EventHubs and its implementation looks tightly coupled to how EventHubs itself works, including having its own cache.
Do you have any suggestions on how to proceed with this without causing too much havoc? @ReubenBond maybe?
As noted on discord, this PR is now ready for formal review.
@benjaminpetit is taking a look
Many thanks!
@benjaminpetit Wow did you just merge this without comment? You're more confident than I am! 😅
Well... that's embarrassing. I merged the wrong PR 😓 But I started to review it few days ago.
I still think it's a worthy addition for a preview, so let's keep it.
I will open a PR to mark this new package "prerelease" or "preview", if that's ok with you?
That's totally fine. Cheers.