spring-ai icon indicating copy to clipboard operation
spring-ai copied to clipboard

block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3

Open yukiofyume opened this issue 8 months ago • 16 comments
trafficstars

I hope to quickly learn Spring AI + ollama + deepseek r1 (local learning and deployment) through the official documentation, but when I call the interface /ai/generate based on the official documentation, I get the following error:

Image

Image Image

When I configure the following code, it can work normally.

@Configuration
public class Myconfig {
    @Bean
    public RestClient.Builder builder() {
        return RestClient.builder().requestFactory(new SimpleClientHttpRequestFactory());
    }
}
Image

Environment

ollama version is 0.5.12
Java version: zulu 21
    <dependencies>
        <dependency>
            <groupId>org.springframework.ai</groupId>
            <artifactId>spring-ai-ollama-spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
            <version>3.4.2</version>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-resolver-dns-native-macos</artifactId>
            <version>4.1.116.Final</version>
            <classifier>osx-aarch_64</classifier>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.ai</groupId>
                <artifactId>spring-ai-bom</artifactId>
                <version>1.0.0-M6</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

It might be more appropriate to add RestClient configuration near the specific content of the document, which can help beginners get feedback quickly.

yukiofyume avatar Mar 20 '25 16:03 yukiofyume

same issue,add RestClient.Builder work

heleihelei avatar Mar 21 '25 09:03 heleihelei

Ran into a similar issue in OpenAi's API. Was running an Embedding example from https://docs.spring.io/spring-ai/reference/api/embeddings/openai-embeddings.html , and it was that example code that produced the following error.

org.springframework.web.client.ResourceAccessException: I/O error on POST request for "https://api.openai.com/v1/embeddings": block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3 at org.springframework.web.client.DefaultRestClient$DefaultRequestBodyUriSpec.createResourceAccessException(DefaultRestClient.java:692) ~[spring-web-6.2.5.jar:6.2.5] at org.springframework.web.client.DefaultRestClient$DefaultRequestBodyUriSpec.exchangeInternal(DefaultRestClient.java:577) ~[spring-web-6.2.5.jar:6.2.5] at org.springframework.web.client.DefaultRestClient$DefaultRequestBodyUriSpec.exchange(DefaultRestClient.java:535) ~[spring-web-6.2.5.jar:6.2.5] at org.springframework.web.client.RestClient$RequestHeadersSpec.exchange(RestClient.java:677) ~[spring-web-6.2.5.jar:6.2.5] at org.springframework.web.client.DefaultRestClient$DefaultResponseSpec.executeAndExtract(DefaultRestClient.java:809) ~[spring-web-6.2.5.jar:6.2.5] at org.springframework.web.client.DefaultRestClient$DefaultResponseSpec.toEntityInternal(DefaultRestClient.java:769) ~[spring-web-6.2.5.jar:6.2.5] at org.springframework.web.client.DefaultRestClient$DefaultResponseSpec.toEntity(DefaultRestClient.java:765) ~[spring-web-6.2.5.jar:6.2.5] at org.springframework.ai.openai.api.OpenAiApi.embeddings(OpenAiApi.java:364) ~[spring-ai-openai-1.0.0-M6.jar:1.0.0-M6] at org.springframework.ai.openai.OpenAiEmbeddingModel.lambda$call$1(OpenAiEmbeddingModel.java:163) ~[spring-ai-openai-1.0.0-M6.jar:1.0.0-M6]

Vevvev avatar Mar 24 '25 21:03 Vevvev

bump

youcangetme avatar Apr 07 '25 01:04 youcangetme

Thanks for providing how you got this to work.

@Configuration
public class Myconfig {
    @Bean
    public RestClient.Builder builder() {
        return RestClient.builder().requestFactory(new SimpleClientHttpRequestFactory());
    }
}

We will investigate.

@youcangetme @yukiofyume Can you confirm if this is only with the deepseek model as we have not had these type of reports for other ollama hosted model. Note: there are been reports of deepseek not performing correctly when hosted by vllm - see https://github.com/spring-projects/spring-ai/issues/2427

