找回密码
 会员注册
查看: 20|回复: 0

KubernetesInformer基本原理

[复制链接]

1

主题

0

回帖

4

积分

新手上路

积分
4
发表于 2024-10-12 20:43:55 | 显示全部楼层 |阅读模式
Kubernetes Informer基本原理 2024年01月30日 09:01 424 本文分析 k8s controller 中 informer 启动的基本流程不论是 k8s 自身组件,还是自己编写 controller,都需要通过 apiserver 监听 etcd 事件来完成自己的控制循环逻辑。如何高效可靠进行事件监听,k8s 客户端工具包 client-go 提供了一个通用的 informer 包,通过 informer,可以方便和高效的进行 controller 开发。informer 包提供了如下的一些功能:1、本地缓存(store)2、索引机制(indexer)3、Handler 注册功能(eventHandler)1、informer 架构整个 informer 机制架构如下图(图片源自 Client-go):可以看到这张图分为上下两个部分,上半部分由 client-go 提供,下半部分则是需要自己实现的控制循环逻辑本文主要分析上半部分的逻辑,包括下面几个组件:1.1、Reflector:从图上可以看到 Reflector 是一个和 apiserver 交互的组件,通过 list 和 watch api 将资源对象压入队列1.2、DeltaFifo:DeltaFifo的结构体示意如下:type DeltaFIFO struct {  ...  // We depend on the property that items in the s    et are in  // the queue and vice versa, and that all Deltas in this  // map have at least one Delta.  items map[string]Deltas  queue []string  ...}主要分为两部分,fifo 和 delta(1)fifo:先进先出队列对应结构体中的 queue,结构体示例如下:[default/centos-fd77b5886-pfrgn, xxx, xxx](2)delta:对应结构体中的items,存储了资源对象并且携带了资源操作类型的一个 map,结构体示例如下:map:{"default/centos-fd77b5886-pfrgn":[{Replaced &od{ObjectMeta: ${pod参数}], "xxx": [{},{}]}消费者从 queue 中 pop 出对象进行消费,并从 items 获取具体的消费操作(执行动作 Update/Deleted/Sync,和执行的对象 object spec)1.3、Indexer:client-go 用来存储资源对象并自带索引功能的本地存储,deltaFIFO 中 pop 出的对象将存储到 Indexer。indexer 与 etcd 集群中的数据保持一致,从而 client-go 可以直接从本地缓存获取资源对象,减少 apiserver 和 etcd 集群的压力。2、一个基本例子func main() {  stopCh := make(chan struct{})  defer close(stopCh)    // (1)New a k8s clientset  masterUrl := "172.27.32.110:8080"  config, err := clientcmd.BuildConfigFromFlags(masterUrl, "")  if err != nil {    klog.Errorf("BuildConfigFromFlags err, err: %v", err)  }    clientset, err := k.NewForConfig(config)  if err != nil {    klog.Errorf("Get clientset err, err: %v", err)  }    // (2)New a sharedInformers factory  sharedInformers := informers.NewSharedInformerFactory(clientset, defaultResync)      // (3)Register a informer  //  f.informers[informerType] = informer,  //  the detail for informer is build in NewFilteredPodInformer()  podInformer := sharedInformers.Core().V1().Pods().Informer()    // (4)Register event handler  podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{      AddFunc: func(obj interface{}) {        mObj := obj.(v1.Object)        klog.Infof("Get new obj: %v", mObj)        klog.Infof("Get new obj name: %s", mObj.GetName())      },  })    // (5)Start all informers  sharedInformers.Start(stopCh)    // (6)A cronjob for cache sync  if !cache.WaitForCacheSync(stopCh, podInformer.HasSynced) {    klog.Infof("Cache sync fail!")  }    // (7)Use lister  podLister := sharedInformers.Core().V1().Pods().Lister()  pods, err := podLister.List(labels.Everything())  if err != nil {    klog.Infof("err: %v", err)  }  klog.Infof("len(pods), %d", len(pods))  for _, v := range pods {    klog.Infof("pod: %s", v.Name)  }     0 {    if _, exists := f.items[id]; !exists {      f.queue = append(f.queue, id)    }    f.items[id] = newDeltas    f.cond.Broadcast()          // 通知所有阻塞住的消费者  }  ...  return nil}(2)消费—c.processLoop:消费逻辑就是从 DeltaFifo pop 出对象,然后做两件事情:(1)触发前面注册的 eventhandler (2)更新本地索引缓存 indexer,保持数据和 etcd 一致func (c *controller) processLoop() {  for {    obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))  }}### Queue.Pop:## Queue.Pop是一个带有处理函数的pod方法,首先先看Pod逻辑,即为deltaFifo的pop方法:func (f *DeltaFIFO) op(process opProcessFunc) (interface{}, error) {  for {                       // 无限循环    for len(f.queue) == 0 {      f.cond.Wait()       // 阻塞直到生产端broadcast方法通知    }    id := f.queue[0]    item, ok := f.items[id]    delete(f.items, id)    err := process(item)        // 执行处理方法    if e, ok := err.(ErrRequeue); ok {      f.addIfNotPresent(id, item)     // 如果处理失败的重新加入到fifo中重新处理      err = e.Err    }    return item, err  }}### c.config.Process:## c.config.Process是在初始化controller的时候赋值的,即为前面的s.HandleDeltas### s.HandleDeltas:func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {  s.blockDeltas.Lock()  defer s.blockDeltas.Unlock()  // from oldest to newest  for _, d := range obj.(Deltas) {    switch d.Type {    case Sync, Replaced, Added, Updated:      s.cacheMutationDetector.AddObject(d.Object)        if old, exists, err := s.indexer.Get(d.Object); err == nil & exists {          if err := s.indexer.Update(d.Object); err != nil {            return err          }          isSync := false          switch {          case d.Type == Sync:            // Sync events are only propagated to listeners that requested resync            isSync = true          case d.Type == Replaced:            if accessor, err := meta.Accessor(d.Object); err == nil {                if oldAccessor, err := meta.Accessor(old); err == nil {                  // Replaced events that didn't change resourceVersion are treated as resync events                  // and only propagated to listeners that requested resync                  isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()                }            }          }          s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)        } else {          if err := s.indexer.Add(d.Object); err != nil {            return err          }          s.processor.distribute(addNotification{newObj: d.Object}, false)        }    case Deleted:      if err := s.indexer.Delete(d.Object); err != nil {        return err      }      s.processor.distribute(deleteNotification{oldObj: d.Object}, false)    }  }  return nil}可以看到上面主要执行两部分逻辑:s.processor.distribute#### s.processor.distribute:### 例如新增通知:s.processor.distribute(addNotification{newObj: d.Object}, false)### 其中addNotification就是add类型的通知,后面会通过notification结构体的类型来执行不同的eventHandlerfunc (p *sharedProcessor) distribute(obj interface{}, sync bool) {  p.listenersLock.RLock()  defer p.listenersLock.RUnlock()    if sync {    for _, listener := range p.syncingListeners {      listener.add(obj)    }  } else {    for _, listener := range p.listeners {      listener.add(obj)    }  }}func (p *processorListener) add(notification interface{}) {  p.addCh 
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 会员注册

本版积分规则

QQ|手机版|心飞设计-版权所有:微度网络信息技术服务中心 ( 鲁ICP备17032091号-12 )|网站地图

GMT+8, 2024-12-25 14:02 , Processed in 1.692964 second(s), 26 queries .

Powered by Discuz! X3.5

© 2001-2024 Discuz! Team.

快速回复 返回顶部 返回列表