mcp-go icon indicating copy to clipboard operation
mcp-go copied to clipboard

bug: SendNotificationToClient not work

Open beckwy opened this issue 5 months ago • 7 comments

Description

I used the SendNotificationClient method in the example to send a notification to the client. The SendNotification method did not report an error, but the client OnNotification method did not receive the notification.

Code Sample

this is server code

server := server.ServerFromContext(ctx)
	logger.ZapLogger.Info("SendNotificationToClient start")
	err := server.SendNotificationToClient(
		ctx,
		"notifications/progress",
		map[string]any{
			"progress":      10,
			"total":         10,
			"progressToken": 0,
		},
	)
	logger.ZapLogger.Info("SendNotificationToClient end", zap.Any("err", err))
	if err != nil {
		logger.ZapLogger.Info("handleDescribeInstancesTool failed to send notification", zap.Any("err", err))
		return nil, err
	}

this is server log print

{"level":"info","ts":"2025-05-26T15:56:23.100+0800","caller":"tools/describeInstances.go:37","msg":"SendNotificationToClient start"}
{"level":"info","ts":"2025-05-26T15:56:23.100+0800","caller":"tools/describeInstances.go:46","msg":"SendNotificationToClient end","err":null}

this is client code

c = client.NewClient(sseTransport)
	// Set up notification handler
	c.OnNotification(func(notification mcp.JSONRPCNotification) {
		log.Debugf("connectMcpServerWithSSE OnNotification Received notification: %s\n, AITraceId: %v", notification.Method, yReq.AITraceId)
	})
	// Initialize the client
	initRequest := mcp.InitializeRequest{}
	initRequest.Params.ProtocolVersion = mcp.LATEST_PROTOCOL_VERSION
	initRequest.Params.ClientInfo = mcp.Implementation{
		Name:    "MCP-Go Simple Client Example",
		Version: "1.0.0",
	}
	initRequest.Params.Capabilities = mcp.ClientCapabilities{}

client not received notification!

beckwy avatar May 26 '25 08:05 beckwy

I try the server code you present( call them in ToolHandlerFunc), and it send notification successfully.

there might be two reasons:

  1. the place you execute server code could not get client from ctx, could you present more detail about that?
  2. perhaps the client program had already closed before successfully receiving the notification

The way I send notification

	mcpServer.AddTool(mcp.NewTool(
		"test-tool",
		mcp.WithDescription("Test tool"),
	), func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
		mcpServer := server.ServerFromContext(ctx)
		_ = mcpServer.SendNotificationToClient(
			ctx,
			"notifications/progress",
			map[string]any{
				"progress":      10,
				"total":         10,
				"progressToken": 0,
			},
		)

		return &mcp.CallToolResult{
			Content: []mcp.Content{
				mcp.TextContent{
					Type: "text",
					Text: "Input parameter: " + request.GetArguments()["parameter-1"].(string),
				},
			},
		}, nil
	})

dugenkui03 avatar May 27 '25 09:05 dugenkui03

I printed the return value of the server.ServerFromContext method Image

Image

i try to update sdk verison to v0.30.0,use httpstreamable, but it is not work also。

i see this in log

Image

beckwy avatar May 27 '25 12:05 beckwy

From your code provided in your last comment, I know that you are sending notification in tool function, I think the code is correct.

There is a example example code, you can use to check the difference between your client code and this example. And you can provide more details about your client code.

perhaps the client program had already shut down before successfully receiving the notification

dugenkui03 avatar May 28 '25 05:05 dugenkui03

this is complete client code,in order to verify,i use CallTool add。

	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	// defer cancel()
	var c *client.Client
	var err error
	log.Debugf("connectMcpServerWithSSE Initializing HTTP client...")
	// Create HTTP transport
	httpTransport, err := transport.NewStreamableHTTP("http://XXX:8080/mcp")
	if err != nil {
		log.Fatalf("connectMcpServerWithSSE Failed to create HTTP transport: %v", err)
	}

	// Create client with the transport
	c = client.NewClient(httpTransport)
