diff --git a/bootstrap/bootstrap-pod.yaml b/bootstrap/bootstrap-pod.yaml index bc93746282..ca42facd11 100644 --- a/bootstrap/bootstrap-pod.yaml +++ b/bootstrap/bootstrap-pod.yaml @@ -37,6 +37,7 @@ spec: fieldRef: fieldPath: spec.nodeName hostNetwork: true + terminationGracePeriodSeconds: 130 volumes: - name: kubeconfig hostPath: diff --git a/cmd/start.go b/cmd/start.go index 578fc8ac46..32488e13dc 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -1,6 +1,8 @@ package main import ( + "context" + "github.com/spf13/cobra" "k8s.io/klog" @@ -16,11 +18,12 @@ func init() { Long: "", Run: func(cmd *cobra.Command, args []string) { // To help debugging, immediately log version - klog.Infof("%s", version.String) + klog.Info(version.String) - if err := opts.Run(); err != nil { + if err := opts.Run(context.Background()); err != nil { klog.Fatalf("error: %v", err) } + klog.Infof("Graceful shutdown complete for %s.", version.String) }, } diff --git a/install/0000_00_cluster-version-operator_03_deployment.yaml b/install/0000_00_cluster-version-operator_03_deployment.yaml index 3b3ae8d1b5..0973d81cf5 100644 --- a/install/0000_00_cluster-version-operator_03_deployment.yaml +++ b/install/0000_00_cluster-version-operator_03_deployment.yaml @@ -57,6 +57,7 @@ spec: nodeSelector: node-role.kubernetes.io/master: "" priorityClassName: "system-cluster-critical" + terminationGracePeriodSeconds: 130 tolerations: - key: "node-role.kubernetes.io/master" operator: Exists diff --git a/pkg/autoupdate/autoupdate.go b/pkg/autoupdate/autoupdate.go index 20c6af731f..f0da98511a 100644 --- a/pkg/autoupdate/autoupdate.go +++ b/pkg/autoupdate/autoupdate.go @@ -84,23 +84,23 @@ func New( } // Run runs the autoupdate controller. -func (ctrl *Controller) Run(ctx context.Context, workers int) { - defer utilruntime.HandleCrash() +func (ctrl *Controller) Run(ctx context.Context, workers int) error { defer ctrl.queue.ShutDown() klog.Info("Starting AutoUpdateController") defer klog.Info("Shutting down AutoUpdateController") if !cache.WaitForCacheSync(ctx.Done(), ctrl.cacheSynced...) { - klog.Info("Caches never synchronized") - return + return fmt.Errorf("caches never synchronized: %w", ctx.Err()) } for i := 0; i < workers; i++ { + // FIXME: actually wait until these complete if the Context is canceled. And possibly add utilruntime.HandleCrash. go wait.UntilWithContext(ctx, ctrl.worker, time.Second) } <-ctx.Done() + return nil } func (ctrl *Controller) eventHandler() cache.ResourceEventHandler { diff --git a/pkg/cvo/cvo.go b/pkg/cvo/cvo.go index b23d0e821e..e8629c1eff 100644 --- a/pkg/cvo/cvo.go +++ b/pkg/cvo/cvo.go @@ -290,8 +290,7 @@ func loadConfigMapVerifierDataFromUpdate(update *payload.Update, clientBuilder s } // Run runs the cluster version operator until stopCh is completed. Workers is ignored for now. -func (optr *Operator) Run(ctx context.Context, workers int) { - defer utilruntime.HandleCrash() +func (optr *Operator) Run(ctx context.Context, workers int) error { defer optr.queue.ShutDown() stopCh := ctx.Done() workerStopCh := make(chan struct{}) @@ -300,8 +299,7 @@ func (optr *Operator) Run(ctx context.Context, workers int) { defer klog.Info("Shutting down ClusterVersionOperator") if !cache.WaitForCacheSync(stopCh, optr.cacheSynced...) { - klog.Info("Caches never synchronized") - return + return fmt.Errorf("caches never synchronized: %w", ctx.Err()) } // trigger the first cluster version reconcile always @@ -330,6 +328,8 @@ func (optr *Operator) Run(ctx context.Context, workers int) { // stop the queue, then wait for the worker to exit optr.queue.ShutDown() <-workerStopCh + + return nil } func (optr *Operator) queueKey() string { diff --git a/pkg/start/start.go b/pkg/start/start.go index bf10fd51dc..43a72040bc 100644 --- a/pkg/start/start.go +++ b/pkg/start/start.go @@ -10,13 +10,13 @@ import ( "math/rand" "os" "os/signal" - "sync" "syscall" "time" "github.com/google/uuid" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" @@ -77,6 +77,11 @@ type Options struct { ResyncInterval time.Duration } +type asyncResult struct { + name string + error error +} + func defaultEnv(name, defaultValue string) string { env, ok := os.LookupEnv(name) if !ok { @@ -101,7 +106,7 @@ func NewOptions() *Options { } } -func (o *Options) Run() error { +func (o *Options) Run(ctx context.Context) error { if o.NodeName == "" { return fmt.Errorf("node-name is required") } @@ -137,29 +142,6 @@ func (o *Options) Run() error { return err } - // TODO: Kube 1.14 will contain a ReleaseOnCancel boolean on - // LeaderElectionConfig that allows us to have the lock code - // release the lease when this context is cancelled. At that - // time we can remove our changes to OnStartedLeading. - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - ch := make(chan os.Signal, 1) - defer func() { signal.Stop(ch) }() - signal.Notify(ch, os.Interrupt, syscall.SIGTERM) - go func() { - sig := <-ch - klog.Infof("Shutting down due to %s", sig) - cancel() - - // exit after 2s no matter what - select { - case <-time.After(5 * time.Second): - klog.Fatalf("Exiting") - case <-ch: - klog.Fatalf("Received shutdown signal twice, exiting") - } - }() - o.run(ctx, controllerCtx, lock) return nil } @@ -186,13 +168,33 @@ func (o *Options) makeTLSConfig() (*tls.Config, error) { }), nil } +// run launches a number of goroutines to handle manifest application, +// metrics serving, etc. It continues operating until ctx.Done(), +// and then attempts a clean shutdown limited by an internal context +// with a two-minute cap. It returns after it successfully collects all +// launched goroutines. func (o *Options) run(ctx context.Context, controllerCtx *Context, lock *resourcelock.ConfigMapLock) { - runContext, runCancel := context.WithCancel(ctx) + runContext, runCancel := context.WithCancel(ctx) // so we can cancel internally on errors or TERM defer runCancel() - shutdownContext, shutdownCancel := context.WithCancel(ctx) + shutdownContext, shutdownCancel := context.WithCancel(context.Background()) // extends beyond ctx defer shutdownCancel() - errorChannel := make(chan error, 1) - errorChannelCount := 0 + postMainContext, postMainCancel := context.WithCancel(context.Background()) // extends beyond ctx + defer postMainCancel() + + ch := make(chan os.Signal, 1) + defer func() { signal.Stop(ch) }() + signal.Notify(ch, os.Interrupt, syscall.SIGTERM) + go func() { + defer utilruntime.HandleCrash() + sig := <-ch + klog.Infof("Shutting down due to %s", sig) + runCancel() + sig = <-ch + klog.Fatalf("Received shutdown signal twice, exiting: %s", sig) + }() + + resultChannel := make(chan asyncResult, 1) + resultChannelCount := 0 if o.ListenAddr != "" { var tlsConfig *tls.Config if o.ServingCertFile != "" || o.ServingKeyFile != "" { @@ -202,85 +204,96 @@ func (o *Options) run(ctx context.Context, controllerCtx *Context, lock *resourc klog.Fatalf("Failed to create TLS config: %v", err) } } - errorChannelCount++ + resultChannelCount++ go func() { - errorChannel <- cvo.RunMetrics(runContext, shutdownContext, o.ListenAddr, tlsConfig) + defer utilruntime.HandleCrash() + err := cvo.RunMetrics(postMainContext, shutdownContext, o.ListenAddr, tlsConfig) + resultChannel <- asyncResult{name: "metrics server", error: err} }() } - exit := make(chan struct{}) - exitClose := sync.Once{} - - // TODO: when we switch to graceful lock shutdown, this can be - // moved back inside RunOrDie - // TODO: properly wire ctx here - go leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{ - Lock: lock, - LeaseDuration: leaseDuration, - RenewDeadline: renewDeadline, - RetryPeriod: retryPeriod, - Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: func(localCtx context.Context) { - controllerCtx.Start(runContext) - select { - case <-runContext.Done(): - // WARNING: this is not completely safe until we have Kube 1.14 and ReleaseOnCancel - // and client-go ContextCancelable, which allows us to block new API requests before - // we step down. However, the CVO isn't that sensitive to races and can tolerate - // brief overlap. - klog.Infof("Stepping down as leader") - // give the controllers some time to shut down - time.Sleep(100 * time.Millisecond) - // if we still hold the leader lease, clear the owner identity (other lease watchers - // still have to wait for expiration) like the new ReleaseOnCancel code will do. - if err := lock.Update(localCtx, resourcelock.LeaderElectionRecord{}); err == nil { - // if we successfully clear the owner identity, we can safely delete the record - if err := lock.Client.ConfigMaps(lock.ConfigMapMeta.Namespace).Delete(localCtx, lock.ConfigMapMeta.Name, metav1.DeleteOptions{}); err != nil { - klog.Warningf("Unable to step down cleanly: %v", err) - } + informersDone := postMainContext.Done() + // FIXME: would be nice if there was a way to collect these. + controllerCtx.CVInformerFactory.Start(informersDone) + controllerCtx.OpenshiftConfigInformerFactory.Start(informersDone) + controllerCtx.OpenshiftConfigManagedInformerFactory.Start(informersDone) + controllerCtx.InformerFactory.Start(informersDone) + + resultChannelCount++ + go func() { + defer utilruntime.HandleCrash() + leaderelection.RunOrDie(postMainContext, leaderelection.LeaderElectionConfig{ + Lock: lock, + ReleaseOnCancel: true, + LeaseDuration: leaseDuration, + RenewDeadline: renewDeadline, + RetryPeriod: retryPeriod, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(_ context.Context) { // no need for this passed-through postMainContext, because goroutines we launch inside will use runContext + resultChannelCount++ + go func() { + defer utilruntime.HandleCrash() + err := controllerCtx.CVO.Run(runContext, 2) + resultChannel <- asyncResult{name: "main operator", error: err} + }() + + if controllerCtx.AutoUpdate != nil { + resultChannelCount++ + go func() { + defer utilruntime.HandleCrash() + err := controllerCtx.AutoUpdate.Run(runContext, 2) + resultChannel <- asyncResult{name: "auto-update controller", error: err} + }() } - klog.Infof("Finished shutdown") - exitClose.Do(func() { close(exit) }) - case <-localCtx.Done(): - // we will exit in OnStoppedLeading - } - }, - OnStoppedLeading: func() { - klog.Warning("leaderelection lost") - exitClose.Do(func() { close(exit) }) + }, + OnStoppedLeading: func() { + klog.Info("Stopped leading; shutting down.") + runCancel() + }, }, - }, - }) + }) + resultChannel <- asyncResult{name: "leader controller", error: nil} + }() - for errorChannelCount > 0 { - var shutdownTimer *time.Timer + var shutdownTimer *time.Timer + for resultChannelCount > 0 { + klog.Infof("Waiting on %d outstanding goroutines.", resultChannelCount) if shutdownTimer == nil { // running select { case <-runContext.Done(): + klog.Info("Run context completed; beginning two-minute graceful shutdown period.") shutdownTimer = time.NewTimer(2 * time.Minute) - case err := <-errorChannel: - errorChannelCount-- - if err != nil { - klog.Error(err) + case result := <-resultChannel: + resultChannelCount-- + if result.error == nil { + klog.Infof("Collected %s goroutine.", result.name) + } else { + klog.Errorf("Collected %s goroutine: %v", result.name, result.error) runCancel() // this will cause shutdownTimer initialization in the next loop } + if result.name == "main operator" { + postMainCancel() + } } } else { // shutting down select { case <-shutdownTimer.C: // never triggers after the channel is stopped, although it would not matter much if it did because subsequent cancel calls do nothing. shutdownCancel() shutdownTimer.Stop() - case err := <-errorChannel: - errorChannelCount-- - if err != nil { - klog.Error(err) - runCancel() + case result := <-resultChannel: + resultChannelCount-- + if result.error == nil { + klog.Infof("Collected %s goroutine.", result.name) + } else { + klog.Errorf("Collected %s goroutine: %v", result.name, result.error) + } + if result.name == "main operator" { + postMainCancel() } } } } - - <-exit + klog.Info("Finished collecting operator goroutines.") } // createResourceLock initializes the lock. @@ -440,17 +453,3 @@ func (o *Options) NewControllerContext(cb *ClientBuilder) *Context { } return ctx } - -// Start launches the controllers in the provided context and any supporting -// infrastructure. When ch is closed the controllers will be shut down. -func (c *Context) Start(ctx context.Context) { - ch := ctx.Done() - go c.CVO.Run(ctx, 2) - if c.AutoUpdate != nil { - go c.AutoUpdate.Run(ctx, 2) - } - c.CVInformerFactory.Start(ch) - c.OpenshiftConfigInformerFactory.Start(ch) - c.OpenshiftConfigManagedInformerFactory.Start(ch) - c.InformerFactory.Start(ch) -} diff --git a/pkg/start/start_integration_test.go b/pkg/start/start_integration_test.go index 153027f029..1faaf3dcae 100644 --- a/pkg/start/start_integration_test.go +++ b/pkg/start/start_integration_test.go @@ -267,9 +267,14 @@ func TestIntegrationCVO_initializeAndUpgrade(t *testing.T) { worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}, "", record.NewFakeRecorder(100)) controllers.CVO.SetSyncWorkerForTesting(worker) + lock, err := createResourceLock(cb, options.Namespace, options.Name) + if err != nil { + t.Fatal(err) + } + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - controllers.Start(ctx) + go options.run(ctx, controllers, lock) t.Logf("wait until we observe the cluster version become available") lastCV, err := waitForUpdateAvailable(ctx, t, client, ns, false, "0.0.1") @@ -424,9 +429,14 @@ func TestIntegrationCVO_initializeAndHandleError(t *testing.T) { worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Duration: time.Second, Factor: 1.2}, "", record.NewFakeRecorder(100)) controllers.CVO.SetSyncWorkerForTesting(worker) + lock, err := createResourceLock(cb, options.Namespace, options.Name) + if err != nil { + t.Fatal(err) + } + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - controllers.Start(ctx) + go options.run(ctx, controllers, lock) t.Logf("wait until we observe the cluster version become available") lastCV, err := waitForUpdateAvailable(ctx, t, client, ns, false, "0.0.1") @@ -534,7 +544,7 @@ func TestIntegrationCVO_gracefulStepDown(t *testing.T) { worker := cvo.NewSyncWorker(&mapPayloadRetriever{}, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}, "", record.NewFakeRecorder(100)) controllers.CVO.SetSyncWorkerForTesting(worker) - lock, err := createResourceLock(cb, ns, ns) + lock, err := createResourceLock(cb, options.Namespace, options.Name) if err != nil { t.Fatal(err) } @@ -550,7 +560,7 @@ func TestIntegrationCVO_gracefulStepDown(t *testing.T) { // wait until the lock record exists err = wait.PollImmediate(200*time.Millisecond, 60*time.Second, func() (bool, error) { - _, err := kc.CoreV1().ConfigMaps(ns).Get(ctx, ns, metav1.GetOptions{}) + _, _, err := lock.Get(ctx) if err != nil { if errors.IsNotFound(err) { return false, nil @@ -572,26 +582,26 @@ func TestIntegrationCVO_gracefulStepDown(t *testing.T) { t.Fatalf("no leader election events found in\n%#v", events.Items) } - t.Logf("after the context is closed, the lock record should be deleted quickly") + t.Logf("after the context is closed, the lock should be released quickly") cancel() startTime := time.Now() var endTime time.Time // the lock should be deleted immediately err = wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (bool, error) { - _, err := kc.CoreV1().ConfigMaps(ns).Get(ctx, ns, metav1.GetOptions{}) - if errors.IsNotFound(err) { - endTime = time.Now() - return true, nil - } + electionRecord, _, err := lock.Get(ctx) if err != nil { + if errors.IsNotFound(err) { + return false, nil + } return false, err } - return false, nil + endTime = time.Now() + return electionRecord.HolderIdentity == "", nil }) if err != nil { t.Fatal(err) } - t.Logf("lock deleted in %s", endTime.Sub(startTime)) + t.Logf("lock released in %s", endTime.Sub(startTime)) select { case <-time.After(time.Second): @@ -721,9 +731,14 @@ metadata: worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}, "", record.NewFakeRecorder(100)) controllers.CVO.SetSyncWorkerForTesting(worker) + lock, err := createResourceLock(cb, options.Namespace, options.Name) + if err != nil { + t.Fatal(err) + } + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - controllers.Start(ctx) + go options.run(ctx, controllers, lock) t.Logf("wait until we observe the cluster version become available") lastCV, err := waitForUpdateAvailable(ctx, t, client, ns, false, "0.0.1")