markpollack avatar Apr 17 '25 20:04 markpollack

@markpollack @youcangetme When I only import spring-ai-ollama-spring-boot-starter:

@SpringBootApplication
public class SpringAIApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringAIApplication.class, args);
    }
}
Image

Therefore, I add:

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
            <version>3.4.2</version>
        </dependency>
Image

When I use qwen2.5:1.5b or gemma3:1b, call the interface /ai/generate, I get the same problem and it looks different #2427 . When I debug the RestClient initialize process and use example:

Image Image Image Image result: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3

After adding the following configuration, RestClient initialize as follows:

@Configuration
public class Myconfig {
    @Bean
    public RestClient.Builder builder() {
        return RestClient.builder().requestFactory(new SimpleClientHttpRequestFactory());
    }
}
Image Image Image result: {"generation":"I’m Gemma, a large language model created by the Gemma team at Google DeepMind. I’m openly available, which means you can use my weights. \n\n\nDo you have any specific questions you’d like me to answer?"}

I think it may be because RestClient exposing a fluent, synchronous API over underlying HTTP client, the default is to use ReactorClientHttpRequestFactory initialize ClientHttpRequestFactory, resulting in the above error

yukiofyume avatar Apr 18 '25 15:04 yukiofyume

spring.http.client.factory=simple

should also work

making avatar Apr 18 '25 19:04 making

Thanks for providing how you got this to work.

@Configuration
public class Myconfig {
    @Bean
    public RestClient.Builder builder() {
        return RestClient.builder().requestFactory(new SimpleClientHttpRequestFactory());
    }
}

We will investigate.

@youcangetme @yukiofyume Can you confirm if this is only with the deepseek model as we have not had these type of reports for other ollama hosted model. Note: there are been reports of deepseek not performing correctly when hosted by vllm - see #2427

I was having it happen with OpenAI, not Deepseek.

Here's the line that was throwing out the blocking

Image

and the imports I have in my pom file.

Image

I did update the spring-ai.version in the Pom to 1.0.0-M7 over 1.0.0-M6, but ran into the same blocking problem.

Vevvev avatar Apr 18 '25 19:04 Vevvev

I tried to configure RestClient as described above, but it fail to resolve this error. I have a question: why use .block() in an Async Function? In reactive programming (e.g., WebFlux), invoking .block() on a reactor-http-nio thread will result in this error. I think this is why the program shows the error 'block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3'. Image

WindowsXP-XP avatar May 06 '25 11:05 WindowsXP-XP

I tried to configure RestClient as described above, but it fail to resolve this error. I have a question: why use .block() in an Async Function? In reactive programming (e.g., WebFlux), invoking .block() on a reactor-http-nio thread will result in this error. I think this is why the program shows the error 'block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3'. Image

I encounter same issue during a streaming chat with gpt enabled with tools wrapped in AsyncMcpToolCallbackProvider.

I think during chat streaming we are not able to use SyncMcpToolCallbackProvider since it invokes McpSyncClient and then invokes block() as well. public McpSchema.CallToolResult callTool(McpSchema.CallToolRequest callToolRequest) { return this.delegate.callTool(callToolRequest).block(); } And current AsyncMcpToolCallbackProvider seems need to be redesigned.

shaohzhangebay avatar May 08 '25 00:05 shaohzhangebay

I tried to configure RestClient as described above, but it fail to resolve this error. I have a question: why use .block() in an Async Function? In reactive programming (e.g., WebFlux), invoking .block() on a reactor-http-nio thread will result in this error. I think this is why the program shows the error 'block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3'. Image

Found this https://github.com/spring-projects/spring-ai/issues/2341 and tested it is working for me.

shaohzhangebay avatar May 08 '25 04:05 shaohzhangebay

