首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >mac 上学习k8s系列(21)CRD (part III)

mac 上学习k8s系列(21)CRD (part III)

作者头像
golangLeetcode
发布2022-08-02 19:33:15
发布2022-08-02 19:33:15
4580
举报

今天继续从https://github.com/kubernetes/sample-controller入手,分析crd的源码:

从main.go的main函数入手,首先通过config 创建了两个client,一个是k8s自带的client,一个是我们生成的。通过这两个client我们可以分别操作deployment和我们自定义的foo。

代码语言:javascript
复制
    kubeClient, err := kubernetes.NewForConfig(cfg)
    exampleClient, err := clientset.NewForConfig(cfg)

然后是通过这两个cliend得到了两个informerfactory,通过这两个factory我们可以获得所关注的资源的变化:

代码语言:javascript
复制
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)

具体实现在分别在:

代码语言:javascript
复制
  vendor/k8s.io/client-go/kubernetes/scheme/register.go
  pkg/generated/clientset/versioned/scheme/register.go

然后通过这两个client和informer来创建controller:informer接受消息通知,controller 根据收到的消息,进行对应的更新

代码语言:javascript
复制
controller := NewController(kubeClient, exampleClient,
    kubeInformerFactory.Apps().V1().Deployments(),
    exampleInformerFactory.Samplecontroller().V1alpha1().Foos())

启动两个informer:

代码语言:javascript
复制
kubeInformerFactory.Start(stopCh)
exampleInformerFactory.Start(stopCh)

然后我们就进入了事件循环:

代码语言:javascript
复制
if err = controller.Run(2, stopCh); err != nil {

然后我们重点看下controller.go,首先看下构造函数:

代码语言:javascript
复制
func NewController(
  kubeclientset kubernetes.Interface,
  sampleclientset clientset.Interface,
  deploymentInformer appsinformers.DeploymentInformer,
  fooInformer informers.FooInformer) *Controller 
      samplescheme.AddToScheme(scheme.Scheme)
      deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: controller.enqueueFoo,
        UpdateFunc: func(old, new interface{}) {controller.enqueueFoo(new)},
      workqueue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Foos"),
      deploymentsLister: deploymentInformer.Lister(),
      deploymentsSynced: deploymentInformer.Informer().HasSynced,
      foosLister:        fooInformer.Lister(),
      foosSynced:        fooInformer.Informer().HasSynced,

首先通过 AddToScheme 把我们自定义生成的schema 注册到runtime的schema里面;然后给informer 注册事件处理函数,AddFunc和UpdateFunc,AddFunc负责把事件加入到队列里,informer收到信息后就enqueue,UpdateFunc不断从队列里消费事件,并做相应的处理,最后是定义了两对 Lister 和HasSynced。

接着我们看下事件大循环函数:

代码语言:javascript
复制
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error 
      defer c.workqueue.ShutDown()
      if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.foosSynced); !ok
        for i := 0; i < threadiness; i++ {
            go wait.Until(c.runWorker, time.Second, stopCh)
        }  

等待deployment 和我们自定义的Foo都synced以后就起两个go routine,执行runWorker方法。

代码语言:javascript
复制
func (c *Controller) runWorker() {
  for c.processNextWorkItem() {
  }
}

它里面是一个死循环,调用了 processNextWorkItem函数:

代码语言:javascript
复制
func (c *Controller) processNextWorkItem() bool
      obj, shutdown := c.workqueue.Get()
      defer c.workqueue.Done(obj)
      c.workqueue.Forget(obj)
      err := c.syncHandler(key); err != nil
      c.workqueue.AddRateLimited(key)
      c.workqueue.Forget(obj)

这个函数的作用,就是不断从workqueue里取对象,如果对象不是我们想要的类型,或者执行过程中出错,就Forget,否则调用syncHandler来做同步处理,处理完成后就Done,这个队列有限速的作用。

代码语言:javascript
复制
func (c *Controller) syncHandler(key string) error
      namespace, name, err := cache.SplitMetaNamespaceKey(key)
      foo, err := c.foosLister.Foos(namespace).Get(name)
      deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)
      deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(context.TODO(), newDeployment(foo), metav1.CreateOptions{})
      !metav1.IsControlledBy(deployment, foo)
      deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(context.TODO(), newDeployment(foo), metav1.UpdateOptions{})
      err = c.updateFooStatus(foo, deployment)
      c.recorder.Event(foo, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)

syncHandler,首先通过lister分别获区foo对象和deployment对象,如果deployment对象不存在,那么就通过kubeclientset 创建一个。如果不是被Foo 控制的,就更行成Foo控制的。 然后更新foo的状态。

代码语言:javascript
复制
func (c *Controller) updateFooStatus(foo *samplev1alpha1.Foo, deployment *appsv1.Deployment) error
      fooCopy := foo.DeepCopy()
      fooCopy.Status.AvailableReplicas = deployment.Status.AvailableReplicas
      _, err := c.sampleclientset.SamplecontrollerV1alpha1().Foos(foo.Namespace).Update(context.TODO(), fooCopy, metav1.UpdateOptions{})

这里就是唯一一个用到sampleClient的地方,就是用来更新foo的状态。

代码语言:javascript
复制
func newDeployment(foo *samplev1alpha1.Foo) *appsv1.Deployment 
代码语言:javascript
复制
func (c *Controller) handleObject(obj interface{}) 
      foo, err := c.foosLister.Foos(object.GetNamespace()).Get(ownerRef.Name)
      c.enqueueFoo(foo)

在初始化controller的时候,AddFunc就是用 controller.enqueueFoo 函数来初始化的,它的定义如下:不断把事件塞入队列

代码语言:javascript
复制
func (c *Controller) enqueueFoo(obj interface{}) 
      c.workqueue.Add(key)

workqueue是一个RateLimitingInterface

代码语言:javascript
复制
type Controller struct
      workqueue workqueue.RateLimitingInterface

informer的调用栈如下:

vendor/k8s.io/client-go/tools/cache/shared_informer.go

代码语言:javascript
复制
err := wait.PollImmediateUntil(syncedPollPeriod,
  for _, syncFunc := range cacheSyncs {
        if !syncFunc() {

vendor/k8s.io/apimachinery/pkg/util/wait/wait.go

代码语言:javascript
复制
func PollImmediateUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error
      done, err := condition()
      select {
        return PollUntil(interval, condition, stopCh)
代码语言:javascript
复制
func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error
      return WaitFor(poller(interval, 0), condition, ctx.Done())

这就是sample-controller的源码,通过两个client,分别控制deployment和自定义的foo,通过事件循环,把二者关联起来。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-11-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档