首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >golang源码分析:net/rpc(2)

golang源码分析:net/rpc(2)

作者头像
golangLeetcode
发布2026-03-18 18:17:32
发布2026-03-18 18:17:32
340
举报

在分析完server端的源码实现后,我们分析下client端的源码实现,首先还是具体实现一个client:

代码语言:javascript
复制
package main
import (
    "fmt"
    "log"
    "net/rpc"
)
// Args 定义与服务器相同的参数结构
type Args struct {
    A, B int
}
funcmain() {
    // 连接RPC服务器
    client, err := rpc.DialHTTP("tcp", "127.0.0.1:1234")
    if err != nil {
        log.Fatal("连接失败:", err)
    }
    // 准备调用参数
    args := &Args{A: 5, B: 3}
    var reply int
    // 调用远程方法
    err = client.Call("MathService.Add", args, &reply)
    if err != nil {
        log.Fatal("调用失败:", err)
    }
    fmt.Printf("计算结果: %d + %d = %d\n", args.A, args.B, reply)
}

首先通过rpc.DialHTTP进行http连接,然后client.Call进行调用,第一个参数是方法名:即"ServiceName.MethodName"格式,然后是参数和返回值。运行结果如下:

代码语言:javascript
复制
计算结果: 5 + 3 = 8

首先我们看看DialHttp的具体实现

代码语言:javascript
复制
func DialHTTP(network, address string) (*Client, error) {
    return DialHTTPPath(network, address, DefaultRPCPath)
}
代码语言:javascript
复制
func DialHTTPPath(network, address, path string) (*Client, error) {
    conn, err := net.Dial(network, address)
    if err != nil {
        return nil, err
    }
    io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n")
    // Require successful HTTP response
    // before switching to RPC protocol.
    resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"})
    if err == nil && resp.Status == connected {
        return NewClient(conn), nil
    }
    if err == nil {
        err = errors.New("unexpected HTTP response: " + resp.Status)
    }
    conn.Close()
    return nil, &net.OpError{
        Op:   "dial-http",
        Net:  network + " " + address,
        Addr: nil,
        Err:  err,
    }
}
代码语言:javascript
复制
const (
    // Defaults used by HandleHTTP
    DefaultRPCPath   = "/_goRPC_"

没错就是对介绍server实现的时候提到的http路径_goRPC_,进行连接发起http CONNECT请求,请求成功后初始化一个客户端

代码语言:javascript
复制
func NewClient(conn io.ReadWriteCloser) *Client {
    encBuf := bufio.NewWriter(conn)
    client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
    return NewClientWithCodec(client)
}

同时注册gob的编码和解码器,然后启动协程等待server的返回

代码语言:javascript
复制
func NewClientWithCodec(codec ClientCodec) *Client {
    client := &Client{
        codec:   codec,
        pending: make(map[uint64]*Call),
    }
    go client.input()
    return client
}
代码语言:javascript
复制
func (client *Client) input() {
    var err error
    var response Response
    for err == nil {
        response = Response{}
        err = client.codec.ReadResponseHeader(&response)

接着我们看看Call方法,首先初始化一个Call结构体,然后填上ServiceMethod参数,然后调用send方法发送请求

代码语言:javascript
复制
func (client *Client) Call(serviceMethod string, args any, reply any) error {
    call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
    return call.Error
}
代码语言:javascript
复制
func (client *Client) Go(serviceMethod string, args any, reply any, done chan *Call) *Call {
    call := new(Call)
    call.ServiceMethod = serviceMethod
    call.Args = args
    call.Reply = reply
    if done == nil {
        done = make(chan *Call, 10) // buffered.
    } else {
        // If caller passes done != nil, it must arrange that
        // done has enough buffer for the number of simultaneous
        // RPCs that will be using that channel. If the channel
        // is totally unbuffered, it's best not to run at all.
        if cap(done) == 0 {
            log.Panic("rpc: done channel is unbuffered")
        }
    }
    call.Done = done
    client.send(call)
    return call
}

具体Call的定义如下

代码语言:javascript
复制
type Call struct {
    ServiceMethod string     // The name of the service and method to call.
    Args          any        // The argument to the function (*struct).
    Reply         any        // The reply from the function (*struct).
    Error         error      // After completion, the error status.
    Done          chan *Call // Receives *Call when Go is complete.
}

在call内部使用前面定义的codec方法进行请求编码发送,也就是通过gob格式发送请求

代码语言:javascript
复制
func (client *Client) send(call *Call) {
    client.reqMutex.Lock()
    defer client.reqMutex.Unlock()
    // Register this call.
    client.mutex.Lock()
    if client.shutdown || client.closing {
        client.mutex.Unlock()
        call.Error = ErrShutdown
        call.done()
        return
    }
    seq := client.seq
    client.seq++
    client.pending[seq] = call
    client.mutex.Unlock()
    // Encode and send the request.
    client.request.Seq = seq
    client.request.ServiceMethod = call.ServiceMethod
    err := client.codec.WriteRequest(&client.request, call.Args)
    if err != nil {
        client.mutex.Lock()
        call = client.pending[seq]
        delete(client.pending, seq)
        client.mutex.Unlock()
        if call != nil {
            call.Error = err
            call.done()
        }
    }
}

至此net/rpc包的源码介绍完毕

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-06-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档