I tried to configure RestClient as described above, but it fail to resolve this error. I have a question: why use .block() in an Async Function? In reactive programming (e.g., WebFlux), invoking .block() on a reactor-http-nio thread will result in this error. I think this is why the program shows the error 'block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3'. Image

Found this #2341 and tested it is working for me.

Thank you. Based on your reference plan, I drafted a provisional solution:

  1. Create a custom AsyncMcpToolCallbackProvider and AsyncMcpToolCallback. AsyncMcpToolCallbackProvider:
public class MyAsyncMcpToolCallbackProvider extends AsyncMcpToolCallbackProvider {

    private final List<McpAsyncClient> mcpClients;

    private final BiPredicate<McpAsyncClient, McpSchema.Tool> toolFilter;

    public MyAsyncMcpToolCallbackProvider(BiPredicate<McpAsyncClient, McpSchema.Tool> toolFilter, List<McpAsyncClient> mcpClients) {
        Assert.notNull(mcpClients, "MCP clients must not be null");
        Assert.notNull(toolFilter, "Tool filter must not be null");
        this.mcpClients = mcpClients;
        this.toolFilter = toolFilter;
    }

    public MyAsyncMcpToolCallbackProvider(List<McpAsyncClient> mcpClients) {
        this((mcpClient, tool) -> true, mcpClients);
    }

    public MyAsyncMcpToolCallbackProvider(BiPredicate<McpAsyncClient, McpSchema.Tool> toolFilter, McpAsyncClient... mcpClients) {
        this(toolFilter, List.of(mcpClients));
    }

    public MyAsyncMcpToolCallbackProvider(McpAsyncClient... mcpClients) {
        this(List.of(mcpClients));
    }

    @Override
    public ToolCallback[] getToolCallbacks() {

        CountDownLatch latch = new CountDownLatch(1);
        CopyOnWriteArrayList<ToolCallback> toolCallbackList = new CopyOnWriteArrayList<>();
        AtomicReference<Throwable> errorRef = new AtomicReference<>();

        this.myAsyncToolCallbacks(this.mcpClients).subscribe(toolCallback -> {
            toolCallbackList.add(toolCallback);
        }, error -> {
            errorRef.set(error);
            latch.countDown();
        }, () -> {
            latch.countDown();
        });

        try {
            latch.await();
            if (errorRef.get() != null) {
                throw (errorRef.get() instanceof RuntimeException runtimeException) ? runtimeException
                        : new RuntimeException("Error during tool execution", errorRef.get());
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Tool execution was interrupted", e);
        }

        validateToolCallbacks(toolCallbackList.toArray(new ToolCallback[0]));

        return toolCallbackList.toArray(new ToolCallback[0]);
    }

    private void validateToolCallbacks(ToolCallback[] toolCallbacks) {
        List<String> duplicateToolNames = ToolUtils.getDuplicateToolNames(toolCallbacks);
        if (!duplicateToolNames.isEmpty()) {
            throw new IllegalStateException(
                    "Multiple tools with the same name (%s)".formatted(String.join(", ", duplicateToolNames)));
        }
    }

    private static Flux<ToolCallback> myAsyncToolCallbacks(List<McpAsyncClient> mcpClients) {
        if (CollectionUtils.isEmpty(mcpClients)) {
            return Flux.empty();
        }

        return Flux.fromIterable(mcpClients).flatMap(mcpClient -> {
            // Check if client is initialized and initialize if needed
            Mono<McpAsyncClient> clientMono = mcpClient.isInitialized() ? Mono.just(mcpClient)
                    : mcpClient.initialize().thenReturn(mcpClient);

            return clientMono.flatMap(client -> client.listTools())
                    .flatMapIterable(response -> response.tools())
                    .map(tool -> new MyAsyncMcpToolCallback(mcpClient, tool));
        });
    }

}

AsyncMcpToolCallback:

public class MyAsyncMcpToolCallback extends AsyncMcpToolCallback {

    private final McpAsyncClient asyncMcpClient;

    private final McpSchema.Tool tool;

