azure-sdk-for-java icon indicating copy to clipboard operation
azure-sdk-for-java copied to clipboard

Adding safety to Amqp Session operations, SessionCache to cache ReactorSession instances and integrating RequestResponseChannelCache

Open anuchandy opened this issue 1 year ago • 3 comments

This PR has the following changes –

ReactorSessionCache

At the moment, we "create and open" the Qpid AMQP session outside of the Reactor Executor. We need a design to address this, additionally given we cache the sessions, we need to ensure 3 things -

  1. Session is "created and opened" only when it is loaded into the cache for the first time and must use Reactor Executor.
  2. Once loaded into cache, the next/future cache look up must not attempt session open.
  3. Step 2 (next/future cache look up) must not rely on Reactor Executor – this avoids unnecessary works getting scheduled into Reactor Executor, thread hopping and unnecessary wait for the calling threads racing into cache.

A new implementation type "ReactorSessionCache" has been introduced to azure-core-amqp, to improve the cache functionalities readable, testable and to abstract the above 3 goals.

The "ReactorConnection" type composes this "ReactorSessionCache".

ProtonSession

Currently the ReactorSession "open" the internal Qpid session in the ReactorSession::Ctr, which potentially happens outside Reactor Executor (discussed in the previous section). Additionally, the RequestResponseChannel direct access the internal Qpid session that ReactorSession composes and "create and open" Sender and Receiver on it, this "create and open" by RequestResponseChannel is also not guaranteed to always happen in Reactor Executor.

A new implementation type "ProtonSession" has been introduced to abstract away direct operations in "Qpid session" that address the above safety concerns, the ReactorSession will contain an instance of "ProtonSession" and won’t expose it outside. Since the session can be accessed (not modified) from cache outside of Reactor Executor but can also concurrently modified, e.g., disposed from Reactor Executor, the "ProtonSession" stores the internal Qpid session in atomic reference. Keeping all lower-level operations in "ProtonSession" also makes it independently testable.

Session IO operations are never in the hot path, but at the time of connection shutdown and recovery, multiple threads can race for session, so safety is important here.

ProtonSession.ProtonChannel

Nested type of ProtonSession holding a "sender and receiver links pair" for bi-directional communication. The RequestResponseChannel will use this type to facilitate communications for cbs and management.

RequestResponseChannelCache

We already had "RequestResponseChannelCache" that follows the design principles of V2 "ReactorConnectionCache", but this type was not wired. These changes integrate this new cache into the execution flow.

Temporary wrapper types

As usual, 2 temporary wrapper types are added for side-by-side V1, V2, which will be deleted upon V1 removal.

  • ChannelCacheWrapper: delegator for V1 AmqpChannelProcessor or V2 RequestResponseChannelCache.
  • ProtonSessionWrapper: delegator for Qpid session direct operations or V2 ProtonSession.

opt-in flag

For our internal testing an opt-in flag added "com.azure.core.amqp.internal.session-channel-cache.v2", this enables two cache routes “ReactorSessionCache” and “RequestResponseChannelCache”.

(testing, rolling out plan tbd).

anuchandy avatar Mar 07 '24 00:03 anuchandy

API change check

API changes are not detected in this pull request.

azure-sdk avatar Mar 07 '24 00:03 azure-sdk

/azp run java - servicebus - ci

anuchandy avatar Jul 19 '24 03:07 anuchandy

Azure Pipelines successfully started running 1 pipeline(s).

azure-pipelines[bot] avatar Jul 19 '24 03:07 azure-pipelines[bot]