Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/cvo/cvo.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func (optr *Operator) Run(ctx context.Context, workers int) {

// start the config sync loop, and have it notify the queue when new status is detected
go runThrottledStatusNotifier(stopCh, optr.statusInterval, 2, optr.configSync.StatusCh(), func() { optr.queue.Add(optr.queueKey()) })
go optr.configSync.Start(ctx, 16)
go optr.configSync.Start(ctx, 16, optr.name, optr.cvLister)
go wait.Until(func() { optr.worker(optr.availableUpdatesQueue, optr.availableUpdatesSync) }, time.Second, stopCh)
go wait.Until(func() { optr.worker(optr.upgradeableQueue, optr.upgradeableSync) }, time.Second, stopCh)
go wait.Until(func() {
Expand Down
24 changes: 11 additions & 13 deletions pkg/cvo/cvo_scenarios_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestCVO_StartupAndSync(t *testing.T) {

defer shutdownFn()
worker := o.configSync.(*SyncWorker)
go worker.Start(ctx, 1)
go worker.Start(ctx, 1, o.name, o.cvLister)

// Step 1: Verify the CVO creates the initial Cluster Version object
//
Expand Down Expand Up @@ -377,7 +377,7 @@ func TestCVO_StartupAndSyncUnverifiedPayload(t *testing.T) {
VerificationError: payloadErr,
}

go worker.Start(ctx, 1)
go worker.Start(ctx, 1, o.name, o.cvLister)

// Step 1: Verify the CVO creates the initial Cluster Version object
//
Expand Down Expand Up @@ -638,7 +638,7 @@ func TestCVO_StartupAndSyncPreconditionFailing(t *testing.T) {
Directory: "testdata/payloadtest",
Local: true,
}
go worker.Start(ctx, 1)
go worker.Start(ctx, 1, o.name, o.cvLister)

// Step 1: Verify the CVO creates the initial Cluster Version object
//
Expand Down Expand Up @@ -938,7 +938,7 @@ func TestCVO_UpgradeUnverifiedPayload(t *testing.T) {
retriever := worker.retriever.(*fakeDirectoryRetriever)
retriever.Set(PayloadInfo{}, payloadErr)

go worker.Start(ctx, 1)
go worker.Start(ctx, 1, o.name, o.cvLister)

// Step 1: The operator should report that it is blocked on unverified content
//
Expand Down Expand Up @@ -1163,7 +1163,7 @@ func TestCVO_UpgradeUnverifiedPayloadRetriveOnce(t *testing.T) {
retriever := worker.retriever.(*fakeDirectoryRetriever)
retriever.Set(PayloadInfo{}, payloadErr)

go worker.Start(ctx, 1)
go worker.Start(ctx, 1, o.name, o.cvLister)

// Step 1: The operator should report that it is blocked on unverified content
//
Expand Down Expand Up @@ -1415,7 +1415,7 @@ func TestCVO_UpgradePreconditionFailing(t *testing.T) {
worker := o.configSync.(*SyncWorker)
worker.preconditions = []precondition.Precondition{&testPrecondition{SuccessAfter: 3}}

go worker.Start(ctx, 1)
go worker.Start(ctx, 1, o.name, o.cvLister)

// Step 1: The operator should report that it is blocked on precondition checks failing
//
Expand Down Expand Up @@ -1650,7 +1650,7 @@ func TestCVO_UpgradeVerifiedPayload(t *testing.T) {
retriever := worker.retriever.(*fakeDirectoryRetriever)
retriever.Set(PayloadInfo{}, payloadErr)

go worker.Start(ctx, 1)
go worker.Start(ctx, 1, o.name, o.cvLister)

// Step 1: The operator should report that it is blocked on unverified content
//
Expand Down Expand Up @@ -1886,7 +1886,7 @@ func TestCVO_RestartAndReconcile(t *testing.T) {
// Step 2: Start the sync worker and verify the sequence of events, and then verify
// the status does not change
//
go worker.Start(ctx, 1)
go worker.Start(ctx, 1, o.name, o.cvLister)
//
verifyAllStatus(t, worker.StatusCh(),
SyncWorkerStatus{
Expand Down Expand Up @@ -2045,10 +2045,8 @@ func TestCVO_ErrorDuringReconcile(t *testing.T) {
if worker.work.State != payload.ReconcilingPayload {
t.Fatalf("The worker should be reconciling: %v", worker.work)
}

// Step 2: Start the sync worker and verify the sequence of events
//
go worker.Start(ctx, 1)
go worker.Start(ctx, 1, o.name, o.cvLister)
//
verifyAllStatus(t, worker.StatusCh(),
SyncWorkerStatus{
Expand Down Expand Up @@ -2252,7 +2250,7 @@ func TestCVO_ParallelError(t *testing.T) {
//
cancellable, cancel := context.WithCancel(ctx)
defer cancel()
go worker.Start(cancellable, 1)
go worker.Start(cancellable, 1, o.name, o.cvLister)
//
verifyAllStatus(t, worker.StatusCh(),
SyncWorkerStatus{
Expand Down Expand Up @@ -2470,7 +2468,7 @@ func verifyAllStatus(t *testing.T, ch <-chan SyncWorkerStatus, items ...SyncWork
t.Helper()
if len(items) == 0 {
if len(ch) > 0 {
t.Fatalf("expected status to empty, got %#v", <-ch)
t.Fatalf("expected status to be empty, got %#v", <-ch)
}
return
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/cvo/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
clientgotesting "k8s.io/client-go/testing"

configv1 "github.com/openshift/api/config/v1"
configlistersv1 "github.com/openshift/client-go/config/listers/config/v1"

"github.com/openshift/cluster-version-operator/lib"
"github.com/openshift/cluster-version-operator/lib/resourcebuilder"
Expand Down Expand Up @@ -387,7 +388,8 @@ func (r *fakeSyncRecorder) StatusCh() <-chan SyncWorkerStatus {
return ch
}

func (r *fakeSyncRecorder) Start(ctx context.Context, maxWorkers int) {}
func (r *fakeSyncRecorder) Start(ctx context.Context, maxWorkers int, cvoOptrName string, lister configlistersv1.ClusterVersionLister) {
}

func (r *fakeSyncRecorder) Update(generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, state payload.State) *SyncWorkerStatus {
r.Updates = append(r.Updates, desired)
Expand Down Expand Up @@ -453,7 +455,7 @@ func (pf *testPrecondition) Name() string {
return fmt.Sprintf("TestPrecondition SuccessAfter: %d", pf.SuccessAfter)
}

func (pf *testPrecondition) Run(_ context.Context, _ precondition.ReleaseContext) error {
func (pf *testPrecondition) Run(_ context.Context, _ precondition.ReleaseContext, cv *configv1.ClusterVersion) error {
if pf.SuccessAfter == 0 {
return nil
}
Expand Down
15 changes: 10 additions & 5 deletions pkg/cvo/sync_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync"
"time"

configlistersv1 "github.com/openshift/client-go/config/listers/config/v1"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate"
"k8s.io/klog"
Expand All @@ -29,7 +30,7 @@ import (

// ConfigSyncWorker abstracts how the image is synchronized to the server. Introduced for testing.
type ConfigSyncWorker interface {
Start(ctx context.Context, maxWorkers int)
Start(ctx context.Context, maxWorkers int, cvoOptrName string, lister configlistersv1.ClusterVersionLister)
Update(generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, state payload.State) *SyncWorkerStatus
StatusCh() <-chan SyncWorkerStatus
}
Expand Down Expand Up @@ -241,7 +242,7 @@ func (w *SyncWorker) Update(generation int64, desired configv1.Update, overrides
// Start periodically invokes run, detecting whether content has changed.
// It is edge-triggered when Update() is invoked and level-driven after the
// syncOnce() has succeeded for a given input (we are said to be "reconciling").
func (w *SyncWorker) Start(ctx context.Context, maxWorkers int) {
func (w *SyncWorker) Start(ctx context.Context, maxWorkers int, cvoOptrName string, lister configlistersv1.ClusterVersionLister) {
klog.V(5).Infof("Starting sync worker")

work := &SyncWork{}
Expand Down Expand Up @@ -306,11 +307,15 @@ func (w *SyncWorker) Start(ctx context.Context, maxWorkers int) {
w.lock.Unlock()
defer cancelFn()

config, err := lister.Get(cvoOptrName)
if err != nil {
return err
}
// reporter hides status updates that occur earlier than the previous failure,
// so that we don't fail, then immediately start reporting an earlier status
reporter := &statusWrapper{w: w, previousStatus: w.Status()}
klog.V(5).Infof("Previous sync status: %#v", reporter.previousStatus)
return w.syncOnce(ctx, work, maxWorkers, reporter)
return w.syncOnce(ctx, work, maxWorkers, reporter, config)
}()
if err != nil {
// backoff wait
Expand Down Expand Up @@ -466,7 +471,7 @@ func (w *SyncWorker) Status() *SyncWorkerStatus {
// sync retrieves the image and applies it to the server, returning an error if
// the update could not be completely applied. The status is updated as we progress.
// Cancelling the context will abort the execution of the sync.
func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers int, reporter StatusReporter) error {
func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers int, reporter StatusReporter, clusterVersion *configv1.ClusterVersion) error {
klog.V(4).Infof("Running sync %s (force=%t) on generation %d in state %s at attempt %d", versionString(work.Desired), work.Desired.Force, work.Generation, work.State, work.Attempt)
update := work.Desired

Expand Down Expand Up @@ -524,7 +529,7 @@ func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers in
Actual: update,
Verified: info.Verified,
})
if err := precondition.Summarize(w.preconditions.RunAll(ctx, precondition.ReleaseContext{DesiredVersion: payloadUpdate.ReleaseVersion})); err != nil {
if err := precondition.Summarize(w.preconditions.RunAll(ctx, precondition.ReleaseContext{DesiredVersion: payloadUpdate.ReleaseVersion}, clusterVersion)); err != nil {
if update.Force {
klog.V(4).Infof("Forcing past precondition failures: %s", err)
} else {
Expand Down
2 changes: 1 addition & 1 deletion pkg/payload/precondition/clusterversion/upgradable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestUpgradeableRun(t *testing.T) {
cvLister := fakeClusterVersionLister(clusterVersion)
instance := NewUpgradeable(cvLister)

err := instance.Run(context.TODO(), precondition.ReleaseContext{DesiredVersion: tc.desiredVersion})
err := instance.Run(context.TODO(), precondition.ReleaseContext{DesiredVersion: tc.desiredVersion}, clusterVersion)
switch {
case err != nil && len(tc.expected) == 0:
t.Error(err)
Expand Down
33 changes: 30 additions & 3 deletions pkg/payload/precondition/clusterversion/upgradeable.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,26 @@ func NewUpgradeable(lister configv1listers.ClusterVersionLister) *Upgradeable {
}
}

// ClusterVersionOverridesCondition returns an UpgradeableClusterVersionOverrides condition when overrides are set, and nil when no overrides are set.
func ClusterVersionOverridesCondition(cv *configv1.ClusterVersion) *configv1.ClusterOperatorStatusCondition {
for _, override := range cv.Spec.Overrides {
if override.Unmanaged {
condition := configv1.ClusterOperatorStatusCondition{
Type: configv1.ClusterStatusConditionType("UpgradeableClusterVersionOverrides"),
Status: configv1.ConditionFalse,
Reason: "ClusterVersionOverridesSet",
Message: "Disabling ownership via cluster version overrides prevents upgrades. Please remove overrides before continuing.",
}
return &condition
}
}
return nil
}

// Run runs the Upgradeable precondition.
// If the feature gate `key` is not found, or the api for clusterversion doesn't exist, this check is inert and always returns nil error.
// Otherwise, if Upgradeable condition is set to false in the object, it returns an PreconditionError when possible.
func (pf *Upgradeable) Run(ctx context.Context, releaseContext precondition.ReleaseContext) error {
func (pf *Upgradeable) Run(ctx context.Context, releaseContext precondition.ReleaseContext, clusterVersion *configv1.ClusterVersion) error {
cv, err := pf.lister.Get(pf.key)
if apierrors.IsNotFound(err) || meta.IsNoMatchError(err) {
return nil
Expand Down Expand Up @@ -68,9 +84,20 @@ func (pf *Upgradeable) Run(ctx context.Context, releaseContext precondition.Rele
klog.V(5).Infof("currentMinor %s releaseContext.DesiredVersion %s desiredMinor %s", currentMinor, releaseContext.DesiredVersion, desiredMinor)

// if there is no difference in the minor version (4.y.z where 4.y is the same for current and desired), then we can still upgrade
// if no cluster overrides have been set
if currentMinor == desiredMinor {
klog.V(4).Infof("Precondition %q passed: minor from the current %s matches minor from the target %s (both %s).", pf.Name(), cv.Status.History[0].Version, releaseContext.DesiredVersion, currentMinor)
return nil
klog.V(4).Infof("Precondition %q passed: minor from the current %s matches minor from the target %s (both %s).", pf.Name(), currentVersion, releaseContext.DesiredVersion, currentMinor)
if condition := ClusterVersionOverridesCondition(clusterVersion); condition != nil {
klog.V(4).Infof("Update from %s to %s blocked by %s: %s", currentVersion, releaseContext.DesiredVersion, condition.Reason, condition.Message)

return &precondition.Error{
Reason: condition.Reason,
Message: condition.Message,
Name: pf.Name(),
}
} else {
return nil
}
}

return &precondition.Error{
Expand Down
7 changes: 4 additions & 3 deletions pkg/payload/precondition/precondition.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"strings"

configv1 "github.com/openshift/api/config/v1"
"k8s.io/klog"

"github.com/openshift/cluster-version-operator/pkg/payload"
Expand Down Expand Up @@ -41,7 +42,7 @@ type ReleaseContext struct {
// Precondition defines the precondition check for a payload.
type Precondition interface {
// Run executes the precondition checks ands returns an error when the precondition fails.
Run(ctx context.Context, releaseContext ReleaseContext) error
Run(ctx context.Context, releaseContext ReleaseContext, cv *configv1.ClusterVersion) error

// Name returns a human friendly name for the precondition.
Name() string
Expand All @@ -52,10 +53,10 @@ type List []Precondition

// RunAll runs all the reflight checks in order, returning a list of errors if any.
// All checks are run, regardless if any one precondition fails.
func (pfList List) RunAll(ctx context.Context, releaseContext ReleaseContext) []error {
func (pfList List) RunAll(ctx context.Context, releaseContext ReleaseContext, cv *configv1.ClusterVersion) []error {
var errs []error
for _, pf := range pfList {
if err := pf.Run(ctx, releaseContext); err != nil {
if err := pf.Run(ctx, releaseContext, cv); err != nil {
klog.Errorf("Precondition %q failed: %v", pf.Name(), err)
errs = append(errs, err)
}
Expand Down