    public MyAsyncMcpToolCallback(McpAsyncClient asyncMcpClient, McpSchema.Tool tool) {
        super(asyncMcpClient, tool);
        this.asyncMcpClient = asyncMcpClient;
        this.tool = tool;
    }

    @Override
    public ToolDefinition getToolDefinition() {
        return ToolDefinition.builder()
                .name(this.tool.name())
                .description(this.tool.description())
                .inputSchema(ModelOptionsUtils.toJsonString(this.tool.inputSchema()))
                .build();
    }

    @Override
    public String call(String functionInput) {
        Map<String, Object> arguments = ModelOptionsUtils.jsonToMap(functionInput);
        CountDownLatch latch = new CountDownLatch(1);
        AtomicReference<String> result = new AtomicReference<>();
        AtomicReference<Throwable> error = new AtomicReference<>();

        this.asyncMcpClient.callTool(new McpSchema.CallToolRequest(this.getToolDefinition().name(), arguments))
                .map(response -> ModelOptionsUtils.toJsonString(response.content()))
                .subscribe(value -> {
                    result.set(value);
                    latch.countDown();
                }, throwable -> {
                    error.set(throwable);
                    latch.countDown();
                });

        try {
            latch.await();
            if (error.get() != null) {
                if (error.get() instanceof RuntimeException) {
                    throw (RuntimeException) error.get();
                } else {
                    throw new RuntimeException("Error during tool execution", error.get());
                }
            }
            return result.get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Tool execution was interrupted", e);
        }
    }
}
  1. Inject the custom AsyncMcpToolCallbackProvider into the Spring container.
@AutoConfiguration
public class MyAiAutoConfiguration {
    @Bean
    @ConditionalOnProperty(prefix = McpClientCommonProperties.CONFIG_PREFIX, name = "type", havingValue = "ASYNC")
    public ToolCallbackProvider myMcpAsyncToolCallbacks(ObjectProvider<List<McpAsyncClient>> mcpClientsProvider) {
        List<McpAsyncClient> mcpClients = mcpClientsProvider.stream().flatMap(List::stream).toList();
        return new MyAsyncMcpToolCallbackProvider(mcpClients);
    }
}
  1. Pass the custom AsyncMcpToolCallback as a parameter during chat conversations.
    public static ChatOptions buildChatOptions(AiPlatformEnum platform, String model, Double temperature, Integer maxTokens,
                                               Set<String> toolNames) {
        FunctionCallback[] mcpToolCallbacks = SpringUtils.getBean(MyAsyncMcpToolCallbackProvider.class).getToolCallbacks();
        List<FunctionCallback> toolCallbackList = List.of(mcpToolCallbacks);
        return OpenAiChatOptions.builder().model(model).temperature(temperature).maxTokens(maxTokens)
                        .toolNames(toolNames).functionCallbacks(toolCallbackList).build()
    }

WindowsXP-XP avatar May 08 '25 09:05 WindowsXP-XP

@WindowsXP-XP

Found this https://github.com/spring-projects/spring-ai/issues/2341 and tested it is working for me.

and then you reply

Thank you. Based on your reference plan, I drafted a provisional solution:

So the #2341 didn't fix the issue for you?

There is another thing going on here wrt to the comment spring.http.client.factory=simple should also work.

unfortunately there seems to be a bug in boot wrt to picking a the httpclient when reactor is on the classpath for nonreactive RestClient.

Whenever there are reactive dependencies in the classpath (which is always the case in spring ai), Spring Boot uses Reactor to implement RestClient instead of standard JDK HttpClient, even if the application is not a reactive application.

setting spring.http.client.factory=jdk also should help.

However, this may not be the issue you are reporting, I'm not clear on what problem the solution you are providing is trying to fix.

markpollack avatar May 09 '25 15:05 markpollack

@WindowsXP-XP

Found this #2341 and tested it is working for me.

and then you reply

Thank you. Based on your reference plan, I drafted a provisional solution:

So the #2341 didn't fix the issue for you?

There is another thing going on here wrt to the comment should also work.spring.http.client.factory=simple

unfortunately there seems to be a bug in boot wrt to picking a the httpclient when reactor is on the classpath for nonreactive RestClient.

Whenever there are reactive dependencies in the classpath (which is always the case in spring ai), Spring Boot uses Reactor to implement RestClient instead of standard JDK HttpClient, even if the application is not a reactive application.

setting also should help.spring.http.client.factory=jdk

However, this may not be the issue you are reporting, I'm not clear on what problem the solution you are providing is trying to fix.

Neither setting spring.http.client.factory=simple nor spring.http.client.factory=jdk resolved my issue.

This is only a temporary solution. I haven't tried the approach in #2341 yet. I noticed he resolved it by modifying the ChatModels of different AI platforms, but I believe implementing unified handling when calling ChatModels would be preferable.

My provisional solution is based on Came up with some (ugly) workaround to overwrite/replace AsyncMcpToolCallbackProvider and wrap AsyncMcpToolCallbacks with a custom ToolCallback implementation that uses a ThreadPoolTaskExecuto in #2341

The Spring AI version referenced in #2341 has not been published to Maven Central, and third-party component support for this unreleased version remains limited. Therefore, I conclude that customizing AsyncMcpToolCallbackProvider and AsyncMcpToolCallbacks within our project currently represents:

