spring-ai
spring-ai copied to clipboard
block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3
I hope to quickly learn Spring AI + ollama + deepseek r1 (local learning and deployment) through the official documentation, but when I call the interface /ai/generate based on the official documentation, I get the following error:
When I configure the following code, it can work normally.
@Configuration
public class Myconfig {
@Bean
public RestClient.Builder builder() {
return RestClient.builder().requestFactory(new SimpleClientHttpRequestFactory());
}
}
Environment
ollama version is 0.5.12
Java version: zulu 21
<dependencies>
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-ollama-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<version>3.4.2</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-resolver-dns-native-macos</artifactId>
<version>4.1.116.Final</version>
<classifier>osx-aarch_64</classifier>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-bom</artifactId>
<version>1.0.0-M6</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
It might be more appropriate to add RestClient configuration near the specific content of the document, which can help beginners get feedback quickly.
same issue,add RestClient.Builder work
Ran into a similar issue in OpenAi's API. Was running an Embedding example from https://docs.spring.io/spring-ai/reference/api/embeddings/openai-embeddings.html , and it was that example code that produced the following error.
org.springframework.web.client.ResourceAccessException: I/O error on POST request for "https://api.openai.com/v1/embeddings": block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3 at org.springframework.web.client.DefaultRestClient$DefaultRequestBodyUriSpec.createResourceAccessException(DefaultRestClient.java:692) ~[spring-web-6.2.5.jar:6.2.5] at org.springframework.web.client.DefaultRestClient$DefaultRequestBodyUriSpec.exchangeInternal(DefaultRestClient.java:577) ~[spring-web-6.2.5.jar:6.2.5] at org.springframework.web.client.DefaultRestClient$DefaultRequestBodyUriSpec.exchange(DefaultRestClient.java:535) ~[spring-web-6.2.5.jar:6.2.5] at org.springframework.web.client.RestClient$RequestHeadersSpec.exchange(RestClient.java:677) ~[spring-web-6.2.5.jar:6.2.5] at org.springframework.web.client.DefaultRestClient$DefaultResponseSpec.executeAndExtract(DefaultRestClient.java:809) ~[spring-web-6.2.5.jar:6.2.5] at org.springframework.web.client.DefaultRestClient$DefaultResponseSpec.toEntityInternal(DefaultRestClient.java:769) ~[spring-web-6.2.5.jar:6.2.5] at org.springframework.web.client.DefaultRestClient$DefaultResponseSpec.toEntity(DefaultRestClient.java:765) ~[spring-web-6.2.5.jar:6.2.5] at org.springframework.ai.openai.api.OpenAiApi.embeddings(OpenAiApi.java:364) ~[spring-ai-openai-1.0.0-M6.jar:1.0.0-M6] at org.springframework.ai.openai.OpenAiEmbeddingModel.lambda$call$1(OpenAiEmbeddingModel.java:163) ~[spring-ai-openai-1.0.0-M6.jar:1.0.0-M6]
bump
Thanks for providing how you got this to work.
@Configuration
public class Myconfig {
@Bean
public RestClient.Builder builder() {
return RestClient.builder().requestFactory(new SimpleClientHttpRequestFactory());
}
}
We will investigate.
@youcangetme @yukiofyume Can you confirm if this is only with the deepseek model as we have not had these type of reports for other ollama hosted model. Note: there are been reports of deepseek not performing correctly when hosted by vllm - see https://github.com/spring-projects/spring-ai/issues/2427
@markpollack @youcangetme
When I only import spring-ai-ollama-spring-boot-starter:
@SpringBootApplication
public class SpringAIApplication {
public static void main(String[] args) {
SpringApplication.run(SpringAIApplication.class, args);
}
}
Therefore, I add:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<version>3.4.2</version>
</dependency>
When I use qwen2.5:1.5b or gemma3:1b, call the interface /ai/generate, I get the same problem and it looks different #2427 . When I debug the RestClient initialize process and use example:
After adding the following configuration, RestClient initialize as follows:
@Configuration
public class Myconfig {
@Bean
public RestClient.Builder builder() {
return RestClient.builder().requestFactory(new SimpleClientHttpRequestFactory());
}
}
I think it may be because RestClient exposing a fluent, synchronous API over underlying HTTP client, the default is to use ReactorClientHttpRequestFactory initialize ClientHttpRequestFactory, resulting in the above error
spring.http.client.factory=simple
should also work
Thanks for providing how you got this to work.
@Configuration public class Myconfig { @Bean public RestClient.Builder builder() { return RestClient.builder().requestFactory(new SimpleClientHttpRequestFactory()); } }We will investigate.
@youcangetme @yukiofyume Can you confirm if this is only with the deepseek model as we have not had these type of reports for other ollama hosted model. Note: there are been reports of deepseek not performing correctly when hosted by vllm - see #2427
I was having it happen with OpenAI, not Deepseek.
Here's the line that was throwing out the blocking
and the imports I have in my pom file.
I did update the spring-ai.version in the Pom to 1.0.0-M7 over 1.0.0-M6, but ran into the same blocking problem.
I tried to configure RestClient as described above, but it fail to resolve this error.
I have a question: why use .block() in an Async Function?
In reactive programming (e.g., WebFlux), invoking .block() on a reactor-http-nio thread will result in this error.
I think this is why the program shows the error 'block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3'.
I tried to configure RestClient as described above, but it fail to resolve this error. I have a question: why use
.block()in an Async Function? In reactive programming (e.g., WebFlux), invoking.block()on a reactor-http-nio thread will result in this error. I think this is why the program shows the error 'block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3'.
I encounter same issue during a streaming chat with gpt enabled with tools wrapped in AsyncMcpToolCallbackProvider.
I think during chat streaming we are not able to use SyncMcpToolCallbackProvider since it invokes McpSyncClient and then invokes block() as well.
public McpSchema.CallToolResult callTool(McpSchema.CallToolRequest callToolRequest) { return this.delegate.callTool(callToolRequest).block(); }
And current AsyncMcpToolCallbackProvider seems need to be redesigned.
I tried to configure RestClient as described above, but it fail to resolve this error. I have a question: why use
.block()in an Async Function? In reactive programming (e.g., WebFlux), invoking.block()on a reactor-http-nio thread will result in this error. I think this is why the program shows the error 'block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3'.
Found this https://github.com/spring-projects/spring-ai/issues/2341 and tested it is working for me.
I tried to configure RestClient as described above, but it fail to resolve this error. I have a question: why use
.block()in an Async Function? In reactive programming (e.g., WebFlux), invoking.block()on a reactor-http-nio thread will result in this error. I think this is why the program shows the error 'block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3'.Found this #2341 and tested it is working for me.
Thank you. Based on your reference plan, I drafted a provisional solution:
- Create a custom AsyncMcpToolCallbackProvider and AsyncMcpToolCallback.
AsyncMcpToolCallbackProvider:
public class MyAsyncMcpToolCallbackProvider extends AsyncMcpToolCallbackProvider {
private final List<McpAsyncClient> mcpClients;
private final BiPredicate<McpAsyncClient, McpSchema.Tool> toolFilter;
public MyAsyncMcpToolCallbackProvider(BiPredicate<McpAsyncClient, McpSchema.Tool> toolFilter, List<McpAsyncClient> mcpClients) {
Assert.notNull(mcpClients, "MCP clients must not be null");
Assert.notNull(toolFilter, "Tool filter must not be null");
this.mcpClients = mcpClients;
this.toolFilter = toolFilter;
}
public MyAsyncMcpToolCallbackProvider(List<McpAsyncClient> mcpClients) {
this((mcpClient, tool) -> true, mcpClients);
}
public MyAsyncMcpToolCallbackProvider(BiPredicate<McpAsyncClient, McpSchema.Tool> toolFilter, McpAsyncClient... mcpClients) {
this(toolFilter, List.of(mcpClients));
}
public MyAsyncMcpToolCallbackProvider(McpAsyncClient... mcpClients) {
this(List.of(mcpClients));
}
@Override
public ToolCallback[] getToolCallbacks() {
CountDownLatch latch = new CountDownLatch(1);
CopyOnWriteArrayList<ToolCallback> toolCallbackList = new CopyOnWriteArrayList<>();
AtomicReference<Throwable> errorRef = new AtomicReference<>();
this.myAsyncToolCallbacks(this.mcpClients).subscribe(toolCallback -> {
toolCallbackList.add(toolCallback);
}, error -> {
errorRef.set(error);
latch.countDown();
}, () -> {
latch.countDown();
});
try {
latch.await();
if (errorRef.get() != null) {
throw (errorRef.get() instanceof RuntimeException runtimeException) ? runtimeException
: new RuntimeException("Error during tool execution", errorRef.get());
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Tool execution was interrupted", e);
}
validateToolCallbacks(toolCallbackList.toArray(new ToolCallback[0]));
return toolCallbackList.toArray(new ToolCallback[0]);
}
private void validateToolCallbacks(ToolCallback[] toolCallbacks) {
List<String> duplicateToolNames = ToolUtils.getDuplicateToolNames(toolCallbacks);
if (!duplicateToolNames.isEmpty()) {
throw new IllegalStateException(
"Multiple tools with the same name (%s)".formatted(String.join(", ", duplicateToolNames)));
}
}
private static Flux<ToolCallback> myAsyncToolCallbacks(List<McpAsyncClient> mcpClients) {
if (CollectionUtils.isEmpty(mcpClients)) {
return Flux.empty();
}
return Flux.fromIterable(mcpClients).flatMap(mcpClient -> {
// Check if client is initialized and initialize if needed
Mono<McpAsyncClient> clientMono = mcpClient.isInitialized() ? Mono.just(mcpClient)
: mcpClient.initialize().thenReturn(mcpClient);
return clientMono.flatMap(client -> client.listTools())
.flatMapIterable(response -> response.tools())
.map(tool -> new MyAsyncMcpToolCallback(mcpClient, tool));
});
}
}
AsyncMcpToolCallback:
public class MyAsyncMcpToolCallback extends AsyncMcpToolCallback {
private final McpAsyncClient asyncMcpClient;
private final McpSchema.Tool tool;
public MyAsyncMcpToolCallback(McpAsyncClient asyncMcpClient, McpSchema.Tool tool) {
super(asyncMcpClient, tool);
this.asyncMcpClient = asyncMcpClient;
this.tool = tool;
}
@Override
public ToolDefinition getToolDefinition() {
return ToolDefinition.builder()
.name(this.tool.name())
.description(this.tool.description())
.inputSchema(ModelOptionsUtils.toJsonString(this.tool.inputSchema()))
.build();
}
@Override
public String call(String functionInput) {
Map<String, Object> arguments = ModelOptionsUtils.jsonToMap(functionInput);
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<String> result = new AtomicReference<>();
AtomicReference<Throwable> error = new AtomicReference<>();
this.asyncMcpClient.callTool(new McpSchema.CallToolRequest(this.getToolDefinition().name(), arguments))
.map(response -> ModelOptionsUtils.toJsonString(response.content()))
.subscribe(value -> {
result.set(value);
latch.countDown();
}, throwable -> {
error.set(throwable);
latch.countDown();
});
try {
latch.await();
if (error.get() != null) {
if (error.get() instanceof RuntimeException) {
throw (RuntimeException) error.get();
} else {
throw new RuntimeException("Error during tool execution", error.get());
}
}
return result.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Tool execution was interrupted", e);
}
}
}
- Inject the custom AsyncMcpToolCallbackProvider into the Spring container.
@AutoConfiguration
public class MyAiAutoConfiguration {
@Bean
@ConditionalOnProperty(prefix = McpClientCommonProperties.CONFIG_PREFIX, name = "type", havingValue = "ASYNC")
public ToolCallbackProvider myMcpAsyncToolCallbacks(ObjectProvider<List<McpAsyncClient>> mcpClientsProvider) {
List<McpAsyncClient> mcpClients = mcpClientsProvider.stream().flatMap(List::stream).toList();
return new MyAsyncMcpToolCallbackProvider(mcpClients);
}
}
- Pass the custom AsyncMcpToolCallback as a parameter during chat conversations.
public static ChatOptions buildChatOptions(AiPlatformEnum platform, String model, Double temperature, Integer maxTokens,
Set<String> toolNames) {
FunctionCallback[] mcpToolCallbacks = SpringUtils.getBean(MyAsyncMcpToolCallbackProvider.class).getToolCallbacks();
List<FunctionCallback> toolCallbackList = List.of(mcpToolCallbacks);
return OpenAiChatOptions.builder().model(model).temperature(temperature).maxTokens(maxTokens)
.toolNames(toolNames).functionCallbacks(toolCallbackList).build()
}
@WindowsXP-XP
Found this https://github.com/spring-projects/spring-ai/issues/2341 and tested it is working for me.
and then you reply
Thank you. Based on your reference plan, I drafted a provisional solution:
So the #2341 didn't fix the issue for you?
There is another thing going on here wrt to the comment spring.http.client.factory=simple should also work.
unfortunately there seems to be a bug in boot wrt to picking a the httpclient when reactor is on the classpath for nonreactive RestClient.
Whenever there are reactive dependencies in the classpath (which is always the case in spring ai), Spring Boot uses Reactor to implement RestClient instead of standard JDK HttpClient, even if the application is not a reactive application.
setting spring.http.client.factory=jdk also should help.
However, this may not be the issue you are reporting, I'm not clear on what problem the solution you are providing is trying to fix.
Found this #2341 and tested it is working for me.
and then you reply
Thank you. Based on your reference plan, I drafted a provisional solution:
So the #2341 didn't fix the issue for you?
There is another thing going on here wrt to the comment should also work.
spring.http.client.factory=simpleunfortunately there seems to be a bug in boot wrt to picking a the httpclient when reactor is on the classpath for nonreactive RestClient.
Whenever there are reactive dependencies in the classpath (which is always the case in spring ai), Spring Boot uses Reactor to implement RestClient instead of standard JDK HttpClient, even if the application is not a reactive application.
setting also should help.
spring.http.client.factory=jdkHowever, this may not be the issue you are reporting, I'm not clear on what problem the solution you are providing is trying to fix.
Neither setting spring.http.client.factory=simple nor spring.http.client.factory=jdk resolved my issue.
This is only a temporary solution. I haven't tried the approach in #2341 yet. I noticed he resolved it by modifying the ChatModels of different AI platforms, but I believe implementing unified handling when calling ChatModels would be preferable.
My provisional solution is based on Came up with some (ugly) workaround to overwrite/replace AsyncMcpToolCallbackProvider and wrap AsyncMcpToolCallbacks with a custom ToolCallback implementation that uses a ThreadPoolTaskExecuto in #2341
The Spring AI version referenced in #2341 has not been published to Maven Central, and third-party component support for this unreleased version remains limited. Therefore, I conclude that customizing AsyncMcpToolCallbackProvider and AsyncMcpToolCallbacks within our project currently represents:
-
The least invasive approach
-
The minimal code change requirement
-
The lowest risk of dependency conflicts
This why I choose this temporary solution
I tried to configure RestClient as described above, but it fail to resolve this error. I have a question: why use
.block()in an Async Function? In reactive programming (e.g., WebFlux), invoking.block()on a reactor-http-nio thread will result in this error. I think this is why the program shows the error 'block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3'.Found this #2341 and tested it is working for me.
Thank you. Based on your reference plan, I drafted a provisional solution:
- Create a custom AsyncMcpToolCallbackProvider and AsyncMcpToolCallback.
AsyncMcpToolCallbackProvider:public class MyAsyncMcpToolCallbackProvider extends AsyncMcpToolCallbackProvider {
private final List<McpAsyncClient> mcpClients; private final BiPredicate<McpAsyncClient, McpSchema.Tool> toolFilter; public MyAsyncMcpToolCallbackProvider(BiPredicate<McpAsyncClient, McpSchema.Tool> toolFilter, List<McpAsyncClient> mcpClients) { Assert.notNull(mcpClients, "MCP clients must not be null"); Assert.notNull(toolFilter, "Tool filter must not be null"); this.mcpClients = mcpClients; this.toolFilter = toolFilter; } public MyAsyncMcpToolCallbackProvider(List<McpAsyncClient> mcpClients) { this((mcpClient, tool) -> true, mcpClients); } public MyAsyncMcpToolCallbackProvider(BiPredicate<McpAsyncClient, McpSchema.Tool> toolFilter, McpAsyncClient... mcpClients) { this(toolFilter, List.of(mcpClients)); } public MyAsyncMcpToolCallbackProvider(McpAsyncClient... mcpClients) { this(List.of(mcpClients)); } @Override public ToolCallback[] getToolCallbacks() { CountDownLatch latch = new CountDownLatch(1); CopyOnWriteArrayList<ToolCallback> toolCallbackList = new CopyOnWriteArrayList<>(); AtomicReference<Throwable> errorRef = new AtomicReference<>(); this.myAsyncToolCallbacks(this.mcpClients).subscribe(toolCallback -> { toolCallbackList.add(toolCallback); }, error -> { errorRef.set(error); latch.countDown(); }, () -> { latch.countDown(); }); try { latch.await(); if (errorRef.get() != null) { throw (errorRef.get() instanceof RuntimeException runtimeException) ? runtimeException : new RuntimeException("Error during tool execution", errorRef.get()); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("Tool execution was interrupted", e); } validateToolCallbacks(toolCallbackList.toArray(new ToolCallback[0])); return toolCallbackList.toArray(new ToolCallback[0]); } private void validateToolCallbacks(ToolCallback[] toolCallbacks) { List<String> duplicateToolNames = ToolUtils.getDuplicateToolNames(toolCallbacks); if (!duplicateToolNames.isEmpty()) { throw new IllegalStateException( "Multiple tools with the same name (%s)".formatted(String.join(", ", duplicateToolNames))); } } private static Flux<ToolCallback> myAsyncToolCallbacks(List<McpAsyncClient> mcpClients) { if (CollectionUtils.isEmpty(mcpClients)) { return Flux.empty(); } return Flux.fromIterable(mcpClients).flatMap(mcpClient -> { // Check if client is initialized and initialize if needed Mono<McpAsyncClient> clientMono = mcpClient.isInitialized() ? Mono.just(mcpClient) : mcpClient.initialize().thenReturn(mcpClient); return clientMono.flatMap(client -> client.listTools()) .flatMapIterable(response -> response.tools()) .map(tool -> new MyAsyncMcpToolCallback(mcpClient, tool)); }); }}
AsyncMcpToolCallback:public class MyAsyncMcpToolCallback extends AsyncMcpToolCallback {
private final McpAsyncClient asyncMcpClient; private final McpSchema.Tool tool; public MyAsyncMcpToolCallback(McpAsyncClient asyncMcpClient, McpSchema.Tool tool) { super(asyncMcpClient, tool); this.asyncMcpClient = asyncMcpClient; this.tool = tool; } @Override public ToolDefinition getToolDefinition() { return ToolDefinition.builder() .name(this.tool.name()) .description(this.tool.description()) .inputSchema(ModelOptionsUtils.toJsonString(this.tool.inputSchema())) .build(); } @Override public String call(String functionInput) { Map<String, Object> arguments = ModelOptionsUtils.jsonToMap(functionInput); CountDownLatch latch = new CountDownLatch(1); AtomicReference<String> result = new AtomicReference<>(); AtomicReference<Throwable> error = new AtomicReference<>(); this.asyncMcpClient.callTool(new McpSchema.CallToolRequest(this.getToolDefinition().name(), arguments)) .map(response -> ModelOptionsUtils.toJsonString(response.content())) .subscribe(value -> { result.set(value); latch.countDown(); }, throwable -> { error.set(throwable); latch.countDown(); }); try { latch.await(); if (error.get() != null) { if (error.get() instanceof RuntimeException) { throw (RuntimeException) error.get(); } else { throw new RuntimeException("Error during tool execution", error.get()); } } return result.get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("Tool execution was interrupted", e); } }} 2. Inject the custom AsyncMcpToolCallbackProvider into the Spring container.
@AutoConfiguration public class MyAiAutoConfiguration { @Bean @ConditionalOnProperty(prefix = McpClientCommonProperties.CONFIG_PREFIX, name = "type", havingValue = "ASYNC") public ToolCallbackProvider myMcpAsyncToolCallbacks(ObjectProvider<List<McpAsyncClient>> mcpClientsProvider) { List<McpAsyncClient> mcpClients = mcpClientsProvider.stream().flatMap(List::stream).toList(); return new MyAsyncMcpToolCallbackProvider(mcpClients); } } 3. Pass the custom AsyncMcpToolCallback as a parameter during chat conversations.
public static ChatOptions buildChatOptions(AiPlatformEnum platform, String model, Double temperature, Integer maxTokens, Set<String> toolNames) { FunctionCallback[] mcpToolCallbacks = SpringUtils.getBean(MyAsyncMcpToolCallbackProvider.class).getToolCallbacks(); List<FunctionCallback> toolCallbackList = List.of(mcpToolCallbacks); return OpenAiChatOptions.builder().model(model).temperature(temperature).maxTokens(maxTokens) .toolNames(toolNames).functionCallbacks(toolCallbackList).build() }
what is the version of your springai,my McpAsyncClient do not have .isInitialized() method.
I tried to configure RestClient as described above, but it fail to resolve this error. I have a question: why use
.block()in an Async Function? In reactive programming (e.g., WebFlux), invoking.block()on a reactor-http-nio thread will result in this error. I think this is why the program shows the error 'block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3'.Found this #2341 and tested it is working for me.
Thank you. Based on your reference plan, I drafted a provisional solution:
- Create a custom AsyncMcpToolCallbackProvider and AsyncMcpToolCallback.
AsyncMcpToolCallbackProvider:public class MyAsyncMcpToolCallbackProvider extends AsyncMcpToolCallbackProvider {
private final List<McpAsyncClient> mcpClients; private final BiPredicate<McpAsyncClient, McpSchema.Tool> toolFilter; public MyAsyncMcpToolCallbackProvider(BiPredicate<McpAsyncClient, McpSchema.Tool> toolFilter, List<McpAsyncClient> mcpClients) { Assert.notNull(mcpClients, "MCP clients must not be null"); Assert.notNull(toolFilter, "Tool filter must not be null"); this.mcpClients = mcpClients; this.toolFilter = toolFilter; } public MyAsyncMcpToolCallbackProvider(List<McpAsyncClient> mcpClients) { this((mcpClient, tool) -> true, mcpClients); } public MyAsyncMcpToolCallbackProvider(BiPredicate<McpAsyncClient, McpSchema.Tool> toolFilter, McpAsyncClient... mcpClients) { this(toolFilter, List.of(mcpClients)); } public MyAsyncMcpToolCallbackProvider(McpAsyncClient... mcpClients) { this(List.of(mcpClients)); } @Override public ToolCallback[] getToolCallbacks() { CountDownLatch latch = new CountDownLatch(1); CopyOnWriteArrayList<ToolCallback> toolCallbackList = new CopyOnWriteArrayList<>(); AtomicReference<Throwable> errorRef = new AtomicReference<>(); this.myAsyncToolCallbacks(this.mcpClients).subscribe(toolCallback -> { toolCallbackList.add(toolCallback); }, error -> { errorRef.set(error); latch.countDown(); }, () -> { latch.countDown(); }); try { latch.await(); if (errorRef.get() != null) { throw (errorRef.get() instanceof RuntimeException runtimeException) ? runtimeException : new RuntimeException("Error during tool execution", errorRef.get()); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("Tool execution was interrupted", e); } validateToolCallbacks(toolCallbackList.toArray(new ToolCallback[0])); return toolCallbackList.toArray(new ToolCallback[0]); } private void validateToolCallbacks(ToolCallback[] toolCallbacks) { List<String> duplicateToolNames = ToolUtils.getDuplicateToolNames(toolCallbacks); if (!duplicateToolNames.isEmpty()) { throw new IllegalStateException( "Multiple tools with the same name (%s)".formatted(String.join(", ", duplicateToolNames))); } } private static Flux<ToolCallback> myAsyncToolCallbacks(List<McpAsyncClient> mcpClients) { if (CollectionUtils.isEmpty(mcpClients)) { return Flux.empty(); } return Flux.fromIterable(mcpClients).flatMap(mcpClient -> { // Check if client is initialized and initialize if needed Mono<McpAsyncClient> clientMono = mcpClient.isInitialized() ? Mono.just(mcpClient) : mcpClient.initialize().thenReturn(mcpClient); return clientMono.flatMap(client -> client.listTools()) .flatMapIterable(response -> response.tools()) .map(tool -> new MyAsyncMcpToolCallback(mcpClient, tool)); }); }}
AsyncMcpToolCallback: public class MyAsyncMcpToolCallback extends AsyncMcpToolCallback {private final McpAsyncClient asyncMcpClient; private final McpSchema.Tool tool; public MyAsyncMcpToolCallback(McpAsyncClient asyncMcpClient, McpSchema.Tool tool) { super(asyncMcpClient, tool); this.asyncMcpClient = asyncMcpClient; this.tool = tool; } @Override public ToolDefinition getToolDefinition() { return ToolDefinition.builder() .name(this.tool.name()) .description(this.tool.description()) .inputSchema(ModelOptionsUtils.toJsonString(this.tool.inputSchema())) .build(); } @Override public String call(String functionInput) { Map<String, Object> arguments = ModelOptionsUtils.jsonToMap(functionInput); CountDownLatch latch = new CountDownLatch(1); AtomicReference<String> result = new AtomicReference<>(); AtomicReference<Throwable> error = new AtomicReference<>(); this.asyncMcpClient.callTool(new McpSchema.CallToolRequest(this.getToolDefinition().name(), arguments)) .map(response -> ModelOptionsUtils.toJsonString(response.content())) .subscribe(value -> { result.set(value); latch.countDown(); }, throwable -> { error.set(throwable); latch.countDown(); }); try { latch.await(); if (error.get() != null) { if (error.get() instanceof RuntimeException) { throw (RuntimeException) error.get(); } else { throw new RuntimeException("Error during tool execution", error.get()); } } return result.get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("Tool execution was interrupted", e); } }} 2. Inject the custom AsyncMcpToolCallbackProvider into the Spring container. @autoConfiguration public class MyAiAutoConfiguration { @bean @ConditionalOnProperty(prefix = McpClientCommonProperties.CONFIG_PREFIX, name = "type", havingValue = "ASYNC") public ToolCallbackProvider myMcpAsyncToolCallbacks(ObjectProvider<List> mcpClientsProvider) { List mcpClients = mcpClientsProvider.stream().flatMap(List::stream).toList(); return new MyAsyncMcpToolCallbackProvider(mcpClients); } } 3. Pass the custom AsyncMcpToolCallback as a parameter during chat conversations.
public static ChatOptions buildChatOptions(AiPlatformEnum platform, String model, Double temperature, Integer maxTokens, Set<String> toolNames) { FunctionCallback[] mcpToolCallbacks = SpringUtils.getBean(MyAsyncMcpToolCallbackProvider.class).getToolCallbacks(); List<FunctionCallback> toolCallbackList = List.of(mcpToolCallbacks); return OpenAiChatOptions.builder().model(model).temperature(temperature).maxTokens(maxTokens) .toolNames(toolNames).functionCallbacks(toolCallbackList).build() }what is the version of your springai,my McpAsyncClient do not have .isInitialized() method.
old version 1.0.0-M6
what is the version of your springai,my McpAsyncClient do not have .isInitialized() method.
old version 1.0.0-M6
Thank you,this solution worked for me