diff --git a/pkg/cvo/cvo.go b/pkg/cvo/cvo.go index 124bf7459..2dc27b4b8 100644 --- a/pkg/cvo/cvo.go +++ b/pkg/cvo/cvo.go @@ -226,8 +226,11 @@ func (optr *Operator) InitializeFromPayload(restConfig *rest.Config, burstRestCo // Run runs the cluster version operator until stopCh is completed. Workers is ignored for now. func (optr *Operator) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() - defer optr.queue.ShutDown() + // TODO: when Kube 77170 is fixed we can remove the use of the once here + var shutdownOnce sync.Once + defer shutdownOnce.Do(func() { optr.queue.ShutDown() }) stopCh := ctx.Done() + workerStopCh := make(chan struct{}) glog.Infof("Starting ClusterVersionOperator with minimum reconcile period %s", optr.minimumUpdateCheckInterval) defer glog.Info("Shutting down ClusterVersionOperator") @@ -243,11 +246,22 @@ func (optr *Operator) Run(ctx context.Context, workers int) { // start the config sync loop, and have it notify the queue when new status is detected go runThrottledStatusNotifier(stopCh, optr.statusInterval, 2, optr.configSync.StatusCh(), func() { optr.queue.Add(optr.queueKey()) }) go optr.configSync.Start(ctx, 16) - - go wait.Until(func() { optr.worker(optr.queue, optr.sync) }, time.Second, stopCh) go wait.Until(func() { optr.worker(optr.availableUpdatesQueue, optr.availableUpdatesSync) }, time.Second, stopCh) + go wait.Until(func() { + defer close(workerStopCh) + + // run the worker, then when the queue is closed sync one final time to flush any pending status + optr.worker(optr.queue, optr.sync) + if err := optr.sync(optr.queueKey()); err != nil { + utilruntime.HandleError(fmt.Errorf("unable to perform final sync: %v", err)) + } + }, time.Second, stopCh) <-stopCh + + // stop the queue, then wait for the worker to exit + shutdownOnce.Do(func() { optr.queue.ShutDown() }) + <-workerStopCh } func (optr *Operator) queueKey() string { diff --git a/pkg/start/start.go b/pkg/start/start.go index 0cc40b506..64253cd69 100644 --- a/pkg/start/start.go +++ b/pkg/start/start.go @@ -132,7 +132,7 @@ func (o *Options) Run() error { // exit after 2s no matter what select { - case <-time.After(2 * time.Second): + case <-time.After(5 * time.Second): glog.Fatalf("Exiting") case <-ch: glog.Fatalf("Received shutdown signal twice, exiting") diff --git a/pkg/start/start_integration_test.go b/pkg/start/start_integration_test.go index 800eac708..64b86589a 100644 --- a/pkg/start/start_integration_test.go +++ b/pkg/start/start_integration_test.go @@ -739,9 +739,11 @@ func waitForUpdateAvailable(t *testing.T, client clientset.Interface, ns string, } if len(versions) == 1 { - // we should not observe status.generation == metadata.generation without also observing a status history entry + // we should not observe status.generation == metadata.generation without also observing a status history entry - if + // a version is set, it must match our desired version (we can occasionally observe a "working towards" event where only + // the image value is set, not the version) if cv.Status.ObservedGeneration == cv.Generation { - if len(cv.Status.History) == 0 || cv.Status.History[0].Version != versions[0] { + if len(cv.Status.History) == 0 || (cv.Status.History[0].Version != "" && cv.Status.History[0].Version != versions[0]) { return false, fmt.Errorf("initializing operator should set history and generation at the same time") } } @@ -770,10 +772,11 @@ func waitForUpdateAvailable(t *testing.T, client clientset.Interface, ns string, // we should not observe status.generation == metadata.generation without also observing a status history entry if cv.Status.ObservedGeneration == cv.Generation { target := versions[len(versions)-1] - if cv.Status.Desired.Version != target { + hasVersion := cv.Status.Desired.Version != "" + if hasVersion && cv.Status.Desired.Version != target { return false, fmt.Errorf("upgrading operator should always have desired version when spec version is set") } - if len(cv.Status.History) == 0 || cv.Status.History[0].Version != target { + if len(cv.Status.History) == 0 || (hasVersion && cv.Status.History[0].Version != target) { return false, fmt.Errorf("upgrading operator should set history and generation at the same time") } } @@ -873,10 +876,11 @@ func waitUntilUpgradeFails(t *testing.T, client clientset.Interface, ns string, // we should not observe status.generation == metadata.generation without also observing a status history entry if cv.Status.ObservedGeneration == cv.Generation { target := versions[len(versions)-1] - if cv.Status.Desired.Version != target { + hasVersion := cv.Status.Desired.Version != "" + if hasVersion && cv.Status.Desired.Version != target { return false, fmt.Errorf("upgrading operator should always have desired version when spec version is set") } - if len(cv.Status.History) == 0 || cv.Status.History[0].Version != target { + if len(cv.Status.History) == 0 || (hasVersion && cv.Status.History[0].Version != target) { return false, fmt.Errorf("upgrading operator should set history and generation at the same time") } }