  • The least invasive approach

  • The minimal code change requirement

  • The lowest risk of dependency conflicts

This why I choose this temporary solution

WindowsXP-XP avatar May 10 '25 01:05 WindowsXP-XP

I tried to configure RestClient as described above, but it fail to resolve this error. I have a question: why use .block() in an Async Function? In reactive programming (e.g., WebFlux), invoking .block() on a reactor-http-nio thread will result in this error. I think this is why the program shows the error 'block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3'. Image

Found this #2341 and tested it is working for me.

Thank you. Based on your reference plan, I drafted a provisional solution:

  1. Create a custom AsyncMcpToolCallbackProvider and AsyncMcpToolCallback. AsyncMcpToolCallbackProvider:

public class MyAsyncMcpToolCallbackProvider extends AsyncMcpToolCallbackProvider {

private final List<McpAsyncClient> mcpClients;

private final BiPredicate<McpAsyncClient, McpSchema.Tool> toolFilter;

public MyAsyncMcpToolCallbackProvider(BiPredicate<McpAsyncClient, McpSchema.Tool> toolFilter, List<McpAsyncClient> mcpClients) {
    Assert.notNull(mcpClients, "MCP clients must not be null");
    Assert.notNull(toolFilter, "Tool filter must not be null");
    this.mcpClients = mcpClients;
    this.toolFilter = toolFilter;
}

public MyAsyncMcpToolCallbackProvider(List<McpAsyncClient> mcpClients) {
    this((mcpClient, tool) -> true, mcpClients);
}

public MyAsyncMcpToolCallbackProvider(BiPredicate<McpAsyncClient, McpSchema.Tool> toolFilter, McpAsyncClient... mcpClients) {
    this(toolFilter, List.of(mcpClients));
}

public MyAsyncMcpToolCallbackProvider(McpAsyncClient... mcpClients) {
    this(List.of(mcpClients));
}

@Override
public ToolCallback[] getToolCallbacks() {

    CountDownLatch latch = new CountDownLatch(1);
    CopyOnWriteArrayList<ToolCallback> toolCallbackList = new CopyOnWriteArrayList<>();
    AtomicReference<Throwable> errorRef = new AtomicReference<>();

    this.myAsyncToolCallbacks(this.mcpClients).subscribe(toolCallback -> {
        toolCallbackList.add(toolCallback);
    }, error -> {
        errorRef.set(error);
        latch.countDown();
    }, () -> {
        latch.countDown();
    });

    try {
        latch.await();
        if (errorRef.get() != null) {
            throw (errorRef.get() instanceof RuntimeException runtimeException) ? runtimeException
                    : new RuntimeException("Error during tool execution", errorRef.get());
        }
    }
    catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new RuntimeException("Tool execution was interrupted", e);
    }

