From 785b7b4381d988309f1f4e0dd03d87ef5db2154a Mon Sep 17 00:00:00 2001 From: "W. Trevor King" Date: Tue, 19 Jan 2021 19:39:56 -0800 Subject: [PATCH] pkg/cvo: Use shutdownContext for final status synchronization Address a bug introduced by cc1921d186 (pkg/start: Release leader lease on graceful shutdown, 2020-08-03, #424), where canceling the Operator.Run context would leave the operator with no time to attempt the final sync [1]: E0119 22:24:15.924216 1 cvo.go:344] unable to perform final sync: context canceled With this commit, I'm piping through shutdownContext, which gets a two-minute grace period beyond runContext, to give the operator time to push out that final status (which may include important information like the fact that the incoming release image has completed verification). --- This commit picks c4ddf03bec (pkg/cvo: Use shutdownContext for final status synchronization, 2021-01-19, #517) back to 4.5. It's not a clean pick, because we're missing changes like: * b72e843974 (Bug 1822844: Block z level upgrades if ClusterVersionOverridesSet set, 2020-04-30, #364). * 1d1de3ba1a (Use context to add timeout to cincinnati HTTP request, 2019-01-15, #410). which also touched these lines. But we've gotten this far without backporting rhbz#1822844, and #410 was never associated with a bug in the first place, so instead of pulling back more of 4.6 to get a clean pick, I've just manually reconciled the pick conflicts. Removing Start from pkg/start (again) fixes a buggy re-introduction in the manually-backported 20421b601a (*: Add lots of Context and options arguments, 2020-07-24, #470). [1]: https://bugzilla.redhat.com/show_bug.cgi?id=1916384#c10 --- pkg/cvo/cvo.go | 29 +++++++++++++++++------------ pkg/start/start.go | 15 +-------------- 2 files changed, 18 insertions(+), 26 deletions(-) diff --git a/pkg/cvo/cvo.go b/pkg/cvo/cvo.go index 31632920f0..16eac37f53 100644 --- a/pkg/cvo/cvo.go +++ b/pkg/cvo/cvo.go @@ -314,17 +314,20 @@ func loadConfigMapVerifierDataFromUpdate(update *payload.Update, clientBuilder v return nil, nil, nil } -// Run runs the cluster version operator until stopCh is completed. Workers is ignored for now. -func (optr *Operator) Run(ctx context.Context, workers int) error { +// Run runs the cluster version operator until runContext.Done() and +// then attempts a clean shutdown limited by shutdownContext.Done(). +// Assumes runContext.Done() occurs before or simultaneously with +// shutdownContext.Done(). +func (optr *Operator) Run(runContext context.Context, shutdownContext context.Context, workers int) error { defer optr.queue.ShutDown() - stopCh := ctx.Done() + stopCh := runContext.Done() workerStopCh := make(chan struct{}) klog.Infof("Starting ClusterVersionOperator with minimum reconcile period %s", optr.minimumUpdateCheckInterval) defer klog.Info("Shutting down ClusterVersionOperator") if !cache.WaitForCacheSync(stopCh, optr.cacheSynced...) { - return fmt.Errorf("caches never synchronized: %w", ctx.Err()) + return fmt.Errorf("caches never synchronized: %w", runContext.Err()) } // trigger the first cluster version reconcile always @@ -332,20 +335,22 @@ func (optr *Operator) Run(ctx context.Context, workers int) error { // start the config sync loop, and have it notify the queue when new status is detected go runThrottledStatusNotifier(stopCh, optr.statusInterval, 2, optr.configSync.StatusCh(), func() { optr.queue.Add(optr.queueKey()) }) - go optr.configSync.Start(ctx, 16) - go wait.Until(func() { optr.worker(ctx, optr.availableUpdatesQueue, optr.availableUpdatesSync) }, time.Second, stopCh) - go wait.Until(func() { optr.worker(ctx, optr.upgradeableQueue, optr.upgradeableSync) }, time.Second, stopCh) - go wait.Until(func() { + go optr.configSync.Start(runContext, 16) + go wait.UntilWithContext(runContext, func(runContext context.Context) { + optr.worker(runContext, optr.availableUpdatesQueue, optr.availableUpdatesSync) + }, time.Second) + go wait.UntilWithContext(runContext, func(runContext context.Context) { optr.worker(runContext, optr.upgradeableQueue, optr.upgradeableSync) }, time.Second) + go wait.UntilWithContext(runContext, func(runContext context.Context) { defer close(workerStopCh) // run the worker, then when the queue is closed sync one final time to flush any pending status - optr.worker(ctx, optr.queue, func(key string) error { return optr.sync(ctx, key) }) - if err := optr.sync(ctx, optr.queueKey()); err != nil { + optr.worker(runContext, optr.queue, func(key string) error { return optr.sync(runContext, key) }) + if err := optr.sync(shutdownContext, optr.queueKey()); err != nil { utilruntime.HandleError(fmt.Errorf("unable to perform final sync: %v", err)) } - }, time.Second, stopCh) + }, time.Second) if optr.signatureStore != nil { - go optr.signatureStore.Run(ctx, optr.minimumUpdateCheckInterval*2) + go optr.signatureStore.Run(runContext, optr.minimumUpdateCheckInterval*2) } <-stopCh diff --git a/pkg/start/start.go b/pkg/start/start.go index 80b809cda7..afad67e199 100644 --- a/pkg/start/start.go +++ b/pkg/start/start.go @@ -187,7 +187,7 @@ func (o *Options) run(ctx context.Context, controllerCtx *Context, lock *resourc resultChannelCount++ go func() { defer utilruntime.HandleCrash() - err := controllerCtx.CVO.Run(runContext, 2) + err := controllerCtx.CVO.Run(runContext, shutdownContext, 2) resultChannel <- asyncResult{name: "main operator", error: err} }() @@ -403,16 +403,3 @@ func (o *Options) NewControllerContext(cb *ClientBuilder) *Context { } return ctx } - -// Start launches the controllers in the provided context and any supporting -// infrastructure. When ch is closed the controllers will be shut down. -func (c *Context) Start(ctx context.Context) { - ch := ctx.Done() - go c.CVO.Run(ctx, 2) - if c.AutoUpdate != nil { - go c.AutoUpdate.Run(ctx, 2) - } - c.CVInformerFactory.Start(ch) - c.OpenshiftConfigInformerFactory.Start(ch) - c.InformerFactory.Start(ch) -}