diff --git a/pkg/cvo/cvo.go b/pkg/cvo/cvo.go index a1359ce61..27b5a2c1e 100644 --- a/pkg/cvo/cvo.go +++ b/pkg/cvo/cvo.go @@ -292,38 +292,43 @@ func loadConfigMapVerifierDataFromUpdate(update *payload.Update, clientBuilder s return verifier, persister, nil } -// Run runs the cluster version operator until stopCh is completed. Workers is ignored for now. -func (optr *Operator) Run(ctx context.Context, workers int) error { +// Run runs the cluster version operator until runContext.Done() and +// then attempts a clean shutdown limited by shutdownContext.Done(). +// Assumes runContext.Done() occurs before or simultaneously with +// shutdownContext.Done(). +func (optr *Operator) Run(runContext context.Context, shutdownContext context.Context, workers int) error { defer optr.queue.ShutDown() - stopCh := ctx.Done() + stopCh := runContext.Done() workerStopCh := make(chan struct{}) klog.Infof("Starting ClusterVersionOperator with minimum reconcile period %s", optr.minimumUpdateCheckInterval) defer klog.Info("Shutting down ClusterVersionOperator") if !cache.WaitForCacheSync(stopCh, optr.cacheSynced...) { - return fmt.Errorf("caches never synchronized: %w", ctx.Err()) + return fmt.Errorf("caches never synchronized: %w", runContext.Err()) } // trigger the first cluster version reconcile always optr.queue.Add(optr.queueKey()) // start the config sync loop, and have it notify the queue when new status is detected - go runThrottledStatusNotifier(ctx, optr.statusInterval, 2, optr.configSync.StatusCh(), func() { optr.queue.Add(optr.queueKey()) }) - go optr.configSync.Start(ctx, 16, optr.name, optr.cvLister) - go wait.UntilWithContext(ctx, func(ctx context.Context) { optr.worker(ctx, optr.availableUpdatesQueue, optr.availableUpdatesSync) }, time.Second) - go wait.UntilWithContext(ctx, func(ctx context.Context) { optr.worker(ctx, optr.upgradeableQueue, optr.upgradeableSync) }, time.Second) - go wait.UntilWithContext(ctx, func(ctx context.Context) { + go runThrottledStatusNotifier(runContext, optr.statusInterval, 2, optr.configSync.StatusCh(), func() { optr.queue.Add(optr.queueKey()) }) + go optr.configSync.Start(runContext, 16, optr.name, optr.cvLister) + go wait.UntilWithContext(runContext, func(runContext context.Context) { + optr.worker(runContext, optr.availableUpdatesQueue, optr.availableUpdatesSync) + }, time.Second) + go wait.UntilWithContext(runContext, func(runContext context.Context) { optr.worker(runContext, optr.upgradeableQueue, optr.upgradeableSync) }, time.Second) + go wait.UntilWithContext(runContext, func(runContext context.Context) { defer close(workerStopCh) // run the worker, then when the queue is closed sync one final time to flush any pending status - optr.worker(ctx, optr.queue, func(ctx context.Context, key string) error { return optr.sync(ctx, key) }) - if err := optr.sync(ctx, optr.queueKey()); err != nil { + optr.worker(runContext, optr.queue, func(runContext context.Context, key string) error { return optr.sync(runContext, key) }) + if err := optr.sync(shutdownContext, optr.queueKey()); err != nil { utilruntime.HandleError(fmt.Errorf("unable to perform final sync: %v", err)) } }, time.Second) if optr.signatureStore != nil { - go optr.signatureStore.Run(ctx, optr.minimumUpdateCheckInterval*2) + go optr.signatureStore.Run(runContext, optr.minimumUpdateCheckInterval*2) } <-stopCh diff --git a/pkg/start/start.go b/pkg/start/start.go index 6177483dd..d9e3494dc 100644 --- a/pkg/start/start.go +++ b/pkg/start/start.go @@ -237,7 +237,7 @@ func (o *Options) run(ctx context.Context, controllerCtx *Context, lock *resourc resultChannelCount++ go func() { defer utilruntime.HandleCrash() - err := controllerCtx.CVO.Run(runContext, 2) + err := controllerCtx.CVO.Run(runContext, shutdownContext, 2) resultChannel <- asyncResult{name: "main operator", error: err} }()