Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions lib/resourcebuilder/apiext.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ func newCRDBuilder(config *rest.Config, m lib.Manifest) Interface {
}
}

func (b *crdBuilder) WithMode(m Mode) Interface {
return b
}

func (b *crdBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface {
b.modifier = f
return b
Expand Down
4 changes: 4 additions & 0 deletions lib/resourcebuilder/apireg.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ func newAPIServiceBuilder(config *rest.Config, m lib.Manifest) Interface {
}
}

func (b *apiServiceBuilder) WithMode(m Mode) Interface {
return b
}

func (b *apiServiceBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface {
b.modifier = f
return b
Expand Down
8 changes: 8 additions & 0 deletions lib/resourcebuilder/apps.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ func newDeploymentBuilder(config *rest.Config, m lib.Manifest) Interface {
}
}

func (b *deploymentBuilder) WithMode(m Mode) Interface {
return b
}

func (b *deploymentBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface {
b.modifier = f
return b
Expand Down Expand Up @@ -91,6 +95,10 @@ func newDaemonsetBuilder(config *rest.Config, m lib.Manifest) Interface {
}
}

func (b *daemonsetBuilder) WithMode(m Mode) Interface {
return b
}

func (b *daemonsetBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface {
b.modifier = f
return b
Expand Down
4 changes: 4 additions & 0 deletions lib/resourcebuilder/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ func newJobBuilder(config *rest.Config, m lib.Manifest) Interface {
}
}

func (b *jobBuilder) WithMode(m Mode) Interface {
return b
}

func (b *jobBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface {
b.modifier = f
return b
Expand Down
16 changes: 16 additions & 0 deletions lib/resourcebuilder/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ func newServiceAccountBuilder(config *rest.Config, m lib.Manifest) Interface {
}
}

func (b *serviceAccountBuilder) WithMode(m Mode) Interface {
return b
}

func (b *serviceAccountBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface {
b.modifier = f
return b
Expand Down Expand Up @@ -50,6 +54,10 @@ func newConfigMapBuilder(config *rest.Config, m lib.Manifest) Interface {
}
}

func (b *configMapBuilder) WithMode(m Mode) Interface {
return b
}

func (b *configMapBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface {
b.modifier = f
return b
Expand Down Expand Up @@ -77,6 +85,10 @@ func newNamespaceBuilder(config *rest.Config, m lib.Manifest) Interface {
}
}

func (b *namespaceBuilder) WithMode(m Mode) Interface {
return b
}

func (b *namespaceBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface {
b.modifier = f
return b
Expand Down Expand Up @@ -104,6 +116,10 @@ func newServiceBuilder(config *rest.Config, m lib.Manifest) Interface {
}
}

func (b *serviceBuilder) WithMode(m Mode) Interface {
return b
}

func (b *serviceBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface {
b.modifier = f
return b
Expand Down
10 changes: 10 additions & 0 deletions lib/resourcebuilder/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,18 @@ type MetaV1ObjectModifierFunc func(metav1.Object)
// and the Manifest.
type NewInteraceFunc func(rest *rest.Config, m lib.Manifest) Interface

// Mode is how this builder is being used.
type Mode int

const (
UpdatingMode Mode = iota
ReconcilingMode
InitializingMode
)

type Interface interface {
WithModifier(MetaV1ObjectModifierFunc) Interface
WithMode(Mode) Interface
Do(context.Context) error
}

Expand Down
16 changes: 16 additions & 0 deletions lib/resourcebuilder/rbac.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ func newClusterRoleBuilder(config *rest.Config, m lib.Manifest) Interface {
}
}

func (b *clusterRoleBuilder) WithMode(m Mode) Interface {
return b
}

func (b *clusterRoleBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface {
b.modifier = f
return b
Expand Down Expand Up @@ -50,6 +54,10 @@ func newClusterRoleBindingBuilder(config *rest.Config, m lib.Manifest) Interface
}
}

func (b *clusterRoleBindingBuilder) WithMode(m Mode) Interface {
return b
}

func (b *clusterRoleBindingBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface {
b.modifier = f
return b
Expand Down Expand Up @@ -77,6 +85,10 @@ func newRoleBuilder(config *rest.Config, m lib.Manifest) Interface {
}
}

func (b *roleBuilder) WithMode(m Mode) Interface {
return b
}

func (b *roleBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface {
b.modifier = f
return b
Expand Down Expand Up @@ -104,6 +116,10 @@ func newRoleBindingBuilder(config *rest.Config, m lib.Manifest) Interface {
}
}

func (b *roleBindingBuilder) WithMode(m Mode) Interface {
return b
}

func (b *roleBindingBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface {
b.modifier = f
return b
Expand Down
4 changes: 4 additions & 0 deletions lib/resourcebuilder/security.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ func newSecurityBuilder(config *rest.Config, m lib.Manifest) Interface {
}
}

func (b *securityBuilder) WithMode(m Mode) Interface {
return b
}

func (b *securityBuilder) WithModifier(f MetaV1ObjectModifierFunc) Interface {
b.modifier = f
return b
Expand Down
68 changes: 57 additions & 11 deletions pkg/cvo/cvo.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/openshift/cluster-version-operator/lib"
"github.com/openshift/cluster-version-operator/lib/resourceapply"
"github.com/openshift/cluster-version-operator/lib/resourcebuilder"
"github.com/openshift/cluster-version-operator/lib/resourcemerge"
"github.com/openshift/cluster-version-operator/lib/validation"
"github.com/openshift/cluster-version-operator/pkg/cvo/internal"
"github.com/openshift/cluster-version-operator/pkg/cvo/internal/dynamicclient"
Expand Down Expand Up @@ -157,13 +156,13 @@ func New(
kubeClient: kubeClient,
eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: namespace}),

queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "clusterversion"),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "clusterversion"),
availableUpdatesQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "availableupdates"),
}

optr.configSync = NewSyncWorker(
optr.defaultPayloadRetriever(),
NewResourceBuilder(optr.restConfig),
NewResourceBuilder(optr.restConfig, coInformer.Lister()),
minimumInterval,
wait.Backoff{
Duration: time.Second * 10,
Expand Down Expand Up @@ -219,7 +218,7 @@ func (optr *Operator) Run(workers int, stopCh <-chan struct{}) {

// start the config sync loop, and have it notify the queue when new status is detected
go runThrottledStatusNotifier(stopCh, optr.statusInterval, 2, optr.configSync.StatusCh(), func() { optr.queue.Add(optr.queueKey()) })
go optr.configSync.Start(8, stopCh)
go optr.configSync.Start(16, stopCh)

go wait.Until(func() { optr.worker(optr.queue, optr.sync) }, time.Second, stopCh)
go wait.Until(func() { optr.worker(optr.availableUpdatesQueue, optr.availableUpdatesSync) }, time.Second, stopCh)
Expand Down Expand Up @@ -334,10 +333,19 @@ func (optr *Operator) sync(key string) error {
}, errs)
}

// identify an initial state to inform the sync loop of
var state payload.State
switch {
case hasNeverReachedLevel(config):
state = payload.InitializingPayload
case hasReachedLevel(config, desired):
state = payload.ReconcilingPayload
default:
state = payload.UpdatingPayload
}

// inform the config sync loop about our desired state
reconciling := resourcemerge.IsOperatorStatusConditionTrue(config.Status.Conditions, configv1.OperatorAvailable) &&
resourcemerge.IsOperatorStatusConditionFalse(config.Status.Conditions, configv1.OperatorProgressing)
status := optr.configSync.Update(config.Generation, desired, config.Spec.Overrides, reconciling)
status := optr.configSync.Update(config.Generation, desired, config.Spec.Overrides, state)

// write cluster version status
return optr.syncStatus(original, config, status, errs)
Expand Down Expand Up @@ -462,14 +470,19 @@ func (optr *Operator) SetSyncWorkerForTesting(worker ConfigSyncWorker) {
type resourceBuilder struct {
config *rest.Config
modifier resourcebuilder.MetaV1ObjectModifierFunc

clusterOperators internal.ClusterOperatorsGetter
}

// NewResourceBuilder creates the default resource builder implementation.
func NewResourceBuilder(config *rest.Config) payload.ResourceBuilder {
return &resourceBuilder{config: config}
func NewResourceBuilder(config *rest.Config, clusterOperators configlistersv1.ClusterOperatorLister) payload.ResourceBuilder {
return &resourceBuilder{config: config, clusterOperators: clusterOperators}
}

func (b *resourceBuilder) BuilderFor(m *lib.Manifest) (resourcebuilder.Interface, error) {
if b.clusterOperators != nil && m.GVK == configv1.SchemeGroupVersion.WithKind("ClusterOperator") {
return internal.NewClusterOperatorBuilder(b.clusterOperators, *m), nil
}
if resourcebuilder.Mapper.Exists(m.GVK) {
return resourcebuilder.New(resourcebuilder.Mapper, b.config, *m)
}
Expand All @@ -480,13 +493,46 @@ func (b *resourceBuilder) BuilderFor(m *lib.Manifest) (resourcebuilder.Interface
return internal.NewGenericBuilder(client, *m)
}

func (b *resourceBuilder) Apply(ctx context.Context, m *lib.Manifest) error {
func (b *resourceBuilder) Apply(ctx context.Context, m *lib.Manifest, state payload.State) error {
builder, err := b.BuilderFor(m)
if err != nil {
return err
}
if b.modifier != nil {
builder = builder.WithModifier(b.modifier)
}
return builder.Do(ctx)
return builder.WithMode(stateToMode(state)).Do(ctx)
}

func stateToMode(state payload.State) resourcebuilder.Mode {
switch state {
case payload.InitializingPayload:
return resourcebuilder.InitializingMode
case payload.UpdatingPayload:
return resourcebuilder.UpdatingMode
case payload.ReconcilingPayload:
return resourcebuilder.ReconcilingMode
default:
panic(fmt.Sprintf("unexpected payload state %d", int(state)))
}
}

func hasNeverReachedLevel(cv *configv1.ClusterVersion) bool {
for _, version := range cv.Status.History {
if version.State == configv1.CompletedUpdate {
return false
}
}
// TODO: check the payload, just in case
return true
}

func hasReachedLevel(cv *configv1.ClusterVersion, desired configv1.Update) bool {
if len(cv.Status.History) == 0 {
return false
}
if cv.Status.History[0].State != configv1.CompletedUpdate {
return false
}
return desired.Image == cv.Status.History[0].Image
}
8 changes: 6 additions & 2 deletions pkg/cvo/cvo_scenarios_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,23 +206,27 @@ func TestCVO_StartupAndSync(t *testing.T) {
})
verifyAllStatus(t, worker.StatusCh(),
SyncWorkerStatus{
Step: "RetrievePayload",
Step: "RetrievePayload",
Initial: true,
// the desired version is briefly incorrect (user provided) until we retrieve the image
Actual: configv1.Update{Version: "4.0.1", Image: "image/image:1"},
},
SyncWorkerStatus{
Step: "ApplyResources",
Initial: true,
VersionHash: "6GC9TkkG9PA=",
Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"},
},
SyncWorkerStatus{
Fraction: float32(1) / 3,
Step: "ApplyResources",
Initial: true,
VersionHash: "6GC9TkkG9PA=",
Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"},
},
SyncWorkerStatus{
Fraction: float32(2) / 3,
Initial: true,
Step: "ApplyResources",
VersionHash: "6GC9TkkG9PA=",
Actual: configv1.Update{Version: "1.0.0-abc", Image: "image/image:1"},
Expand Down Expand Up @@ -706,6 +710,6 @@ func (b *blockingResourceBuilder) Send(err error) {
b.ch <- err
}

func (b *blockingResourceBuilder) Apply(ctx context.Context, m *lib.Manifest) error {
func (b *blockingResourceBuilder) Apply(ctx context.Context, m *lib.Manifest, state payload.State) error {
return <-b.ch
}
4 changes: 4 additions & 0 deletions pkg/cvo/internal/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func NewGenericBuilder(client dynamic.ResourceInterface, m lib.Manifest) (resour
}, nil
}

func (b *genericBuilder) WithMode(m resourcebuilder.Mode) resourcebuilder.Interface {
return b
}

func (b *genericBuilder) WithModifier(f resourcebuilder.MetaV1ObjectModifierFunc) resourcebuilder.Interface {
b.modifier = f
return b
Expand Down
Loading