diff --git a/pkg/cvo/cvo.go b/pkg/cvo/cvo.go index 31632920f0..16eac37f53 100644 --- a/pkg/cvo/cvo.go +++ b/pkg/cvo/cvo.go @@ -314,17 +314,20 @@ func loadConfigMapVerifierDataFromUpdate(update *payload.Update, clientBuilder v return nil, nil, nil } -// Run runs the cluster version operator until stopCh is completed. Workers is ignored for now. -func (optr *Operator) Run(ctx context.Context, workers int) error { +// Run runs the cluster version operator until runContext.Done() and +// then attempts a clean shutdown limited by shutdownContext.Done(). +// Assumes runContext.Done() occurs before or simultaneously with +// shutdownContext.Done(). +func (optr *Operator) Run(runContext context.Context, shutdownContext context.Context, workers int) error { defer optr.queue.ShutDown() - stopCh := ctx.Done() + stopCh := runContext.Done() workerStopCh := make(chan struct{}) klog.Infof("Starting ClusterVersionOperator with minimum reconcile period %s", optr.minimumUpdateCheckInterval) defer klog.Info("Shutting down ClusterVersionOperator") if !cache.WaitForCacheSync(stopCh, optr.cacheSynced...) { - return fmt.Errorf("caches never synchronized: %w", ctx.Err()) + return fmt.Errorf("caches never synchronized: %w", runContext.Err()) } // trigger the first cluster version reconcile always @@ -332,20 +335,22 @@ func (optr *Operator) Run(ctx context.Context, workers int) error { // start the config sync loop, and have it notify the queue when new status is detected go runThrottledStatusNotifier(stopCh, optr.statusInterval, 2, optr.configSync.StatusCh(), func() { optr.queue.Add(optr.queueKey()) }) - go optr.configSync.Start(ctx, 16) - go wait.Until(func() { optr.worker(ctx, optr.availableUpdatesQueue, optr.availableUpdatesSync) }, time.Second, stopCh) - go wait.Until(func() { optr.worker(ctx, optr.upgradeableQueue, optr.upgradeableSync) }, time.Second, stopCh) - go wait.Until(func() { + go optr.configSync.Start(runContext, 16) + go wait.UntilWithContext(runContext, func(runContext context.Context) { + optr.worker(runContext, optr.availableUpdatesQueue, optr.availableUpdatesSync) + }, time.Second) + go wait.UntilWithContext(runContext, func(runContext context.Context) { optr.worker(runContext, optr.upgradeableQueue, optr.upgradeableSync) }, time.Second) + go wait.UntilWithContext(runContext, func(runContext context.Context) { defer close(workerStopCh) // run the worker, then when the queue is closed sync one final time to flush any pending status - optr.worker(ctx, optr.queue, func(key string) error { return optr.sync(ctx, key) }) - if err := optr.sync(ctx, optr.queueKey()); err != nil { + optr.worker(runContext, optr.queue, func(key string) error { return optr.sync(runContext, key) }) + if err := optr.sync(shutdownContext, optr.queueKey()); err != nil { utilruntime.HandleError(fmt.Errorf("unable to perform final sync: %v", err)) } - }, time.Second, stopCh) + }, time.Second) if optr.signatureStore != nil { - go optr.signatureStore.Run(ctx, optr.minimumUpdateCheckInterval*2) + go optr.signatureStore.Run(runContext, optr.minimumUpdateCheckInterval*2) } <-stopCh diff --git a/pkg/start/start.go b/pkg/start/start.go index 80b809cda7..afad67e199 100644 --- a/pkg/start/start.go +++ b/pkg/start/start.go @@ -187,7 +187,7 @@ func (o *Options) run(ctx context.Context, controllerCtx *Context, lock *resourc resultChannelCount++ go func() { defer utilruntime.HandleCrash() - err := controllerCtx.CVO.Run(runContext, 2) + err := controllerCtx.CVO.Run(runContext, shutdownContext, 2) resultChannel <- asyncResult{name: "main operator", error: err} }() @@ -403,16 +403,3 @@ func (o *Options) NewControllerContext(cb *ClientBuilder) *Context { } return ctx } - -// Start launches the controllers in the provided context and any supporting -// infrastructure. When ch is closed the controllers will be shut down. -func (c *Context) Start(ctx context.Context) { - ch := ctx.Done() - go c.CVO.Run(ctx, 2) - if c.AutoUpdate != nil { - go c.AutoUpdate.Run(ctx, 2) - } - c.CVInformerFactory.Start(ch) - c.OpenshiftConfigInformerFactory.Start(ch) - c.InformerFactory.Start(ch) -}