Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions pkg/cvo/cvo.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,11 @@ func (optr *Operator) InitializeFromPayload(restConfig *rest.Config, burstRestCo
// Run runs the cluster version operator until stopCh is completed. Workers is ignored for now.
func (optr *Operator) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
defer optr.queue.ShutDown()
// TODO: when Kube 77170 is fixed we can remove the use of the once here
var shutdownOnce sync.Once
defer shutdownOnce.Do(func() { optr.queue.ShutDown() })
stopCh := ctx.Done()
workerStopCh := make(chan struct{})

glog.Infof("Starting ClusterVersionOperator with minimum reconcile period %s", optr.minimumUpdateCheckInterval)
defer glog.Info("Shutting down ClusterVersionOperator")
Expand All @@ -243,11 +246,22 @@ func (optr *Operator) Run(ctx context.Context, workers int) {
// start the config sync loop, and have it notify the queue when new status is detected
go runThrottledStatusNotifier(stopCh, optr.statusInterval, 2, optr.configSync.StatusCh(), func() { optr.queue.Add(optr.queueKey()) })
go optr.configSync.Start(ctx, 16)

go wait.Until(func() { optr.worker(optr.queue, optr.sync) }, time.Second, stopCh)
go wait.Until(func() { optr.worker(optr.availableUpdatesQueue, optr.availableUpdatesSync) }, time.Second, stopCh)
go wait.Until(func() {
defer close(workerStopCh)

// run the worker, then when the queue is closed sync one final time to flush any pending status
optr.worker(optr.queue, optr.sync)
if err := optr.sync(optr.queueKey()); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to perform final sync: %v", err))
}
}, time.Second, stopCh)

<-stopCh

// stop the queue, then wait for the worker to exit
shutdownOnce.Do(func() { optr.queue.ShutDown() })
<-workerStopCh
}

func (optr *Operator) queueKey() string {
Expand Down
2 changes: 1 addition & 1 deletion pkg/start/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (o *Options) Run() error {

// exit after 2s no matter what
select {
case <-time.After(2 * time.Second):
case <-time.After(5 * time.Second):
glog.Fatalf("Exiting")
case <-ch:
glog.Fatalf("Received shutdown signal twice, exiting")
Expand Down
16 changes: 10 additions & 6 deletions pkg/start/start_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,9 +739,11 @@ func waitForUpdateAvailable(t *testing.T, client clientset.Interface, ns string,
}

if len(versions) == 1 {
// we should not observe status.generation == metadata.generation without also observing a status history entry
// we should not observe status.generation == metadata.generation without also observing a status history entry - if
// a version is set, it must match our desired version (we can occasionally observe a "working towards" event where only
// the image value is set, not the version)
if cv.Status.ObservedGeneration == cv.Generation {
if len(cv.Status.History) == 0 || cv.Status.History[0].Version != versions[0] {
if len(cv.Status.History) == 0 || (cv.Status.History[0].Version != "" && cv.Status.History[0].Version != versions[0]) {
return false, fmt.Errorf("initializing operator should set history and generation at the same time")
}
}
Expand Down Expand Up @@ -770,10 +772,11 @@ func waitForUpdateAvailable(t *testing.T, client clientset.Interface, ns string,
// we should not observe status.generation == metadata.generation without also observing a status history entry
if cv.Status.ObservedGeneration == cv.Generation {
target := versions[len(versions)-1]
if cv.Status.Desired.Version != target {
hasVersion := cv.Status.Desired.Version != ""
if hasVersion && cv.Status.Desired.Version != target {
return false, fmt.Errorf("upgrading operator should always have desired version when spec version is set")
}
if len(cv.Status.History) == 0 || cv.Status.History[0].Version != target {
if len(cv.Status.History) == 0 || (hasVersion && cv.Status.History[0].Version != target) {
return false, fmt.Errorf("upgrading operator should set history and generation at the same time")
}
}
Expand Down Expand Up @@ -873,10 +876,11 @@ func waitUntilUpgradeFails(t *testing.T, client clientset.Interface, ns string,
// we should not observe status.generation == metadata.generation without also observing a status history entry
if cv.Status.ObservedGeneration == cv.Generation {
target := versions[len(versions)-1]
if cv.Status.Desired.Version != target {
hasVersion := cv.Status.Desired.Version != ""
if hasVersion && cv.Status.Desired.Version != target {
return false, fmt.Errorf("upgrading operator should always have desired version when spec version is set")
}
if len(cv.Status.History) == 0 || cv.Status.History[0].Version != target {
if len(cv.Status.History) == 0 || (hasVersion && cv.Status.History[0].Version != target) {
return false, fmt.Errorf("upgrading operator should set history and generation at the same time")
}
}
Expand Down