Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions pkg/cvo/cvo.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func New(

optr.configSync = NewSyncWorker(
optr.defaultPayloadRetriever(),
NewResourceBuilder(optr.restConfig),
NewResourceBuilder(optr.restConfig, coInformer.Lister()),
minimumInterval,
wait.Backoff{
Duration: time.Second * 10,
Expand Down Expand Up @@ -462,14 +462,19 @@ func (optr *Operator) SetSyncWorkerForTesting(worker ConfigSyncWorker) {
type resourceBuilder struct {
config *rest.Config
modifier resourcebuilder.MetaV1ObjectModifierFunc

clusterOperators internal.ClusterOperatorsGetter
}

// NewResourceBuilder creates the default resource builder implementation.
func NewResourceBuilder(config *rest.Config) payload.ResourceBuilder {
return &resourceBuilder{config: config}
func NewResourceBuilder(config *rest.Config, clusterOperators configlistersv1.ClusterOperatorLister) payload.ResourceBuilder {
return &resourceBuilder{config: config, clusterOperators: clusterOperators}
}

func (b *resourceBuilder) BuilderFor(m *lib.Manifest) (resourcebuilder.Interface, error) {
if b.clusterOperators != nil && m.GVK == configv1.SchemeGroupVersion.WithKind("ClusterOperator") {
return internal.NewClusterOperatorBuilder(b.clusterOperators, *m), nil
}
if resourcebuilder.Mapper.Exists(m.GVK) {
return resourcebuilder.New(resourcebuilder.Mapper, b.config, *m)
}
Expand Down
29 changes: 25 additions & 4 deletions pkg/cvo/internal/operatorstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,35 @@ func readClusterOperatorV1OrDie(objBytes []byte) *configv1.ClusterOperator {
}

type clusterOperatorBuilder struct {
client configclientv1.ConfigV1Interface
client ClusterOperatorsGetter
raw []byte
modifier resourcebuilder.MetaV1ObjectModifierFunc
}

func newClusterOperatorBuilder(config *rest.Config, m lib.Manifest) resourcebuilder.Interface {
return NewClusterOperatorBuilder(clientClusterOperatorsGetter{
getter: configclientv1.NewForConfigOrDie(config).ClusterOperators(),
}, m)
}

// ClusterOperatorsGetter abstracts object access with a client or a cache lister.
type ClusterOperatorsGetter interface {
Get(name string) (*configv1.ClusterOperator, error)
}

type clientClusterOperatorsGetter struct {
getter configclientv1.ClusterOperatorInterface
}

func (g clientClusterOperatorsGetter) Get(name string) (*configv1.ClusterOperator, error) {
return g.getter.Get(name, metav1.GetOptions{})
}

// NewClusterOperatorBuilder accepts the ClusterOperatorsGetter interface which may be implemented by a
// client or a lister cache.
func NewClusterOperatorBuilder(client ClusterOperatorsGetter, m lib.Manifest) resourcebuilder.Interface {
return &clusterOperatorBuilder{
client: configclientv1.NewForConfigOrDie(config),
client: client,
raw: m.Raw,
}
}
Expand All @@ -77,10 +98,10 @@ func (b *clusterOperatorBuilder) Do(ctx context.Context) error {
return waitForOperatorStatusToBeDone(ctxWithTimeout, 1*time.Second, b.client, os)
}

func waitForOperatorStatusToBeDone(ctx context.Context, interval time.Duration, client configclientv1.ClusterOperatorsGetter, expected *configv1.ClusterOperator) error {
func waitForOperatorStatusToBeDone(ctx context.Context, interval time.Duration, client ClusterOperatorsGetter, expected *configv1.ClusterOperator) error {
var lastErr error
err := wait.PollImmediateUntil(interval, func() (bool, error) {
actual, err := client.ClusterOperators().Get(expected.Name, metav1.GetOptions{})
actual, err := client.Get(expected.Name)
if err != nil {
lastErr = &payload.UpdateError{
Nested: err,
Expand Down
15 changes: 6 additions & 9 deletions pkg/cvo/internal/operatorstatus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,15 +478,12 @@ func Test_waitForOperatorStatusToBeDone(t *testing.T) {

ctxWithTimeout, cancel := context.WithTimeout(context.TODO(), 1*time.Millisecond)
defer cancel()
err := waitForOperatorStatusToBeDone(ctxWithTimeout, 1*time.Millisecond, client.ConfigV1(), test.exp)
if test.expErr == nil {
if err != nil {
t.Fatalf("expected nil error, got: %v", err)
}
} else {
if !reflect.DeepEqual(test.expErr, err) {
t.Fatalf("unexpected: %s", diff.ObjectReflectDiff(test.expErr, err))
}
err := waitForOperatorStatusToBeDone(ctxWithTimeout, 1*time.Millisecond, clientClusterOperatorsGetter{getter: client.ConfigV1().ClusterOperators()}, test.exp)
if (test.expErr == nil) != (err == nil) {
t.Fatalf("unexpected error: %v", err)
}
if !reflect.DeepEqual(test.expErr, err) {
t.Fatalf("unexpected: %s", diff.ObjectReflectDiff(test.expErr, err))
}
})
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cvo/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func Test_SyncWorker_apply(t *testing.T) {

worker := &SyncWorker{}
worker.backoff.Steps = 3
worker.builder = NewResourceBuilder(nil)
worker.builder = NewResourceBuilder(nil, nil)
ctx := context.Background()
worker.apply(ctx, up, &SyncWork{}, 1, &statusWrapper{w: worker, previousStatus: worker.Status()})
test.check(t, r.actions)
Expand Down
8 changes: 4 additions & 4 deletions pkg/start/start_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func TestIntegrationCVO_initializeAndUpgrade(t *testing.T) {
options.EnableMetrics = false
controllers := options.NewControllerContext(cb)

worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg), 5*time.Second, wait.Backoff{Steps: 3}).(*cvo.SyncWorker)
worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}).(*cvo.SyncWorker)
controllers.CVO.SetSyncWorkerForTesting(worker)

stopCh := make(chan struct{})
Expand Down Expand Up @@ -381,7 +381,7 @@ func TestIntegrationCVO_initializeAndHandleError(t *testing.T) {
options.EnableMetrics = false
controllers := options.NewControllerContext(cb)

worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg), 5*time.Second, wait.Backoff{Steps: 3}).(*cvo.SyncWorker)
worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}).(*cvo.SyncWorker)
controllers.CVO.SetSyncWorkerForTesting(worker)

stopCh := make(chan struct{})
Expand Down Expand Up @@ -487,7 +487,7 @@ func TestIntegrationCVO_gracefulStepDown(t *testing.T) {
options.EnableMetrics = false
controllers := options.NewControllerContext(cb)

worker := cvo.NewSyncWorker(&mapPayloadRetriever{}, cvo.NewResourceBuilder(cfg), 5*time.Second, wait.Backoff{Steps: 3}).(*cvo.SyncWorker)
worker := cvo.NewSyncWorker(&mapPayloadRetriever{}, cvo.NewResourceBuilder(cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}).(*cvo.SyncWorker)
controllers.CVO.SetSyncWorkerForTesting(worker)

lock, err := createResourceLock(cb, ns, ns)
Expand Down Expand Up @@ -657,7 +657,7 @@ metadata:
options.EnableMetrics = false
controllers := options.NewControllerContext(cb)

worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg), 5*time.Second, wait.Backoff{Steps: 3}).(*cvo.SyncWorker)
worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}).(*cvo.SyncWorker)
controllers.CVO.SetSyncWorkerForTesting(worker)

stopCh := make(chan struct{})
Expand Down