if err := c.Start(ctx); err != nil {
		log.Fatalf("connectMcpServerWithSSE Failed to start client: %v, AITraceId: %v", err, yReq.AITraceId)
	}
	// Set up notification handler
	c.OnNotification(func(notification mcp.JSONRPCNotification) {
		log.Debugf("connectMcpServerWithSSE OnNotification Received")
		log.Debugf("connectMcpServerWithSSE OnNotification Received notification: %s\n, AITraceId: %v", notification.Method, yReq.AITraceId)
	})
	// Initialize the client
	initRequest := mcp.InitializeRequest{}
	initRequest.Params.ProtocolVersion = mcp.LATEST_PROTOCOL_VERSION
	initRequest.Params.ClientInfo = mcp.Implementation{
		Name:    "MCP-Go Simple Client Example",
		Version: "1.0.0",
	}
	initRequest.Params.Capabilities = mcp.ClientCapabilities{}

	serverInfo, err := c.Initialize(ctx, initRequest)
	if err != nil {
		log.Fatalf("connectMcpServerWithSSE Failed to initialize: %v,  AITraceId: %v", err, yReq.AITraceId)
	}
	log.Debugf("connectMcpServerWithSSE Connected to server: %s (version %s)\n,  AITraceId: %v",
		serverInfo.ServerInfo.Name,
		serverInfo.ServerInfo.Version,
		yReq.AITraceId)
	log.Debugf("connectMcpServerWithSSE Server capabilities: %+v\n,  AITraceId: %v", serverInfo.Capabilities, yReq.AITraceId)
	log.Debugf("connectMcpServerWithSSE Client initialized successfully. AITraceId: %v", yReq.AITraceId)

	var callToolReq mcp.CallToolRequest
	args := make(map[string]interface{})
	args["AITraceId"] = yReq.AITraceId
	callToolReq.Params.Name = "add"
	callToolReq.Params.Arguments = args
	toolResult, err1 := c.CallTool(ctx, callToolReq)
	log.Debugf("connectMcpServerWithSSE  toolResult1111: %+v,  AITraceId: %+v, err1: %v", toolResult, yReq.AITraceId, err1)

this server tools function

func handleAddTool(
	ctx context.Context,
	request mcp.CallToolRequest,
) (*mcp.CallToolResult, error) {
	server := server.ServerFromContext(ctx)
	logger.ZapLogger.Info("handleAddTool start", zap.Any("server", server))
	err := server.SendNotificationToClient(
		ctx,
		"notifications/progress",
		map[string]any{
			"progress":      10,
			"total":         10,
			"progressToken": 0,
		},
	)
	if err != nil {
		logger.ZapLogger.Info("failed to send notification", zap.Error(err))
		return nil, fmt.Errorf("failed to send notification: %w", err)
	}
	logger.ZapLogger.Info("handleAddTool end", zap.Any("server", server))
	return &mcp.CallToolResult{
		Content: []mcp.Content{
			mcp.TextContent{
				Type: "text",
				Text: "a + b",
			},
		},
	}, nil
}

this is log prove client toolResult is accpte value

2025-05-28 14:17:49.427 DEBUG utils/httpclient.go:216 connectMcpServerWithSSE Initializing HTTP client... 2025-05-28 14:17:49.429 DEBUG utils/httpclient.go:262 connectMcpServerWithSSE Connected to server: bigdata_ai_mcp (version 1.0.0) 2025-05-28 14:17:49.429 DEBUG utils/httpclient.go:266 connectMcpServerWithSSE Server capabilities: {Experimental:map[] Logging:0x20c0e20 Prompts:0xc00023632a Resources:0xc00023632c Tools:0xc00023632e} 2025-05-28 14:17:49.429 DEBUG utils/httpclient.go:267 connectMcpServerWithSSE Client initialized successfully. AITraceId: 6a99baba-e52e-40f5-9c9c-d11eb31ab1ab 2025-05-28 14:17:49.430 DEBUG utils/httpclient.go:276 connectMcpServerWithSSE toolResult1111: &{Result:{Meta:map[]} Content:[{Annotated:{Annotations:} Type:text Text:a + b}] IsError:false}, AITraceId: 6a99baba-e52e-40f5-9c9c-d11eb31ab1ab, err1:

beckwy avatar May 28 '25 06:05 beckwy

