@@ -12,21 +12,40 @@ Informer 是 client-go 中一个比较核心的工具,通过 Informer(实际
1212
1313## 架构概览
1414
15- 自定义控制器的工作流程基本如下图所示,我们今天要分析图中上半部分的逻辑。
15+ 自定义控制器的工作流程基本如下图所示,我们今天要分析图中上半部分的逻辑。(图片来自 https://github.com/kubernetes/sample-controller/blob/master/docs/controller-client-go.md )
1616
1717![ 1555996411720] ( image/informer/1555996411720.png )
1818
1919我们开发自定义控制器的时候用到的“机制”主要定义在 client-go 的 tool/cache下:
2020
2121![ 1556075198766] ( image/informer/1556075198766.png )
2222
23- 我们根据图中的9个步骤来跟源码
23+ 先关注一下第一幅图中涉及到的一些 components:
2424
25- ## reflector - List & Watch API Server
25+ ### client-go 相关模块
26+
27+ - ** Reflector** : Reflector 类型定义在 cache 包中(* tools/cache/reflector.go:47* ),它的作用是向 apiserver watch 特定的资源类型。这个功能通过其绑定的 ListAndWatch 方法实现。Watch 的资源可以是 in-build 的资源也可以是 custom 的资源。当 Reflector 通过 watch API 接收到存在新的资源对象实例的通知后,它使用相应的 list API 获取新创建的资源对象,然后 put 进 Delta Fifo 队列。这个步骤在 watchHandler 函数(* tools/cache/reflector.go:268* )中完成。
28+ - ** Informer** : 一个定义在 cache 包中的基础 controller(* tools/cache/controller.go:75* ) (一个 informer) 从 Delta Fifo 队列中 pop 出来资源对象实例(这个功能在 processLoop 中实现(* tools/cache/controller.go:148* ))。这个 base controller 做的工作是保存这个对象用于后续检索处理用的,然后触发我们自己的控制器来处理这个对象。
29+ - ** Indexer** : Indexer 提供的是 objects 之上的检索能力。Indexer 也定义在 cache 包中(* tools/cache/index.go:27* ). 一个典型的检索使用方式是基于一个对象的 labels 创建索引。Indexer 可以基于各种索引函数维护索引。Indexer 使用一个线程安全的 store 来存储对象和其对应的 key. 还有一个默认函数 MetaNamespaceKeyFunc(* tools/cache/store.go:76* ) 可以生成对象的 key,类似 < ; namespace> ; /< ; name> ; 格式来关联对应的对象。
30+
31+ ### 自定义控制器相关模块
32+
33+ - ** Informer reference** : 这是一个知道如何处理自定义资源对象的 Informer 实例的引用。自定义控制器需要创建合适的 Informer.
34+ - ** Indexer reference** : 这是一个知道如何处理自定义资源对象的 Indexer 实例的引用. 自定义控制器代码需要创建这个引用对象,然后用于检索资源对象用于后续的处理。
35+
36+ Base controller 提供了 NewIndexerInformer(* tools/cache/controller.go:345* ) 函数来创建 Informer 和 Indexer. 在代码里我们可以直接调用这个函数或者使用工厂方法来创建 informer.
37+
38+ - ** Resource Event Handlers** : 这是一个回调函数,在 Informer 想要分发一个对象给控制器的时候会调用这个函数。典型的用法是写一个函数来获取分发过来的对象的 key,将 key 放入队列中等待进一步的处理。
39+ - ** Work queue** : 这个队列是在自己的控制器代码中创建的,用来解耦一个对象的分发和处理过程。Resource event handler 函数会被写成提取分发来的对象的 key,然后将这个 key 添加到 work queue 里面。
40+ - ** Process Item** 这是我们在自己代码中实现的用来处理 work queue 中拿到的 items 的函数。这里可以有一个或多个函数来处理具体的过程,这个函数的典型用法是使用 Indexer 索引或者一个 Listing wrapper 来根据相应的 key 检索对象。
41+
42+ 下面我们根据图中这几个步骤来跟源码。
43+
44+ ## 第一步:reflector - List & Watch API Server
2645
2746Reflector 会监视特定的资源,将变化写入给定的存储中,也就是 Delta FIFO queue.
2847
29- ### Reflector 对象
48+ ### Reflector 对象定义
3049
3150Reflector 的中文含义是反射器,我们先看一下类型定义:
3251
@@ -91,7 +110,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
91110 // 开始 watch
92111 w , err := r.listerWatcher .Watch (options)
93112 // ……
94- // w 交给 watchHandler 处理
113+ // w 交给 watchHandler 处理,这里的逻辑后面分析
95114 if err := r.watchHandler (w, &resourceVersion, resyncerrc, stopCh); err != nil {
96115 if err != errorStopRequested {
97116 klog.Warningf (" %s : watch of %v ended with: %v " , r.name , r.expectedType , err)
@@ -102,9 +121,9 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
102121}
103122```
104123
105- ## watchHandler - add obj to delta fifo
124+ ## 第二步: watchHandler - add obj to delta fifo
106125
107- 前面讲到 ListAndWatch 函数的最后一步逻辑是 watchHandler,在 ListAndWatch 中先是更新了 Delta FIFO 中的 item,然后 watch 资源对象,最后交给 watchHandler 处理,所以 watchHandler 基本可以猜到是将有变化的资源添加到 Delta FIFO 中了 。
126+ 前面讲到 ** ListAndWatch** 函数的最后一步逻辑是 ** watchHandler** ,在 ListAndWatch 中先是更新了 Delta FIFO 中的 item,然后 watch 资源对象,最后交给 watchHandler 处理,所以 watchHandler 基本可以猜到是将有变化的资源添加到 Delta FIFO 中,我们具体来看 。
108127
109128!FILENAME tools/cache/reflector.go:287
110129
@@ -150,11 +169,13 @@ loop:
150169}
151170```
152171
153- ## Informer (controller) - pop obj from delta fifo
172+ ## 第三、四、五步:Informer - pop obj from delta fifo、Add obj to store
173+
174+ 先看 Controller 是什么
154175
155176### Controller
156177
157- 一个 Informer 需要实现 Controller 接口:
178+ Informer 会实现 Controller 接口,这个接口长这样 :
158179
159180!FILENAME tools/cache/controller.go:82
160181
@@ -166,7 +187,7 @@ type Controller interface {
166187}
167188```
168189
169- 一个基础的 Controller 实现如下 :
190+ 和这个 Controller 对应的有一个基础 controller 实现 :
170191
171192!FILENAME tools/cache/controller.go:75
172193
@@ -183,7 +204,7 @@ controller 类型结构如下:
183204
184205![ 1556088003902] ( image/informer/1556088003902.png )
185206
186- 可以看到主要对外暴露的逻辑是 Run() 方法,我们看一下 Run() 中的逻辑:
207+ 可以看到主要对外暴露的逻辑是 Run() 方法,还有一个重点 processLoop() 其实也在 Run() 里面被调用, 我们看一下 Run() 中的逻辑:
187208
188209!FILENAME tools/cache/controller.go:100
189210
@@ -233,7 +254,7 @@ func (c *controller) processLoop() {
233254
234255这里的 Queue 就是 Delta FIFO,Pop 是个阻塞方法,内部实现时会逐个 pop 队列中的数据,交给 PopProcessFunc 处理。我们先不看 Pop 的实现,关注一下 PopProcessFunc 是如何处理 Pop 中从队列拿出来的 item 的。
235256
236- PopProcessFunc 是一个类型:
257+ PopProcessFunc 是一个类型,如下 :
237258
238259` type PopProcessFunc func(interface{}) error `
239260
@@ -274,9 +295,9 @@ Process: func(obj interface{}) error {
274295- clientState
275296- ResourceEventHandler (h)
276297
277- 我们一一来看
298+ 我们后面会一一分析到。
278299
279- ## Add obj to Indexer (Thread safe store)
300+ ### clientState
280301
281302前面说到 clientState,这个变量的初始化是` clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers) `
282303
@@ -307,7 +328,7 @@ type Indexer interface {
307328}
308329```
309330
310- 顺带看一下 NewThreadSafeStore()
331+ 顺带看一下 ** NewThreadSafeStore()**
311332
312333!FILENAME tools/cache/thread_safe_store.go:298
313334
@@ -355,13 +376,13 @@ func (c *threadSafeMap) Add(key string, obj interface{}) {
355376}
356377```
357378
358- 第四步和第五步的内容先分析到这里 ,后面关注 threadSafeMap 实现的时候再继续深入。
379+ 这块逻辑先分析到这里 ,后面关注 threadSafeMap 实现的时候再继续深入。
359380
360- ## sharedIndexInformer
381+ ## 第六步:Dispatch Event Handler functions
361382
362- 第六步是 Dispatch Event Handler functions(Send Object to Custom Controller)
383+ 我们先看一个接口 SharedInformer
363384
364- 我们先看一个接口 SharedInformer:
385+ ### sharedIndexInformer
365386
366387!FILENAME tools/cache/shared_informer.go:43
367388
0 commit comments