
今天继续从https://github.com/kubernetes/sample-controller入手,分析crd的源码:
从main.go的main函数入手,首先通过config 创建了两个client,一个是k8s自带的client,一个是我们生成的。通过这两个client我们可以分别操作deployment和我们自定义的foo。
kubeClient, err := kubernetes.NewForConfig(cfg)
exampleClient, err := clientset.NewForConfig(cfg)然后是通过这两个cliend得到了两个informerfactory,通过这两个factory我们可以获得所关注的资源的变化:
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)具体实现在分别在:
vendor/k8s.io/client-go/kubernetes/scheme/register.go
pkg/generated/clientset/versioned/scheme/register.go然后通过这两个client和informer来创建controller:informer接受消息通知,controller 根据收到的消息,进行对应的更新
controller := NewController(kubeClient, exampleClient,
kubeInformerFactory.Apps().V1().Deployments(),
exampleInformerFactory.Samplecontroller().V1alpha1().Foos())启动两个informer:
kubeInformerFactory.Start(stopCh)
exampleInformerFactory.Start(stopCh)然后我们就进入了事件循环:
if err = controller.Run(2, stopCh); err != nil {然后我们重点看下controller.go,首先看下构造函数:
func NewController(
kubeclientset kubernetes.Interface,
sampleclientset clientset.Interface,
deploymentInformer appsinformers.DeploymentInformer,
fooInformer informers.FooInformer) *Controller
samplescheme.AddToScheme(scheme.Scheme)
deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueFoo,
UpdateFunc: func(old, new interface{}) {controller.enqueueFoo(new)},
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Foos"),
deploymentsLister: deploymentInformer.Lister(),
deploymentsSynced: deploymentInformer.Informer().HasSynced,
foosLister: fooInformer.Lister(),
foosSynced: fooInformer.Informer().HasSynced,首先通过 AddToScheme 把我们自定义生成的schema 注册到runtime的schema里面;然后给informer 注册事件处理函数,AddFunc和UpdateFunc,AddFunc负责把事件加入到队列里,informer收到信息后就enqueue,UpdateFunc不断从队列里消费事件,并做相应的处理,最后是定义了两对 Lister 和HasSynced。
接着我们看下事件大循环函数:
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error
defer c.workqueue.ShutDown()
if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.foosSynced); !ok
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
} 等待deployment 和我们自定义的Foo都synced以后就起两个go routine,执行runWorker方法。
func (c *Controller) runWorker() {
for c.processNextWorkItem() {
}
}它里面是一个死循环,调用了 processNextWorkItem函数:
func (c *Controller) processNextWorkItem() bool
obj, shutdown := c.workqueue.Get()
defer c.workqueue.Done(obj)
c.workqueue.Forget(obj)
err := c.syncHandler(key); err != nil
c.workqueue.AddRateLimited(key)
c.workqueue.Forget(obj)这个函数的作用,就是不断从workqueue里取对象,如果对象不是我们想要的类型,或者执行过程中出错,就Forget,否则调用syncHandler来做同步处理,处理完成后就Done,这个队列有限速的作用。
func (c *Controller) syncHandler(key string) error
namespace, name, err := cache.SplitMetaNamespaceKey(key)
foo, err := c.foosLister.Foos(namespace).Get(name)
deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)
deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(context.TODO(), newDeployment(foo), metav1.CreateOptions{})
!metav1.IsControlledBy(deployment, foo)
deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(context.TODO(), newDeployment(foo), metav1.UpdateOptions{})
err = c.updateFooStatus(foo, deployment)
c.recorder.Event(foo, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)syncHandler,首先通过lister分别获区foo对象和deployment对象,如果deployment对象不存在,那么就通过kubeclientset 创建一个。如果不是被Foo 控制的,就更行成Foo控制的。 然后更新foo的状态。
func (c *Controller) updateFooStatus(foo *samplev1alpha1.Foo, deployment *appsv1.Deployment) error
fooCopy := foo.DeepCopy()
fooCopy.Status.AvailableReplicas = deployment.Status.AvailableReplicas
_, err := c.sampleclientset.SamplecontrollerV1alpha1().Foos(foo.Namespace).Update(context.TODO(), fooCopy, metav1.UpdateOptions{})这里就是唯一一个用到sampleClient的地方,就是用来更新foo的状态。
func newDeployment(foo *samplev1alpha1.Foo) *appsv1.Deployment func (c *Controller) handleObject(obj interface{})
foo, err := c.foosLister.Foos(object.GetNamespace()).Get(ownerRef.Name)
c.enqueueFoo(foo)在初始化controller的时候,AddFunc就是用 controller.enqueueFoo 函数来初始化的,它的定义如下:不断把事件塞入队列
func (c *Controller) enqueueFoo(obj interface{})
c.workqueue.Add(key)workqueue是一个RateLimitingInterface
type Controller struct
workqueue workqueue.RateLimitingInterfaceinformer的调用栈如下:
vendor/k8s.io/client-go/tools/cache/shared_informer.go
err := wait.PollImmediateUntil(syncedPollPeriod,
for _, syncFunc := range cacheSyncs {
if !syncFunc() {vendor/k8s.io/apimachinery/pkg/util/wait/wait.go
func PollImmediateUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error
done, err := condition()
select {
return PollUntil(interval, condition, stopCh)func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error
return WaitFor(poller(interval, 0), condition, ctx.Done())这就是sample-controller的源码,通过两个client,分别控制deployment和自定义的foo,通过事件循环,把二者关联起来。
本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!