spring-ai
spring-ai copied to clipboard
使用spring-ai-starter-mcp-client sse方式连接mcp server,重启mcp server后,以前的client不会自动重连。
1.0.0-M7 版本: client 采用sse 方式连接上 mcp server,可以正常运行;这个时候重启 mcp server, client 端就不会重连 mcp server,导致一直报错 reactor.core.Exceptions$ReactiveException: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 20000ms in 'source(MonoCreate)' (and no fallback has been configured) at reactor.core.Exceptions.propagate(Exceptions.java:410) at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:102)
怎么设置重连mcp server?
应该只能重新创建一个mcpclient对象 或者重新实现一个client可以重连的
I also encountered this problem. Every time I restarted mcp-server, I needed to restart the client to recover. If I only restarted mcp-server, a 404 error would appear. I didn't see any reconnection configuration in the official documentation.
org.springframework.web.reactive.function.client.WebClientResponseException$NotFound: 404 Not Found from POST http://localhost:9080/mcp/message at org.springframework.web.reactive.function.client.WebClientResponseException.create(WebClientResponseException.java:324) ~[spring-webflux-6.1.14.jar:6.1.14] Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Error has been observed at the following site(s): *__checkpoint ⇢ 404 NOT_FOUND from POST http://localhost:9080/mcp/message [DefaultWebClient]
这都不用等服务器重启,我连着高德的MCP,等个几分钟就断开了,然后他就不连了.... 怎么办???
这都不用等服务器重启,我连着高德的MCP,等个几分钟就断开了,然后他就不连了.... 怎么办???
目前可以不用autoconfigure,自己在代码中实现 mcpClient 的初始化,用mcpClient.ping() 来判断是否正常,不正常重新实例化一个新的mcpClient。
这都不用等服务器重启,我连着高德的MCP,等个几分钟就断开了,然后他就不连了.... 怎么办???
目前可以不用autoconfigure,自己在代码中实现 mcpClient 的初始化,用mcpClient.ping() 来判断是否正常,不正常重新实例化一个新的mcpClient。
大佬,请问有示例代码么?我尝试这个方案还是有问题,重新实例化时,mcp server会报错: session is null
@zhouwenjun-hub 去看看autoconfigure包是怎么写的你就会了。
stare
这都不用等服务器重启,我连着高德的MCP,等个几分钟就断开了,然后他就不连了.... 怎么办???
目前可以不用autoconfigure,自己在代码中实现 mcpClient 的初始化,用mcpClient.ping() 来判断是否正常,不正常重新实例化一个新的mcpClient。
麻烦您能不能贴一下关键代码,我也遇到这个问题,我重新调用mcpSyncClient.initialize(); 方法还是无法重连。
这都不用等服务器重启,我连着高德的MCP,等个几分钟就断开了,然后他就不连了....怎么办???
目前可以不用autoconfigure,自己在代码中实现 mcpClient 的初始化,用mcpClient.ping() 来判断是否正常,不正常重新实例化一个新的mcpClient。
麻烦您能不能贴一下关键代码,我也遇到这个问题,我重新调用mcpSyncClient.initialize();方法还是无法重连。
您好,请问一下,您解决了吗
@Configuration @EnableConfigurationProperties public class McpConfig { private static final Logger LOGGER = LoggerFactory.getLogger(McpConfig.class); private McpSyncClient mcpClient; private List<ToolCallback> toolCallbacks;
/**
* MCP客户端的URL地址,用于建立与MCP服务器的连接。
*/
@Value("${spring.ai.mcp.client.sse.connections.server1.url}")
private String mcpServerUrl;
/**
* 创建或重建MCP客户端。该方法会关闭旧客户端实例(如果存在),并初始化新的MCP客户端及关联的工具回调。
* <p>
* 同步方法确保客户端创建过程的线程安全。
*/
private synchronized void createMcpClient() {
// 关闭旧的MCP客户端(如果存在)
if (mcpClient != null) {
try {
mcpClient.close();
} catch (final Exception e) {
LOGGER.error("关闭旧客户端失败", e);
}
}
// 创建并配置新的MCP客户端传输层
final WebClient.Builder builder = WebClient.builder().baseUrl(mcpServerUrl);
final WebFluxSseClientTransport transport = new WebFluxSseClientTransport(builder);
final McpSyncClient newClient = McpClient.sync(transport).build();
// 初始化新客户端并记录结果
final McpSchema.InitializeResult init = newClient.initialize();
LOGGER.info("<---------------->MCP Initialized: {}", init);
// 更新MCP客户端引用和工具回调
mcpClient = newClient;
toolCallbacks = List.of(new SyncMcpToolCallbackProvider(mcpClient).getToolCallbacks());
}
/**
* 获取工具回调列表。该方法会确保MCP客户端处于有效连接状态,若连接异常则自动重建客户端。
* @return 已配置的工具回调列表
*/
public List<ToolCallback> getToolCallback() {
if (mcpClient == null) {
// 如果MCP客户端未初始化,则创建并直接返回回调列表
createMcpClient();
return toolCallbacks;
}
try {
// 检查与服务端的连接状态
mcpClient.ping();
} catch (final Exception e) {
// 捕获连接异常,记录错误日志并重建客户端
LOGGER.error("<---------------->MCP ping error: {}", e.getLocalizedMessage());
createMcpClient();
}
return toolCallbacks;
}
/**
* 在Bean初始化时创建MCP客户端。
*/
@PostConstruct
public void init() {
createMcpClient(); // 在Bean初始化时强制执行
}
}
这个方法可行,在这个代码基础上,我把自动配置chatClient改成自己配置,并且设定一个定时任务,定时ping mcp server,断开就尝试重新初始化chatClient,主要是这个chatClient 的tools每次都是增量添加,导致我断开一次后再添加会报错 tool 重复,所以只能每次重新初始化chatClient
private static final Logger LOGGER = LoggerFactory.getLogger(McpConfig.class); private McpSyncClient mcpClient; private List toolCallbacks;
666啊兄弟,收下我的膝盖
这个方法可行,在这个代码基础上,我把自动配置chatClient改成自己配置,并且设定一个定时任务,定时ping mcp server,断开就尝试重新初始化chatClient,主要是这个chatClient的tools每次都是增量添加,导致我断开一次后再添加会报错 tool 重复,所以只能每次重新初始化chatClient
可以附一下代码吗兄弟?不,义父0.0
这个方法可行,在这个代码基础上,我把自动配置chatClient改成自己配置,并且设定一个定时任务,定时ping mcp server,断开就尝试重新初始化chatClient,主要是这个chatClient的tools每次都是增量添加,导致我断开一次后再添加会报错 tool 重复,所以只能每次重新初始化chatClient
可以附一下代码吗兄弟?不,义父0.0
配置文件关闭client自动配置: ai: mcp: client: enabled: false
@Autowired
private OpenAiChatModel chatModel;
在上面的那位兄弟的基础上添加了这个方法,定时任务应该不用贴了吧
这个方法可行,在这个代码基础上,我把自动配置chatClient改成自己配置,并且设定一个定时任务,定时ping mcp server,断开就尝试重新初始化chatClient,主要是这个chatClient的tools每次都是增量添加,导致我断开一次后再添加会报错 tool 重复,所以只能每次重新初始化chatClient
可以附一下代码吗兄弟?不,义父0.0
配置文件关闭client自动配置: ai: mcp: client: enabled: false
@Autowired private OpenAiChatModel chatModel;
在上面的那位兄弟的基础上添加了这个方法,定时任务应该不用贴了吧
谢谢大佬
这个方法可行,在这个代码基础上,我把自动配置chatClient改成自己配置,并且设定一个定时任务,定时ping mcp server,断开就尝试重新初始化chatClient,主要是这个chatClient的tools每次都是增量添加,导致我断开一次后再添加会报错 tool 重复,所以只能每次重新初始化chatClient
可以附一下代码吗兄弟?不,义父0.0
配置文件关闭client自动配置: ai: mcp: client: enabled: false
@Autowired private OpenAiChatModel chatModel;
在上面的那位兄弟的基础上添加了这个方法,定时任务应该不用贴了吧
用InMemoryChatMemory的话,重新初始化chatClient会丢失现有的用户会话状态跟记录。可以使用alibaba的RedisChatMemory实现,正好我的VectorStore也是用的Redis。https://github.com/alibaba/spring-ai-alibaba/blob/main/community/memories/spring-ai-alibaba-redis-memory/src/main/java/com/alibaba/cloud/ai/memory/redis/RedisChatMemory.java
这个方法可行,在这个代码基础上,我把自动配置chatClient改成自己配置,并且设定一个定时任务,定时ping mcp server,断开就尝试重新初始化chatClient,主要是这个chatClient的tools每次都是增量添加,导致我断开一次后再添加会报错 tool 重复,所以只能每次重新初始化chatClient
可以附一下代码吗兄弟?不,义父0.0
配置文件关闭client自动配置: ai: mcp: client: enabled: false
@Autowired private OpenAiChatModel chatModel;
在上面的那位兄弟的基础上添加了这个方法,定时任务应该不用贴了吧
用InMemoryChatMemory的话,重新初始化chatClient会丢失现有的用户会话状态跟记录。可以使用alibaba的RedisChatMemory实现,正好我的VectorStore也是用的Redis。https://github.com/alibaba/spring-ai-alibaba/blob/main/community/memories/spring-ai-alibaba-redis-memory/src/main/java/com/alibaba/cloud/ai/memory/redis/RedisChatMemory.java
666666,我才发现
贴一下代码,<spring-ai.version>1.0.0-M7</spring-ai.version>,不需要使用advisor的可以去掉。
@Slf4j
@Service
@EnableScheduling
@EnableConfigurationProperties({McpSseClientProperties.class})
public class CustomerSupportAssistant {
private final McpSseClientProperties sseProperties;
public CustomerSupportAssistant(McpSseClientProperties sseProperties) {
this.sseProperties = sseProperties;
}
@Resource
ChatModel chatModel;
@Resource
VectorStore vectorStore;
@Resource
ChatMemory chatMemory;
@Resource
ConversationAdvisor conversationAdvisor;
private ChatClient chatClient;
//more than one mcp-client
private static final List<McpSyncClient> mcpSyncClients = new CopyOnWriteArrayList<>();
@Scheduled(cron = "0 * * * * ?")
public void ping() {
if (mcpSyncClients.isEmpty()) {
initChatClient();
} else {
Iterator<McpSyncClient> iterator = mcpSyncClients.iterator();
while (iterator.hasNext()) {
McpSyncClient mcpSyncClient = iterator.next();
try {
mcpSyncClient.ping();
log.info("ping mcp client success:{}", mcpSyncClient.getClientInfo().name());
} catch (Exception e) {
log.error("ping mcp client error:{}", mcpSyncClient.getClientInfo().name());
//close & remove old mcp-client
mcpSyncClient.close();
iterator.remove();
//re-init
initChatClient();
}
}
}
}
@PostConstruct
private void initChatClient() {
this.chatClient = ChatClient.builder(chatModel)
.defaultSystem(new ClassPathResource(SYSTEM_PROMPT_TEMPLATE_PATH))
.defaultAdvisors(
new MessageChatMemoryAdvisor(chatMemory),
new QuestionAnswerAdvisor(vectorStore, SearchRequest.builder().topK(3).build()),
new ReReadingAdvisor(),
new LoggingAdvisor(),
conversationAdvisor
)
.defaultTools(
initToolCallbacks()
)
.build();
}
private synchronized List<ToolCallback> initToolCallbacks() {
List<ToolCallback> toolCallbacks = new ArrayList<>();
this.sseProperties.getConnections().forEach((key, value) -> {
final WebClient.Builder builder = WebClient.builder().baseUrl(value.url());
final WebFluxSseClientTransport transport = new WebFluxSseClientTransport(builder);
final McpSyncClient mcpSyncClient = McpClient.sync(transport)
.clientInfo(new McpSchema.Implementation(
"mcp-client-".concat(value.url()),
"1.0.0"))
.build();
try {
McpSchema.InitializeResult initialize = mcpSyncClient.initialize();
log.info("initialize mcp client: {} mcp server :{}", mcpSyncClient.getClientInfo().name(), initialize.serverInfo().name());
mcpSyncClients.add(mcpSyncClient);
toolCallbacks.addAll(List.of(new SyncMcpToolCallbackProvider(mcpSyncClient).getToolCallbacks()));
} catch (Exception e) {
log.error("Failed to initialize mcp client for server: {}", value.url(), e);
}
});
return toolCallbacks;
}
//chat
public Flux<String> chat(String content) {
return this.chatClient.prompt()
.advisors()
.user(userMessageContent)
.toolContext()
.stream()
.content();
}
}
这个方法可行,在这个代码基础上,我把自动配置chatClient改成自己配置,并且设定一个定时任务,定时ping mcp server,断开就尝试重新初始化chatClient,主要是这个chatClient 的tools每次都是增量添加,导致我断开一次后再添加会报错 tool 重复,所以只能每次重新初始化chatClient
ChatClient.Builder builder = SpringUtil.getBean(ChatClient.Builder.class); ChatClient.Builder clone = builder.clone(); chatClient = clone .defaultTools(mcpConfig.getToolCallback().toArray(new ToolCallbackProvider[mcpConfig.getToolCallback().size()])) .build();
可以解决重复tools
https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http MCP 官方已经有方案了,使用Streamable HTTP替代现在的SSE +HTTP传输模式,可以解决断线重连和恢复,SDK好像还在开发中,估计Spring AI在下个release会跟进这个。
参考各位大佬的使用,改写如下: 对于 同步调用的来说,可以尝试定义一个 McpConfig,具体代码如下:
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.springframework.ai.mcp.SyncMcpToolCallbackProvider;
import org.springframework.ai.mcp.client.autoconfigure.properties.McpSseClientProperties;
import org.springframework.ai.mcp.client.autoconfigure.properties.McpSseClientProperties.SseParameters;
import org.springframework.ai.tool.ToolCallback;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.zj.study.ai.utils.gson.GsonUtils;
import cn.hutool.core.map.MapUtil;
import io.modelcontextprotocol.client.McpClient;
import io.modelcontextprotocol.client.McpSyncClient;
import io.modelcontextprotocol.client.transport.HttpClientSseClientTransport;
import io.modelcontextprotocol.spec.McpSchema;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
/**
* @author zhoujun134
* Created on 2025-05-07
*/
@EnableScheduling
@Configuration
@EnableConfigurationProperties
@Slf4j
public class McpConfig {
/**
* MCP客户端的配置属性,用于存储MCP客户端的相关配置信息。
* key : 服务名称-版本号-服务器地址
*/
private final Map<String, McpSyncClient> mcpSyncClientMap = new HashMap<>();
private final Map<String, List<ToolCallback>> toolCallbacksMap = new HashMap<>();
@Autowired
private McpSseClientProperties mcpSseClientProperties;
/**
* 创建或重建MCP客户端。该方法会关闭旧客户端实例(如果存在),并初始化新的MCP客户端及关联的工具回调。
* <p>
* 同步方法确保客户端创建过程的线程安全。
*/
private synchronized void pingAndCheckMcpClient() {
if (MapUtil.isNotEmpty(this.mcpSyncClientMap)) {
final List<String> disableServers = new ArrayList<>();
this.mcpSyncClientMap.forEach((serverUrl, client) -> {
try {
// 检查与服务端的连接状态
client.ping();
} catch (final Exception e) {
// 捕获连接异常,记录错误日志并重建客户端
log.error("<---------------->MCP ping error: {}", e.getLocalizedMessage());
boolean createResult = this.createClientByServerUrl(serverUrl);
if (!createResult) {
disableServers.add(serverUrl);
}
}
});
if (CollectionUtils.isNotEmpty(disableServers)) {
disableServers.forEach(serverUrl -> {
if (MapUtil.isNotEmpty(this.mcpSyncClientMap)) {
this.mcpSyncClientMap.remove(serverUrl);
}
if (MapUtil.isNotEmpty(this.toolCallbacksMap)) {
this.toolCallbacksMap.remove(serverUrl);
}
});
log.info("<---------------->MCP server 状态检测完成, 当前存活{}个MCP客户端, urls={},"
+ " 已失效 {} 个 MCP 客户端,失效 urls={}",
this.mcpSyncClientMap.size(), GsonUtils.toJSONString(this.mcpSyncClientMap.keySet()),
disableServers.size(), GsonUtils.toJSONString(disableServers));
}
// 找出 mcpSyncClientMap 中不存在,配置文件中存在的连接信息,并再次重建 MCP 客户端
Map<String, SseParameters> connections = this.mcpSseClientProperties.getConnections();
if (MapUtil.isEmpty(connections)) {
log.warn("<---------------->MCP 没有配置连接信息,请检查配置文件");
return;
}
connections.forEach((key, sseParameter) -> {
String serverUrl = sseParameter.url();
if (this.mcpSyncClientMap.containsKey(serverUrl) || disableServers.contains(serverUrl)) {
return;
}
// 不在 mcpSyncClientMap 和 disableServers 中,则重建 MCP 客户端
this.createClientByServerUrl(serverUrl);
});
} else {
this.initMcpClient();
}
}
private boolean createClientByServerUrl(String serverUrl) {
if (StringUtils.isBlank(serverUrl)) {
log.warn("serverUrl is blank, please check the configuration file.");
return false;
}
try {
final HttpClientSseClientTransport transport = HttpClientSseClientTransport.builder(serverUrl).build();
final McpSyncClient newClient = McpClient.sync(transport).build();
// 初始化新客户端并记录结果
final McpSchema.InitializeResult init = newClient.initialize();
log.info("<----------------> sync MCP Initialized 完成: {}", init.serverInfo());
final List<ToolCallback> curToolCallbacks =
List.of(new SyncMcpToolCallbackProvider(newClient).getToolCallbacks());
this.toolCallbacksMap.put(serverUrl, curToolCallbacks);
this.mcpSyncClientMap.put(serverUrl, newClient);
} catch (Exception ex) {
log.error("<----------------> createClientByServerUrl 链接 MCP 客户端失败: {}, serverUrl={}",
ex.getLocalizedMessage(), serverUrl);
return false;
}
return true;
}
private synchronized void initMcpClient() {
Map<String, SseParameters> connectionsMap = mcpSseClientProperties.getConnections();
if (MapUtil.isEmpty(connectionsMap)) {
log.warn("initMcpClient <---------------->MCP 没有配置连接信息,请检查配置文件");
return;
}
connectionsMap.forEach((key, sseParameter) -> {
String serverUrl = sseParameter.url();
this.createClientByServerUrl(serverUrl);
});
}
/**
* 获取工具回调列表。该方法会确保MCP客户端处于有效连接状态,若连接异常则自动重建客户端。
*
* @return 已配置的工具回调列表
*/
public List<ToolCallback> getToolCallback() {
if (MapUtil.isNotEmpty(this.toolCallbacksMap)) {
return this.toolCallbacksMap.values().stream()
.flatMap(List::stream)
.collect(Collectors.toList());
}
return Collections.emptyList();
}
/**
* 每秒检查一次MCP客户端的状态,并重建异常的客户端。
*/
@Scheduled(cron = "* * * * * ?")
public void checkMcpClient() {
this.pingAndCheckMcpClient();
}
/**
* 在Bean初始化时创建MCP客户端。
*/
@PostConstruct
public void init() {
log.info("<---------------->MCP Initializing...");
// 在Bean初始化时强制执行
initMcpClient();
log.info("<---------------->MCP Initialized 完成, 共初始化{}个MCP客户端, urls={}",
this.mcpSyncClientMap.size(), GsonUtils.toJSONString(this.mcpSyncClientMap.keySet()));
}
}
然后在使用的地方通过以下的方式使用:
@Autowired
private McpConfig mcpConfig;
ChatClient chatClient = ChatClient.builder(zhiPuAiChatModel)
.defaultTools(mcpConfig.getToolCallback())
.build();
@hymmyh @zhoujun134 @luojinggit 各位大佬,使用上面的方案遇到:
i.m.spec.McpClientSession : Unexpected response for unkown id cd772c95-561
怎么解决呢?多个client同事启动会出现这个问题,谢谢
@zhoujun134 非常感谢,我参考了你的代码,然后重新实现了一下,因为我这边是基于Consul的微服务,所以我这边添加了基于注册中心来动态注册Mcp Server 下面代码供大家参考一下,有问题欢迎指出
package com.yuan.chat.conf;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.modelcontextprotocol.client.McpClient;
import io.modelcontextprotocol.client.McpSyncClient;
import io.modelcontextprotocol.client.transport.HttpClientSseClientTransport;
import io.modelcontextprotocol.spec.McpSchema;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.mcp.SyncMcpToolCallbackProvider;
import org.springframework.ai.mcp.client.autoconfigure.NamedClientMcpTransport;
import org.springframework.ai.mcp.client.autoconfigure.configurer.McpSyncClientConfigurer;
import org.springframework.ai.mcp.client.autoconfigure.properties.McpClientCommonProperties;
import org.springframework.ai.mcp.customizer.McpSyncClientCustomizer;
import org.springframework.ai.tool.ToolCallback;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.consul.discovery.ConsulDiscoveryClient;
import org.springframework.cloud.consul.discovery.ConsulServiceInstance;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Scheduled;
import java.net.http.HttpClient;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@Slf4j
@Configuration
@RequiredArgsConstructor
public class McpClientConfig {
private final ConsulDiscoveryClient consulDiscoveryClient;
private final ObjectProvider<ObjectMapper> objectMapperProvide;
private final McpClientCommonProperties commonProperties;
// 黑名单,用来剔除一些不需要注册的服务
private final Set<String> blackList = Set.of("consul");
private final ConcurrentMap<String, List<NamedClientMcpTransport>> sseTransportMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, List<McpSyncClient>> mcpSyncClientMap = new ConcurrentHashMap<>();
private final Set<McpSyncClient> mcpSyncClients = ConcurrentHashMap.newKeySet();
private McpSyncClientConfigurer mcpSyncClientConfigurer;
@Getter
@Setter
private boolean clientValid;
// ======== 注册自动配置器 ========
@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(prefix = McpClientCommonProperties.CONFIG_PREFIX, name = "type", havingValue = "SYNC", matchIfMissing = true)
McpSyncClientConfigurer mcpSyncClientConfigurer(ObjectProvider<McpSyncClientCustomizer> customizerProvider) {
this.mcpSyncClientConfigurer = new McpSyncClientConfigurer(customizerProvider.orderedStream().toList());
return mcpSyncClientConfigurer;
}
// ======== 从注册中心拉取服务并创建传输连接 ========
public void getMcpServer(ObjectProvider<ObjectMapper> objectMapperProvider) {
ObjectMapper objectMapper = objectMapperProvider.getIfAvailable(ObjectMapper::new);
List<String> services = consulDiscoveryClient.getServices();
// 清空旧数据,防止重复添加
sseTransportMap.clear();
for (String service : services) {
if (blackList.contains(service)) continue;
for (ServiceInstance instance : consulDiscoveryClient.getInstances(service)) {
String baseUrl = instance.getUri().toString();
List<String> tags = ((ConsulServiceInstance) instance).getTags();
if (tags.isEmpty()) continue;
String sseEndpoint = tags.get(0).split("=")[1] + "/sse";
var transport = HttpClientSseClientTransport.builder(baseUrl)
.sseEndpoint(sseEndpoint)
.clientBuilder(HttpClient.newBuilder())
.objectMapper(objectMapper)
.build();
String connectedName = connectedClientName(commonProperties.getName(), instance.getInstanceId());
sseTransportMap.computeIfAbsent(connectedName, k -> new ArrayList<>())
.add(new NamedClientMcpTransport(instance.getInstanceId(), transport));
}
}
log.info("已加载 MCP 服务配置: {}", sseTransportMap.keySet());
}
// ======== 创建所有 MCP 客户端 ========
private List<McpSyncClient> mcpClients() {
this.getMcpServer(objectMapperProvide);
if (sseTransportMap.isEmpty()) {
log.warn("未发现 MCP 服务");
return List.of();
}
for (String key : sseTransportMap.keySet()) {
this.connectClient(key);
}
// 重新创建 ChatClient
this.clientValid = true;
log.info("已创建 MCP 客户端: {}", mcpSyncClients);
return new ArrayList<>(mcpSyncClients);
}
// ======== 每秒检查 MCP 客户端健康状态 ========
@Scheduled(cron = "* * * * * ?")
public void checkMcpClient() {
this.pingAndCheckMcpClient();
}
private synchronized void pingAndCheckMcpClient() {
log.debug("检查 MCP 客户端状态...");
if (sseTransportMap.isEmpty() || mcpSyncClientMap.isEmpty()) {
this.mcpClients();
return;
}
List<String> rebuildKeys = new ArrayList<>();
for (var entry : mcpSyncClientMap.entrySet()) {
List<McpSyncClient> clients = entry.getValue();
List<McpSyncClient> offlineClients = new ArrayList<>();
for (McpSyncClient client : clients) {
try {
client.ping();
} catch (Exception e) {
log.warn("客户端异常,准备重连: {}", entry.getKey());
offlineClients.add(client);
}
}
clients.removeAll(offlineClients);
offlineClients.forEach(mcpSyncClients::remove);
if (clients.isEmpty()) {
mcpSyncClientMap.remove(entry.getKey());
rebuildKeys.add(entry.getKey());
}
}
boolean rebuilt = false;
for (String name : rebuildKeys) {
if (this.connectClient(name)) {
rebuilt = true;
}
}
if (rebuilt) {
// 重新创建 ChatClient
this.clientValid = true;
}
}
// ======== 创建指定连接名称的 MCP 客户端 ========
private boolean connectClient(String key) {
List<NamedClientMcpTransport> transports = sseTransportMap.get(key);
if (transports == null) return false;
boolean connected = false;
for (NamedClientMcpTransport namedTransport : transports) {
String clientName = connectedClientName(commonProperties.getName(), namedTransport.name());
McpSchema.Implementation clientInfo = new McpSchema.Implementation(clientName, commonProperties.getVersion());
McpClient.SyncSpec spec = McpClient.sync(namedTransport.transport())
.clientInfo(clientInfo)
.requestTimeout(commonProperties.getRequestTimeout());
spec = mcpSyncClientConfigurer.configure(namedTransport.name(), spec);
McpSyncClient client = spec.build();
try {
if (commonProperties.isInitialized()) {
client.initialize();
}
if (mcpSyncClients.add(client)) {
mcpSyncClientMap.computeIfAbsent(clientName, k -> new ArrayList<>()).add(client);
log.info("连接 MCP 成功: {}", clientName);
connected = true;
}
} catch (Exception e) {
log.warn("连接 MCP 失败: {}", clientName);
}
}
return connected;
}
private String connectedClientName(String clientName, String serverConnectionName) {
return clientName + " - " + serverConnectionName;
}
// ======== MCP 客户端提供的工具回调集合 ========
public List<ToolCallback> getToolCallback() {
List<ToolCallback> allCallbacks = new ArrayList<>();
for (McpSyncClient client : mcpSyncClients) {
allCallbacks.addAll(List.of(new SyncMcpToolCallbackProvider(client).getToolCallbacks()));
}
return allCallbacks;
}
}
McpClientConfig 主要负责从注册中心获取服务注册成Mcp Server 以及断线重连
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.ai.ollama.OllamaChatModel;
import org.springframework.ai.tool.ToolCallback;
import org.springframework.context.annotation.Configuration;
import java.util.List;
@Slf4j
@RequiredArgsConstructor
@Configuration
public class ChatClientConfig {
private ChatClient chatClient;
private final OllamaChatModel ollamaChatModel;
private final McpClientConfig mcpClientConfig;
// ======== ChatClient 构建与刷新 ========
private synchronized void createChatClient() {
// 仅在Mcp Server 重连之后需要重新创建ChatClient
// 重新创建之后标记为 false,防止重复重构,
mcpClientConfig.setClientValid(false);
log.debug("重新创建 Chat Client");
List<ToolCallback> toolCallbacks = mcpClientConfig.getToolCallback();
toolCallbacks.forEach(cb -> log.info("加载 ToolCallback: {}", cb));
this.chatClient = ChatClient.builder(ollamaChatModel)
.defaultToolCallbacks(toolCallbacks)
.build();
}
public synchronized ChatClient getChatClient() {
if (this.chatClient == null || mcpClientConfig.isClientValid()) {
this.createChatClient();
}
return this.chatClient;
}
public synchronized void refreshChatClient() {
this.createChatClient();
}
}
ChatClientConfig 主要负责提供ChatClient实例,方便其他地方调用
@Slf4j
@RestController
@RequestMapping("/api/chat")
@RequiredArgsConstructor
public class ChatController {
private final ChatClientConfig chatClientConfig;
/**
* 处理聊天请求,使用AI和MCP工具进行响应
*
* @return 包含AI回复的响应
*/
@GetMapping
public Flux<String> chat(@RequestParam String message) {
return chatClientConfig.getChatClient().prompt(message)
.advisors(new SimpleLoggerAdvisor())
.stream()
.content();
}
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> stream(@RequestParam String message) {
Prompt prompt = new Prompt(new UserMessage(message));
return chatClientConfig.getChatClient().prompt(prompt)
.advisors(new SimpleLoggerAdvisor())
.stream()
.content();
}
}
ChatController 则是使用方
@zhoujun134 同步版本客户端好像性能不好。看文档描述,这块应该非webflux应用也能用spring-ai-starter-mcp-client-webflux.的依赖
For production deployment, we recommend using the WebFlux-based SSE connection with the spring-ai-starter-mcp-client-webflux.
@zhoujun134 大佬,这样写会更好吗
@Slf4j
@Configuration
@EnableScheduling
@RequiredArgsConstructor
@EnableConfigurationProperties
public class McpConfig {
private final Map<String, McpClientWrapper> mcpClientWrapperMap = new ConcurrentHashMap<>();
private final DelayQueue<McpReconnectTask> reconnectTaskQueue = new DelayQueue<>();
private final McpSseClientProperties mcpSseClientProperties;
private final ExecutorService reconnectExecutor = Executors.newSingleThreadExecutor(r -> {
Thread t = new Thread(r, "mcp-reconnect-thread");
t.setDaemon(true);
return t;
});
@PostConstruct
public void init() {
initMcpClients();
log.info("MCP client initialization complete. Client count: {}, URLs: {}",
mcpClientWrapperMap.size(), String.join(", ", mcpClientWrapperMap.keySet()));
reconnectExecutor.submit(this::processReconnectQueue);
}
@PreDestroy
public void shutdown() {
reconnectExecutor.shutdownNow();
log.info("MCP reconnect thread shut down.");
}
private void processReconnectQueue() {
while (true) {
try {
McpReconnectTask task = reconnectTaskQueue.take();
String serverUrl = task.getServerUrl();
if (!mcpClientWrapperMap.containsKey(serverUrl) && !connectOrReconnect(serverUrl, true)) {
log.error("Reconnect failed. Re-queueing: {}", serverUrl);
reconnectTaskQueue.offer(new McpReconnectTask(
serverUrl,
task.getDelay(TimeUnit.SECONDS) + 5,
TimeUnit.SECONDS
));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Reconnect thread interrupted", e);
}
}
}
@Scheduled(fixedDelay = 5000, initialDelay = 2000)
public void checkMcpClients() {
mcpClientWrapperMap.forEach((serverUrl, wrapperClient) -> {
try {
wrapperClient.getClient().ping();
} catch (Exception e) {
log.error("Ping failed for {}", serverUrl, e);
mcpClientWrapperMap.remove(serverUrl);
reconnectTaskQueue.offer(new McpReconnectTask(serverUrl, 5, TimeUnit.SECONDS));
}
});
}
private boolean connectOrReconnect(String serverUrl, boolean isReconnect) {
try {
McpSyncClient client = McpClient.sync(
HttpClientSseClientTransport.builder(serverUrl).build()
).build();
McpSchema.InitializeResult init = client.initialize();
List<ToolCallback> callbacks = Arrays.asList(
new SyncMcpToolCallbackProvider(client).getToolCallbacks()
);
mcpClientWrapperMap.put(serverUrl, new McpClientWrapper(client, callbacks));
log.info(" {} mcp server: {}",
isReconnect ? "Reconnected" : "Connected",
init.serverInfo());
return true;
} catch (Exception ex) {
log.error("{} mcp server fail: {}, url={}",
isReconnect ? "Reconnect" : "Connect",
ex.getLocalizedMessage(), serverUrl);
return false;
}
}
private void initMcpClients() {
Map<String, SseParameters> connections = mcpSseClientProperties.getConnections();
if (MapUtil.isEmpty(connections)) {
log.warn("No MCP connection config found.");
return;
}
connections.forEach((key, params) -> {
boolean success = connectOrReconnect(params.url(), false);
if (!success) {
reconnectTaskQueue.offer(new McpReconnectTask(params.url(), 5, TimeUnit.SECONDS));
}
});
}
}
public class McpReconnectTask implements Delayed {
private final String serverUrl;
private final long delay;
public McpReconnectTask(String serverUrl, long delay, TimeUnit unit) {
this.serverUrl = serverUrl;
this.delay = System.nanoTime() + unit.toNanos(delay);
}
public String getServerUrl() {
return serverUrl;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(delay - System.nanoTime(), TimeUnit.NANOSECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(this.delay, ((McpReconnectTask) o).delay);
}
}
@Getter
@AllArgsConstructor
public class McpClientWrapper {
private final McpSyncClient client;
private final List<ToolCallback> toolCallbacks;
}
Field f = WebFluxSseClientTransport.class.getDeclaredField("inboundRetryHandler");
f.setAccessible(true);
BiConsumer<Retry.RetrySignal, SynchronousSink<Object>> handler =
(retrySignal, sink) -> {
// 报错刷新 重连child
if (retrySignal.failure() instanceof Exception) {
loadMcpTools(); // 继续重连逻辑
} else {
sink.error(retrySignal.failure()); // 其它异常照旧抛
}
};
f.set(transport, handler);
可以重写inboundRetryHandler,在报错的时候处理重连逻辑
好像只能是 webflux 框架下使用,在webmvc框架下没有 Retry 相关配置
@lyxfn CopyOnWriteArrayList 不能使用迭代器移除吧
我发现404的原因是因为server重启后,client仍然拿着旧的sessionId来访问,此时新的server是没有保存老sessionId的;看代码是存储在了内存中,这可能是spring-ai的问题,在多副本的情况下,重连是有问题的:比如从podA /sse 拿到了新的sessionId,然后访问了podB /mcp/message 还是会有404的问题