Skip to content

Commit dbedb7a

Browse files
cvo: When the CVO restarts, perform one final sync to write status
When we upgrade the CVO causes itself to reboot by updating the deployment. The CVO gets signalled with SIGTERM and then releases the leader lease. However, there is no guarantee the latest status of the CVO has been flushed to the cluster version object which can mean the "verified: true" flag that the sync worker calculates when it retrieves the payload doesn't get written. The new CVO pod loads from the payload and so doesn't have the verified flag. While in the future we may want to completely decouple verification from payload retrieval (background worker that verifies available updates as well as checks historical records), for now we need to ensure the loaded state is persisted to the CV. Since there may be useful human information available about the payload that a failed new CVO pod might not get a chance to write, alter the CVO sync loop to perform one final status sync during shutdown, and increase the amount of time we wait before hard shutdown to 5s to give it more room to happen.
1 parent 51fef0b commit dbedb7a

File tree

2 files changed

+18
-4
lines changed

2 files changed

+18
-4
lines changed

pkg/cvo/cvo.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -226,8 +226,11 @@ func (optr *Operator) InitializeFromPayload(restConfig *rest.Config, burstRestCo
226226
// Run runs the cluster version operator until stopCh is completed. Workers is ignored for now.
227227
func (optr *Operator) Run(ctx context.Context, workers int) {
228228
defer utilruntime.HandleCrash()
229-
defer optr.queue.ShutDown()
229+
// TODO: when Kube 77170 is fixed we can remove the use of the once here
230+
var shutdownOnce sync.Once
231+
defer shutdownOnce.Do(func() { optr.queue.ShutDown() })
230232
stopCh := ctx.Done()
233+
workerStopCh := make(chan struct{})
231234

232235
glog.Infof("Starting ClusterVersionOperator with minimum reconcile period %s", optr.minimumUpdateCheckInterval)
233236
defer glog.Info("Shutting down ClusterVersionOperator")
@@ -243,11 +246,22 @@ func (optr *Operator) Run(ctx context.Context, workers int) {
243246
// start the config sync loop, and have it notify the queue when new status is detected
244247
go runThrottledStatusNotifier(stopCh, optr.statusInterval, 2, optr.configSync.StatusCh(), func() { optr.queue.Add(optr.queueKey()) })
245248
go optr.configSync.Start(ctx, 16)
246-
247-
go wait.Until(func() { optr.worker(optr.queue, optr.sync) }, time.Second, stopCh)
248249
go wait.Until(func() { optr.worker(optr.availableUpdatesQueue, optr.availableUpdatesSync) }, time.Second, stopCh)
250+
go wait.Until(func() {
251+
defer close(workerStopCh)
252+
253+
// run the worker, then when the queue is closed sync one final time to flush any pending status
254+
optr.worker(optr.queue, optr.sync)
255+
if err := optr.sync(optr.queueKey()); err != nil {
256+
utilruntime.HandleError(fmt.Errorf("unable to perform final sync: %v", err))
257+
}
258+
}, time.Second, stopCh)
249259

250260
<-stopCh
261+
262+
// stop the queue, then wait for the worker to exit
263+
shutdownOnce.Do(func() { optr.queue.ShutDown() })
264+
<-workerStopCh
251265
}
252266

253267
func (optr *Operator) queueKey() string {

pkg/start/start.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ func (o *Options) Run() error {
132132

133133
// exit after 2s no matter what
134134
select {
135-
case <-time.After(2 * time.Second):
135+
case <-time.After(5 * time.Second):
136136
glog.Fatalf("Exiting")
137137
case <-ch:
138138
glog.Fatalf("Received shutdown signal twice, exiting")

0 commit comments

Comments
 (0)