// Type is a work queue (see the package comment). type Type struct { queue []t //定义元素的处理顺序,里面所有的元素在 dirty集合中应该都有,但是不能出现在processing集合中 dirty set //标记所有需要被处理的元素 processing set //当前正在被处理的元素,当处理完毕后,需要检查该元素是否在 dirty 集合中,如果在则添加到 queue 队列中 cond *sync.Cond shuttingDown bool drain bool metrics queueMetrics unfinishedWorkUpdatePeriod time.Duration clock clock.WithTicker }
// set是一个map,使用map key的唯一性当作set使用 type set map[t]empty
item = q.queue[0] // The underlying array still exists and reference this object, so the object will not be garbage collected. q.queue[0] = nil//设置为nil,让该元素可以被垃圾回收掉 q.queue = q.queue[1:]
// heartbeat ensures we wait no more than maxWait before firing heartbeat clock.Ticker
// waitingForAddCh is a buffered channel that feeds waitingForAdd waitingForAddCh chan *waitFor //传递 waitfor的channel,默认大小为1000
// metrics counts the number of retries metrics retryMetrics }
type DelayingInterface interface { Interface // AddAfter adds an item to the workqueue after the indicated duration has passed AddAfter(item interface{}, duration time.Duration) }
// Set up a wait for the first item's readyAt (if one exists) nextReadyAt := never if waitingForQueue.Len() > 0 { if nextReadyAtTimer != nil { nextReadyAtTimer.Stop() } entry := waitingForQueue.Peek().(*waitFor) nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now)) nextReadyAt = nextReadyAtTimer.C() }
select { case <-q.stopCh: return
case <-q.heartbeat.C(): // continue the loop, which will add ready items
case <-nextReadyAt: // continue the loop, which will add ready items
case waitEntry := <-q.waitingForAddCh: if waitEntry.readyAt.After(q.clock.Now()) { insert(waitingForQueue, waitingEntryByData, waitEntry) } else { q.Add(waitEntry.data) }
// AddAfter adds the given item to the work queue after the given delay func(q *delayingType) AddAfter(item interface{}, duration time.Duration) { // don't add if we're already shutting down if q.ShuttingDown() { return }
q.metrics.retry()
// immediately add things with no delay if duration <= 0 { q.Add(item) return }
select { case <-q.stopCh: // unblock if ShutDown() is called case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}: } }
// RateLimitingInterface is an interface that rate limits items being added to the queue. type RateLimitingInterface interface { DelayingInterface //内嵌延迟队列
// AddRateLimited adds an item to the workqueue after the rate limiter says it's ok AddRateLimited(item interface{}) //使用限速方式往队列中加入一个元素
// Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing // or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you // still have to call `Done` on the queue. Forget(item interface{}) //标识一个元素结束重试
// NumRequeues returns back how many times the item was requeued NumRequeues(item interface{}) int//标识这个元素被处理了多少次 }
// 限速器的定义 type RateLimiter interface { // When gets an item and gets to decide how long that item should wait When(item interface{}) time.Duration // Forget indicates that an item is finished being retried. Doesn't matter whether it's for failing // or for success, we'll stop tracking it Forget(item interface{}) // NumRequeues returns back how many failures the item has had NumRequeues(item interface{}) int }
iflen(newDeltas) > 0 { if _, exists := f.items[id]; !exists { f.queue = append(f.queue, id) } f.items[id] = newDeltas f.cond.Broadcast() } else { // This never happens, because dedupDeltas never returns an empty list // when given a non-empty list (as it is here). // If somehow it happens anyway, deal with it but complain. if oldDeltas == nil { klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj) returnnil } klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj) f.items[id] = newDeltas return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj) } returnnil }
Pop 函数会按照元素的添加或更新顺序有序返回一个元素(Deltas),在队列为空时会阻塞。Pop过程中会先从队列中删除一个元素后返回,如果处理失败了,需要通过 AddIfNotPresent函数将这个元素重新加回到队列汇总。
Pop的参数是 Type PopProcessFunc func(interface{}) error 类型的process,在Pop函数中,直接将队列中第一个元素出队,然后丢给process处理,如果处理失败会重新入队,但是这个 Deltas 和对应的错误信息会被返回
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for { for len(f.queue) == 0 { // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. // When Close() is called, the f.closed is set and the condition is broadcasted. // Which causes this loop to continue and return from the Pop(). if f.closed { return nil, ErrFIFOClosed }
f.cond.Wait() } isInInitialList := !f.hasSynced_locked() id := f.queue[0] f.queue = f.queue[1:] depth := len(f.queue) if f.initialPopulationCount > 0 { f.initialPopulationCount-- } item, ok := f.items[id] if !ok { // This should never happen klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id) continue } delete(f.items, id) // Only log traces if the queue depth is greater than 10 and it takes more than // 100 milliseconds to process one item from the queue. // Queue depth never goes high because processing an item is locking the queue, // and new items can't be added until processing finish. // https://github.com/kubernetes/kubernetes/issues/103789 if depth > 10 { trace := utiltrace.New("DeltaFIFO Pop Process", utiltrace.Field{Key: "ID", Value: id}, utiltrace.Field{Key: "Depth", Value: depth}, utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"}) defer trace.LogIfLong(100 * time.Millisecond) } err := process(item, isInInitialList) if e, ok := err.(ErrRequeue); ok { f.addIfNotPresent(id, item) err = e.Err } // Don't need to copyDeltas here, because we're transferring // ownership to the caller. return item, err } }
// keep backwards compat for old clients action := Sync if f.emitDeltaTypeReplaced { action = Replaced }
// Add Sync/Replaced action for each new item. for _, item := range list { key, err := f.KeyOf(item) if err != nil { return KeyError{item, err} } keys.Insert(key) if err := f.queueActionLocked(action, item); err != nil { return fmt.Errorf("couldn't enqueue object: %v", err) } }
if f.knownObjects == nil { // Do deletion detection against our own list. queuedDeletions := 0 for k, oldItem := range f.items { if keys.Has(k) { continue } // Delete pre-existing items not in the new list. // This could happen if watch deletion event was missed while // disconnected from apiserver. var deletedObj interface{} if n := oldItem.Newest(); n != nil { deletedObj = n.Object } queuedDeletions++ if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { return err } }
if !f.populated { f.populated = true // While there shouldn't be any queued deletions in the initial // population of the queue, it's better to be on the safe side. f.initialPopulationCount = keys.Len() + queuedDeletions }
returnnil }
// Detect deletions not already in the queue. knownKeys := f.knownObjects.ListKeys() queuedDeletions := 0 for _, k := range knownKeys { if keys.Has(k) { continue }
deletedObj, exists, err := f.knownObjects.GetByKey(k) if err != nil { deletedObj = nil klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k) } elseif !exists { deletedObj = nil klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k) } queuedDeletions++ if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { return err } }
// AddIndexers adds more indexers to this store. If you call this after you already have data // in the store, the results are undefined. AddIndexers(newIndexers Indexers) error // Resync is a no-op and is deprecated Resync() error }
func (i *storeIndex) updateIndices(oldObj interface{}, newObj interface{}, key string) { var oldIndexValues, indexValues []string var err error for name, indexFunc := range i.indexers { // oldObj是否存在,如果不存在就置空,如果存在则取出相应值 if oldObj != nil { oldIndexValues, err = indexFunc(oldObj) } else { oldIndexValues = oldIndexValues[:0] } if err != nil { panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err)) } // 和上面判断oldObj是否存在的逻辑一样 if newObj != nil { indexValues, err = indexFunc(newObj) } else { indexValues = indexValues[:0] } if err != nil { panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err)) } // 拿到一个index index := i.indices[name] if index == nil { index = Index{} i.indices[name] = index }
if len(indexValues) == 1 && len(oldIndexValues) == 1 && indexValues[0] == oldIndexValues[0] { // We optimize for the most common case where indexFunc returns a single value which has not been changed continue } // 删除oldIndex for _, value := range oldIndexValues { i.deleteKeyFromIndex(key, value, index) } // 添加一个新的index for _, value := range indexValues { i.addKeyToIndex(key, value, index) } } }
ListerWatcher是Reflector的主要能力提供者,通过一种叫作 ListAndWatch 的方法,把 APIServer 中的 API 对象缓存在了本地,并负责更新和维护这个缓存。ListAndWatch通过 APIServer 的 LIST API“获取”所有最新版本的 API 对象;然后,再通过 WATCH API 来“监听”所有这些 API 对象的变化;
err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, r.clock, resyncerrc, stopCh) retry.After(err) if err != nil { if err != errorStopRequested { switch { case isExpiredError(err): // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already // has a semantic that it returns data at least as fresh as provided RV. // So first try to LIST with setting RV to resource version of last observed object. klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.typeDescription, err) case apierrors.IsTooManyRequests(err): klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.typeDescription) <-r.initConnBackoffManager.Backoff().C() continue case apierrors.IsInternalError(err) && retry.ShouldRetry(): klog.V(2).Infof("%s: retrying watch of %v internal error: %v", r.name, r.typeDescription, err) continue default: klog.Warningf("%s: watch of %v ended with: %v", r.name, r.typeDescription, err) } } returnnil } } }
监控资源对象
Watch(监控)操作通过HTTP协议与Kubernetes API Server 建立长链接,接收 Kubernetes API Server 发来的资源变更事件。Watch操作的实现机制使用HTTP协议的分块传输编码(Chunked Transfer Enconding)。当client-go 调用Kubernetes API Server时,Kubernetes API Server 在Response的 HTTP Header 中设置 Transfer-Encoding的值为chunked,表示采用分块传输编码,客户端收到该信息后,便于服务端进行连接,并等待下一个数据块(资源的事件信息)