diff --git a/pkg/cvo/cvo.go b/pkg/cvo/cvo.go index 42e58a2b3..1d33030e7 100644 --- a/pkg/cvo/cvo.go +++ b/pkg/cvo/cvo.go @@ -163,7 +163,7 @@ func New( optr.configSync = NewSyncWorker( optr.defaultPayloadRetriever(), - NewResourceBuilder(optr.restConfig), + NewResourceBuilder(optr.restConfig, coInformer.Lister()), minimumInterval, wait.Backoff{ Duration: time.Second * 10, @@ -462,14 +462,19 @@ func (optr *Operator) SetSyncWorkerForTesting(worker ConfigSyncWorker) { type resourceBuilder struct { config *rest.Config modifier resourcebuilder.MetaV1ObjectModifierFunc + + clusterOperators internal.ClusterOperatorsGetter } // NewResourceBuilder creates the default resource builder implementation. -func NewResourceBuilder(config *rest.Config) payload.ResourceBuilder { - return &resourceBuilder{config: config} +func NewResourceBuilder(config *rest.Config, clusterOperators configlistersv1.ClusterOperatorLister) payload.ResourceBuilder { + return &resourceBuilder{config: config, clusterOperators: clusterOperators} } func (b *resourceBuilder) BuilderFor(m *lib.Manifest) (resourcebuilder.Interface, error) { + if b.clusterOperators != nil && m.GVK == configv1.SchemeGroupVersion.WithKind("ClusterOperator") { + return internal.NewClusterOperatorBuilder(b.clusterOperators, *m), nil + } if resourcebuilder.Mapper.Exists(m.GVK) { return resourcebuilder.New(resourcebuilder.Mapper, b.config, *m) } diff --git a/pkg/cvo/internal/operatorstatus.go b/pkg/cvo/internal/operatorstatus.go index a1e0f5047..0549af1ca 100644 --- a/pkg/cvo/internal/operatorstatus.go +++ b/pkg/cvo/internal/operatorstatus.go @@ -50,14 +50,35 @@ func readClusterOperatorV1OrDie(objBytes []byte) *configv1.ClusterOperator { } type clusterOperatorBuilder struct { - client configclientv1.ConfigV1Interface + client ClusterOperatorsGetter raw []byte modifier resourcebuilder.MetaV1ObjectModifierFunc } func newClusterOperatorBuilder(config *rest.Config, m lib.Manifest) resourcebuilder.Interface { + return NewClusterOperatorBuilder(clientClusterOperatorsGetter{ + getter: configclientv1.NewForConfigOrDie(config).ClusterOperators(), + }, m) +} + +// ClusterOperatorsGetter abstracts object access with a client or a cache lister. +type ClusterOperatorsGetter interface { + Get(name string) (*configv1.ClusterOperator, error) +} + +type clientClusterOperatorsGetter struct { + getter configclientv1.ClusterOperatorInterface +} + +func (g clientClusterOperatorsGetter) Get(name string) (*configv1.ClusterOperator, error) { + return g.getter.Get(name, metav1.GetOptions{}) +} + +// NewClusterOperatorBuilder accepts the ClusterOperatorsGetter interface which may be implemented by a +// client or a lister cache. +func NewClusterOperatorBuilder(client ClusterOperatorsGetter, m lib.Manifest) resourcebuilder.Interface { return &clusterOperatorBuilder{ - client: configclientv1.NewForConfigOrDie(config), + client: client, raw: m.Raw, } } @@ -77,10 +98,10 @@ func (b *clusterOperatorBuilder) Do(ctx context.Context) error { return waitForOperatorStatusToBeDone(ctxWithTimeout, 1*time.Second, b.client, os) } -func waitForOperatorStatusToBeDone(ctx context.Context, interval time.Duration, client configclientv1.ClusterOperatorsGetter, expected *configv1.ClusterOperator) error { +func waitForOperatorStatusToBeDone(ctx context.Context, interval time.Duration, client ClusterOperatorsGetter, expected *configv1.ClusterOperator) error { var lastErr error err := wait.PollImmediateUntil(interval, func() (bool, error) { - actual, err := client.ClusterOperators().Get(expected.Name, metav1.GetOptions{}) + actual, err := client.Get(expected.Name) if err != nil { lastErr = &payload.UpdateError{ Nested: err, diff --git a/pkg/cvo/internal/operatorstatus_test.go b/pkg/cvo/internal/operatorstatus_test.go index 9e1740279..a3ed5efd2 100644 --- a/pkg/cvo/internal/operatorstatus_test.go +++ b/pkg/cvo/internal/operatorstatus_test.go @@ -478,15 +478,12 @@ func Test_waitForOperatorStatusToBeDone(t *testing.T) { ctxWithTimeout, cancel := context.WithTimeout(context.TODO(), 1*time.Millisecond) defer cancel() - err := waitForOperatorStatusToBeDone(ctxWithTimeout, 1*time.Millisecond, client.ConfigV1(), test.exp) - if test.expErr == nil { - if err != nil { - t.Fatalf("expected nil error, got: %v", err) - } - } else { - if !reflect.DeepEqual(test.expErr, err) { - t.Fatalf("unexpected: %s", diff.ObjectReflectDiff(test.expErr, err)) - } + err := waitForOperatorStatusToBeDone(ctxWithTimeout, 1*time.Millisecond, clientClusterOperatorsGetter{getter: client.ConfigV1().ClusterOperators()}, test.exp) + if (test.expErr == nil) != (err == nil) { + t.Fatalf("unexpected error: %v", err) + } + if !reflect.DeepEqual(test.expErr, err) { + t.Fatalf("unexpected: %s", diff.ObjectReflectDiff(test.expErr, err)) } }) } diff --git a/pkg/cvo/sync_test.go b/pkg/cvo/sync_test.go index 003c98981..71db691ec 100644 --- a/pkg/cvo/sync_test.go +++ b/pkg/cvo/sync_test.go @@ -121,7 +121,7 @@ func Test_SyncWorker_apply(t *testing.T) { worker := &SyncWorker{} worker.backoff.Steps = 3 - worker.builder = NewResourceBuilder(nil) + worker.builder = NewResourceBuilder(nil, nil) ctx := context.Background() worker.apply(ctx, up, &SyncWork{}, 1, &statusWrapper{w: worker, previousStatus: worker.Status()}) test.check(t, r.actions) diff --git a/pkg/start/start_integration_test.go b/pkg/start/start_integration_test.go index 5c3f82128..e11b4e39c 100644 --- a/pkg/start/start_integration_test.go +++ b/pkg/start/start_integration_test.go @@ -230,7 +230,7 @@ func TestIntegrationCVO_initializeAndUpgrade(t *testing.T) { options.EnableMetrics = false controllers := options.NewControllerContext(cb) - worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg), 5*time.Second, wait.Backoff{Steps: 3}).(*cvo.SyncWorker) + worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}).(*cvo.SyncWorker) controllers.CVO.SetSyncWorkerForTesting(worker) stopCh := make(chan struct{}) @@ -381,7 +381,7 @@ func TestIntegrationCVO_initializeAndHandleError(t *testing.T) { options.EnableMetrics = false controllers := options.NewControllerContext(cb) - worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg), 5*time.Second, wait.Backoff{Steps: 3}).(*cvo.SyncWorker) + worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}).(*cvo.SyncWorker) controllers.CVO.SetSyncWorkerForTesting(worker) stopCh := make(chan struct{}) @@ -487,7 +487,7 @@ func TestIntegrationCVO_gracefulStepDown(t *testing.T) { options.EnableMetrics = false controllers := options.NewControllerContext(cb) - worker := cvo.NewSyncWorker(&mapPayloadRetriever{}, cvo.NewResourceBuilder(cfg), 5*time.Second, wait.Backoff{Steps: 3}).(*cvo.SyncWorker) + worker := cvo.NewSyncWorker(&mapPayloadRetriever{}, cvo.NewResourceBuilder(cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}).(*cvo.SyncWorker) controllers.CVO.SetSyncWorkerForTesting(worker) lock, err := createResourceLock(cb, ns, ns) @@ -657,7 +657,7 @@ metadata: options.EnableMetrics = false controllers := options.NewControllerContext(cb) - worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg), 5*time.Second, wait.Backoff{Steps: 3}).(*cvo.SyncWorker) + worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}).(*cvo.SyncWorker) controllers.CVO.SetSyncWorkerForTesting(worker) stopCh := make(chan struct{})