Thank you for providing so many details. I've submitted a PR for this issue.

  • The problem http: wrote more than the declared Content-Length is indeed a bug and will be fixed.
  • Client can not receive all the notifications: in the httpstreamable server, notification sending occurs in another goroutine, meaning that response may return before the notification is sent. I demonstrate a hook way to ensure client could receive all the notification in the pr.

dugenkui03 avatar May 28 '25 16:05 dugenkui03

Thank you for confirming the problem, looking forward to the follow-up solution

beckwy avatar May 29 '25 03:05 beckwy

@dugenkui03 @ezynda3 oh, Today I tested this bug fix with the latest main code. Disappointingly, the client still did not receive the notification. I looked at the code design in detail and found that the code was that the server did not send the notification successfully.

Image The code is likely to be returned in these two places. There are two main problems now: 1.notification cannot be sent 2.the order of return value and notification cannot be guaranteed.

beckwy avatar May 30 '25 10:05 beckwy

@beckwy #348 is intended to fix this problem http: wrote more than the declared Content-Length/panic.

Also, as described in #348, the current design do not ensure that client can receive all the notifications from the server.

This PR do not ensure that client could receive all the notification from server, and I think this is as expected, because notification could not block the response. mcp-go maybe should provide sync way to send notification( need more discussion), I provide a hook way to ensure that client could receive the all the notification(as demonstrated in the test code).

dugenkui03 avatar Jun 03 '25 10:06 dugenkui03

there is a hook way to ensure that client could receive all the notifications Image

dugenkui03 avatar Jun 03 '25 10:06 dugenkui03

streamable http SendNotificationToClient,确实有问题且依然没有被解决。 streamable http mcp go server和python的streamable http mcp client之间无法通过NotificationProgress交互,会导致client卡死。 但是这个代码库中的client和server之间使用streamable http NotificationProgress 同步进展却是ok的。 感觉是实现的协议方式有问题。

zhuchangwu avatar Jun 10 '25 02:06 zhuchangwu

@dugenkui03 I am having the same issue that the example code for sending notifications does not work when using a StreamableHTTPServer. Running the same code via ServeStdio, the notification mechanism works fine. Also the workaround you mentioned does not work in my case.

I am using the inspector to check it. Using the official server, it works as expected. Also, when using the official Go SDK it works fine.

Do you have another idea for a workaround or maybe an ETA for a fix?

Here is the code I am using:

package main

import (
	"context"
	"flag"
	"fmt"
	"log"
	"time"

	"github.com/mark3labs/mcp-go/mcp"
	"github.com/mark3labs/mcp-go/server"
)

var (
	// example: -http localhost:8080
	httpAddr = flag.String("http", "", "if set, the server is started on the specified address, else stdio is used")
)

func main() {
	flag.Parse()

	// create MCP server
	mcpServer := server.NewMCPServer("Test", "1.0.0")

	// add long running tool
	mcpServer.AddTool(mcp.NewTool(
		string("longRunningOperation"),
		mcp.WithDescription(
			"Demonstrates a long running operation with progress updates",
		),
		mcp.WithNumber("duration",
			mcp.Description("Duration of the operation in seconds"),
			mcp.DefaultNumber(5),
		),
		mcp.WithNumber("steps",
			mcp.Description("Number of steps in the operation"),
			mcp.DefaultNumber(3),
		),
	), handleLongRunningOperationTool)

	// start either via HTTP or stdio
	if *httpAddr != "" {
		log.Printf("MCP handler listening at %s", *httpAddr)

		httpServer := server.NewStreamableHTTPServer(mcpServer, server.WithEndpointPath("/mcp"))

		if err := httpServer.Start(*httpAddr); err != nil {
			log.Fatalf("Server error: %v", err)
		}
	} else {
		log.Printf("MCP handler listening on stdio")

		if err := server.ServeStdio(mcpServer); err != nil {
			log.Printf("Server failed: %v", err)
		}
	}
}

