From 2a78a6cedd73f5f0bbfbeb8778e23d66490d7ea7 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Thu, 7 Mar 2019 20:52:41 -0500 Subject: [PATCH 1/3] sync: Distinguish between reconcile, initial, and sync states Identify what sort of change we are making to the cluster in order to better handle the different states. Does not change status reporting or the internal sync which will be in a follow up commit. The three states are: * Initializing - we haven't yet completed a payload * Updating - we're moving between two payloads of different versions * Reconciling - we're level on a completed payload The states will allow us to more efficiently reconcile, start faster, and be safer during upgrades. --- pkg/cvo/cvo.go | 38 ++++++++-- pkg/cvo/cvo_scenarios_test.go | 129 +++++++++++++++++++++++++++++++++- pkg/cvo/sync_test.go | 2 +- pkg/cvo/sync_worker.go | 45 ++++++------ pkg/payload/payload.go | 55 +++++++++++++++ 5 files changed, 240 insertions(+), 29 deletions(-) diff --git a/pkg/cvo/cvo.go b/pkg/cvo/cvo.go index 1feedc81b..a3c618c17 100644 --- a/pkg/cvo/cvo.go +++ b/pkg/cvo/cvo.go @@ -31,7 +31,6 @@ import ( "github.com/openshift/cluster-version-operator/lib" "github.com/openshift/cluster-version-operator/lib/resourceapply" "github.com/openshift/cluster-version-operator/lib/resourcebuilder" - "github.com/openshift/cluster-version-operator/lib/resourcemerge" "github.com/openshift/cluster-version-operator/lib/validation" "github.com/openshift/cluster-version-operator/pkg/cvo/internal" "github.com/openshift/cluster-version-operator/pkg/cvo/internal/dynamicclient" @@ -157,7 +156,7 @@ func New( kubeClient: kubeClient, eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: namespace}), - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "clusterversion"), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "clusterversion"), availableUpdatesQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "availableupdates"), } @@ -334,10 +333,19 @@ func (optr *Operator) sync(key string) error { }, errs) } + // identify an initial state to inform the sync loop of + var state payload.State + switch { + case hasNeverReachedLevel(config): + state = payload.InitializingPayload + case hasReachedLevel(config, desired): + state = payload.ReconcilingPayload + default: + state = payload.UpdatingPayload + } + // inform the config sync loop about our desired state - reconciling := resourcemerge.IsOperatorStatusConditionTrue(config.Status.Conditions, configv1.OperatorAvailable) && - resourcemerge.IsOperatorStatusConditionFalse(config.Status.Conditions, configv1.OperatorProgressing) - status := optr.configSync.Update(config.Generation, desired, config.Spec.Overrides, reconciling) + status := optr.configSync.Update(config.Generation, desired, config.Spec.Overrides, state) // write cluster version status return optr.syncStatus(original, config, status, errs) @@ -495,3 +503,23 @@ func (b *resourceBuilder) Apply(ctx context.Context, m *lib.Manifest) error { } return builder.Do(ctx) } + +func hasNeverReachedLevel(cv *configv1.ClusterVersion) bool { + for _, version := range cv.Status.History { + if version.State == configv1.CompletedUpdate { + return false + } + } + // TODO: check the payload, just in case + return true +} + +func hasReachedLevel(cv *configv1.ClusterVersion, desired configv1.Update) bool { + if len(cv.Status.History) == 0 { + return false + } + if cv.Status.History[0].State != configv1.CompletedUpdate { + return false + } + return desired.Image == cv.Status.History[0].Image +} diff --git a/pkg/cvo/cvo_scenarios_test.go b/pkg/cvo/cvo_scenarios_test.go index 8517da386..0d14e8230 100644 --- a/pkg/cvo/cvo_scenarios_test.go +++ b/pkg/cvo/cvo_scenarios_test.go @@ -206,23 +206,27 @@ func TestCVO_StartupAndSync(t *testing.T) { }) verifyAllStatus(t, worker.StatusCh(), SyncWorkerStatus{ - Step: "RetrievePayload", + Step: "RetrievePayload", + Initial: true, // the desired version is briefly incorrect (user provided) until we retrieve the image Actual: configv1.Update{Version: "4.0.1", Image: "image/image:1"}, }, SyncWorkerStatus{ Step: "ApplyResources", + Initial: true, VersionHash: "6GC9TkkG9PA=", Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, }, SyncWorkerStatus{ Fraction: float32(1) / 3, Step: "ApplyResources", + Initial: true, VersionHash: "6GC9TkkG9PA=", Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, }, SyncWorkerStatus{ Fraction: float32(2) / 3, + Initial: true, Step: "ApplyResources", VersionHash: "6GC9TkkG9PA=", Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"}, @@ -382,6 +386,9 @@ func TestCVO_RestartAndReconcile(t *testing.T) { if status := worker.Status(); !status.Reconciling || status.Completed != 0 { t.Fatalf("The worker should be reconciling from the beginning: %#v", status) } + if worker.work.State != payload.ReconcilingPayload { + t.Fatalf("The worker should be reconciling: %v", worker.work) + } // Step 2: Start the sync worker and verify the sequence of events, and then verify // the status does not change @@ -536,6 +543,9 @@ func TestCVO_ErrorDuringReconcile(t *testing.T) { if status := worker.Status(); !status.Reconciling || status.Completed != 0 { t.Fatalf("The worker should be reconciling from the beginning: %#v", status) } + if worker.work.State != payload.ReconcilingPayload { + t.Fatalf("The worker should be reconciling: %v", worker.work) + } // Step 2: Start the sync worker and verify the sequence of events // @@ -658,6 +668,123 @@ func TestCVO_ErrorDuringReconcile(t *testing.T) { }) } +func TestCVO_VerifyInitializingPayloadState(t *testing.T) { + o, cvs, client, _, shutdownFn := setupCVOTest() + stopCh := make(chan struct{}) + defer close(stopCh) + defer shutdownFn() + worker := o.configSync.(*SyncWorker) + b := newBlockingResourceBuilder() + worker.builder = b + + // Setup: a successful sync from a previous run, and the operator at the same image as before + // + o.releaseImage = "image/image:1" + o.releaseVersion = "1.0.0-abc" + desired := configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"} + uid, _ := uuid.NewRandom() + clusterUID := configv1.ClusterID(uid.String()) + cvs["version"] = &configv1.ClusterVersion{ + ObjectMeta: metav1.ObjectMeta{ + Name: "version", + ResourceVersion: "1", + }, + Spec: configv1.ClusterVersionSpec{ + ClusterID: clusterUID, + Channel: "fast", + }, + Status: configv1.ClusterVersionStatus{ + // Prefers the image version over the operator's version (although in general they will remain in sync) + Desired: desired, + VersionHash: "6GC9TkkG9PA=", + History: []configv1.UpdateHistory{ + {State: configv1.PartialUpdate, Image: "image/image:1", Version: "1.0.0-abc", StartedTime: defaultStartedTime}, + }, + Conditions: []configv1.ClusterOperatorStatusCondition{ + {Type: configv1.OperatorAvailable, Status: configv1.ConditionTrue, Message: "Done applying 1.0.0-abc"}, + {Type: configv1.OperatorFailing, Status: configv1.ConditionFalse}, + {Type: configv1.OperatorProgressing, Status: configv1.ConditionFalse, Message: "Cluster version is 1.0.0-abc"}, + {Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse}, + }, + }, + } + + // Step 1: The sync loop starts and triggers a sync, but does not update status + // + client.ClearActions() + err := o.sync(o.queueKey()) + if err != nil { + t.Fatal(err) + } + + // check the worker status is initially set to reconciling + if status := worker.Status(); status.Reconciling || status.Completed != 0 { + t.Fatalf("The worker should be initializing from the beginning: %#v", status) + } + if worker.work.State != payload.InitializingPayload { + t.Fatalf("The worker should be initializing: %v", worker.work) + } +} + +func TestCVO_VerifyUpdatingPayloadState(t *testing.T) { + o, cvs, client, _, shutdownFn := setupCVOTest() + stopCh := make(chan struct{}) + defer close(stopCh) + defer shutdownFn() + worker := o.configSync.(*SyncWorker) + b := newBlockingResourceBuilder() + worker.builder = b + + // Setup: a successful sync from a previous run, and the operator at the same image as before + // + o.releaseImage = "image/image:1" + o.releaseVersion = "1.0.0-abc" + desired := configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"} + uid, _ := uuid.NewRandom() + clusterUID := configv1.ClusterID(uid.String()) + cvs["version"] = &configv1.ClusterVersion{ + ObjectMeta: metav1.ObjectMeta{ + Name: "version", + ResourceVersion: "1", + }, + Spec: configv1.ClusterVersionSpec{ + ClusterID: clusterUID, + Channel: "fast", + }, + Status: configv1.ClusterVersionStatus{ + // Prefers the image version over the operator's version (although in general they will remain in sync) + Desired: desired, + VersionHash: "6GC9TkkG9PA=", + History: []configv1.UpdateHistory{ + {State: configv1.PartialUpdate, Image: "image/image:1", Version: "1.0.0-abc", StartedTime: defaultStartedTime}, + {State: configv1.CompletedUpdate, Image: "image/image:0", Version: "1.0.0-abc.0", StartedTime: defaultStartedTime, CompletionTime: &defaultCompletionTime}, + }, + Conditions: []configv1.ClusterOperatorStatusCondition{ + {Type: configv1.OperatorAvailable, Status: configv1.ConditionTrue, Message: "Done applying 1.0.0-abc"}, + {Type: configv1.OperatorFailing, Status: configv1.ConditionFalse}, + {Type: configv1.OperatorProgressing, Status: configv1.ConditionFalse, Message: "Cluster version is 1.0.0-abc"}, + {Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse}, + }, + }, + } + + // Step 1: The sync loop starts and triggers a sync, but does not update status + // + client.ClearActions() + err := o.sync(o.queueKey()) + if err != nil { + t.Fatal(err) + } + + // check the worker status is initially set to reconciling + if status := worker.Status(); status.Reconciling || status.Completed != 0 { + t.Fatalf("The worker should be updating from the beginning: %#v", status) + } + if worker.work.State != payload.UpdatingPayload { + t.Fatalf("The worker should be updating: %v", worker.work) + } +} + func verifyAllStatus(t *testing.T, ch <-chan SyncWorkerStatus, items ...SyncWorkerStatus) { t.Helper() if len(items) == 0 { diff --git a/pkg/cvo/sync_test.go b/pkg/cvo/sync_test.go index 71db691ec..182ce68c3 100644 --- a/pkg/cvo/sync_test.go +++ b/pkg/cvo/sync_test.go @@ -353,7 +353,7 @@ func (r *fakeSyncRecorder) StatusCh() <-chan SyncWorkerStatus { func (r *fakeSyncRecorder) Start(maxWorkers int, stopCh <-chan struct{}) {} -func (r *fakeSyncRecorder) Update(generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, reconciling bool) *SyncWorkerStatus { +func (r *fakeSyncRecorder) Update(generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, state payload.State) *SyncWorkerStatus { r.Updates = append(r.Updates, desired) return r.Returns } diff --git a/pkg/cvo/sync_worker.go b/pkg/cvo/sync_worker.go index 5d40bbceb..378f08c52 100644 --- a/pkg/cvo/sync_worker.go +++ b/pkg/cvo/sync_worker.go @@ -25,7 +25,7 @@ import ( // ConfigSyncWorker abstracts how the image is synchronized to the server. Introduced for testing. type ConfigSyncWorker interface { Start(maxWorkers int, stopCh <-chan struct{}) - Update(generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, reconciling bool) *SyncWorkerStatus + Update(generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, state payload.State) *SyncWorkerStatus StatusCh() <-chan SyncWorkerStatus } @@ -41,11 +41,11 @@ type StatusReporter interface { // SyncWork represents the work that should be done in a sync iteration. type SyncWork struct { - Generation int64 - Desired configv1.Update - Overrides []configv1.ComponentOverride - Reconciling bool - Completed int + Generation int64 + Desired configv1.Update + Overrides []configv1.ComponentOverride + State payload.State + Completed int } // Empty returns true if the image is empty for this work. @@ -64,6 +64,7 @@ type SyncWorkerStatus struct { Completed int Reconciling bool + Initial bool VersionHash string Actual configv1.Update @@ -147,7 +148,7 @@ func (w *SyncWorker) StatusCh() <-chan SyncWorkerStatus { // the initial state or whatever the last recorded status was. // TODO: in the future it may be desirable for changes that alter desired to wait briefly before returning, // giving the sync loop the opportunity to observe our change and begin working towards it. -func (w *SyncWorker) Update(generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, reconciling bool) *SyncWorkerStatus { +func (w *SyncWorker) Update(generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, state payload.State) *SyncWorkerStatus { w.lock.Lock() defer w.lock.Unlock() @@ -164,12 +165,10 @@ func (w *SyncWorker) Update(generation int64, desired configv1.Update, overrides // initialize the reconciliation flag and the status the first time // update is invoked if w.work == nil { - if reconciling { - work.Reconciling = true - } + work.State = state w.status = SyncWorkerStatus{ Generation: generation, - Reconciling: work.Reconciling, + Reconciling: state.Reconciling(), Actual: work.Desired, } } @@ -202,7 +201,7 @@ func (w *SyncWorker) Start(maxWorkers int, stopCh <-chan struct{}) { var next <-chan time.Time for { - waitingToReconcile := work.Reconciling + waitingToReconcile := work.State == payload.ReconcilingPayload select { case <-stopCh: glog.V(5).Infof("Stopped worker") @@ -261,7 +260,7 @@ func (w *SyncWorker) Start(maxWorkers int, stopCh <-chan struct{}) { glog.V(5).Infof("Sync succeeded, reconciling") work.Completed++ - work.Reconciling = true + work.State = payload.ReconcilingPayload next = time.After(w.minimumReconcileInterval) } }, 10*time.Millisecond, stopCh) @@ -302,14 +301,14 @@ func (w *SyncWorker) calculateNext(work *SyncWork) bool { // if this is the first time through the loop, initialize reconciling to // the state Update() calculated (to allow us to start in reconciling) if work.Empty() { - work.Reconciling = w.work.Reconciling + work.State = w.work.State } else { if changed { - work.Reconciling = false + work.State = payload.UpdatingPayload } } // always clear the completed variable if we are not reconciling - if !work.Reconciling { + if work.State != payload.ReconcilingPayload { work.Completed = 0 } @@ -378,22 +377,22 @@ func (w *SyncWorker) Status() *SyncWorkerStatus { // the update could not be completely applied. The status is updated as we progress. // Cancelling the context will abort the execution of the sync. func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers int, reporter StatusReporter) error { - glog.V(4).Infof("Running sync %s on generation %d", versionString(work.Desired), work.Generation) + glog.V(4).Infof("Running sync %s on generation %d in state %s", versionString(work.Desired), work.Generation, work.State) update := work.Desired // cache the payload until the release image changes validPayload := w.payload if validPayload == nil || !equalUpdate(configv1.Update{Image: validPayload.ReleaseImage}, update) { glog.V(4).Infof("Loading payload") - reporter.Report(SyncWorkerStatus{Step: "RetrievePayload", Reconciling: work.Reconciling, Actual: update}) + reporter.Report(SyncWorkerStatus{Step: "RetrievePayload", Initial: work.State.Initializing(), Reconciling: work.State.Reconciling(), Actual: update}) payloadDir, err := w.retriever.RetrievePayload(ctx, update) if err != nil { - reporter.Report(SyncWorkerStatus{Failure: err, Step: "RetrievePayload", Reconciling: work.Reconciling, Actual: update}) + reporter.Report(SyncWorkerStatus{Failure: err, Step: "RetrievePayload", Initial: work.State.Initializing(), Reconciling: work.State.Reconciling(), Actual: update}) return err } payloadUpdate, err := payload.LoadUpdate(payloadDir, update.Image) if err != nil { - reporter.Report(SyncWorkerStatus{Failure: err, Step: "VerifyPayload", Reconciling: work.Reconciling, Actual: update}) + reporter.Report(SyncWorkerStatus{Failure: err, Step: "VerifyPayload", Initial: work.State.Initializing(), Reconciling: work.State.Reconciling(), Actual: update}) return err } w.payload = payloadUpdate @@ -418,7 +417,8 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w cr := &consistentReporter{ status: SyncWorkerStatus{ Generation: work.Generation, - Reconciling: work.Reconciling, + Initial: work.State.Initializing(), + Reconciling: work.State.Reconciling(), VersionHash: payloadUpdate.ManifestHash, Actual: update, }, @@ -471,7 +471,7 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w return err } - // update the + // update the status cr.Complete() return nil } @@ -541,6 +541,7 @@ func (r *consistentReporter) Complete() { metricPayload.WithLabelValues(r.version, "applied").Set(float64(r.done)) copied := r.status copied.Completed = r.completed + 1 + copied.Initial = false copied.Reconciling = true copied.Fraction = 1 r.reporter.Report(copied) diff --git a/pkg/payload/payload.go b/pkg/payload/payload.go index 1d9345f72..333ca719c 100644 --- a/pkg/payload/payload.go +++ b/pkg/payload/payload.go @@ -21,6 +21,61 @@ import ( "github.com/openshift/cluster-version-operator/lib/resourceread" ) +// State describes the state of the payload and alters +// how a payload is applied. +type State int + +const ( + // UpdatingPayload indicates we are moving from one state to + // another. + // + // When we are moving to a different payload version, we want to + // be as conservative as possible about ordering of the payload + // and the errors we might encounter. An error in one operator + // should prevent dependent operators from changing. We are + // willing to take longer to roll out an update if it reduces + // the possibility of error. + UpdatingPayload State = iota + // ReconcilingPayload indicates we are attempting to maintain + // our current state. + // + // When the payload has already been applied to the cluster, we + // prioritize ensuring resources are recreated and don't need to + // progress in strict order. We also attempt to reset as many + // resources as possible back to their desired state and report + // errors after the fact. + ReconcilingPayload + // InitializingPayload indicates we are establishing our first + // state. + // + // When we are deploying a payload for the first time we want + // to make progress quickly but in a predictable order to + // minimize retries and crash-loops. We wait for operators + // to report level but tolerate degraded and transient errors. + // Our goal is to get the entire payload created, even if some + // operators are still converging. + InitializingPayload +) + +// Initializing is true if the state is InitializingPayload. +func (s State) Initializing() bool { return s == InitializingPayload } + +// Reconciling is true if the state is ReconcilingPayload. +func (s State) Reconciling() bool { return s == ReconcilingPayload } + +func (s State) String() string { + switch s { + case ReconcilingPayload: + return "Reconciling" + case UpdatingPayload: + return "Updating" + case InitializingPayload: + return "Initializing" + default: + panic(fmt.Sprintf("unrecognized state %d", int(s))) + } +} + const ( DefaultPayloadDir = "/" From a7c69835d0db1ee2443a8d369c2134b0fb58a245 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Mon, 11 Mar 2019 18:11:01 -0400 Subject: [PATCH 2/3] resourcebuilder: Make resourcebuilder aware of the sync mode Will allow different behavior based on whether we are reconciling, installing, or upgrading. --- lib/resourcebuilder/apiext.go | 4 ++++ lib/resourcebuilder/apireg.go | 4 ++++ lib/resourcebuilder/apps.go | 8 ++++++++ lib/resourcebuilder/batch.go | 4 ++++ lib/resourcebuilder/core.go | 16 ++++++++++++++++ lib/resourcebuilder/interface.go | 10 ++++++++++ lib/resourcebuilder/rbac.go | 16 ++++++++++++++++ lib/resourcebuilder/security.go | 4 ++++ pkg/cvo/cvo.go | 17 +++++++++++++++-- pkg/cvo/cvo_scenarios_test.go | 2 +- pkg/cvo/internal/generic.go | 4 ++++ pkg/cvo/internal/operatorstatus.go | 10 ++++++++-- pkg/cvo/internal/operatorstatus_test.go | 5 +++-- pkg/cvo/sync_test.go | 8 +++++++- pkg/cvo/sync_worker.go | 2 +- pkg/payload/task.go | 6 +++--- 16 files changed, 108 insertions(+), 12 deletions(-) diff --git a/lib/resourcebuilder/apiext.go b/lib/resourcebuilder/apiext.go index e345ea276..8ab83d268 100644 --- a/lib/resourcebuilder/apiext.go +++ b/lib/resourcebuilder/apiext.go @@ -30,6 +30,10 @@ func newCRDBuilder(config *rest.Config, m lib.Manifest) Interface { } } +func (b *crdBuilder) WithMode(m Mode) Interface { + return b +} + func (b *crdBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b diff --git a/lib/resourcebuilder/apireg.go b/lib/resourcebuilder/apireg.go index da428fd22..cda4d3cee 100644 --- a/lib/resourcebuilder/apireg.go +++ b/lib/resourcebuilder/apireg.go @@ -23,6 +23,10 @@ func newAPIServiceBuilder(config *rest.Config, m lib.Manifest) Interface { } } +func (b *apiServiceBuilder) WithMode(m Mode) Interface { + return b +} + func (b *apiServiceBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b diff --git a/lib/resourcebuilder/apps.go b/lib/resourcebuilder/apps.go index f086f6479..79afb9ffb 100644 --- a/lib/resourcebuilder/apps.go +++ b/lib/resourcebuilder/apps.go @@ -31,6 +31,10 @@ func newDeploymentBuilder(config *rest.Config, m lib.Manifest) Interface { } } +func (b *deploymentBuilder) WithMode(m Mode) Interface { + return b +} + func (b *deploymentBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b @@ -91,6 +95,10 @@ func newDaemonsetBuilder(config *rest.Config, m lib.Manifest) Interface { } } +func (b *daemonsetBuilder) WithMode(m Mode) Interface { + return b +} + func (b *daemonsetBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b diff --git a/lib/resourcebuilder/batch.go b/lib/resourcebuilder/batch.go index 15f922e1f..843215caa 100644 --- a/lib/resourcebuilder/batch.go +++ b/lib/resourcebuilder/batch.go @@ -30,6 +30,10 @@ func newJobBuilder(config *rest.Config, m lib.Manifest) Interface { } } +func (b *jobBuilder) WithMode(m Mode) Interface { + return b +} + func (b *jobBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b diff --git a/lib/resourcebuilder/core.go b/lib/resourcebuilder/core.go index f121a66e5..f8f03f0db 100644 --- a/lib/resourcebuilder/core.go +++ b/lib/resourcebuilder/core.go @@ -23,6 +23,10 @@ func newServiceAccountBuilder(config *rest.Config, m lib.Manifest) Interface { } } +func (b *serviceAccountBuilder) WithMode(m Mode) Interface { + return b +} + func (b *serviceAccountBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b @@ -50,6 +54,10 @@ func newConfigMapBuilder(config *rest.Config, m lib.Manifest) Interface { } } +func (b *configMapBuilder) WithMode(m Mode) Interface { + return b +} + func (b *configMapBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b @@ -77,6 +85,10 @@ func newNamespaceBuilder(config *rest.Config, m lib.Manifest) Interface { } } +func (b *namespaceBuilder) WithMode(m Mode) Interface { + return b +} + func (b *namespaceBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b @@ -104,6 +116,10 @@ func newServiceBuilder(config *rest.Config, m lib.Manifest) Interface { } } +func (b *serviceBuilder) WithMode(m Mode) Interface { + return b +} + func (b *serviceBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b diff --git a/lib/resourcebuilder/interface.go b/lib/resourcebuilder/interface.go index c3665f43d..a22c8a142 100644 --- a/lib/resourcebuilder/interface.go +++ b/lib/resourcebuilder/interface.go @@ -63,8 +63,18 @@ type MetaV1ObjectModifierFunc func(metav1.Object) // and the Manifest. type NewInteraceFunc func(rest *rest.Config, m lib.Manifest) Interface +// Mode is how this builder is being used. +type Mode int + +const ( + UpdatingMode Mode = iota + ReconcilingMode + InitializingMode +) + type Interface interface { WithModifier(MetaV1ObjectModifierFunc) Interface + WithMode(Mode) Interface Do(context.Context) error } diff --git a/lib/resourcebuilder/rbac.go b/lib/resourcebuilder/rbac.go index 098726be7..b3a0d57a4 100644 --- a/lib/resourcebuilder/rbac.go +++ b/lib/resourcebuilder/rbac.go @@ -23,6 +23,10 @@ func newClusterRoleBuilder(config *rest.Config, m lib.Manifest) Interface { } } +func (b *clusterRoleBuilder) WithMode(m Mode) Interface { + return b +} + func (b *clusterRoleBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b @@ -50,6 +54,10 @@ func newClusterRoleBindingBuilder(config *rest.Config, m lib.Manifest) Interface } } +func (b *clusterRoleBindingBuilder) WithMode(m Mode) Interface { + return b +} + func (b *clusterRoleBindingBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b @@ -77,6 +85,10 @@ func newRoleBuilder(config *rest.Config, m lib.Manifest) Interface { } } +func (b *roleBuilder) WithMode(m Mode) Interface { + return b +} + func (b *roleBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b @@ -104,6 +116,10 @@ func newRoleBindingBuilder(config *rest.Config, m lib.Manifest) Interface { } } +func (b *roleBindingBuilder) WithMode(m Mode) Interface { + return b +} + func (b *roleBindingBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b diff --git a/lib/resourcebuilder/security.go b/lib/resourcebuilder/security.go index f89e2dde2..91e2a11b1 100644 --- a/lib/resourcebuilder/security.go +++ b/lib/resourcebuilder/security.go @@ -23,6 +23,10 @@ func newSecurityBuilder(config *rest.Config, m lib.Manifest) Interface { } } +func (b *securityBuilder) WithMode(m Mode) Interface { + return b +} + func (b *securityBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface { b.modifier = f return b diff --git a/pkg/cvo/cvo.go b/pkg/cvo/cvo.go index a3c618c17..56e60e9a6 100644 --- a/pkg/cvo/cvo.go +++ b/pkg/cvo/cvo.go @@ -493,7 +493,7 @@ func (b *resourceBuilder) BuilderFor(m *lib.Manifest) (resourcebuilder.Interface return internal.NewGenericBuilder(client, *m) } -func (b *resourceBuilder) Apply(ctx context.Context, m *lib.Manifest) error { +func (b *resourceBuilder) Apply(ctx context.Context, m *lib.Manifest, state payload.State) error { builder, err := b.BuilderFor(m) if err != nil { return err @@ -501,7 +501,20 @@ func (b *resourceBuilder) Apply(ctx context.Context, m *lib.Manifest) error { if b.modifier != nil { builder = builder.WithModifier(b.modifier) } - return builder.Do(ctx) + return builder.WithMode(stateToMode(state)).Do(ctx) +} + +func stateToMode(state payload.State) resourcebuilder.Mode { + switch state { + case payload.InitializingPayload: + return resourcebuilder.InitializingMode + case payload.UpdatingPayload: + return resourcebuilder.UpdatingMode + case payload.ReconcilingPayload: + return resourcebuilder.ReconcilingMode + default: + panic(fmt.Sprintf("unexpected payload state %d", int(state))) + } } func hasNeverReachedLevel(cv *configv1.ClusterVersion) bool { diff --git a/pkg/cvo/cvo_scenarios_test.go b/pkg/cvo/cvo_scenarios_test.go index 0d14e8230..5d51359b8 100644 --- a/pkg/cvo/cvo_scenarios_test.go +++ b/pkg/cvo/cvo_scenarios_test.go @@ -833,6 +833,6 @@ func (b *blockingResourceBuilder) Send(err error) { b.ch <- err } -func (b *blockingResourceBuilder) Apply(ctx context.Context, m *lib.Manifest) error { +func (b *blockingResourceBuilder) Apply(ctx context.Context, m *lib.Manifest, state payload.State) error { return <-b.ch } diff --git a/pkg/cvo/internal/generic.go b/pkg/cvo/internal/generic.go index efb41c1f8..df2c02d03 100644 --- a/pkg/cvo/internal/generic.go +++ b/pkg/cvo/internal/generic.go @@ -73,6 +73,10 @@ func NewGenericBuilder(client dynamic.ResourceInterface, m lib.Manifest) (resour }, nil } +func (b *genericBuilder) WithMode(m resourcebuilder.Mode) resourcebuilder.Interface { + return b +} + func (b *genericBuilder) WithModifier(f resourcebuilder.MetaV1ObjectModifierFunc) resourcebuilder.Interface { b.modifier = f return b diff --git a/pkg/cvo/internal/operatorstatus.go b/pkg/cvo/internal/operatorstatus.go index 0549af1ca..69cbd2dce 100644 --- a/pkg/cvo/internal/operatorstatus.go +++ b/pkg/cvo/internal/operatorstatus.go @@ -53,6 +53,7 @@ type clusterOperatorBuilder struct { client ClusterOperatorsGetter raw []byte modifier resourcebuilder.MetaV1ObjectModifierFunc + mode resourcebuilder.Mode } func newClusterOperatorBuilder(config *rest.Config, m lib.Manifest) resourcebuilder.Interface { @@ -83,6 +84,11 @@ func NewClusterOperatorBuilder(client ClusterOperatorsGetter, m lib.Manifest) re } } +func (b *clusterOperatorBuilder) WithMode(m resourcebuilder.Mode) resourcebuilder.Interface { + b.mode = m + return b +} + func (b *clusterOperatorBuilder) WithModifier(f resourcebuilder.MetaV1ObjectModifierFunc) resourcebuilder.Interface { b.modifier = f return b @@ -95,10 +101,10 @@ func (b *clusterOperatorBuilder) Do(ctx context.Context) error { } ctxWithTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute) defer cancel() - return waitForOperatorStatusToBeDone(ctxWithTimeout, 1*time.Second, b.client, os) + return waitForOperatorStatusToBeDone(ctxWithTimeout, 1*time.Second, b.client, os, b.mode) } -func waitForOperatorStatusToBeDone(ctx context.Context, interval time.Duration, client ClusterOperatorsGetter, expected *configv1.ClusterOperator) error { +func waitForOperatorStatusToBeDone(ctx context.Context, interval time.Duration, client ClusterOperatorsGetter, expected *configv1.ClusterOperator, mode resourcebuilder.Mode) error { var lastErr error err := wait.PollImmediateUntil(interval, func() (bool, error) { actual, err := client.Get(expected.Name) diff --git a/pkg/cvo/internal/operatorstatus_test.go b/pkg/cvo/internal/operatorstatus_test.go index a3ed5efd2..086fce6c9 100644 --- a/pkg/cvo/internal/operatorstatus_test.go +++ b/pkg/cvo/internal/operatorstatus_test.go @@ -16,6 +16,7 @@ import ( configv1 "github.com/openshift/api/config/v1" "github.com/openshift/client-go/config/clientset/versioned/fake" + "github.com/openshift/cluster-version-operator/lib/resourcebuilder" "github.com/openshift/cluster-version-operator/pkg/payload" ) @@ -23,7 +24,7 @@ func Test_waitForOperatorStatusToBeDone(t *testing.T) { tests := []struct { name string actual *configv1.ClusterOperator - + mode resourcebuilder.Mode exp *configv1.ClusterOperator expErr error }{{ @@ -478,7 +479,7 @@ func Test_waitForOperatorStatusToBeDone(t *testing.T) { ctxWithTimeout, cancel := context.WithTimeout(context.TODO(), 1*time.Millisecond) defer cancel() - err := waitForOperatorStatusToBeDone(ctxWithTimeout, 1*time.Millisecond, clientClusterOperatorsGetter{getter: client.ConfigV1().ClusterOperators()}, test.exp) + err := waitForOperatorStatusToBeDone(ctxWithTimeout, 1*time.Millisecond, clientClusterOperatorsGetter{getter: client.ConfigV1().ClusterOperators()}, test.exp, test.mode) if (test.expErr == nil) != (err == nil) { t.Fatalf("unexpected error: %v", err) } diff --git a/pkg/cvo/sync_test.go b/pkg/cvo/sync_test.go index 182ce68c3..7fdb052d0 100644 --- a/pkg/cvo/sync_test.go +++ b/pkg/cvo/sync_test.go @@ -300,10 +300,16 @@ type testBuilder struct { *recorder reactors map[action]error modifiers []resourcebuilder.MetaV1ObjectModifierFunc + mode resourcebuilder.Mode m *lib.Manifest } +func (t *testBuilder) WithMode(m resourcebuilder.Mode) resourcebuilder.Interface { + t.mode = m + return t +} + func (t *testBuilder) WithModifier(m resourcebuilder.MetaV1ObjectModifierFunc) resourcebuilder.Interface { t.modifiers = append(t.modifiers, m) return t @@ -392,7 +398,7 @@ type testResourceBuilder struct { modifiers []resourcebuilder.MetaV1ObjectModifierFunc } -func (b *testResourceBuilder) Apply(ctx context.Context, m *lib.Manifest) error { +func (b *testResourceBuilder) Apply(ctx context.Context, m *lib.Manifest, state payload.State) error { ns := m.Object().GetNamespace() fakeGVR := schema.GroupVersionResource{Group: m.GVK.Group, Version: m.GVK.Version, Resource: strings.ToLower(m.GVK.Kind)} client := b.client.Resource(fakeGVR).Namespace(ns) diff --git a/pkg/cvo/sync_worker.go b/pkg/cvo/sync_worker.go index 378f08c52..2d0874464 100644 --- a/pkg/cvo/sync_worker.go +++ b/pkg/cvo/sync_worker.go @@ -458,7 +458,7 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w continue } - if err := task.Run(ctx, version, w.builder); err != nil { + if err := task.Run(ctx, version, w.builder, work.State); err != nil { return err } cr.Inc() diff --git a/pkg/payload/task.go b/pkg/payload/task.go index 3089c0258..c28a66abe 100644 --- a/pkg/payload/task.go +++ b/pkg/payload/task.go @@ -31,7 +31,7 @@ func init() { // ResourceBuilder abstracts how a manifest is created on the server. Introduced for testing. type ResourceBuilder interface { - Apply(context.Context, *lib.Manifest) error + Apply(context.Context, *lib.Manifest, State) error } type Task struct { @@ -50,11 +50,11 @@ func (st *Task) String() string { return fmt.Sprintf("%s \"%s/%s\" (%d of %d)", strings.ToLower(st.Manifest.GVK.Kind), ns, st.Manifest.Object().GetName(), st.Index, st.Total) } -func (st *Task) Run(ctx context.Context, version string, builder ResourceBuilder) error { +func (st *Task) Run(ctx context.Context, version string, builder ResourceBuilder, state State) error { var lastErr error if err := wait.ExponentialBackoff(st.Backoff, func() (bool, error) { // run builder for the manifest - if err := builder.Apply(ctx, st.Manifest); err != nil { + if err := builder.Apply(ctx, st.Manifest, state); err != nil { utilruntime.HandleError(errors.Wrapf(err, "error running apply for %s", st)) lastErr = err metricPayloadErrors.WithLabelValues(version).Inc() From 6971c2bce79327fd51045fe6726c5d9c4524aaed Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Tue, 12 Mar 2019 21:09:06 -0400 Subject: [PATCH 3/3] status: Guarantee we always keep the last completed status Currently history pruning could lose the last completed entry. To preserve the user's last successful outcome, always end the history with a completed entry if one exists. This also prevents the initializing state from being detected if the user triggers a lot of failed history changes. --- pkg/cvo/status.go | 21 +++++++++-- pkg/cvo/status_test.go | 81 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+), 3 deletions(-) diff --git a/pkg/cvo/status.go b/pkg/cvo/status.go index f7c8d952d..2199c7106 100644 --- a/pkg/cvo/status.go +++ b/pkg/cvo/status.go @@ -71,9 +71,7 @@ func mergeOperatorHistory(config *configv1.ClusterVersion, desired configv1.Upda last = &config.Status.History[0] } - if len(config.Status.History) > 10 { - config.Status.History = config.Status.History[:10] - } + pruneStatusHistory(config, 10) if completed { last.State = configv1.CompletedUpdate @@ -88,6 +86,23 @@ func mergeOperatorHistory(config *configv1.ClusterVersion, desired configv1.Upda config.Status.Desired = desired } +func pruneStatusHistory(config *configv1.ClusterVersion, maxHistory int) { + if len(config.Status.History) <= maxHistory { + return + } + for i, item := range config.Status.History { + if item.State != configv1.CompletedUpdate { + continue + } + // guarantee the last position in the history is always a completed item + if i >= maxHistory { + config.Status.History[maxHistory-1] = item + } + break + } + config.Status.History = config.Status.History[:maxHistory] +} + // ClusterVersionInvalid indicates that the cluster version has an error that prevents the server from // taking action. The cluster version operator will only reconcile the current state as long as this // condition is set. diff --git a/pkg/cvo/status_test.go b/pkg/cvo/status_test.go index 4c8435653..5e5fcf835 100644 --- a/pkg/cvo/status_test.go +++ b/pkg/cvo/status_test.go @@ -1,9 +1,11 @@ package cvo import ( + "reflect" "testing" configv1 "github.com/openshift/api/config/v1" + "k8s.io/apimachinery/pkg/util/diff" ) func Test_mergeEqualVersions(t *testing.T) { @@ -39,3 +41,82 @@ func Test_mergeEqualVersions(t *testing.T) { }) } } + +func Test_pruneStatusHistory(t *testing.T) { + obj := &configv1.ClusterVersion{ + Status: configv1.ClusterVersionStatus{ + History: []configv1.UpdateHistory{ + {State: configv1.PartialUpdate, Version: "0.0.10"}, + {State: configv1.PartialUpdate, Version: "0.0.9"}, + {State: configv1.PartialUpdate, Version: "0.0.8"}, + {State: configv1.CompletedUpdate, Version: "0.0.7"}, + {State: configv1.PartialUpdate, Version: "0.0.6"}, + }, + }, + } + tests := []struct { + name string + config *configv1.ClusterVersion + maxHistory int + want []configv1.UpdateHistory + }{ + { + config: obj.DeepCopy(), + maxHistory: 2, + want: []configv1.UpdateHistory{ + {State: configv1.PartialUpdate, Version: "0.0.10"}, + {State: configv1.CompletedUpdate, Version: "0.0.7"}, + }, + }, + { + config: obj.DeepCopy(), + maxHistory: 3, + want: []configv1.UpdateHistory{ + {State: configv1.PartialUpdate, Version: "0.0.10"}, + {State: configv1.PartialUpdate, Version: "0.0.9"}, + {State: configv1.CompletedUpdate, Version: "0.0.7"}, + }, + }, + { + config: obj.DeepCopy(), + maxHistory: 4, + want: []configv1.UpdateHistory{ + {State: configv1.PartialUpdate, Version: "0.0.10"}, + {State: configv1.PartialUpdate, Version: "0.0.9"}, + {State: configv1.PartialUpdate, Version: "0.0.8"}, + {State: configv1.CompletedUpdate, Version: "0.0.7"}, + }, + }, + { + config: obj.DeepCopy(), + maxHistory: 5, + want: []configv1.UpdateHistory{ + {State: configv1.PartialUpdate, Version: "0.0.10"}, + {State: configv1.PartialUpdate, Version: "0.0.9"}, + {State: configv1.PartialUpdate, Version: "0.0.8"}, + {State: configv1.CompletedUpdate, Version: "0.0.7"}, + {State: configv1.PartialUpdate, Version: "0.0.6"}, + }, + }, + { + config: obj.DeepCopy(), + maxHistory: 6, + want: []configv1.UpdateHistory{ + {State: configv1.PartialUpdate, Version: "0.0.10"}, + {State: configv1.PartialUpdate, Version: "0.0.9"}, + {State: configv1.PartialUpdate, Version: "0.0.8"}, + {State: configv1.CompletedUpdate, Version: "0.0.7"}, + {State: configv1.PartialUpdate, Version: "0.0.6"}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + config := tt.config.DeepCopy() + pruneStatusHistory(config, tt.maxHistory) + if !reflect.DeepEqual(tt.want, config.Status.History) { + t.Fatalf("%s", diff.ObjectReflectDiff(tt.want, config.Status.History)) + } + }) + } +}