
cursor、cline、claude code等agent,除了支持和大模型交互外,还支持通过mcp 增强处理能力。如何完整实现一个支持这种能力的agent?其实原理很简单,下面我们开始去魅。我们的目标是实现下面这种agent:调用大模型逻辑处理,并支持调用mcp工具进行计算,最后通过三种通信方式进行结果推送。

首先,后端我们开发agent用到的还是langchain-go,但是langchain-go对mcp支持不友好,虽然它支持tools,但是并不适配通用的mcp格式,所以需要一层适配器,将mcp的client 适配为langchain的tool完整整个流程的整合。大模型调用,我们直接使用openai的接口即可。下面就是核心代码:
第一步初始化mcp client
func NewAgent() *Agent {
mcpClient, err := client.NewSSEMCPClient(
"http://127.0.0.1:8080/sse",
)
if err != nil {
log.Fatalf("Failed to create MCP client: %v", err)
}
llm, err := openai.New(
openai.WithBaseURL("https:///api/xxx"),
openai.WithToken("xxxxx"),
openai.WithModel("xxxx"),
)
if err != nil {
log.Fatalf("Create client: %v", err)
}
return &Agent{
mcpClient: mcpClient,
llm: llm,
}
}对于stdio通信格式
func NewAgent() *Agent {
// Create an MCP client using stdio
mcpClient, err := client.NewStdioMCPClient(
"./stdio/server/server", // Path to an MCP server executable
nil, // Additional environment variables if needed
)
if err != nil {
log.Fatalf("Failed to create MCP client: %v", err)
}
llm, err := openai.New(
openai.WithBaseURL("https:///api/xxx"),
openai.WithToken("xxxxx"),
openai.WithModel("xxxx"),
)
if err != nil {
log.Fatalf("Create client: %v", err)
}
return &Agent{
mcpClient: mcpClient,
llm: llm,
}
}第二步,进行接口适配,把mcp工具绑定到agent上
func (a *Agent) Init(ctx context.Context) {
// Start the client
if err := a.mcpClient.Start(ctx); err != nil {
fmt.Printf("Failed to start client: %v", err)
}
// Create the adapter
adapter, err := langchaingo_mcp_adapter.New(a.mcpClient)
if err != nil {
log.Fatalf("Failed to create adapter: %v", err)
}
// Get all tools from MCP server
mcpTools, err := adapter.Tools()
if err != nil {
log.Fatalf("Failed to get tools: %v", err)
}
for _, tool := range mcpTools {
fmt.Println("Tools: ", tool.Name(), tool.Description())
}
// Create a agent with the tools
agent := agents.NewOneShotAgent(
a.llm,
mcpTools,
agents.WithMaxIterations(3),
)
executor := agents.NewExecutor(agent)
a.executor = executor
fmt.Println("Agent executor initialized")
}第三步,处理请求
func (a *Agent) Execute(ctx context.Context, question string) string {
// Use the agent
result, err := chains.Run(
ctx,
a.executor,
question,
)
if err != nil {
log.Fatalf("Agent execution error: %v", err)
}
log.Printf("Agent result: %s", result)
return result
}如果是一个命令行的工具,我们已经大功告成了。但是如果想做一个网页版,或者做一个编辑器插件。我们就需要做结果的展示。常见的有三种方式:
方式一:基于http的长链接
后端
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("connection", "keep-alive")
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
return
}
fmt.Fprintf(w, "%s", string(res))
flusher.Flush()前端
async function httpSend(loadingId,currentImplementation, message) {
const response = await fetch('/api/chat', {
method: 'POST',
headers: {
'Content-Type': 'text/event-stream',
'connection': 'keep-alive',
'cache-control': 'no-cache',
},
body: JSON.stringify({
message: message,
implementation: currentImplementation,
}),
});
const data = await response.json();
if (data.error) {
showError(data.error);
} else {
addMessage('agent', "http 方式返回数据:"+data.message);
}
}方式二:经典的sse
后端
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("connection", "keep-alive")
flusher, ok := w.(http.Flusher)
if !ok {
fmt.Println("Streaming unsupported!")
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
return
}
fmt.Println("send response", string(res))
fmt.Fprintf(w, "data: %s\n\n", string(res))
flusher.Flush()前端
eventSource = new EventSource("/api/sse?msg="+message,{ retry: 50000 });
eventSource.onmessage = function(event) {
console.log('SSE message:', event.data);
addMessage('agent', "sse 方式返回数据:"+event.data);
eventSource.close();
};方式三:websocket
后端
func (s *Server) handleWebSocket(w http.ResponseWriter, r *http.Request) {
// Upgrade HTTP connection to WebSocket
conn, err := s.upgrader.Upgrade(w, r, nil)
if err != nil {
s.logger.Printf("WebSocket upgrade failed: %v", err)
return
}
// Get agent ID from query parameter
agentID := r.URL.Query().Get("agentId")
client := NewWSClient(conn, s.wsHub, agentID)
s.wsHub.register <- client
// Start read and write pumps
go client.WritePump()
go client.ReadPump()
s.logger.Printf("New WebSocket connection established (agentId: %s)", agentID)
}func (c *WSClient) WritePump() {
for {
select {
case message, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if !ok {
// Hub closed the channel
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
if err := c.conn.WriteJSON(message); err != nil {
c.hub.logger.Printf("Error writing message: %v", err)
return
}func (c *WSClient) ReadPump() {
for {
select {
_, message, err := c.conn.ReadMessage()
c.send <- &WSMessage{}前端
ws.send(JSON.stringify({
type:'message',
content: message,
agentId: currentImplementation,
})); ws = new WebSocket(wsUrl);
ws.onmessage = (event) => {
try {
const message = JSON.parse(event.data);
handleWebSocketMessage(message);
} catch (error) {
console.error('Error parsing WebSocket message:', error);
}
};总的来说,agent开发,由于流程中嵌入了大模型这个节点,导致流程看起来比较黑盒,但是基本原理非常简单。目前langchain工具箱已经非常成熟了,常用的功能基本都有封装。
本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!