
初始化完转发代理,我们看看json-rpc服务提供能力是如何实现的
func ListenAndServe(ctx context.Context, network, addr string, server StreamServer, idleTimeout time.Duration) error {
ln, err := net.Listen(network, addr)
if err != nil {
return err
}
defer ln.Close()
if network == "unix" {
defer os.Remove(addr)
}
return Serve(ctx, ln, server, idleTimeout)
}它传入的server提供了ServeStream方法给Serve函数使用
type StreamServer interface {
ServeStream(context.Context, Conn) error
}具体的Serve函数定义如下:
func Serve(ctx context.Context, ln net.Listener, server StreamServer, idleTimeout time.Duration) error {
newConns := make(chan net.Conn)
closedConns := make(chan error)
activeConns := 0
var acceptErr error
go func() {
defer close(newConns)
for {
var nc net.Conn
nc, acceptErr = ln.Accept()
if acceptErr != nil {
return
}
newConns <- nc
}
}()
ctx, cancel := context.WithCancel(ctx)
defer func() {
// Signal the Accept goroutine to stop immediately
// and terminate all newly-accepted connections until it returns.
ln.Close()
for nc := range newConns {
nc.Close()
}
// Cancel pending ServeStream callbacks and wait for them to finish.
cancel()
for activeConns > 0 {
err := <-closedConns
if !isClosingError(err) {
event.Error(ctx, "closed a connection", err)
}
activeConns--
}
}()
// Max duration: ~290 years; surely that's long enough.
const forever = math.MaxInt64
if idleTimeout <= 0 {
idleTimeout = forever
}
connTimer := time.NewTimer(idleTimeout)
defer connTimer.Stop()
for {
select {
case netConn, ok := <-newConns:
if !ok {
return acceptErr
}
if activeConns == 0 && !connTimer.Stop() {
// connTimer.C may receive a value even after Stop returns.
// (See https://golang.org/issue/37196.)
<-connTimer.C
}
activeConns++
stream := NewHeaderStream(netConn)
go func() {
conn := NewConn(stream)
err := server.ServeStream(ctx, conn)
stream.Close()
closedConns <- err
}()
case err := <-closedConns:
if !isClosingError(err) {
event.Error(ctx, "closed a connection", err)
}
activeConns--
if activeConns == 0 {
connTimer.Reset(idleTimeout)
}
case <-connTimer.C:
return ErrIdleTimeout
case <-ctx.Done():
return nil
}
}
}首先起一个协程不断调用ln.Accept()方法获取连接,然后方法放到channel里面,然后就是在循环里不断解析出HeaderStream,针对每个strea启动一个goroutine,然后调用server.ServeStream方法进行处理。
func NewHeaderStream(conn net.Conn) Stream {
return &headerStream{
conn: conn,
in: bufio.NewReader(conn),
}
}func NewConn(s Stream) Conn {
conn := &conn{
stream: s,
pending: make(map[ID]chan *Response),
done: make(chan struct{}),
}
return conn
}其中conn的定义如下
type conn struct {
seq int64 // must only be accessed using atomic operations
writeMu sync.Mutex // protects writes to the stream
stream Stream
pendingMu sync.Mutex // protects the pending map
pending map[ID]chan *Response
done chan struct{}
err atomic.Value
}接着看下上一篇用到的go方法
func (c *conn) Go(ctx context.Context, handler Handler) {
go c.run(ctx, handler)
}func (c *conn) run(ctx context.Context, handler Handler) {
defer close(c.done)
for {
// get the next message
msg, n, err := c.stream.Read(ctx)
if err != nil {
// The stream failed, we cannot continue.
c.fail(err)
return
}
switch msg := msg.(type) {
case Request:
labels := []label.Label{
Method.Of(msg.Method()),
RPCDirection.Of(Inbound),
{}, // reserved for ID if present
}
if call, ok := msg.(*Call); ok {
labels[len(labels)-1] = RPCID.Of(fmt.Sprintf("%q", call.ID()))
} else {
labels = labels[:len(labels)-1]
}
reqCtx, spanDone := event.Start(ctx, msg.Method(), labels...)
event.Metric(reqCtx,
Started.Of(1),
ReceivedBytes.Of(n))
if err := handler(reqCtx, c.replier(msg, spanDone), msg); err != nil {
// delivery failed, not much we can do
event.Error(reqCtx, "jsonrpc2 message delivery failed", err)
}
case *Response:
// If method is not set, this should be a response, in which case we must
// have an id to send the response back to the caller.
c.pendingMu.Lock()
rchan, ok := c.pending[msg.id]
c.pendingMu.Unlock()
if ok {
rchan <- msg
}
}
}
}其实就是不断的读取消息,然后按照json-rpc协议解析消息,组装消息然后调用handler进行处理,其中handler就是我调用Go放方法时候传入的处理器。
本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!