Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bootstrap/bootstrap-pod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ spec:
fieldRef:
fieldPath: spec.nodeName
hostNetwork: true
terminationGracePeriodSeconds: 130
volumes:
- name: kubeconfig
hostPath:
Expand Down
7 changes: 5 additions & 2 deletions cmd/start.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package main

import (
"context"

"github.com/spf13/cobra"
"k8s.io/klog"

Expand All @@ -16,11 +18,12 @@ func init() {
Long: "",
Run: func(cmd *cobra.Command, args []string) {
// To help debugging, immediately log version
klog.Infof("%s", version.String)
klog.Info(version.String)

if err := opts.Run(); err != nil {
if err := opts.Run(context.Background()); err != nil {
klog.Fatalf("error: %v", err)
}
klog.Infof("Graceful shutdown complete for %s.", version.String)
},
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ spec:
nodeSelector:
node-role.kubernetes.io/master: ""
priorityClassName: "system-cluster-critical"
terminationGracePeriodSeconds: 130
tolerations:
- key: "node-role.kubernetes.io/master"
operator: Exists
Expand Down
8 changes: 4 additions & 4 deletions pkg/autoupdate/autoupdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,23 +84,23 @@ func New(
}

// Run runs the autoupdate controller.
func (ctrl *Controller) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
func (ctrl *Controller) Run(ctx context.Context, workers int) error {
defer ctrl.queue.ShutDown()

klog.Info("Starting AutoUpdateController")
defer klog.Info("Shutting down AutoUpdateController")

if !cache.WaitForCacheSync(ctx.Done(), ctrl.cacheSynced...) {
klog.Info("Caches never synchronized")
return
return fmt.Errorf("caches never synchronized: %w", ctx.Err())
}

for i := 0; i < workers; i++ {
// FIXME: actually wait until these complete if the Context is canceled. And possibly add utilruntime.HandleCrash.
go wait.UntilWithContext(ctx, ctrl.worker, time.Second)
}

<-ctx.Done()
return nil
}

func (ctrl *Controller) eventHandler() cache.ResourceEventHandler {
Expand Down
8 changes: 4 additions & 4 deletions pkg/cvo/cvo.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,7 @@ func loadConfigMapVerifierDataFromUpdate(update *payload.Update, clientBuilder s
}

// Run runs the cluster version operator until stopCh is completed. Workers is ignored for now.
func (optr *Operator) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
func (optr *Operator) Run(ctx context.Context, workers int) error {
defer optr.queue.ShutDown()
stopCh := ctx.Done()
workerStopCh := make(chan struct{})
Expand All @@ -300,8 +299,7 @@ func (optr *Operator) Run(ctx context.Context, workers int) {
defer klog.Info("Shutting down ClusterVersionOperator")

if !cache.WaitForCacheSync(stopCh, optr.cacheSynced...) {
klog.Info("Caches never synchronized")
return
return fmt.Errorf("caches never synchronized: %w", ctx.Err())
}

// trigger the first cluster version reconcile always
Expand Down Expand Up @@ -330,6 +328,8 @@ func (optr *Operator) Run(ctx context.Context, workers int) {
// stop the queue, then wait for the worker to exit
optr.queue.ShutDown()
<-workerStopCh

return nil
}

func (optr *Operator) queueKey() string {
Expand Down
197 changes: 98 additions & 99 deletions pkg/start/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ import (
"math/rand"
"os"
"os/signal"
"sync"
"syscall"
"time"

"github.com/google/uuid"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -77,6 +77,11 @@ type Options struct {
ResyncInterval time.Duration
}

type asyncResult struct {
name string
error error
}

func defaultEnv(name, defaultValue string) string {
env, ok := os.LookupEnv(name)
if !ok {
Expand All @@ -101,7 +106,7 @@ func NewOptions() *Options {
}
}

func (o *Options) Run() error {
func (o *Options) Run(ctx context.Context) error {
if o.NodeName == "" {
return fmt.Errorf("node-name is required")
}
Expand Down Expand Up @@ -137,29 +142,6 @@ func (o *Options) Run() error {
return err
}

// TODO: Kube 1.14 will contain a ReleaseOnCancel boolean on
// LeaderElectionConfig that allows us to have the lock code
// release the lease when this context is cancelled. At that
// time we can remove our changes to OnStartedLeading.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch := make(chan os.Signal, 1)
defer func() { signal.Stop(ch) }()
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
go func() {
sig := <-ch
klog.Infof("Shutting down due to %s", sig)
cancel()

// exit after 2s no matter what
select {
case <-time.After(5 * time.Second):
klog.Fatalf("Exiting")
case <-ch:
klog.Fatalf("Received shutdown signal twice, exiting")
}
}()

o.run(ctx, controllerCtx, lock)
return nil
}
Expand All @@ -186,13 +168,33 @@ func (o *Options) makeTLSConfig() (*tls.Config, error) {
}), nil
}

// run launches a number of goroutines to handle manifest application,
// metrics serving, etc. It continues operating until ctx.Done(),
// and then attempts a clean shutdown limited by an internal context
// with a two-minute cap. It returns after it successfully collects all
// launched goroutines.
func (o *Options) run(ctx context.Context, controllerCtx *Context, lock *resourcelock.ConfigMapLock) {
runContext, runCancel := context.WithCancel(ctx)
runContext, runCancel := context.WithCancel(ctx) // so we can cancel internally on errors or TERM
defer runCancel()
shutdownContext, shutdownCancel := context.WithCancel(ctx)
shutdownContext, shutdownCancel := context.WithCancel(context.Background()) // extends beyond ctx
defer shutdownCancel()
errorChannel := make(chan error, 1)
errorChannelCount := 0
postMainContext, postMainCancel := context.WithCancel(context.Background()) // extends beyond ctx
defer postMainCancel()

ch := make(chan os.Signal, 1)
defer func() { signal.Stop(ch) }()
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
go func() {
defer utilruntime.HandleCrash()
sig := <-ch
klog.Infof("Shutting down due to %s", sig)
runCancel()
sig = <-ch
klog.Fatalf("Received shutdown signal twice, exiting: %s", sig)
}()

resultChannel := make(chan asyncResult, 1)
resultChannelCount := 0
if o.ListenAddr != "" {
var tlsConfig *tls.Config
if o.ServingCertFile != "" || o.ServingKeyFile != "" {
Expand All @@ -202,85 +204,96 @@ func (o *Options) run(ctx context.Context, controllerCtx *Context, lock *resourc
klog.Fatalf("Failed to create TLS config: %v", err)
}
}
errorChannelCount++
resultChannelCount++
go func() {
errorChannel <- cvo.RunMetrics(runContext, shutdownContext, o.ListenAddr, tlsConfig)
defer utilruntime.HandleCrash()
err := cvo.RunMetrics(postMainContext, shutdownContext, o.ListenAddr, tlsConfig)
resultChannel <- asyncResult{name: "metrics server", error: err}
}()
}

exit := make(chan struct{})
exitClose := sync.Once{}

// TODO: when we switch to graceful lock shutdown, this can be
// moved back inside RunOrDie
// TODO: properly wire ctx here
go leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
Lock: lock,
LeaseDuration: leaseDuration,
RenewDeadline: renewDeadline,
RetryPeriod: retryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(localCtx context.Context) {
controllerCtx.Start(runContext)
select {
case <-runContext.Done():
// WARNING: this is not completely safe until we have Kube 1.14 and ReleaseOnCancel
// and client-go ContextCancelable, which allows us to block new API requests before
// we step down. However, the CVO isn't that sensitive to races and can tolerate
// brief overlap.
klog.Infof("Stepping down as leader")
// give the controllers some time to shut down
time.Sleep(100 * time.Millisecond)
// if we still hold the leader lease, clear the owner identity (other lease watchers
// still have to wait for expiration) like the new ReleaseOnCancel code will do.
if err := lock.Update(localCtx, resourcelock.LeaderElectionRecord{}); err == nil {
// if we successfully clear the owner identity, we can safely delete the record
if err := lock.Client.ConfigMaps(lock.ConfigMapMeta.Namespace).Delete(localCtx, lock.ConfigMapMeta.Name, metav1.DeleteOptions{}); err != nil {
klog.Warningf("Unable to step down cleanly: %v", err)
}
informersDone := postMainContext.Done()
// FIXME: would be nice if there was a way to collect these.
controllerCtx.CVInformerFactory.Start(informersDone)
controllerCtx.OpenshiftConfigInformerFactory.Start(informersDone)
controllerCtx.OpenshiftConfigManagedInformerFactory.Start(informersDone)
controllerCtx.InformerFactory.Start(informersDone)

resultChannelCount++
go func() {
defer utilruntime.HandleCrash()
leaderelection.RunOrDie(postMainContext, leaderelection.LeaderElectionConfig{
Lock: lock,
ReleaseOnCancel: true,
LeaseDuration: leaseDuration,
RenewDeadline: renewDeadline,
RetryPeriod: retryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(_ context.Context) { // no need for this passed-through postMainContext, because goroutines we launch inside will use runContext
resultChannelCount++
go func() {
defer utilruntime.HandleCrash()
err := controllerCtx.CVO.Run(runContext, 2)
resultChannel <- asyncResult{name: "main operator", error: err}
}()

if controllerCtx.AutoUpdate != nil {
resultChannelCount++
go func() {
defer utilruntime.HandleCrash()
err := controllerCtx.AutoUpdate.Run(runContext, 2)
resultChannel <- asyncResult{name: "auto-update controller", error: err}
}()
}
klog.Infof("Finished shutdown")
exitClose.Do(func() { close(exit) })
case <-localCtx.Done():
// we will exit in OnStoppedLeading
}
},
OnStoppedLeading: func() {
klog.Warning("leaderelection lost")
exitClose.Do(func() { close(exit) })
},
OnStoppedLeading: func() {
klog.Info("Stopped leading; shutting down.")
runCancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you still need to exit, don't you? How confident are you that this truly resets everything?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How confident are you that this truly resets everything?

If it doesn't, CI should turn it up, and we'll fix those bugs ;)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't sound like what I described on slack. If we lost lease, we exit immediately, no graceful step down. When we have lost our lease we should not be running.

Copy link
Member Author

@wking wking Aug 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we lost lease, we exit immediately...

OnStoppedLeading is not just "lost lease", it is also "gracefully released lease". We could have logic here about checking postMainContext.Err() to guess about lost vs. released. But runCancel() should immediately block all cluster-object-writing activity, so I think this is sufficient as it stands to keep from fighting the new leader. For comparison, master gives a full 5s grace period after the cancel before forcing a hard exit. If reducing my current 2m grace period to 5s would make you happy with this line, I'm happy to make that change.

},
},
},
})
})
resultChannel <- asyncResult{name: "leader controller", error: nil}
}()

for errorChannelCount > 0 {
var shutdownTimer *time.Timer
var shutdownTimer *time.Timer
for resultChannelCount > 0 {
klog.Infof("Waiting on %d outstanding goroutines.", resultChannelCount)
if shutdownTimer == nil { // running
select {
case <-runContext.Done():
klog.Info("Run context completed; beginning two-minute graceful shutdown period.")
shutdownTimer = time.NewTimer(2 * time.Minute)
case err := <-errorChannel:
errorChannelCount--
if err != nil {
klog.Error(err)
case result := <-resultChannel:
resultChannelCount--
if result.error == nil {
klog.Infof("Collected %s goroutine.", result.name)
} else {
klog.Errorf("Collected %s goroutine: %v", result.name, result.error)
runCancel() // this will cause shutdownTimer initialization in the next loop
}
if result.name == "main operator" {
postMainCancel()
}
}
} else { // shutting down
select {
case <-shutdownTimer.C: // never triggers after the channel is stopped, although it would not matter much if it did because subsequent cancel calls do nothing.
shutdownCancel()
shutdownTimer.Stop()
case err := <-errorChannel:
errorChannelCount--
if err != nil {
klog.Error(err)
runCancel()
case result := <-resultChannel:
resultChannelCount--
if result.error == nil {
klog.Infof("Collected %s goroutine.", result.name)
} else {
klog.Errorf("Collected %s goroutine: %v", result.name, result.error)
}
if result.name == "main operator" {
postMainCancel()
}
}
}
}

<-exit
klog.Info("Finished collecting operator goroutines.")
}

// createResourceLock initializes the lock.
Expand Down Expand Up @@ -440,17 +453,3 @@ func (o *Options) NewControllerContext(cb *ClientBuilder) *Context {
}
return ctx
}

// Start launches the controllers in the provided context and any supporting
// infrastructure. When ch is closed the controllers will be shut down.
func (c *Context) Start(ctx context.Context) {
ch := ctx.Done()
go c.CVO.Run(ctx, 2)
if c.AutoUpdate != nil {
go c.AutoUpdate.Run(ctx, 2)
}
c.CVInformerFactory.Start(ch)
c.OpenshiftConfigInformerFactory.Start(ch)
c.OpenshiftConfigManagedInformerFactory.Start(ch)
c.InformerFactory.Start(ch)
}
Loading