首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >Go动态感知资源变更的常规套路,你指定用过!

Go动态感知资源变更的常规套路,你指定用过!

作者头像
有态度的马甲
发布2025-08-01 14:15:39
发布2025-08-01 14:15:39
1980
举报
文章被收录于专栏:精益码农精益码农

最近在倒腾“AI大模型基础设施”, 目标是做一个基于云原生的AI算力平台,目前因公司隐私暂不能公开宏观背景和技术方案, 姑且记录实践中遇到的一些技能点。

前文已经记录了第1步: 使用arena 提交训练任务的实践

今天我们记录聊一聊平台侧另一个核心能力: 动态纳管云原生k8s集群,并监听AI/ML训练任务的状态变更,也就是上图的第4步。


作为面向算法开发者的云原生saas平台,平台在界面上提供了纳管集群的交互入口,平台启动后会去监听pytorch、mpi训练任务的状态变更,并回显到界面(并给开发者发送飞书变更通知)。

本文核心关注:

  • 如何动态纳管k8s集群
  • 如何重建k8s informer监听

这里我提供我的实践, 请看下图:

1. 程序启动,加载初始k8s集群,informer监听训练任务状态,并绑定停止信号(stopCh)、重建信号(rebuildCh)

代码语言:javascript
复制
package main

import "k8s.io/client-go/rest"

type StartInformerFunc func(clusterId string, restConf *rest.Config) (stopCh chan struct{}, err error)
type InformerManager struct {
 clusterConfigs map[string]string
 gvr            map[string]StartInformerFunc
 stopCh         chan struct{}  // informer停止信号
 rebuildCh      chan struct{}  // 发生变更的重建信号
}

var InformerManagerInstance *InformerManager

func NewInFormerManager() *InformerManager {
 InformerManagerInstance = &InformerManager{
  clusterConfigs: map[string]string{
   "id1": "kubeconfig1",
   "id2": "kubeconfig2",
  },
  gvr: map[string]StartInformerFunc{
     "pytorchjob": startPytorchjobInformer,
     "mpijob": startMpijobInformer,
     "job": startRawjobInformer,
  },
  stopCh:    make(chan struct{}),
  rebuildCh: make(chan struct{}, 1),
 }
  return InformerManagerInstance
}

stopCh是informer.Start(……)持续监听要用到的停止信号,rebuildCh是重建informer监听信号。

1.1 开协程定时任务去轮循落盘的待纳管k8s集群记录

- 考虑纳管的k8s集群数据可控,变更时机可控,采用md5校验的方式判断是否发生集群变更

下面的k8s.CheckClusterChanged(mgr.clusterConfigs) 是利用对kube-configs做md5, 前后对比判断集群是否发生变更。

代码语言:javascript
复制
func (mgr *InformerManager) monitorClusterChanged() bool {
 ticker := time.NewTicker(30 * time.Second)
 defer ticker.Stop()
for {
  select {
case <-ticker.C:
   if k8s.CheckClusterChanged(mgr.clusterConfigs) {
    fmt.Println("cluster changed")
    mgr.rebuildCh <- struct{}{}
   } else {
    fmt.Println("cluster no change")
   }
case <-mgr.rebuildCh:
   fmt.Println("rebuild informer")
   mgr.clusterConfigs = k8s.GetInitclusters()
   for k, v := range mgr.clusterConfigs {
    rc, err := k8s.ConvertToRestConfig([]byte(v))
    if err != nil {
     fmt.Println("convert to rest config failed")
     continue
    }
    for gvr, StartInformerFunc := range mgr.gvr {
     ……
    }
   }
  }
 }
}

3. 利用简单的链表指针,重置informer监听

代码语言:javascript
复制
func (mgr *InformerManager) Run() {
for k, v := range mgr.clusterConfigs {
  rc, err := k8s.ConvertToRestConfig([]byte(v))
if err != nil {
   fmt.Println("failed to convert kubeconfig to rest config")
   continue
  }
for gvr, StartInformerFunc := range mgr.gvr {
   go func(k string, rc *rest.Config) {
    newStopch, err := StartInformerFunc(k, rc)
    if err != nil {
     return
    }
    if mgr.stopCh != nil {   // 停止旧informer监听
        close(mgr.stopCh)
    }
    mgr.stopCh = newStopch    // 保存当前重建的监听信号
    fmt.Printf("start %s informer for cluster %s  \n", gvr,k)
   }(k, rc)
  }
  go mgr.monitorClusterChanged()
 }
}

本文记录了使用定时任务感知资源变更,并利用golang信道作为变更重建信号的姿势,可作为golang中动态感知资源变化的常规套路。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-07-31,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 精益码农 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档