
在分析完server端的源码实现后,我们分析下client端的源码实现,首先还是具体实现一个client:
package main
import (
"fmt"
"log"
"net/rpc"
)
// Args 定义与服务器相同的参数结构
type Args struct {
A, B int
}
funcmain() {
// 连接RPC服务器
client, err := rpc.DialHTTP("tcp", "127.0.0.1:1234")
if err != nil {
log.Fatal("连接失败:", err)
}
// 准备调用参数
args := &Args{A: 5, B: 3}
var reply int
// 调用远程方法
err = client.Call("MathService.Add", args, &reply)
if err != nil {
log.Fatal("调用失败:", err)
}
fmt.Printf("计算结果: %d + %d = %d\n", args.A, args.B, reply)
}
首先通过rpc.DialHTTP进行http连接,然后client.Call进行调用,第一个参数是方法名:即"ServiceName.MethodName"格式,然后是参数和返回值。运行结果如下:
计算结果: 5 + 3 = 8首先我们看看DialHttp的具体实现
func DialHTTP(network, address string) (*Client, error) {
return DialHTTPPath(network, address, DefaultRPCPath)
}func DialHTTPPath(network, address, path string) (*Client, error) {
conn, err := net.Dial(network, address)
if err != nil {
return nil, err
}
io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n")
// Require successful HTTP response
// before switching to RPC protocol.
resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"})
if err == nil && resp.Status == connected {
return NewClient(conn), nil
}
if err == nil {
err = errors.New("unexpected HTTP response: " + resp.Status)
}
conn.Close()
return nil, &net.OpError{
Op: "dial-http",
Net: network + " " + address,
Addr: nil,
Err: err,
}
}const (
// Defaults used by HandleHTTP
DefaultRPCPath = "/_goRPC_"没错就是对介绍server实现的时候提到的http路径_goRPC_,进行连接发起http CONNECT请求,请求成功后初始化一个客户端
func NewClient(conn io.ReadWriteCloser) *Client {
encBuf := bufio.NewWriter(conn)
client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
return NewClientWithCodec(client)
}同时注册gob的编码和解码器,然后启动协程等待server的返回
func NewClientWithCodec(codec ClientCodec) *Client {
client := &Client{
codec: codec,
pending: make(map[uint64]*Call),
}
go client.input()
return client
}func (client *Client) input() {
var err error
var response Response
for err == nil {
response = Response{}
err = client.codec.ReadResponseHeader(&response)接着我们看看Call方法,首先初始化一个Call结构体,然后填上ServiceMethod参数,然后调用send方法发送请求
func (client *Client) Call(serviceMethod string, args any, reply any) error {
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
return call.Error
}func (client *Client) Go(serviceMethod string, args any, reply any, done chan *Call) *Call {
call := new(Call)
call.ServiceMethod = serviceMethod
call.Args = args
call.Reply = reply
if done == nil {
done = make(chan *Call, 10) // buffered.
} else {
// If caller passes done != nil, it must arrange that
// done has enough buffer for the number of simultaneous
// RPCs that will be using that channel. If the channel
// is totally unbuffered, it's best not to run at all.
if cap(done) == 0 {
log.Panic("rpc: done channel is unbuffered")
}
}
call.Done = done
client.send(call)
return call
}具体Call的定义如下
type Call struct {
ServiceMethod string // The name of the service and method to call.
Args any // The argument to the function (*struct).
Reply any // The reply from the function (*struct).
Error error // After completion, the error status.
Done chan *Call // Receives *Call when Go is complete.
}在call内部使用前面定义的codec方法进行请求编码发送,也就是通过gob格式发送请求
func (client *Client) send(call *Call) {
client.reqMutex.Lock()
defer client.reqMutex.Unlock()
// Register this call.
client.mutex.Lock()
if client.shutdown || client.closing {
client.mutex.Unlock()
call.Error = ErrShutdown
call.done()
return
}
seq := client.seq
client.seq++
client.pending[seq] = call
client.mutex.Unlock()
// Encode and send the request.
client.request.Seq = seq
client.request.ServiceMethod = call.ServiceMethod
err := client.codec.WriteRequest(&client.request, call.Args)
if err != nil {
client.mutex.Lock()
call = client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
if call != nil {
call.Error = err
call.done()
}
}
}至此net/rpc包的源码介绍完毕
本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!