ml-commons
ml-commons copied to clipboard
[META] Migrate feature/multi_tenancy branch to main
I will be starting the process of migrating the code developed on the feature/multi_tenancy branch to main. I'm creating this issue to establish context for reviewers and pre-answer some frequently asked questions.
Important context to understand:
-
There are two main purposes of this branch/feature:
- Abstract out the CRUD/S operations on system indices into a generic metadata storage option, that can use system indices, or a remote OpenSearch cluster, or DynamoDB, or (in the future) other storage options.
- Add functionality to pass along a Tenant ID which will be used in the remote storage options to allow multiple tenants to use the same (stateless) plugin instance.
-
There will be new packages created in the
opensearch-ml-commonmodule:,org.opensearch.sdkcontaining a new metadata interface and associated request/response objects, andorg.opensearch.sdk.clientcontaining a default client implementation based on NodeClient, which implements existing behavior.- The location of this code in ML-Commons is temporary; it will eventually be migrated somewhere else (possibly a module on the core project, possibly a separate plugin, possibly the Java SDK project).
- The additional clients are presently in the
opensearch-ml-pluginmoduleorg.opensearch.ml.sdkclientpackage. Like the above interfaces, this is not their final location.
-
Because of the way the
createComponents()method on thePlugininterface works (binding those instances to their runtime class, not their interface), the client will be a concrete class which delegates method calls to an internal delegate instance. This is not the ideal object-oriented structure, but seemed the best approach to simplify injection of the client into classes which are instantiated insidecreateComponents()and would otherwise not have access to an interface-bound instance created in a separate module. -
The vast majority of changes across the codebase are copy-and-paste using the following general format (Get is shown, similar for Index, Update, Delete, and Search):
Old code (some variations on wrappers or anonymous subclasses, etc.):
GetRequest getRequest = new GetRequest(<system index>).id(<doc id>).fetchSourceContext(fetchSourceContext);
client.get(getRequest, ActionListener.runBefore(ActionListener.wrap(r -> {
// response code goes here
}, e -> {
// exception code goes here
}), context::restore));
New code (some variations):
GetDataObjectRequest getDataObjectRequest = GetDataObjectRequest
.builder()
.index(<system index>)
.id(<doc id>)
.tenantId(tenantId)
.fetchSourceContext(fetchSourceContext)
.build();
sdkClient
.getDataObjectAsync(getDataObjectRequest, client.threadPool().executor(GENERAL_THREAD_POOL))
.whenComplete((r, throwable) -> {
context.restore();
if (throwable == null) {
try {
GetResponse gr = r.parser() == null ? null : GetResponse.fromXContent(r.parser());
//
// exact copy of response code from above
//
} catch (Exception e) {
listener.onFailure(e);
}
} else {
Exception e = SdkClientUtils.unwrapAndConvertToException(throwable);
//
// exact copy of exception code from above
//
}
});
- Many unit tests needed to be adjusted to adapt to the asynchronous nature of the new client interface. Tests cases are intended to preserve exact current behavior, but some small changes are needed to adjust to the client implementation using a
Futurerather than anActionListener, and the multi-threaded execution.
Current test:
public void testGet_Success() throws IOException {
GetRequest getRequest = < instantiate test request > ;
GetResponse getResponse = < instantiate test response > ;
doAnswer(invocation -> {
ActionListener<GetResponse> listener = invocation.getArgument(1);
listener.onResponse(getResponse);
return null;
}).when(client).get(any(), any());
getTransportAction.doExecute(null, getRequest, actionListener);
verify(actionListener).onResponse(any(GetResponse.class));
}
Updated test to account for multithreading and client args
public void testGet_Success() throws IOException {
GetRequest getRequest = < instantiate test request > ;
GetResponse getResponse = < instantiate test response > ;
// Equivalent of doAnswer().when()
PlainActionFuture<GetResponse> future = PlainActionFuture.newFuture();
future.onResponse(getResponse);
// Note change to single-arg client.get which returns a PlainActionFuture
when(client.get(any(GetRequest.class))).thenReturn(future);
// The original execution while async in reality used a single-threaded doAnswer().when() mock.
// The new execution is on another thread and can cause a race condition, so we await the completion
// of the action listener with a latch.
CountDownLatch latch = new CountDownLatch(1);
LatchedActionListener<GetResponse> latchedActionListener = new LatchedActionListener<>(actionListener, latch);
// Same call, but using the latched listener
getTransportAction.doExecute(null, getRequest, latchedActionListener);
// Usually completes in a few milliseconds
latch.await(500, TimeUnit.MILLISECONDS);
// Remainder of test code as it previously was
verify(actionListener).onResponse(any(GetResponse.class));
}
Expect PRs and Tasks to accomplish the following:
- [ ] Migrate SdkClient interfaces, request/response objects, and default implementation
- [ ] Migrate remote client implementations (note will switch Apache HttpClient imports and then switch back for 2.x backport)
- [ ] Migrate Dynamo DB implementation
- [ ] Migrate Connector CRUD/S actions and associated classes
- [ ] Migrate Model CRUD/S actions and associated classes
- [ ] Migrate Agent CRUD/S actions and associated classes
- [ ] Migrate remaining classes
- [ ] Sanity test implementation on a multi-node cluster and make any needed bug fixes, and update existing CI Integ Tests to handle whatever was missed in existing Integ Tests
- [ ] Sanity test implementation on a remote cluster and make any needed bug fixes. Create new integ tests for the client implementation
- [ ] Sanity test DynamoDB implementation and make any needed bug fixes. Create new integ tests for the DDB implementation
Add any questions/comments!