    validateToolCallbacks(toolCallbackList.toArray(new ToolCallback[0]));

    return toolCallbackList.toArray(new ToolCallback[0]);
}

private void validateToolCallbacks(ToolCallback[] toolCallbacks) {
    List<String> duplicateToolNames = ToolUtils.getDuplicateToolNames(toolCallbacks);
    if (!duplicateToolNames.isEmpty()) {
        throw new IllegalStateException(
                "Multiple tools with the same name (%s)".formatted(String.join(", ", duplicateToolNames)));
    }
}

private static Flux<ToolCallback> myAsyncToolCallbacks(List<McpAsyncClient> mcpClients) {
    if (CollectionUtils.isEmpty(mcpClients)) {
        return Flux.empty();
    }

    return Flux.fromIterable(mcpClients).flatMap(mcpClient -> {
        // Check if client is initialized and initialize if needed
        Mono<McpAsyncClient> clientMono = mcpClient.isInitialized() ? Mono.just(mcpClient)
                : mcpClient.initialize().thenReturn(mcpClient);

        return clientMono.flatMap(client -> client.listTools())
                .flatMapIterable(response -> response.tools())
                .map(tool -> new MyAsyncMcpToolCallback(mcpClient, tool));
    });
}

} AsyncMcpToolCallback:

public class MyAsyncMcpToolCallback extends AsyncMcpToolCallback {

private final McpAsyncClient asyncMcpClient;

private final McpSchema.Tool tool;

public MyAsyncMcpToolCallback(McpAsyncClient asyncMcpClient, McpSchema.Tool tool) {
    super(asyncMcpClient, tool);
    this.asyncMcpClient = asyncMcpClient;
    this.tool = tool;
}

@Override
public ToolDefinition getToolDefinition() {
    return ToolDefinition.builder()
            .name(this.tool.name())
            .description(this.tool.description())
            .inputSchema(ModelOptionsUtils.toJsonString(this.tool.inputSchema()))
            .build();
}

@Override
public String call(String functionInput) {
    Map<String, Object> arguments = ModelOptionsUtils.jsonToMap(functionInput);
    CountDownLatch latch = new CountDownLatch(1);
    AtomicReference<String> result = new AtomicReference<>();
    AtomicReference<Throwable> error = new AtomicReference<>();

    this.asyncMcpClient.callTool(new McpSchema.CallToolRequest(this.getToolDefinition().name(), arguments))
            .map(response -> ModelOptionsUtils.toJsonString(response.content()))
            .subscribe(value -> {
                result.set(value);
                latch.countDown();
            }, throwable -> {
                error.set(throwable);
                latch.countDown();
            });

    try {
        latch.await();
        if (error.get() != null) {
            if (error.get() instanceof RuntimeException) {
                throw (RuntimeException) error.get();
            } else {
                throw new RuntimeException("Error during tool execution", error.get());
            }
        }
        return result.get();
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new RuntimeException("Tool execution was interrupted", e);
    }
}

} 2. Inject the custom AsyncMcpToolCallbackProvider into the Spring container.

@AutoConfiguration public class MyAiAutoConfiguration { @Bean @ConditionalOnProperty(prefix = McpClientCommonProperties.CONFIG_PREFIX, name = "type", havingValue = "ASYNC") public ToolCallbackProvider myMcpAsyncToolCallbacks(ObjectProvider<List<McpAsyncClient>> mcpClientsProvider) { List<McpAsyncClient> mcpClients = mcpClientsProvider.stream().flatMap(List::stream).toList(); return new MyAsyncMcpToolCallbackProvider(mcpClients); } } 3. Pass the custom AsyncMcpToolCallback as a parameter during chat conversations.