func handleLongRunningOperationTool(
	ctx context.Context,
	request mcp.CallToolRequest,
) (*mcp.CallToolResult, error) {
	arguments := request.GetArguments()
	progressToken := request.Params.Meta.ProgressToken
	duration, _ := arguments["duration"].(float64)
	steps, _ := arguments["steps"].(float64)
	stepDuration := duration / steps
	server := server.ServerFromContext(ctx)

	for i := 1; i < int(steps)+1; i++ {
		time.Sleep(time.Duration(stepDuration * float64(time.Second)))
		if progressToken != nil {
			log.Printf("Sending notification %v", progressToken)

			err := server.SendNotificationToClient(
				ctx,
				"notifications/progress",
				map[string]any{
					"progress":      i,
					"total":         int(steps),
					"progressToken": progressToken,
					"message":       fmt.Sprintf("Server progress %v%%", int(float64(i)*100/steps)),
				},
			)
			if err != nil {
				return nil, fmt.Errorf("failed to send notification: %w", err)
			}
		} else {
			log.Printf("No progress token")
		}
	}

	return mcp.NewToolResultText(
		fmt.Sprintf(
			"Long running operation completed. Duration: %f seconds, Steps: %d.",
			duration,
			int(steps),
		),
	), nil
}

With stdio:

Image

With HTTP:

Image

ln-12 avatar Jul 02 '25 11:07 ln-12

@dugenkui03 I am having the same issue that the example code for sending notifications does not work when using a StreamableHTTPServer. Running the same code via ServeStdio, the notification mechanism works fine. Also the workaround you mentioned does not work in my case.

I am using the inspector to check it. Using the official server, it works as expected. Also, when using the official Go SDK it works fine.

Do you have another idea for a workaround or maybe an ETA for a fix?

Here is the code I am using:

package main

import ( "context" "flag" "fmt" "log" "time"

"github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" )

var ( // example: -http localhost:8080 httpAddr = flag.String("http", "", "if set, the server is started on the specified address, else stdio is used") )

func main() { flag.Parse()

// create MCP server mcpServer := server.NewMCPServer("Test", "1.0.0")

// add long running tool mcpServer.AddTool(mcp.NewTool( string("longRunningOperation"), mcp.WithDescription( "Demonstrates a long running operation with progress updates", ), mcp.WithNumber("duration", mcp.Description("Duration of the operation in seconds"), mcp.DefaultNumber(5), ), mcp.WithNumber("steps", mcp.Description("Number of steps in the operation"), mcp.DefaultNumber(3), ), ), handleLongRunningOperationTool)

// start either via HTTP or stdio if *httpAddr != "" { log.Printf("MCP handler listening at %s", *httpAddr)

  httpServer := server.NewStreamableHTTPServer(mcpServer, server.WithEndpointPath("/mcp"))

  if err := httpServer.Start(*httpAddr); err != nil {
  	log.Fatalf("Server error: %v", err)
  }

} else { log.Printf("MCP handler listening on stdio")

  if err := server.ServeStdio(mcpServer); err != nil {
  	log.Printf("Server failed: %v", err)
  }

} }

func handleLongRunningOperationTool( ctx context.Context, request mcp.CallToolRequest, ) (*mcp.CallToolResult, error) { arguments := request.GetArguments() progressToken := request.Params.Meta.ProgressToken duration, _ := arguments["duration"].(float64) steps, _ := arguments["steps"].(float64) stepDuration := duration / steps server := server.ServerFromContext(ctx)

for i := 1; i < int(steps)+1; i++ { time.Sleep(time.Duration(stepDuration * float64(time.Second))) if progressToken != nil { log.Printf("Sending notification %v", progressToken)

  	err := server.SendNotificationToClient(
  		ctx,
  		"notifications/progress",
  		map[string]any{
  			"progress":      i,
  			"total":         int(steps),
  			"progressToken": progressToken,
  			"message":       fmt.Sprintf("Server progress %v%%", int(float64(i)*100/steps)),
  		},
  	)
  	if err != nil {
  		return nil, fmt.Errorf("failed to send notification: %w", err)
  	}
  } else {
  	log.Printf("No progress token")
  }

}

return mcp.NewToolResultText( fmt.Sprintf( "Long running operation completed. Duration: %f seconds, Steps: %d.", duration, int(steps), ), ), nil } With stdio:

Image With HTTP: Image

https://github.com/mark3labs/mcp-go/pull/473 This worked for me, you can try it.

sunerpy avatar Jul 09 '25 06:07 sunerpy

#473 is merged

dugenkui03 avatar Jul 13 '25 14:07 dugenkui03