首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >golang源码分析 :gopls(4)

golang源码分析 :gopls(4)

作者头像
golangLeetcode
发布2026-03-18 18:28:16
发布2026-03-18 18:28:16
690
举报

初始化完转发代理,我们看看json-rpc服务提供能力是如何实现的

代码语言:javascript
复制
func ListenAndServe(ctx context.Context, network, addr string, server StreamServer, idleTimeout time.Duration) error {
    ln, err := net.Listen(network, addr)
    if err != nil {
        return err
    }
    defer ln.Close()
    if network == "unix" {
        defer os.Remove(addr)
    }
    return Serve(ctx, ln, server, idleTimeout)
}

它传入的server提供了ServeStream方法给Serve函数使用

代码语言:javascript
复制
type StreamServer interface {
    ServeStream(context.Context, Conn) error
}

具体的Serve函数定义如下:

代码语言:javascript
复制
func Serve(ctx context.Context, ln net.Listener, server StreamServer, idleTimeout time.Duration) error {
    newConns := make(chan net.Conn)
    closedConns := make(chan error)
    activeConns := 0
    var acceptErr error
    go func() {
        defer close(newConns)
        for {
            var nc net.Conn
            nc, acceptErr = ln.Accept()
            if acceptErr != nil {
                return
            }
            newConns <- nc
        }
    }()
    ctx, cancel := context.WithCancel(ctx)
    defer func() {
        // Signal the Accept goroutine to stop immediately
        // and terminate all newly-accepted connections until it returns.
        ln.Close()
        for nc := range newConns {
            nc.Close()
        }
        // Cancel pending ServeStream callbacks and wait for them to finish.
        cancel()
        for activeConns > 0 {
            err := <-closedConns
            if !isClosingError(err) {
                event.Error(ctx, "closed a connection", err)
            }
            activeConns--
        }
    }()
    // Max duration: ~290 years; surely that's long enough.
    const forever = math.MaxInt64
    if idleTimeout <= 0 {
        idleTimeout = forever
    }
    connTimer := time.NewTimer(idleTimeout)
    defer connTimer.Stop()
    for {
        select {
        case netConn, ok := <-newConns:
            if !ok {
                return acceptErr
            }
            if activeConns == 0 && !connTimer.Stop() {
                // connTimer.C may receive a value even after Stop returns.
                // (See https://golang.org/issue/37196.)
                <-connTimer.C
            }
            activeConns++
            stream := NewHeaderStream(netConn)
            go func() {
                conn := NewConn(stream)
                err := server.ServeStream(ctx, conn)
                stream.Close()
                closedConns <- err
            }()
        case err := <-closedConns:
            if !isClosingError(err) {
                event.Error(ctx, "closed a connection", err)
            }
            activeConns--
            if activeConns == 0 {
                connTimer.Reset(idleTimeout)
            }
        case <-connTimer.C:
            return ErrIdleTimeout
        case <-ctx.Done():
            return nil
        }
    }
}

首先起一个协程不断调用ln.Accept()方法获取连接,然后方法放到channel里面,然后就是在循环里不断解析出HeaderStream,针对每个strea启动一个goroutine,然后调用server.ServeStream方法进行处理。

代码语言:javascript
复制
func NewHeaderStream(conn net.Conn) Stream {
    return &headerStream{
        conn: conn,
        in:   bufio.NewReader(conn),
    }
}
代码语言:javascript
复制
func NewConn(s Stream) Conn {
    conn := &conn{
        stream:  s,
        pending: make(map[ID]chan *Response),
        done:    make(chan struct{}),
    }
    return conn
}

其中conn的定义如下

代码语言:javascript
复制
type conn struct {
    seq       int64      // must only be accessed using atomic operations
    writeMu   sync.Mutex // protects writes to the stream
    stream    Stream
    pendingMu sync.Mutex // protects the pending map
    pending   map[ID]chan *Response
    done chan struct{}
    err  atomic.Value
}

接着看下上一篇用到的go方法

代码语言:javascript
复制
func (c *conn) Go(ctx context.Context, handler Handler) {
    go c.run(ctx, handler)
}
代码语言:javascript
复制
func (c *conn) run(ctx context.Context, handler Handler) {
    defer close(c.done)
    for {
        // get the next message
        msg, n, err := c.stream.Read(ctx)
        if err != nil {
            // The stream failed, we cannot continue.
            c.fail(err)
            return
        }
        switch msg := msg.(type) {
        case Request:
            labels := []label.Label{
                Method.Of(msg.Method()),
                RPCDirection.Of(Inbound),
                {}, // reserved for ID if present
            }
            if call, ok := msg.(*Call); ok {
                labels[len(labels)-1] = RPCID.Of(fmt.Sprintf("%q", call.ID()))
            } else {
                labels = labels[:len(labels)-1]
            }
            reqCtx, spanDone := event.Start(ctx, msg.Method(), labels...)
            event.Metric(reqCtx,
                Started.Of(1),
                ReceivedBytes.Of(n))
            if err := handler(reqCtx, c.replier(msg, spanDone), msg); err != nil {
                // delivery failed, not much we can do
                event.Error(reqCtx, "jsonrpc2 message delivery failed", err)
            }
        case *Response:
            // If method is not set, this should be a response, in which case we must
            // have an id to send the response back to the caller.
            c.pendingMu.Lock()
            rchan, ok := c.pending[msg.id]
            c.pendingMu.Unlock()
            if ok {
                rchan <- msg
            }
        }
    }
}

其实就是不断的读取消息,然后按照json-rpc协议解析消息,组装消息然后调用handler进行处理,其中handler就是我调用Go放方法时候传入的处理器。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-07-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档