Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
Taken from https://github.com/kubernetes/kubernetes/blob/1889a6ef52eb18b08e24843577c5b9d3b9a65daa/pkg/controller/controller_utils.go
As soon as expectations become available in client-go or apimachinery, delete this and switch.
*/
package controller
import (
"fmt"
"sync"
"sync/atomic"
"time"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
"k8s.io/utils/clock"
"kubevirt.io/client-go/log"
)
const (
ExpectationsTimeout = 5 * time.Minute
)
var ExpKeyFunc = func(obj interface{}) (string, error) {
if e, ok := obj.(*ControlleeExpectations); ok {
return e.key, nil
}
return "", fmt.Errorf("Could not find key for obj %#v", obj)
}
type ControllerExpectationsInterface interface {
GetExpectations(controllerKey string) (*ControlleeExpectations, bool, error)
SatisfiedExpectations(controllerKey string) bool
DeleteExpectations(controllerKey string)
SetExpectations(controllerKey string, add, del int)
ExpectCreations(controllerKey string, adds int)
ExpectDeletions(controllerKey string, dels int)
CreationObserved(controllerKey string)
DeletionObserved(controllerKey string)
RaiseExpectations(controllerKey string, add, del int)
LowerExpectations(controllerKey string, add, del int)
AllPendingCreations() (creations int64)
}
type ControllerExpectations struct {
cache.Store
name string
}
func (r *ControllerExpectations) GetExpectations(controllerKey string) (*ControlleeExpectations, bool, error) {
if exp, exists, err := r.GetByKey(controllerKey); err == nil && exists {
return exp.(*ControlleeExpectations), true, nil
} else {
return nil, false, err
}
}
func (r *ControllerExpectations) DeleteExpectations(controllerKey string) {
if exp, exists, err := r.GetByKey(controllerKey); err == nil && exists {
if err := r.Delete(exp); err != nil {
log.Log.Infof("Error deleting expectations for controller %v: %v", controllerKey, err)
}
}
}
func (r *ControllerExpectations) SatisfiedExpectations(controllerKey string) bool {
if exp, exists, err := r.GetExpectations(controllerKey); exists {
if exp.Fulfilled() {
log.Log.V(4).Infof("Controller expectations (name: %s) fulfilled %#v", r.name, exp)
return true
} else if exp.isExpired() {
log.Log.V(4).Infof("Controller expectations (name: %s) expired %#v", r.name, exp)
return true
} else {
log.Log.V(4).Infof("Controller (name: %s) still waiting on expectations %#v", r.name, exp)
return false
}
} else if err != nil {
log.Log.Infof("Error encountered while checking expectations (name: %s) %#v, forcing sync", r.name, err)
} else {
log.Log.V(4).Infof("Controller %v (name: %s) either never recorded expectations, or the ttl expired.", controllerKey, r.name)
}
return true
}
func (exp *ControlleeExpectations) isExpired() bool {
return clock.RealClock{}.Since(exp.timestamp) > ExpectationsTimeout
}
func panicWithKeyFuncMsg(err error) {
const keyFuncChangedFormat = "KeyFunc was changed, %v"
panic(fmt.Errorf(keyFuncChangedFormat, err))
}
func (r *ControllerExpectations) SetExpectations(controllerKey string, add, del int) {
exp := &ControlleeExpectations{key: controllerKey, timestamp: clock.RealClock{}.Now()}
exp.add.Store(int64(add))
exp.del.Store(int64(del))
log.Log.V(4).Infof("Setting expectations %#v", exp)
if err := r.Add(exp); err != nil {
panicWithKeyFuncMsg(err)
}
}
func (r *ControllerExpectations) ExpectCreations(controllerKey string, adds int) {
r.SetExpectations(controllerKey, adds, 0)
}
func (r *ControllerExpectations) ExpectDeletions(controllerKey string, dels int) {
r.SetExpectations(controllerKey, 0, dels)
}
func (r *ControllerExpectations) LowerExpectations(controllerKey string, add, del int) {
if exp, exists, err := r.GetExpectations(controllerKey); err == nil && exists {
exp.Add(int64(-add), int64(-del))
log.Log.V(4).Infof("Lowered expectations: %s", exp)
}
}
func (r *ControllerExpectations) RaiseExpectations(controllerKey string, add, del int) {
if exp, exists, err := r.GetExpectations(controllerKey); err == nil && exists {
exp.Add(int64(add), int64(del))
log.Log.V(4).Infof("Raised expectations: %s", exp)
}
}
func (r *ControllerExpectations) CreationObserved(controllerKey string) {
r.LowerExpectations(controllerKey, 1, 0)
}
func (r *ControllerExpectations) AllPendingCreations() (sum int64) {
for _, key := range r.ListKeys() {
exp, exists, _ := r.GetExpectations(key)
if exists {
sum = sum + exp.add.Load()
}
}
return
}
func (r *ControllerExpectations) DeletionObserved(controllerKey string) {
r.LowerExpectations(controllerKey, 0, 1)
}
type Expectations interface {
Fulfilled() bool
}
type ControlleeExpectations struct {
add atomic.Int64
del atomic.Int64
key string
timestamp time.Time
}
func (e *ControlleeExpectations) Add(add, del int64) {
e.add.Add(add)
e.del.Add(del)
}
func (e *ControlleeExpectations) Fulfilled() bool {
return e.add.Load() <= 0 && e.del.Load() <= 0
}
func (e *ControlleeExpectations) GetExpectations() (int64, int64) {
return e.add.Load(), e.del.Load()
}
func (e *ControlleeExpectations) String() string {
return fmt.Sprintf("key: %s, timestamp: %v, add: %d, del: %d", e.key, e.timestamp, e.add.Load(), e.del.Load())
}
func NewControllerExpectations() *ControllerExpectations {
return &ControllerExpectations{Store: cache.NewStore(ExpKeyFunc), name: "n/a"}
}
func NewControllerExpectationsWithName(name string) *ControllerExpectations {
return &ControllerExpectations{Store: cache.NewStore(ExpKeyFunc), name: name}
}
var UIDSetKeyFunc = func(obj interface{}) (string, error) {
if u, ok := obj.(*UIDSet); ok {
return u.key, nil
}
return "", fmt.Errorf("Could not find key for obj %#v", obj)
}
type UIDSet struct {
sets.String
key string
}
type UIDTrackingControllerExpectations struct {
ControllerExpectationsInterface
uidStoreLock sync.Mutex
uidStore cache.Store
}
func (u *UIDTrackingControllerExpectations) GetUIDs(controllerKey string) sets.String {
if uid, exists, err := u.uidStore.GetByKey(controllerKey); err == nil && exists {
return uid.(*UIDSet).String
}
return nil
}
func (u *UIDTrackingControllerExpectations) ExpectDeletions(rcKey string, deletedKeys []string) {
u.uidStoreLock.Lock()
defer u.uidStoreLock.Unlock()
if existing := u.GetUIDs(rcKey); existing != nil && existing.Len() != 0 {
log.Log.Errorf("Clobbering existing delete keys: %+v", existing)
}
expectedUIDs := sets.NewString()
for _, k := range deletedKeys {
expectedUIDs.Insert(k)
}
log.Log.V(4).Infof("Controller %v waiting on deletions for: %+v", rcKey, deletedKeys)
if err := u.uidStore.Add(&UIDSet{expectedUIDs, rcKey}); err != nil {
panicWithKeyFuncMsg(err)
}
u.ControllerExpectationsInterface.ExpectDeletions(rcKey, expectedUIDs.Len())
}
func (u *UIDTrackingControllerExpectations) AddExpectedDeletion(rcKey string, deletedKey string) {
u.uidStoreLock.Lock()
defer u.uidStoreLock.Unlock()
expectedUIDs := sets.NewString()
if existing := u.GetUIDs(rcKey); existing != nil && existing.Len() != 0 {
expectedUIDs = existing
}
expectedUIDs.Insert(deletedKey)
log.Log.V(4).Infof("Controller %v waiting on deletions for: %+v", rcKey, expectedUIDs)
if err := u.uidStore.Add(&UIDSet{expectedUIDs, rcKey}); err != nil {
panicWithKeyFuncMsg(err)
}
u.ControllerExpectationsInterface.ExpectDeletions(rcKey, expectedUIDs.Len())
}
func (u *UIDTrackingControllerExpectations) DeletionObserved(rcKey, deleteKey string) {
u.uidStoreLock.Lock()
defer u.uidStoreLock.Unlock()
uids := u.GetUIDs(rcKey)
if uids != nil && uids.Has(deleteKey) {
log.Log.V(4).Infof("Controller %v received delete for pod %v", rcKey, deleteKey)
u.ControllerExpectationsInterface.DeletionObserved(rcKey)
uids.Delete(deleteKey)
}
}
func (u *UIDTrackingControllerExpectations) DeleteExpectations(rcKey string) {
u.uidStoreLock.Lock()
defer u.uidStoreLock.Unlock()
u.ControllerExpectationsInterface.DeleteExpectations(rcKey)
if uidExp, exists, err := u.uidStore.GetByKey(rcKey); err == nil && exists {
if err := u.uidStore.Delete(uidExp); err != nil {
log.Log.Infof("Error deleting uid expectations for controller %v: %v", rcKey, err)
}
}
}
func NewUIDTrackingControllerExpectations(ce ControllerExpectationsInterface) *UIDTrackingControllerExpectations {
return &UIDTrackingControllerExpectations{ControllerExpectationsInterface: ce, uidStore: cache.NewStore(UIDSetKeyFunc)}
}