public static ChatOptions buildChatOptions(AiPlatformEnum platform, String model, Double temperature, Integer maxTokens,
                                           Set<String> toolNames) {
    FunctionCallback[] mcpToolCallbacks = SpringUtils.getBean(MyAsyncMcpToolCallbackProvider.class).getToolCallbacks();
    List<FunctionCallback> toolCallbackList = List.of(mcpToolCallbacks);
    return OpenAiChatOptions.builder().model(model).temperature(temperature).maxTokens(maxTokens)
                    .toolNames(toolNames).functionCallbacks(toolCallbackList).build()
}

what is the version of your springai,my McpAsyncClient do not have .isInitialized() method.

zy1260957619 avatar Jul 11 '25 02:07 zy1260957619

I tried to configure RestClient as described above, but it fail to resolve this error. I have a question: why use .block() in an Async Function? In reactive programming (e.g., WebFlux), invoking .block() on a reactor-http-nio thread will result in this error. I think this is why the program shows the error 'block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3'. Image

Found this #2341 and tested it is working for me.

Thank you. Based on your reference plan, I drafted a provisional solution:

  1. Create a custom AsyncMcpToolCallbackProvider and AsyncMcpToolCallback. AsyncMcpToolCallbackProvider:

public class MyAsyncMcpToolCallbackProvider extends AsyncMcpToolCallbackProvider {

private final List<McpAsyncClient> mcpClients;

private final BiPredicate<McpAsyncClient, McpSchema.Tool> toolFilter;

public MyAsyncMcpToolCallbackProvider(BiPredicate<McpAsyncClient, McpSchema.Tool> toolFilter, List<McpAsyncClient> mcpClients) {
    Assert.notNull(mcpClients, "MCP clients must not be null");
    Assert.notNull(toolFilter, "Tool filter must not be null");
    this.mcpClients = mcpClients;
    this.toolFilter = toolFilter;
}

public MyAsyncMcpToolCallbackProvider(List<McpAsyncClient> mcpClients) {
    this((mcpClient, tool) -> true, mcpClients);
}

public MyAsyncMcpToolCallbackProvider(BiPredicate<McpAsyncClient, McpSchema.Tool> toolFilter, McpAsyncClient... mcpClients) {
    this(toolFilter, List.of(mcpClients));
}

public MyAsyncMcpToolCallbackProvider(McpAsyncClient... mcpClients) {
    this(List.of(mcpClients));
}

@Override
public ToolCallback[] getToolCallbacks() {

    CountDownLatch latch = new CountDownLatch(1);
    CopyOnWriteArrayList<ToolCallback> toolCallbackList = new CopyOnWriteArrayList<>();
    AtomicReference<Throwable> errorRef = new AtomicReference<>();

    this.myAsyncToolCallbacks(this.mcpClients).subscribe(toolCallback -> {
        toolCallbackList.add(toolCallback);
    }, error -> {
        errorRef.set(error);
        latch.countDown();
    }, () -> {
        latch.countDown();
    });

    try {
        latch.await();
        if (errorRef.get() != null) {
            throw (errorRef.get() instanceof RuntimeException runtimeException) ? runtimeException
                    : new RuntimeException("Error during tool execution", errorRef.get());
        }
    }
    catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new RuntimeException("Tool execution was interrupted", e);
    }

    validateToolCallbacks(toolCallbackList.toArray(new ToolCallback[0]));

    return toolCallbackList.toArray(new ToolCallback[0]);
}

private void validateToolCallbacks(ToolCallback[] toolCallbacks) {
    List<String> duplicateToolNames = ToolUtils.getDuplicateToolNames(toolCallbacks);
    if (!duplicateToolNames.isEmpty()) {
        throw new IllegalStateException(
                "Multiple tools with the same name (%s)".formatted(String.join(", ", duplicateToolNames)));
    }
}

