
访问本地服务的MCP server的实现都是基于 STDIO 的方式进行通信,这种方式是靠本地进程间的标准的输入输出协议实现通信的。但是通常我们现有的微服务都是web端的应用,STDIO 的方式在这种场景下并不适用,因此,MCP协议提供了另一种通信方式,即SSE (Server-Sent Events) 传输方式。 MCP的 SSE 传输是一种基于 HTTP 的通信机制,主要用于实现服务器到客户端的流式传输。
下面我们实现一个echo的MCP SSE server
package main
import (
"context"
"fmt"
"log"
"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
)
type MCPServer struct {
server *server.MCPServer
}
func NewMCPServer() *MCPServer {
mcpServer := server.NewMCPServer(
"example-server",
"1.0.0",
server.WithResourceCapabilities(true, true),
server.WithPromptCapabilities(true),
server.WithToolCapabilities(true),
)
// Add echo tool
mcpServer.AddTool(mcp.NewTool("echo",
mcp.WithDescription("Echo back the input"),
mcp.WithString("message",
mcp.Required(),
mcp.Description("Message to echo back"),
),
), echoHandler)
return &MCPServer{
server: mcpServer,
}
}
func main() {
s := NewMCPServer()
sseServer := s.ServeSSE("localhost:8080")
log.Printf("SSE server listening on :8080")
if err := sseServer.Start(":8080"); err != nil {
log.Fatalf("Server error: %v", err)
}
}
func echoHandler(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
msg, ok := req.Params.Arguments["message"].(string)
if !ok {
return nil, fmt.Errorf("invalid message parameter")
}
return mcp.NewToolResultText(fmt.Sprintf("Echo: %s", msg)), nil
}
func (s *MCPServer) ServeSSE(addr string) *server.SSEServer {
return server.NewSSEServer(s.server,
server.WithBaseURL(fmt.Sprintf("http://%s", addr)),
)
}我们解析message参数,然后返回。和STDIO 协议MCP servergolang实现mcp server的区别是使用了函数NewSSEServer而不是NewStdioServer,具体到函数内部,实现了两个http uri
func NewSSEServer(server *MCPServer, opts ...SSEOption) *SSEServer {
s := &SSEServer{
server: server,
sseEndpoint: "/sse",
messageEndpoint: "/message",
useFullURLForMessageEndpoint: true,
}启动服务后我们访问下这两个url
curl 'http://localhost:8080/sse'
event: endpoint
data: http://localhost:8080/message?sessionId=91c61b81-a84d-465b-89e4-f9ebd244c958可以看到访问/sse的时候,返回了message路径的完整链接,并带了sessionId参数,每个session都不一样
curl -X POST 'http://localhost:8080/message?sessionId=dd92c1c3-62b3-47d8-bf79-5be071839779'
{"jsonrpc":"2.0","id":null,"error":{"code":-32700,"message":"Parse error"}}
curl -X POST 'http://localhost:8080/message?sessionId=265887bb-7193-490c-a25c-6644aa6ec8cd' -H 'Content-Type: application/json' -d '{"jsonrpc":"2.0","id":null,"method":"tools/call","params":{"name":"echo","arguments":{"message":"Hello SSE!"}}}'请求后就是我们熟悉的mcp json-rpc协议,然后我们简单实现一个mcp client来使用它
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/mark3labs/mcp-go/client"
"github.com/mark3labs/mcp-go/mcp"
)
func main() {
ctx := context.Background()
client, err := client.NewSSEMCPClient("http://localhost:8080/sse")
if err != nil {
log.Fatalf("Failed to create SSE MCP client: %v", err)
}
err = client.Start(ctx)
if err != nil {
log.Fatalf("Failed to start SSE MCP client: %v", err)
}
// Initialize
initRequest := mcp.InitializeRequest{}
initRequest.Params.ProtocolVersion = mcp.LATEST_PROTOCOL_VERSION
initRequest.Params.ClientInfo = mcp.Implementation{
Name: "test-client",
Version: "1.0.0",
}
_, err = client.Initialize(ctx, initRequest)
if err != nil {
log.Fatalf("Failed to Initialize SSE MCP client: %v", err)
}
request := mcp.CallToolRequest{
Request: mcp.Request{
Method: "tools/call",
},
}
arguments := map[string]interface{}{
"message": "Hello SSE!",
}
request.Params.Name = "echo"
request.Params.Arguments = arguments
d, _ := json.Marshal(request)
fmt.Println(string(d))
// Test echo tool
result, err := client.CallTool(context.Background(), request)
if err != nil {
return
}
textContent := result.Content[0].(mcp.TextContent)
fmt.Println(textContent.Text)
time.Sleep(100 * time.Second)
}返回如下
{"method":"tools/call","params":{"name":"echo","arguments":{"message":"Hello SSE!"}}}
Echo: Hello SSE!研究下源码我们发现client。Start其实是向url http://localhost:8080/sse 发起 GET请求
func (c *SSEMCPClient) Start(ctx context.Context) error {
req, err := http.NewRequestWithContext(ctx, "GET", c.baseURL.String(), nil)然后解析服务端返回指向/message的POST 链接
func (c *SSEMCPClient) readSSE(reader io.ReadCloser) {
if event != "" && data != "" {
c.handleSSEEvent(event, data)
}然后具体处理
func (c *SSEMCPClient) handleSSEEvent(event, data string) {
switch event {
case "endpoint":
endpoint, err := c.baseURL.Parse(data)
c.endpoint = endpoint
case "message":
for _, handler := range c.notifications {
handler(notification)
}最后看下CallTool调用
func (c *SSEMCPClient) CallTool(
ctx context.Context,
request mcp.CallToolRequest,
) (*mcp.CallToolResult, error) {
response, err := c.sendRequest(ctx, "tools/call", request.Params)func (c *SSEMCPClient) sendRequest(
ctx context.Context,
method string,
params interface{},
) (*json.RawMessage, error) {
req, err := http.NewRequestWithContext(
ctx,
"POST",
c.endpoint.String(),
bytes.NewReader(requestBytes),
)就是根据上面解析的message路径,发送请求。
本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!