1. const (
    2. // 主要是若给定版本的obj不在cache,需要去etcd fetch数据
    3. // 此参数代表了fetch的超时时间
    4. blockTimeout = 3 * time.Second
    5. // 客户端收到too high resource version异常需要先block的时长,然后再重拾
    6. resourceVersionTooHighRetrySeconds = 1
    7. )
    8. // 为了避免多次计算obj的key/label/field,需要缓存这些值
    9. type storeElement struct {
    10. Key string
    11. Object runtime.Object
    12. Labels labels.Set
    13. Fields fields.Set
    14. }
    15. func storeElementKey(obj interface{}) (string, error) {
    16. elem, ok := obj.(*storeElement)
    17. if !ok {
    18. return "", fmt.Errorf("not a storeElement: %v", obj)
    19. }
    20. return elem.Key, nil
    21. }
    22. func storeElementObject(obj interface{}) (runtime.Object, error) {
    23. elem, ok := obj.(*storeElement)
    24. if !ok {
    25. return nil, fmt.Errorf("not a storeElement: %v", obj)
    26. }
    27. return elem.Object, nil
    28. }
    29. func storeElementIndexFunc(objIndexFunc cache.IndexFunc) cache.IndexFunc {
    30. return func(obj interface{}) (strings []string, e error) {
    31. seo, err := storeElementObject(obj)
    32. if err != nil {
    33. return nil, err
    34. }
    35. return objIndexFunc(seo)
    36. }
    37. }
    38. func storeElementIndexers(indexers *cache.Indexers) cache.Indexers {
    39. if indexers == nil {
    40. return cache.Indexers{}
    41. }
    42. ret := cache.Indexers{}
    43. for indexName, indexFunc := range *indexers {
    44. ret[indexName] = storeElementIndexFunc(indexFunc)
    45. }
    46. return ret
    47. }
    48. // watchCache实际上是一个滑动窗口
    49. // 维护固定数量的obj
    50. type watchCache struct {
    51. sync.RWMutex
    52. // 等待刷新足够的版本数据到cache
    53. cond *sync.Cond
    54. // 滑动窗口的容量
    55. capacity int
    56. keyFunc func(runtime.Object) (string, error)
    57. getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error)
    58. // cache is used a cyclic buffer - its first element (with the smallest
    59. // resourceVersion) is defined by startIndex, its last element is defined
    60. // by endIndex (if cache is full it will be startIndex + capacity).
    61. // Both startIndex and endIndex can be greater than buffer capacity -
    62. // you should always apply modulo capacity to get an index in cache array.
    63. cache []*watchCacheEvent
    64. startIndex int
    65. endIndex int
    66. // store will effectively support LIST operation from the "end of cache
    67. // history" i.e. from the moment just after the newest cached watched event.
    68. // It is necessary to effectively allow clients to start watching at now.
    69. // NOTE: We assume that <store> is thread-safe.
    70. store cache.Indexer
    71. // ResourceVersion up to which the watchCache is propagated.
    72. resourceVersion uint64
    73. // ResourceVersion of the last list result (populated via Replace() method).
    74. listResourceVersion uint64
    75. // This handler is run at the end of every successful Replace() method.
    76. onReplace func()
    77. // 事件处理器
    78. eventHandler func(*watchCacheEvent)
    79. clock clock.Clock
    80. // 获取obj的resourceversion信息
    81. versioner storage.Versioner
    82. }
    83. type watchCacheEvent struct {
    84. Key string
    85. ResourceVersion uint64
    86. Type watch.EventType
    87. Object runtime.Object
    88. ObjLabels labels.Set
    89. ObjFields fields.Set
    90. PrevObject runtime.Object
    91. PrevObjLabels labels.Set
    92. PrevObjFields fields.Set
    93. }
    94. func newWatchCache(
    95. capacity int,
    96. keyFunc func(runtime.Object) (string, error),
    97. eventHandler func(*watchCacheEvent),
    98. getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error),
    99. versioner storage.Versioner,
    100. indexers *cache.Indexers) *watchCache {
    101. wc := &watchCache{
    102. capacity: capacity,
    103. keyFunc: keyFunc,
    104. getAttrsFunc: getAttrsFunc,
    105. cache: make([]*watchCacheEvent, capacity),
    106. startIndex: 0,
    107. endIndex: 0,
    108. store: cache.NewIndexer(storeElementKey, storeElementIndexers(indexers)),
    109. resourceVersion: 0,
    110. listResourceVersion: 0,
    111. eventHandler: eventHandler,
    112. clock: clock.RealClock{},
    113. versioner: versioner,
    114. }
    115. wc.cond = sync.NewCond(wc.RLocker())
    116. return wc
    117. }
    118. func (w *watchCache) Add(obj interface{}) error {
    119. object, resourceVersion, err := w.objectToVersionedRuntimeObject(obj)
    120. if err != nil {
    121. return err
    122. }
    123. event := watch.Event{Type: watch.Added, Object: object}
    124. f := func(elem *storeElement) error { return w.store.Add(elem) }
    125. return w.processEvent(event, resourceVersion, f)
    126. }
    127. func (w *watchCache) Update(obj interface{}) error {
    128. object, resourceVersion, err := w.objectToVersionedRuntimeObject(obj)
    129. if err != nil {
    130. return err
    131. }
    132. event := watch.Event{Type: watch.Modified, Object: object}
    133. f := func(elem *storeElement) error { return w.store.Update(elem) }
    134. return w.processEvent(event, resourceVersion, f)
    135. }
    136. func (w *watchCache) Delete(obj interface{}) error {
    137. object, resourceVersion, err := w.objectToVersionedRuntimeObject(obj)
    138. if err != nil {
    139. return err
    140. }
    141. event := watch.Event{Type: watch.Deleted, Object: object}
    142. f := func(elem *storeElement) error { return w.store.Delete(elem) }
    143. return w.processEvent(event, resourceVersion, f)
    144. }
    145. func (w *watchCache) List() []interface{} {
    146. return w.store.List()
    147. }
    148. func (w *watchCache) ListKeys() []string {
    149. return w.store.ListKeys()
    150. }
    151. func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, matchValues []storage.MatchValue, trace *utiltrace.Trace) ([]interface{}, uint64, error) {
    152. err := w.waitUntilFreshAndBlock(resourceVersion, trace)
    153. defer w.RUnlock()
    154. if err != nil {
    155. return nil, 0, err
    156. }
    157. // This isn't the place where we do "final filtering" - only some "prefiltering" is happening here. So the only
    158. // requirement here is to NOT miss anything that should be returned. We can return as many non-matching items as we
    159. // want - they will be filtered out later. The fact that we return less things is only further performance improvement.
    160. // TODO: if multiple indexes match, return the one with the fewest items, so as to do as much filtering as possible.
    161. for _, matchValue := range matchValues {
    162. if result, err := w.store.ByIndex(matchValue.IndexName, matchValue.Value); err == nil {
    163. return result, w.resourceVersion, nil
    164. }
    165. }
    166. return w.store.List(), w.resourceVersion, nil
    167. }
    168. func (w *watchCache) WaitUntilFreshAndGet(resourceVersion uint64, key string, trace *utiltrace.Trace) (interface{}, bool, uint64, error) {
    169. err := w.waitUntilFreshAndBlock(resourceVersion, trace)
    170. defer w.RUnlock()
    171. if err != nil {
    172. return nil, false, 0, err
    173. }
    174. value, exists, err := w.store.GetByKey(key)
    175. return value, exists, w.resourceVersion, err
    176. }
    177. func (w *watchCache) Get(obj interface{}) (interface{}, bool, error) {
    178. object, ok := obj.(runtime.Object)
    179. if !ok {
    180. return nil, false, fmt.Errorf("obj does not implement runtime.Object interface: %v", obj)
    181. }
    182. key, err := w.keyFunc(object)
    183. if err != nil {
    184. return nil, false, fmt.Errorf("couldn't compute key: %v", err)
    185. }
    186. return w.store.Get(&storeElement{Key: key, Object: object})
    187. }
    188. func (w *watchCache) GetByKey(key string) (interface{}, bool, error) {
    189. return w.store.GetByKey(key)
    190. }
    191. func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
    192. version, err := w.versioner.ParseResourceVersion(resourceVersion)
    193. if err != nil {
    194. return err
    195. }
    196. toReplace := make([]interface{}, 0, len(objs))
    197. for _, obj := range objs {
    198. object, ok := obj.(runtime.Object)
    199. if !ok {
    200. return fmt.Errorf("didn't get runtime.Object for replace: %#v", obj)
    201. }
    202. key, err := w.keyFunc(object)
    203. if err != nil {
    204. return fmt.Errorf("couldn't compute key: %v", err)
    205. }
    206. objLabels, objFields, err := w.getAttrsFunc(object)
    207. if err != nil {
    208. return err
    209. }
    210. toReplace = append(toReplace, &storeElement{
    211. Key: key,
    212. Object: object,
    213. Labels: objLabels,
    214. Fields: objFields,
    215. })
    216. }
    217. w.Lock()
    218. defer w.Unlock()
    219. w.startIndex = 0
    220. w.endIndex = 0
    221. if err := w.store.Replace(toReplace, resourceVersion); err != nil {
    222. return err
    223. }
    224. w.listResourceVersion = version
    225. w.resourceVersion = version
    226. if w.onReplace != nil {
    227. w.onReplace()
    228. }
    229. w.cond.Broadcast()
    230. return nil
    231. }
    232. func (w *watchCache) SetOnReplace(onReplace func()) {
    233. w.Lock()
    234. defer w.Unlock()
    235. w.onReplace = onReplace
    236. }
    237. func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*watchCacheEvent, error) {
    238. size := w.endIndex - w.startIndex
    239. var oldest uint64
    240. switch {
    241. case size >= w.capacity:
    242. // Once the watch event buffer is full, the oldest watch event we can deliver
    243. // is the first one in the buffer.
    244. oldest = w.cache[w.startIndex%w.capacity].ResourceVersion
    245. case w.listResourceVersion > 0:
    246. // If the watch event buffer isn't full, the oldest watch event we can deliver
    247. // is one greater than the resource version of the last full list.
    248. oldest = w.listResourceVersion + 1
    249. case size > 0:
    250. // If we've never completed a list, use the resourceVersion of the oldest event
    251. // in the buffer.
    252. // This should only happen in unit tests that populate the buffer without
    253. // performing list/replace operations.
    254. oldest = w.cache[w.startIndex%w.capacity].ResourceVersion
    255. default:
    256. return nil, fmt.Errorf("watch cache isn't correctly initialized")
    257. }
    258. if resourceVersion == 0 {
    259. // resourceVersion = 0 means that we don't require any specific starting point
    260. // and we would like to start watching from ~now.
    261. // However, to keep backward compatibility, we additionally need to return the
    262. // current state and only then start watching from that point.
    263. //
    264. // TODO: In v2 api, we should stop returning the current state - #13969.
    265. allItems := w.store.List()
    266. result := make([]*watchCacheEvent, len(allItems))
    267. for i, item := range allItems {
    268. elem, ok := item.(*storeElement)
    269. if !ok {
    270. return nil, fmt.Errorf("not a storeElement: %v", elem)
    271. }
    272. objLabels, objFields, err := w.getAttrsFunc(elem.Object)
    273. if err != nil {
    274. return nil, err
    275. }
    276. result[i] = &watchCacheEvent{
    277. Type: watch.Added,
    278. Object: elem.Object,
    279. ObjLabels: objLabels,
    280. ObjFields: objFields,
    281. Key: elem.Key,
    282. ResourceVersion: w.resourceVersion,
    283. }
    284. }
    285. return result, nil
    286. }
    287. if resourceVersion < oldest-1 {
    288. return nil, errors.NewResourceExpired(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1))
    289. }
    290. // Binary search the smallest index at which resourceVersion is greater than the given one.
    291. f := func(i int) bool {
    292. return w.cache[(w.startIndex+i)%w.capacity].ResourceVersion > resourceVersion
    293. }
    294. first := sort.Search(size, f)
    295. result := make([]*watchCacheEvent, size-first)
    296. for i := 0; i < size-first; i++ {
    297. result[i] = w.cache[(w.startIndex+first+i)%w.capacity]
    298. }
    299. return result, nil
    300. }
    301. func (w *watchCache) GetAllEventsSince(resourceVersion uint64) ([]*watchCacheEvent, error) {
    302. w.RLock()
    303. defer w.RUnlock()
    304. return w.GetAllEventsSinceThreadUnsafe(resourceVersion)
    305. }
    306. func (w *watchCache) Resync() error {
    307. return nil
    308. }
    309. func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error {
    310. key, err := w.keyFunc(event.Object)
    311. if err != nil {
    312. return fmt.Errorf("couldn't compute key: %v", err)
    313. }
    314. elem := &storeElement{Key: key, Object: event.Object}
    315. elem.Labels, elem.Fields, err = w.getAttrsFunc(event.Object)
    316. if err != nil {
    317. return err
    318. }
    319. wcEvent := &watchCacheEvent{
    320. Type: event.Type,
    321. Object: elem.Object,
    322. ObjLabels: elem.Labels,
    323. ObjFields: elem.Fields,
    324. Key: key,
    325. ResourceVersion: resourceVersion,
    326. }
    327. if err := func() error {
    328. // TODO: We should consider moving this lock below after the watchCacheEvent
    329. // is created. In such situation, the only problematic scenario is Replace(
    330. // happening after getting object from store and before acquiring a lock.
    331. // Maybe introduce another lock for this purpose.
    332. w.Lock()
    333. defer w.Unlock()
    334. previous, exists, err := w.store.Get(elem)
    335. if err != nil {
    336. return err
    337. }
    338. if exists {
    339. previousElem := previous.(*storeElement)
    340. wcEvent.PrevObject = previousElem.Object
    341. wcEvent.PrevObjLabels = previousElem.Labels
    342. wcEvent.PrevObjFields = previousElem.Fields
    343. }
    344. w.updateCache(wcEvent)
    345. w.resourceVersion = resourceVersion
    346. defer w.cond.Broadcast()
    347. return updateFunc(elem)
    348. }(); err != nil {
    349. return err
    350. }
    351. // Avoid calling event handler under lock.
    352. // This is safe as long as there is at most one call to processEvent in flight
    353. // at any point in time.
    354. if w.eventHandler != nil {
    355. w.eventHandler(wcEvent)
    356. }
    357. return nil
    358. }
    359. func (w *watchCache) updateCache(event *watchCacheEvent) {
    360. if w.endIndex == w.startIndex+w.capacity {
    361. // Cache is full - remove the oldest element.
    362. w.startIndex++
    363. }
    364. w.cache[w.endIndex%w.capacity] = event
    365. w.endIndex++
    366. }
    367. func (w *watchCache) waitUntilFreshAndBlock(resourceVersion uint64, trace *utiltrace.Trace) error {
    368. startTime := w.clock.Now()
    369. go func() {
    370. // Wake us up when the time limit has expired. The docs
    371. // promise that time.After (well, NewTimer, which it calls)
    372. // will wait *at least* the duration given. Since this go
    373. // routine starts sometime after we record the start time, and
    374. // it will wake up the loop below sometime after the broadcast,
    375. // we don't need to worry about waking it up before the time
    376. // has expired accidentally.
    377. <-w.clock.After(blockTimeout)
    378. w.cond.Broadcast()
    379. }()
    380. w.RLock()
    381. if trace != nil {
    382. trace.Step("watchCache locked acquired")
    383. }
    384. for w.resourceVersion < resourceVersion {
    385. if w.clock.Since(startTime) >= blockTimeout {
    386. // Request that the client retry after 'resourceVersionTooHighRetrySeconds' seconds.
    387. return storage.NewTooLargeResourceVersionError(resourceVersion, w.resourceVersion, resourceVersionTooHighRetrySeconds)
    388. }
    389. w.cond.Wait()
    390. }
    391. if trace != nil {
    392. trace.Step("watchCache fresh enough")
    393. }
    394. return nil
    395. }
    396. // 解析obj为runtime.Object、version
    397. func (w *watchCache) objectToVersionedRuntimeObject(obj interface{}) (runtime.Object, uint64, error) {
    398. object, ok := obj.(runtime.Object)
    399. if !ok {
    400. return nil, 0, fmt.Errorf("obj does not implement runtime.Object interface: %v", obj)
    401. }
    402. resourceVersion, err := w.versioner.ObjectResourceVersion(object)
    403. if err != nil {
    404. return nil, 0, err
    405. }
    406. return object, resourceVersion, nil
    407. }