private static Flux<ToolCallback> myAsyncToolCallbacks(List<McpAsyncClient> mcpClients) {
    if (CollectionUtils.isEmpty(mcpClients)) {
        return Flux.empty();
    }

    return Flux.fromIterable(mcpClients).flatMap(mcpClient -> {
        // Check if client is initialized and initialize if needed
        Mono<McpAsyncClient> clientMono = mcpClient.isInitialized() ? Mono.just(mcpClient)
                : mcpClient.initialize().thenReturn(mcpClient);

        return clientMono.flatMap(client -> client.listTools())
                .flatMapIterable(response -> response.tools())
                .map(tool -> new MyAsyncMcpToolCallback(mcpClient, tool));
    });
}

} AsyncMcpToolCallback: public class MyAsyncMcpToolCallback extends AsyncMcpToolCallback {

private final McpAsyncClient asyncMcpClient;

private final McpSchema.Tool tool;

public MyAsyncMcpToolCallback(McpAsyncClient asyncMcpClient, McpSchema.Tool tool) {
    super(asyncMcpClient, tool);
    this.asyncMcpClient = asyncMcpClient;
    this.tool = tool;
}

@Override
public ToolDefinition getToolDefinition() {
    return ToolDefinition.builder()
            .name(this.tool.name())
            .description(this.tool.description())
            .inputSchema(ModelOptionsUtils.toJsonString(this.tool.inputSchema()))
            .build();
}

@Override
public String call(String functionInput) {
    Map<String, Object> arguments = ModelOptionsUtils.jsonToMap(functionInput);
    CountDownLatch latch = new CountDownLatch(1);
    AtomicReference<String> result = new AtomicReference<>();
    AtomicReference<Throwable> error = new AtomicReference<>();

    this.asyncMcpClient.callTool(new McpSchema.CallToolRequest(this.getToolDefinition().name(), arguments))
            .map(response -> ModelOptionsUtils.toJsonString(response.content()))
            .subscribe(value -> {
                result.set(value);
                latch.countDown();
            }, throwable -> {
                error.set(throwable);
                latch.countDown();
            });

    try {
        latch.await();
        if (error.get() != null) {
            if (error.get() instanceof RuntimeException) {
                throw (RuntimeException) error.get();
            } else {
                throw new RuntimeException("Error during tool execution", error.get());
            }
        }
        return result.get();
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new RuntimeException("Tool execution was interrupted", e);
    }
}

} 2. Inject the custom AsyncMcpToolCallbackProvider into the Spring container. @autoConfiguration public class MyAiAutoConfiguration { @bean @ConditionalOnProperty(prefix = McpClientCommonProperties.CONFIG_PREFIX, name = "type", havingValue = "ASYNC") public ToolCallbackProvider myMcpAsyncToolCallbacks(ObjectProvider<List> mcpClientsProvider) { List mcpClients = mcpClientsProvider.stream().flatMap(List::stream).toList(); return new MyAsyncMcpToolCallbackProvider(mcpClients); } } 3. Pass the custom AsyncMcpToolCallback as a parameter during chat conversations.

public static ChatOptions buildChatOptions(AiPlatformEnum platform, String model, Double temperature, Integer maxTokens,
                                           Set<String> toolNames) {
    FunctionCallback[] mcpToolCallbacks = SpringUtils.getBean(MyAsyncMcpToolCallbackProvider.class).getToolCallbacks();
    List<FunctionCallback> toolCallbackList = List.of(mcpToolCallbacks);
    return OpenAiChatOptions.builder().model(model).temperature(temperature).maxTokens(maxTokens)
                    .toolNames(toolNames).functionCallbacks(toolCallbackList).build()
}

what is the version of your springai,my McpAsyncClient do not have .isInitialized() method.

old version 1.0.0-M6

WindowsXP-XP avatar Jul 11 '25 02:07 WindowsXP-XP

what is the version of your springai,my McpAsyncClient do not have .isInitialized() method.

old version 1.0.0-M6

Thank you,this solution worked for me

zy1260957619 avatar Jul 14 '25 04:07 zy1260957619