Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 34 additions & 3 deletions pkg/cvo/cvo.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,11 @@ func New(
clusterProfile: clusterProfile,
}

cvInformer.Informer().AddEventHandler(optr.eventHandler())
cvInformer.Informer().AddEventHandler(optr.clusterVersionEventHandler())
cmConfigInformer.Informer().AddEventHandler(optr.adminAcksEventHandler())
cmConfigManagedInformer.Informer().AddEventHandler(optr.adminGatesEventHandler())

coInformer.Informer().AddEventHandler(optr.clusterOperatorEventHandler())
optr.coLister = coInformer.Lister()
optr.cacheSynced = append(optr.cacheSynced, coInformer.Informer().HasSynced)

Expand Down Expand Up @@ -457,9 +458,9 @@ func (optr *Operator) queueKey() string {
return fmt.Sprintf("%s/%s", optr.namespace, optr.name)
}

// eventHandler queues an update for the cluster version on any change to the given object.
// clusterVersionEventHandler queues an update for the cluster version on any change to the given object.
// Callers should use this with a scoped informer.
func (optr *Operator) eventHandler() cache.ResourceEventHandler {
func (optr *Operator) clusterVersionEventHandler() cache.ResourceEventHandler {
workQueueKey := optr.queueKey()
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
Expand All @@ -478,6 +479,36 @@ func (optr *Operator) eventHandler() cache.ResourceEventHandler {
}
}

// clusterOperatorEventHandler queues an update for the cluster version on any change to the given object.
// Callers should use this with an informer.
func (optr *Operator) clusterOperatorEventHandler() cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
UpdateFunc: func(old, new interface{}) {
versionName := "operator"
_, oldVersion := clusterOperatorInterfaceVersionOrDie(old, versionName)
newStruct, newVersion := clusterOperatorInterfaceVersionOrDie(new, versionName)
if optr.configSync != nil && oldVersion != newVersion {
msg := fmt.Sprintf("Cluster operator %s changed versions[name=%q] from %q to %q", newStruct.ObjectMeta.Name, versionName, oldVersion, newVersion)
optr.configSync.NotifyAboutManagedResourceActivity(new, msg)
}
},
}
}

func clusterOperatorInterfaceVersionOrDie(obj interface{}, name string) (*configv1.ClusterOperator, string) {
co, ok := obj.(*configv1.ClusterOperator)
if !ok {
panic(fmt.Sprintf("%v is %T, not a ClusterOperator", obj, obj))
}

for _, version := range co.Status.Versions {
if version.Name == name {
return co, version.Version
}
}
return co, ""
}

func (optr *Operator) worker(ctx context.Context, queue workqueue.RateLimitingInterface, syncHandler func(context.Context, string) error) {
for processNextWorkItem(ctx, queue, syncHandler, optr.syncFailingStatus) {
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/cvo/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,10 @@ func (r *fakeSyncRecorder) StatusCh() <-chan SyncWorkerStatus {
return ch
}

// Inform the sync worker about activity for a managed resource.
func (r *fakeSyncRecorder) NotifyAboutManagedResourceActivity(obj interface{}, message string) {
}

func (r *fakeSyncRecorder) Start(ctx context.Context, maxWorkers int, cvoOptrName string, lister configlistersv1.ClusterVersionLister) {
}

Expand Down
24 changes: 19 additions & 5 deletions pkg/cvo/sync_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ type ConfigSyncWorker interface {
Start(ctx context.Context, maxWorkers int, cvoOptrName string, lister configlistersv1.ClusterVersionLister)
Update(ctx context.Context, generation int64, desired configv1.Update, config *configv1.ClusterVersion, state payload.State, cvoOptrName string) *SyncWorkerStatus
StatusCh() <-chan SyncWorkerStatus

// Inform the sync worker about activity for a managed resource.
NotifyAboutManagedResourceActivity(obj interface{}, msg string)
}

// PayloadInfo returns details about the payload when it was retrieved.
Expand Down Expand Up @@ -163,7 +166,7 @@ type SyncWorker struct {
minimumReconcileInterval time.Duration

// coordination between the sync loop and external callers
notify chan struct{}
notify chan string
report chan SyncWorkerStatus

// lock guards changes to these fields
Expand Down Expand Up @@ -197,7 +200,7 @@ func NewSyncWorker(retriever PayloadRetriever, builder payload.ResourceBuilder,

minimumReconcileInterval: reconcileInterval,

notify: make(chan struct{}, 1),
notify: make(chan string, 1),
// report is a large buffered channel to improve local testing - most consumers should invoke
// Status() or use the result of calling Update() instead because the channel can be out of date
// if the reader is not fast enough.
Expand Down Expand Up @@ -225,6 +228,16 @@ func (w *SyncWorker) StatusCh() <-chan SyncWorkerStatus {
return w.report
}

// Inform the sync worker about activity for a managed resource.
func (w *SyncWorker) NotifyAboutManagedResourceActivity(obj interface{}, message string) {
select {
case w.notify <- message:
klog.V(2).Infof("Notify the sync worker: %s", message)
default:
klog.V(2).Info("The sync worker already has a pending notification, so do not notify about:no need to inform about: %s", message)
}
}

// syncPayload retrieves, loads, and verifies the specified payload, aka sync's the payload, whenever there is no current
// payload or the current payload differs from the desired payload. Whenever a payload is sync'ed a check is made for
// implicitly enabled capabilities. For the purposes of the check made here, implicitly enabled capabilities are
Expand Down Expand Up @@ -503,8 +516,9 @@ func (w *SyncWorker) Update(ctx context.Context, generation int64, desired confi
w.cancelFn()
w.cancelFn = nil
}
msg := "new work is available"
select {
case w.notify <- struct{}{}:
case w.notify <- msg:
klog.V(2).Info("Notify the sync worker that new work is available")
default:
klog.V(2).Info("The sync worker has already been notified that new work is available")
Expand Down Expand Up @@ -535,8 +549,8 @@ func (w *SyncWorker) Start(ctx context.Context, maxWorkers int, cvoOptrName stri
case <-next:
waitingToReconcile = false
klog.V(2).Infof("Wait finished")
case <-w.notify:
klog.V(2).Infof("Work updated")
case msg := <-w.notify:
klog.V(2).Info(msg)
}

// determine whether we need to do work
Expand Down