diff --git a/lib/resourcebuilder/apiext.go b/lib/resourcebuilder/apiext.go index e345ea276..8ab83d268 100644 --- a/lib/resourcebuilder/apiext.go +++ b/lib/resourcebuilder/apiext.go @@ -30,6 +30,10 @@ func newCRDBuilder(config *rest.Config, m lib.Manifest) Interface { } } +func (b *crdBuilder) WithMode(m Mode) Interface { + return b +} + func (b *crdBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b diff --git a/lib/resourcebuilder/apireg.go b/lib/resourcebuilder/apireg.go index da428fd22..cda4d3cee 100644 --- a/lib/resourcebuilder/apireg.go +++ b/lib/resourcebuilder/apireg.go @@ -23,6 +23,10 @@ func newAPIServiceBuilder(config *rest.Config, m lib.Manifest) Interface { } } +func (b *apiServiceBuilder) WithMode(m Mode) Interface { + return b +} + func (b *apiServiceBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b diff --git a/lib/resourcebuilder/apps.go b/lib/resourcebuilder/apps.go index f086f6479..79afb9ffb 100644 --- a/lib/resourcebuilder/apps.go +++ b/lib/resourcebuilder/apps.go @@ -31,6 +31,10 @@ func newDeploymentBuilder(config *rest.Config, m lib.Manifest) Interface { } } +func (b *deploymentBuilder) WithMode(m Mode) Interface { + return b +} + func (b *deploymentBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b @@ -91,6 +95,10 @@ func newDaemonsetBuilder(config *rest.Config, m lib.Manifest) Interface { } } +func (b *daemonsetBuilder) WithMode(m Mode) Interface { + return b +} + func (b *daemonsetBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b diff --git a/lib/resourcebuilder/batch.go b/lib/resourcebuilder/batch.go index 15f922e1f..843215caa 100644 --- a/lib/resourcebuilder/batch.go +++ b/lib/resourcebuilder/batch.go @@ -30,6 +30,10 @@ func newJobBuilder(config *rest.Config, m lib.Manifest) Interface { } } +func (b *jobBuilder) WithMode(m Mode) Interface { + return b +} + func (b *jobBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b diff --git a/lib/resourcebuilder/core.go b/lib/resourcebuilder/core.go index f121a66e5..f8f03f0db 100644 --- a/lib/resourcebuilder/core.go +++ b/lib/resourcebuilder/core.go @@ -23,6 +23,10 @@ func newServiceAccountBuilder(config *rest.Config, m lib.Manifest) Interface { } } +func (b *serviceAccountBuilder) WithMode(m Mode) Interface { + return b +} + func (b *serviceAccountBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b @@ -50,6 +54,10 @@ func newConfigMapBuilder(config *rest.Config, m lib.Manifest) Interface { } } +func (b *configMapBuilder) WithMode(m Mode) Interface { + return b +} + func (b *configMapBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b @@ -77,6 +85,10 @@ func newNamespaceBuilder(config *rest.Config, m lib.Manifest) Interface { } } +func (b *namespaceBuilder) WithMode(m Mode) Interface { + return b +} + func (b *namespaceBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b @@ -104,6 +116,10 @@ func newServiceBuilder(config *rest.Config, m lib.Manifest) Interface { } } +func (b *serviceBuilder) WithMode(m Mode) Interface { + return b +} + func (b *serviceBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b diff --git a/lib/resourcebuilder/interface.go b/lib/resourcebuilder/interface.go index c3665f43d..a22c8a142 100644 --- a/lib/resourcebuilder/interface.go +++ b/lib/resourcebuilder/interface.go @@ -63,8 +63,18 @@ type MetaV1ObjectModifierFunc func(metav1.Object) // and the Manifest. type NewInteraceFunc func(rest *rest.Config, m lib.Manifest) Interface +// Mode is how this builder is being used. +type Mode int + +const ( + UpdatingMode Mode = iota + ReconcilingMode + InitializingMode +) + type Interface interface { WithModifier(MetaV1ObjectModifierFunc) Interface + WithMode(Mode) Interface Do(context.Context) error } diff --git a/lib/resourcebuilder/rbac.go b/lib/resourcebuilder/rbac.go index 098726be7..b3a0d57a4 100644 --- a/lib/resourcebuilder/rbac.go +++ b/lib/resourcebuilder/rbac.go @@ -23,6 +23,10 @@ func newClusterRoleBuilder(config *rest.Config, m lib.Manifest) Interface { } } +func (b *clusterRoleBuilder) WithMode(m Mode) Interface { + return b +} + func (b *clusterRoleBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b @@ -50,6 +54,10 @@ func newClusterRoleBindingBuilder(config *rest.Config, m lib.Manifest) Interface } } +func (b *clusterRoleBindingBuilder) WithMode(m Mode) Interface { + return b +} + func (b *clusterRoleBindingBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b @@ -77,6 +85,10 @@ func newRoleBuilder(config *rest.Config, m lib.Manifest) Interface { } } +func (b *roleBuilder) WithMode(m Mode) Interface { + return b +} + func (b *roleBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b @@ -104,6 +116,10 @@ func newRoleBindingBuilder(config *rest.Config, m lib.Manifest) Interface { } } +func (b *roleBindingBuilder) WithMode(m Mode) Interface { + return b +} + func (b *roleBindingBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b diff --git a/lib/resourcebuilder/security.go b/lib/resourcebuilder/security.go index f89e2dde2..91e2a11b1 100644 --- a/lib/resourcebuilder/security.go +++ b/lib/resourcebuilder/security.go @@ -23,6 +23,10 @@ func newSecurityBuilder(config *rest.Config, m lib.Manifest) Interface { } } +func (b *securityBuilder) WithMode(m Mode) Interface { + return b +} + func (b *securityBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b diff --git a/pkg/cvo/cvo.go b/pkg/cvo/cvo.go index 1feedc81b..56e60e9a6 100644 --- a/pkg/cvo/cvo.go +++ b/pkg/cvo/cvo.go @@ -31,7 +31,6 @@ import ( "github.com/openshift/cluster-version-operator/lib" "github.com/openshift/cluster-version-operator/lib/resourceapply" "github.com/openshift/cluster-version-operator/lib/resourcebuilder" - "github.com/openshift/cluster-version-operator/lib/resourcemerge" "github.com/openshift/cluster-version-operator/lib/validation" "github.com/openshift/cluster-version-operator/pkg/cvo/internal" "github.com/openshift/cluster-version-operator/pkg/cvo/internal/dynamicclient" @@ -157,7 +156,7 @@ func New( kubeClient: kubeClient, eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: namespace}), - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "clusterversion"), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "clusterversion"), availableUpdatesQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "availableupdates"), } @@ -334,10 +333,19 @@ func (optr *Operator) sync(key string) error { }, errs) } + // identify an initial state to inform the sync loop of + var state payload.State + switch { + case hasNeverReachedLevel(config): + state = payload.InitializingPayload + case hasReachedLevel(config, desired): + state = payload.ReconcilingPayload + default: + state = payload.UpdatingPayload + } + // inform the config sync loop about our desired state - reconciling := resourcemerge.IsOperatorStatusConditionTrue(config.Status.Conditions, configv1.OperatorAvailable) && - resourcemerge.IsOperatorStatusConditionFalse(config.Status.Conditions, configv1.OperatorProgressing) - status := optr.configSync.Update(config.Generation, desired, config.Spec.Overrides, reconciling) + status := optr.configSync.Update(config.Generation, desired, config.Spec.Overrides, state) // write cluster version status return optr.syncStatus(original, config, status, errs) @@ -485,7 +493,7 @@ func (b *resourceBuilder) BuilderFor(m *lib.Manifest) (resourcebuilder.Interface return internal.NewGenericBuilder(client, *m) } -func (b *resourceBuilder) Apply(ctx context.Context, m *lib.Manifest) error { +func (b *resourceBuilder) Apply(ctx context.Context, m *lib.Manifest, state payload.State) error { builder, err := b.BuilderFor(m) if err != nil { return err @@ -493,5 +501,38 @@ func (b *resourceBuilder) Apply(ctx context.Context, m *lib.Manifest) error { if b.modifier != nil { builder = builder.WithModifier(b.modifier) } - return builder.Do(ctx) + return builder.WithMode(stateToMode(state)).Do(ctx) +} + +func stateToMode(state payload.State) resourcebuilder.Mode { + switch state { + case payload.InitializingPayload: + return resourcebuilder.InitializingMode + case payload.UpdatingPayload: + return resourcebuilder.UpdatingMode + case payload.ReconcilingPayload: + return resourcebuilder.ReconcilingMode + default: + panic(fmt.Sprintf("unexpected payload state %d", int(state))) + } +} + +func hasNeverReachedLevel(cv *configv1.ClusterVersion) bool { + for _, version := range cv.Status.History { + if version.State == configv1.CompletedUpdate { + return false + } + } + // TODO: check the payload, just in case + return true +} + +func hasReachedLevel(cv *configv1.ClusterVersion, desired configv1.Update) bool { + if len(cv.Status.History) == 0 { + return false + } + if cv.Status.History[0].State != configv1.CompletedUpdate { + return false + } + return desired.Image == cv.Status.History[0].Image } diff --git a/pkg/cvo/cvo_scenarios_test.go b/pkg/cvo/cvo_scenarios_test.go index 8517da386..5d51359b8 100644 --- a/pkg/cvo/cvo_scenarios_test.go +++ b/pkg/cvo/cvo_scenarios_test.go @@ -206,23 +206,27 @@ func TestCVO_StartupAndSync(t *testing.T) { }) verifyAllStatus(t, worker.StatusCh(), SyncWorkerStatus{ - Step: "RetrievePayload", + Step: "RetrievePayload", + Initial: true, // the desired version is briefly incorrect (user provided) until we retrieve the image Actual: configv1.Update{Version: "4.0.1", Image: "image/image:1"}, }, SyncWorkerStatus{ Step: "ApplyResources", + Initial: true, VersionHash: "6GC9TkkG9PA=", Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, }, SyncWorkerStatus{ Fraction: float32(1) / 3, Step: "ApplyResources", + Initial: true, VersionHash: "6GC9TkkG9PA=", Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, }, SyncWorkerStatus{ Fraction: float32(2) / 3, + Initial: true, Step: "ApplyResources", VersionHash: "6GC9TkkG9PA=", Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, @@ -382,6 +386,9 @@ func TestCVO_RestartAndReconcile(t *testing.T) { if status := worker.Status(); !status.Reconciling || status.Completed != 0 { t.Fatalf("The worker should be reconciling from the beginning: %#v", status) } + if worker.work.State != payload.ReconcilingPayload { + t.Fatalf("The worker should be reconciling: %v", worker.work) + } // Step 2: Start the sync worker and verify the sequence of events, and then verify // the status does not change @@ -536,6 +543,9 @@ func TestCVO_ErrorDuringReconcile(t *testing.T) { if status := worker.Status(); !status.Reconciling || status.Completed != 0 { t.Fatalf("The worker should be reconciling from the beginning: %#v", status) } + if worker.work.State != payload.ReconcilingPayload { + t.Fatalf("The worker should be reconciling: %v", worker.work) + } // Step 2: Start the sync worker and verify the sequence of events // @@ -658,6 +668,123 @@ func TestCVO_ErrorDuringReconcile(t *testing.T) { }) } +func TestCVO_VerifyInitializingPayloadState(t *testing.T) { + o, cvs, client, _, shutdownFn := setupCVOTest() + stopCh := make(chan struct{}) + defer close(stopCh) + defer shutdownFn() + worker := o.configSync.(*SyncWorker) + b := newBlockingResourceBuilder() + worker.builder = b + + // Setup: a successful sync from a previous run, and the operator at the same image as before + // + o.releaseImage = "image/image:1" + o.releaseVersion = "1.0.0-abc" + desired := configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"} + uid, _ := uuid.NewRandom() + clusterUID := configv1.ClusterID(uid.String()) + cvs["version"] = &configv1.ClusterVersion{ + ObjectMeta: metav1.ObjectMeta{ + Name: "version", + ResourceVersion: "1", + }, + Spec: configv1.ClusterVersionSpec{ + ClusterID: clusterUID, + Channel: "fast", + }, + Status: configv1.ClusterVersionStatus{ + // Prefers the image version over the operator's version (although in general they will remain in sync) + Desired: desired, + VersionHash: "6GC9TkkG9PA=", + History: []configv1.UpdateHistory{ + {State: configv1.PartialUpdate, Image: "image/image:1", Version: "1.0.0-abc", StartedTime: defaultStartedTime}, + }, + Conditions: []configv1.ClusterOperatorStatusCondition{ + {Type: configv1.OperatorAvailable, Status: configv1.ConditionTrue, Message: "Done applying 1.0.0-abc"}, + {Type: configv1.OperatorFailing, Status: configv1.ConditionFalse}, + {Type: configv1.OperatorProgressing, Status: configv1.ConditionFalse, Message: "Cluster version is 1.0.0-abc"}, + {Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse}, + }, + }, + } + + // Step 1: The sync loop starts and triggers a sync, but does not update status + // + client.ClearActions() + err := o.sync(o.queueKey()) + if err != nil { + t.Fatal(err) + } + + // check the worker status is initially set to reconciling + if status := worker.Status(); status.Reconciling || status.Completed != 0 { + t.Fatalf("The worker should be initializing from the beginning: %#v", status) + } + if worker.work.State != payload.InitializingPayload { + t.Fatalf("The worker should be initializing: %v", worker.work) + } +} + +func TestCVO_VerifyUpdatingPayloadState(t *testing.T) { + o, cvs, client, _, shutdownFn := setupCVOTest() + stopCh := make(chan struct{}) + defer close(stopCh) + defer shutdownFn() + worker := o.configSync.(*SyncWorker) + b := newBlockingResourceBuilder() + worker.builder = b + + // Setup: a successful sync from a previous run, and the operator at the same image as before + // + o.releaseImage = "image/image:1" + o.releaseVersion = "1.0.0-abc" + desired := configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"} + uid, _ := uuid.NewRandom() + clusterUID := configv1.ClusterID(uid.String()) + cvs["version"] = &configv1.ClusterVersion{ + ObjectMeta: metav1.ObjectMeta{ + Name: "version", + ResourceVersion: "1", + }, + Spec: configv1.ClusterVersionSpec{ + ClusterID: clusterUID, + Channel: "fast", + }, + Status: configv1.ClusterVersionStatus{ + // Prefers the image version over the operator's version (although in general they will remain in sync) + Desired: desired, + VersionHash: "6GC9TkkG9PA=", + History: []configv1.UpdateHistory{ + {State: configv1.PartialUpdate, Image: "image/image:1", Version: "1.0.0-abc", StartedTime: defaultStartedTime}, + {State: configv1.CompletedUpdate, Image: "image/image:0", Version: "1.0.0-abc.0", StartedTime: defaultStartedTime, CompletionTime: &defaultCompletionTime}, + }, + Conditions: []configv1.ClusterOperatorStatusCondition{ + {Type: configv1.OperatorAvailable, Status: configv1.ConditionTrue, Message: "Done applying 1.0.0-abc"}, + {Type: configv1.OperatorFailing, Status: configv1.ConditionFalse}, + {Type: configv1.OperatorProgressing, Status: configv1.ConditionFalse, Message: "Cluster version is 1.0.0-abc"}, + {Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse}, + }, + }, + } + + // Step 1: The sync loop starts and triggers a sync, but does not update status + // + client.ClearActions() + err := o.sync(o.queueKey()) + if err != nil { + t.Fatal(err) + } + + // check the worker status is initially set to reconciling + if status := worker.Status(); status.Reconciling || status.Completed != 0 { + t.Fatalf("The worker should be updating from the beginning: %#v", status) + } + if worker.work.State != payload.UpdatingPayload { + t.Fatalf("The worker should be updating: %v", worker.work) + } +} + func verifyAllStatus(t *testing.T, ch <-chan SyncWorkerStatus, items ...SyncWorkerStatus) { t.Helper() if len(items) == 0 { @@ -706,6 +833,6 @@ func (b *blockingResourceBuilder) Send(err error) { b.ch <- err } -func (b *blockingResourceBuilder) Apply(ctx context.Context, m *lib.Manifest) error { +func (b *blockingResourceBuilder) Apply(ctx context.Context, m *lib.Manifest, state payload.State) error { return <-b.ch } diff --git a/pkg/cvo/internal/generic.go b/pkg/cvo/internal/generic.go index efb41c1f8..df2c02d03 100644 --- a/pkg/cvo/internal/generic.go +++ b/pkg/cvo/internal/generic.go @@ -73,6 +73,10 @@ func NewGenericBuilder(client dynamic.ResourceInterface, m lib.Manifest) (resour }, nil } +func (b *genericBuilder) WithMode(m resourcebuilder.Mode) resourcebuilder.Interface { + return b +} + func (b *genericBuilder) WithModifier(f resourcebuilder.MetaV1ObjectModifierFunc) resourcebuilder.Interface { b.modifier = f return b diff --git a/pkg/cvo/internal/operatorstatus.go b/pkg/cvo/internal/operatorstatus.go index 0549af1ca..69cbd2dce 100644 --- a/pkg/cvo/internal/operatorstatus.go +++ b/pkg/cvo/internal/operatorstatus.go @@ -53,6 +53,7 @@ type clusterOperatorBuilder struct { client ClusterOperatorsGetter raw []byte modifier resourcebuilder.MetaV1ObjectModifierFunc + mode resourcebuilder.Mode } func newClusterOperatorBuilder(config *rest.Config, m lib.Manifest) resourcebuilder.Interface { @@ -83,6 +84,11 @@ func NewClusterOperatorBuilder(client ClusterOperatorsGetter, m lib.Manifest) re } } +func (b *clusterOperatorBuilder) WithMode(m resourcebuilder.Mode) resourcebuilder.Interface { + b.mode = m + return b +} + func (b *clusterOperatorBuilder) WithModifier(f resourcebuilder.MetaV1ObjectModifierFunc) resourcebuilder.Interface { b.modifier = f return b @@ -95,10 +101,10 @@ func (b *clusterOperatorBuilder) Do(ctx context.Context) error { } ctxWithTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute) defer cancel() - return waitForOperatorStatusToBeDone(ctxWithTimeout, 1*time.Second, b.client, os) + return waitForOperatorStatusToBeDone(ctxWithTimeout, 1*time.Second, b.client, os, b.mode) } -func waitForOperatorStatusToBeDone(ctx context.Context, interval time.Duration, client ClusterOperatorsGetter, expected *configv1.ClusterOperator) error { +func waitForOperatorStatusToBeDone(ctx context.Context, interval time.Duration, client ClusterOperatorsGetter, expected *configv1.ClusterOperator, mode resourcebuilder.Mode) error { var lastErr error err := wait.PollImmediateUntil(interval, func() (bool, error) { actual, err := client.Get(expected.Name) diff --git a/pkg/cvo/internal/operatorstatus_test.go b/pkg/cvo/internal/operatorstatus_test.go index a3ed5efd2..086fce6c9 100644 --- a/pkg/cvo/internal/operatorstatus_test.go +++ b/pkg/cvo/internal/operatorstatus_test.go @@ -16,6 +16,7 @@ import ( configv1 "github.com/openshift/api/config/v1" "github.com/openshift/client-go/config/clientset/versioned/fake" + "github.com/openshift/cluster-version-operator/lib/resourcebuilder" "github.com/openshift/cluster-version-operator/pkg/payload" ) @@ -23,7 +24,7 @@ func Test_waitForOperatorStatusToBeDone(t *testing.T) { tests := []struct { name string actual *configv1.ClusterOperator - + mode resourcebuilder.Mode exp *configv1.ClusterOperator expErr error }{{ @@ -478,7 +479,7 @@ func Test_waitForOperatorStatusToBeDone(t *testing.T) { ctxWithTimeout, cancel := context.WithTimeout(context.TODO(), 1*time.Millisecond) defer cancel() - err := waitForOperatorStatusToBeDone(ctxWithTimeout, 1*time.Millisecond, clientClusterOperatorsGetter{getter: client.ConfigV1().ClusterOperators()}, test.exp) + err := waitForOperatorStatusToBeDone(ctxWithTimeout, 1*time.Millisecond, clientClusterOperatorsGetter{getter: client.ConfigV1().ClusterOperators()}, test.exp, test.mode) if (test.expErr == nil) != (err == nil) { t.Fatalf("unexpected error: %v", err) } diff --git a/pkg/cvo/status.go b/pkg/cvo/status.go index f7c8d952d..2199c7106 100644 --- a/pkg/cvo/status.go +++ b/pkg/cvo/status.go @@ -71,9 +71,7 @@ func mergeOperatorHistory(config *configv1.ClusterVersion, desired configv1.Upda last = &config.Status.History[0] } - if len(config.Status.History) > 10 { - config.Status.History = config.Status.History[:10] - } + pruneStatusHistory(config, 10) if completed { last.State = configv1.CompletedUpdate @@ -88,6 +86,23 @@ func mergeOperatorHistory(config *configv1.ClusterVersion, desired configv1.Upda config.Status.Desired = desired } +func pruneStatusHistory(config *configv1.ClusterVersion, maxHistory int) { + if len(config.Status.History) <= maxHistory { + return + } + for i, item := range config.Status.History { + if item.State != configv1.CompletedUpdate { + continue + } + // guarantee the last position in the history is always a completed item + if i >= maxHistory { + config.Status.History[maxHistory-1] = item + } + break + } + config.Status.History = config.Status.History[:maxHistory] +} + // ClusterVersionInvalid indicates that the cluster version has an error that prevents the server from // taking action. The cluster version operator will only reconcile the current state as long as this // condition is set. diff --git a/pkg/cvo/status_test.go b/pkg/cvo/status_test.go index 4c8435653..5e5fcf835 100644 --- a/pkg/cvo/status_test.go +++ b/pkg/cvo/status_test.go @@ -1,9 +1,11 @@ package cvo import ( + "reflect" "testing" configv1 "github.com/openshift/api/config/v1" + "k8s.io/apimachinery/pkg/util/diff" ) func Test_mergeEqualVersions(t *testing.T) { @@ -39,3 +41,82 @@ func Test_mergeEqualVersions(t *testing.T) { }) } } + +func Test_pruneStatusHistory(t *testing.T) { + obj := &configv1.ClusterVersion{ + Status: configv1.ClusterVersionStatus{ + History: []configv1.UpdateHistory{ + {State: configv1.PartialUpdate, Version: "0.0.10"}, + {State: configv1.PartialUpdate, Version: "0.0.9"}, + {State: configv1.PartialUpdate, Version: "0.0.8"}, + {State: configv1.CompletedUpdate, Version: "0.0.7"}, + {State: configv1.PartialUpdate, Version: "0.0.6"}, + }, + }, + } + tests := []struct { + name string + config *configv1.ClusterVersion + maxHistory int + want []configv1.UpdateHistory + }{ + { + config: obj.DeepCopy(), + maxHistory: 2, + want: []configv1.UpdateHistory{ + {State: configv1.PartialUpdate, Version: "0.0.10"}, + {State: configv1.CompletedUpdate, Version: "0.0.7"}, + }, + }, + { + config: obj.DeepCopy(), + maxHistory: 3, + want: []configv1.UpdateHistory{ + {State: configv1.PartialUpdate, Version: "0.0.10"}, + {State: configv1.PartialUpdate, Version: "0.0.9"}, + {State: configv1.CompletedUpdate, Version: "0.0.7"}, + }, + }, + { + config: obj.DeepCopy(), + maxHistory: 4, + want: []configv1.UpdateHistory{ + {State: configv1.PartialUpdate, Version: "0.0.10"}, + {State: configv1.PartialUpdate, Version: "0.0.9"}, + {State: configv1.PartialUpdate, Version: "0.0.8"}, + {State: configv1.CompletedUpdate, Version: "0.0.7"}, + }, + }, + { + config: obj.DeepCopy(), + maxHistory: 5, + want: []configv1.UpdateHistory{ + {State: configv1.PartialUpdate, Version: "0.0.10"}, + {State: configv1.PartialUpdate, Version: "0.0.9"}, + {State: configv1.PartialUpdate, Version: "0.0.8"}, + {State: configv1.CompletedUpdate, Version: "0.0.7"}, + {State: configv1.PartialUpdate, Version: "0.0.6"}, + }, + }, + { + config: obj.DeepCopy(), + maxHistory: 6, + want: []configv1.UpdateHistory{ + {State: configv1.PartialUpdate, Version: "0.0.10"}, + {State: configv1.PartialUpdate, Version: "0.0.9"}, + {State: configv1.PartialUpdate, Version: "0.0.8"}, + {State: configv1.CompletedUpdate, Version: "0.0.7"}, + {State: configv1.PartialUpdate, Version: "0.0.6"}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + config := tt.config.DeepCopy() + pruneStatusHistory(config, tt.maxHistory) + if !reflect.DeepEqual(tt.want, config.Status.History) { + t.Fatalf("%s", diff.ObjectReflectDiff(tt.want, config.Status.History)) + } + }) + } +} diff --git a/pkg/cvo/sync_test.go b/pkg/cvo/sync_test.go index 71db691ec..7fdb052d0 100644 --- a/pkg/cvo/sync_test.go +++ b/pkg/cvo/sync_test.go @@ -300,10 +300,16 @@ type testBuilder struct { *recorder reactors map[action]error modifiers []resourcebuilder.MetaV1ObjectModifierFunc + mode resourcebuilder.Mode m *lib.Manifest } +func (t *testBuilder) WithMode(m resourcebuilder.Mode) resourcebuilder.Interface { + t.mode = m + return t +} + func (t *testBuilder) WithModifier(m resourcebuilder.MetaV1ObjectModifierFunc) resourcebuilder.Interface { t.modifiers = append(t.modifiers, m) return t @@ -353,7 +359,7 @@ func (r *fakeSyncRecorder) StatusCh() <-chan SyncWorkerStatus { func (r *fakeSyncRecorder) Start(maxWorkers int, stopCh <-chan struct{}) {} -func (r *fakeSyncRecorder) Update(generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, reconciling bool) *SyncWorkerStatus { +func (r *fakeSyncRecorder) Update(generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, state payload.State) *SyncWorkerStatus { r.Updates = append(r.Updates, desired) return r.Returns } @@ -392,7 +398,7 @@ type testResourceBuilder struct { modifiers []resourcebuilder.MetaV1ObjectModifierFunc } -func (b *testResourceBuilder) Apply(ctx context.Context, m *lib.Manifest) error { +func (b *testResourceBuilder) Apply(ctx context.Context, m *lib.Manifest, state payload.State) error { ns := m.Object().GetNamespace() fakeGVR := schema.GroupVersionResource{Group: m.GVK.Group, Version: m.GVK.Version, Resource: strings.ToLower(m.GVK.Kind)} client := b.client.Resource(fakeGVR).Namespace(ns) diff --git a/pkg/cvo/sync_worker.go b/pkg/cvo/sync_worker.go index 5d40bbceb..2d0874464 100644 --- a/pkg/cvo/sync_worker.go +++ b/pkg/cvo/sync_worker.go @@ -25,7 +25,7 @@ import ( // ConfigSyncWorker abstracts how the image is synchronized to the server. Introduced for testing. type ConfigSyncWorker interface { Start(maxWorkers int, stopCh <-chan struct{}) - Update(generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, reconciling bool) *SyncWorkerStatus + Update(generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, state payload.State) *SyncWorkerStatus StatusCh() <-chan SyncWorkerStatus } @@ -41,11 +41,11 @@ type StatusReporter interface { // SyncWork represents the work that should be done in a sync iteration. type SyncWork struct { - Generation int64 - Desired configv1.Update - Overrides []configv1.ComponentOverride - Reconciling bool - Completed int + Generation int64 + Desired configv1.Update + Overrides []configv1.ComponentOverride + State payload.State + Completed int } // Empty returns true if the image is empty for this work. @@ -64,6 +64,7 @@ type SyncWorkerStatus struct { Completed int Reconciling bool + Initial bool VersionHash string Actual configv1.Update @@ -147,7 +148,7 @@ func (w *SyncWorker) StatusCh() <-chan SyncWorkerStatus { // the initial state or whatever the last recorded status was. // TODO: in the future it may be desirable for changes that alter desired to wait briefly before returning, // giving the sync loop the opportunity to observe our change and begin working towards it. -func (w *SyncWorker) Update(generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, reconciling bool) *SyncWorkerStatus { +func (w *SyncWorker) Update(generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, state payload.State) *SyncWorkerStatus { w.lock.Lock() defer w.lock.Unlock() @@ -164,12 +165,10 @@ func (w *SyncWorker) Update(generation int64, desired configv1.Update, overrides // initialize the reconciliation flag and the status the first time // update is invoked if w.work == nil { - if reconciling { - work.Reconciling = true - } + work.State = state w.status = SyncWorkerStatus{ Generation: generation, - Reconciling: work.Reconciling, + Reconciling: state.Reconciling(), Actual: work.Desired, } } @@ -202,7 +201,7 @@ func (w *SyncWorker) Start(maxWorkers int, stopCh <-chan struct{}) { var next <-chan time.Time for { - waitingToReconcile := work.Reconciling + waitingToReconcile := work.State == payload.ReconcilingPayload select { case <-stopCh: glog.V(5).Infof("Stopped worker") @@ -261,7 +260,7 @@ func (w *SyncWorker) Start(maxWorkers int, stopCh <-chan struct{}) { glog.V(5).Infof("Sync succeeded, reconciling") work.Completed++ - work.Reconciling = true + work.State = payload.ReconcilingPayload next = time.After(w.minimumReconcileInterval) } }, 10*time.Millisecond, stopCh) @@ -302,14 +301,14 @@ func (w *SyncWorker) calculateNext(work *SyncWork) bool { // if this is the first time through the loop, initialize reconciling to // the state Update() calculated (to allow us to start in reconciling) if work.Empty() { - work.Reconciling = w.work.Reconciling + work.State = w.work.State } else { if changed { - work.Reconciling = false + work.State = payload.UpdatingPayload } } // always clear the completed variable if we are not reconciling - if !work.Reconciling { + if work.State != payload.ReconcilingPayload { work.Completed = 0 } @@ -378,22 +377,22 @@ func (w *SyncWorker) Status() *SyncWorkerStatus { // the update could not be completely applied. The status is updated as we progress. // Cancelling the context will abort the execution of the sync. func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers int, reporter StatusReporter) error { - glog.V(4).Infof("Running sync %s on generation %d", versionString(work.Desired), work.Generation) + glog.V(4).Infof("Running sync %s on generation %d in state %s", versionString(work.Desired), work.Generation, work.State) update := work.Desired // cache the payload until the release image changes validPayload := w.payload if validPayload == nil || !equalUpdate(configv1.Update{Image: validPayload.ReleaseImage}, update) { glog.V(4).Infof("Loading payload") - reporter.Report(SyncWorkerStatus{Step: "RetrievePayload", Reconciling: work.Reconciling, Actual: update}) + reporter.Report(SyncWorkerStatus{Step: "RetrievePayload", Initial: work.State.Initializing(), Reconciling: work.State.Reconciling(), Actual: update}) payloadDir, err := w.retriever.RetrievePayload(ctx, update) if err != nil { - reporter.Report(SyncWorkerStatus{Failure: err, Step: "RetrievePayload", Reconciling: work.Reconciling, Actual: update}) + reporter.Report(SyncWorkerStatus{Failure: err, Step: "RetrievePayload", Initial: work.State.Initializing(), Reconciling: work.State.Reconciling(), Actual: update}) return err } payloadUpdate, err := payload.LoadUpdate(payloadDir, update.Image) if err != nil { - reporter.Report(SyncWorkerStatus{Failure: err, Step: "VerifyPayload", Reconciling: work.Reconciling, Actual: update}) + reporter.Report(SyncWorkerStatus{Failure: err, Step: "VerifyPayload", Initial: work.State.Initializing(), Reconciling: work.State.Reconciling(), Actual: update}) return err } w.payload = payloadUpdate @@ -418,7 +417,8 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w cr := &consistentReporter{ status: SyncWorkerStatus{ Generation: work.Generation, - Reconciling: work.Reconciling, + Initial: work.State.Initializing(), + Reconciling: work.State.Reconciling(), VersionHash: payloadUpdate.ManifestHash, Actual: update, }, @@ -458,7 +458,7 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w continue } - if err := task.Run(ctx, version, w.builder); err != nil { + if err := task.Run(ctx, version, w.builder, work.State); err != nil { return err } cr.Inc() @@ -471,7 +471,7 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w return err } - // update the + // update the status cr.Complete() return nil } @@ -541,6 +541,7 @@ func (r *consistentReporter) Complete() { metricPayload.WithLabelValues(r.version, "applied").Set(float64(r.done)) copied := r.status copied.Completed = r.completed + 1 + copied.Initial = false copied.Reconciling = true copied.Fraction = 1 r.reporter.Report(copied) diff --git a/pkg/payload/payload.go b/pkg/payload/payload.go index 1d9345f72..333ca719c 100644 --- a/pkg/payload/payload.go +++ b/pkg/payload/payload.go @@ -21,6 +21,61 @@ import ( "github.com/openshift/cluster-version-operator/lib/resourceread" ) +// State describes the state of the payload and alters +// how a payload is applied. +type State int + +const ( + // UpdatingPayload indicates we are moving from one state to + // another. + // + // When we are moving to a different payload version, we want to + // be as conservative as possible about ordering of the payload + // and the errors we might encounter. An error in one operator + // should prevent dependent operators from changing. We are + // willing to take longer to roll out an update if it reduces + // the possibility of error. + UpdatingPayload State = iota + // ReconcilingPayload indicates we are attempting to maintain + // our current state. + // + // When the payload has already been applied to the cluster, we + // prioritize ensuring resources are recreated and don't need to + // progress in strict order. We also attempt to reset as many + // resources as possible back to their desired state and report + // errors after the fact. + ReconcilingPayload + // InitializingPayload indicates we are establishing our first + // state. + // + // When we are deploying a payload for the first time we want + // to make progress quickly but in a predictable order to + // minimize retries and crash-loops. We wait for operators + // to report level but tolerate degraded and transient errors. + // Our goal is to get the entire payload created, even if some + // operators are still converging. + InitializingPayload +) + +// Initializing is true if the state is InitializingPayload. +func (s State) Initializing() bool { return s == InitializingPayload } + +// Reconciling is true if the state is ReconcilingPayload. +func (s State) Reconciling() bool { return s == ReconcilingPayload } + +func (s State) String() string { + switch s { + case ReconcilingPayload: + return "Reconciling" + case UpdatingPayload: + return "Updating" + case InitializingPayload: + return "Initializing" + default: + panic(fmt.Sprintf("unrecognized state %d", int(s))) + } +} + const ( DefaultPayloadDir = "/" diff --git a/pkg/payload/task.go b/pkg/payload/task.go index 3089c0258..c28a66abe 100644 --- a/pkg/payload/task.go +++ b/pkg/payload/task.go @@ -31,7 +31,7 @@ func init() { // ResourceBuilder abstracts how a manifest is created on the server. Introduced for testing. type ResourceBuilder interface { - Apply(context.Context, *lib.Manifest) error + Apply(context.Context, *lib.Manifest, State) error } type Task struct { @@ -50,11 +50,11 @@ func (st *Task) String() string { return fmt.Sprintf("%s \"%s/%s\" (%d of %d)", strings.ToLower(st.Manifest.GVK.Kind), ns, st.Manifest.Object().GetName(), st.Index, st.Total) } -func (st *Task) Run(ctx context.Context, version string, builder ResourceBuilder) error { +func (st *Task) Run(ctx context.Context, version string, builder ResourceBuilder, state State) error { var lastErr error if err := wait.ExponentialBackoff(st.Backoff, func() (bool, error) { // run builder for the manifest - if err := builder.Apply(ctx, st.Manifest); err != nil { + if err := builder.Apply(ctx, st.Manifest, state); err != nil { utilruntime.HandleError(errors.Wrapf(err, "error running apply for %s", st)) lastErr = err metricPayloadErrors.WithLabelValues(version).Inc()