diff --git a/pkg/cvo/cvo.go b/pkg/cvo/cvo.go index fda11e5a3..28f7476b3 100644 --- a/pkg/cvo/cvo.go +++ b/pkg/cvo/cvo.go @@ -197,10 +197,11 @@ func New( clusterProfile: clusterProfile, } - cvInformer.Informer().AddEventHandler(optr.eventHandler()) + cvInformer.Informer().AddEventHandler(optr.clusterVersionEventHandler()) cmConfigInformer.Informer().AddEventHandler(optr.adminAcksEventHandler()) cmConfigManagedInformer.Informer().AddEventHandler(optr.adminGatesEventHandler()) + coInformer.Informer().AddEventHandler(optr.clusterOperatorEventHandler()) optr.coLister = coInformer.Lister() optr.cacheSynced = append(optr.cacheSynced, coInformer.Informer().HasSynced) @@ -457,9 +458,9 @@ func (optr *Operator) queueKey() string { return fmt.Sprintf("%s/%s", optr.namespace, optr.name) } -// eventHandler queues an update for the cluster version on any change to the given object. +// clusterVersionEventHandler queues an update for the cluster version on any change to the given object. // Callers should use this with a scoped informer. -func (optr *Operator) eventHandler() cache.ResourceEventHandler { +func (optr *Operator) clusterVersionEventHandler() cache.ResourceEventHandler { workQueueKey := optr.queueKey() return cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -478,6 +479,36 @@ func (optr *Operator) eventHandler() cache.ResourceEventHandler { } } +// clusterOperatorEventHandler queues an update for the cluster version on any change to the given object. +// Callers should use this with an informer. +func (optr *Operator) clusterOperatorEventHandler() cache.ResourceEventHandler { + return cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(old, new interface{}) { + versionName := "operator" + _, oldVersion := clusterOperatorInterfaceVersionOrDie(old, versionName) + newStruct, newVersion := clusterOperatorInterfaceVersionOrDie(new, versionName) + if optr.configSync != nil && oldVersion != newVersion { + msg := fmt.Sprintf("Cluster operator %s changed versions[name=%q] from %q to %q", newStruct.ObjectMeta.Name, versionName, oldVersion, newVersion) + optr.configSync.NotifyAboutManagedResourceActivity(new, msg) + } + }, + } +} + +func clusterOperatorInterfaceVersionOrDie(obj interface{}, name string) (*configv1.ClusterOperator, string) { + co, ok := obj.(*configv1.ClusterOperator) + if !ok { + panic(fmt.Sprintf("%v is %T, not a ClusterOperator", obj, obj)) + } + + for _, version := range co.Status.Versions { + if version.Name == name { + return co, version.Version + } + } + return co, "" +} + func (optr *Operator) worker(ctx context.Context, queue workqueue.RateLimitingInterface, syncHandler func(context.Context, string) error) { for processNextWorkItem(ctx, queue, syncHandler, optr.syncFailingStatus) { } diff --git a/pkg/cvo/sync_test.go b/pkg/cvo/sync_test.go index 721042f31..2e1403e35 100644 --- a/pkg/cvo/sync_test.go +++ b/pkg/cvo/sync_test.go @@ -476,6 +476,10 @@ func (r *fakeSyncRecorder) StatusCh() <-chan SyncWorkerStatus { return ch } +// Inform the sync worker about activity for a managed resource. +func (r *fakeSyncRecorder) NotifyAboutManagedResourceActivity(obj interface{}, message string) { +} + func (r *fakeSyncRecorder) Start(ctx context.Context, maxWorkers int, cvoOptrName string, lister configlistersv1.ClusterVersionLister) { } diff --git a/pkg/cvo/sync_worker.go b/pkg/cvo/sync_worker.go index edc65f093..c7010efec 100644 --- a/pkg/cvo/sync_worker.go +++ b/pkg/cvo/sync_worker.go @@ -34,6 +34,9 @@ type ConfigSyncWorker interface { Start(ctx context.Context, maxWorkers int, cvoOptrName string, lister configlistersv1.ClusterVersionLister) Update(ctx context.Context, generation int64, desired configv1.Update, config *configv1.ClusterVersion, state payload.State, cvoOptrName string) *SyncWorkerStatus StatusCh() <-chan SyncWorkerStatus + + // Inform the sync worker about activity for a managed resource. + NotifyAboutManagedResourceActivity(obj interface{}, msg string) } // PayloadInfo returns details about the payload when it was retrieved. @@ -163,7 +166,7 @@ type SyncWorker struct { minimumReconcileInterval time.Duration // coordination between the sync loop and external callers - notify chan struct{} + notify chan string report chan SyncWorkerStatus // lock guards changes to these fields @@ -197,7 +200,7 @@ func NewSyncWorker(retriever PayloadRetriever, builder payload.ResourceBuilder, minimumReconcileInterval: reconcileInterval, - notify: make(chan struct{}, 1), + notify: make(chan string, 1), // report is a large buffered channel to improve local testing - most consumers should invoke // Status() or use the result of calling Update() instead because the channel can be out of date // if the reader is not fast enough. @@ -225,6 +228,16 @@ func (w *SyncWorker) StatusCh() <-chan SyncWorkerStatus { return w.report } +// Inform the sync worker about activity for a managed resource. +func (w *SyncWorker) NotifyAboutManagedResourceActivity(obj interface{}, message string) { + select { + case w.notify <- message: + klog.V(2).Infof("Notify the sync worker: %s", message) + default: + klog.V(2).Info("The sync worker already has a pending notification, so do not notify about:no need to inform about: %s", message) + } +} + // syncPayload retrieves, loads, and verifies the specified payload, aka sync's the payload, whenever there is no current // payload or the current payload differs from the desired payload. Whenever a payload is sync'ed a check is made for // implicitly enabled capabilities. For the purposes of the check made here, implicitly enabled capabilities are @@ -503,8 +516,9 @@ func (w *SyncWorker) Update(ctx context.Context, generation int64, desired confi w.cancelFn() w.cancelFn = nil } + msg := "new work is available" select { - case w.notify <- struct{}{}: + case w.notify <- msg: klog.V(2).Info("Notify the sync worker that new work is available") default: klog.V(2).Info("The sync worker has already been notified that new work is available") @@ -535,8 +549,8 @@ func (w *SyncWorker) Start(ctx context.Context, maxWorkers int, cvoOptrName stri case <-next: waitingToReconcile = false klog.V(2).Infof("Wait finished") - case <-w.notify: - klog.V(2).Infof("Work updated") + case msg := <-w.notify: + klog.V(2).Info(msg) } // determine whether we need to do work