Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Context cancellation for operator
This adds context cancellation to the functions in the operator.

This favors explicitly passing the context as the first parameter so it
is apparent that the function is cancellable.
  • Loading branch information
jkyros committed Jun 15, 2022
commit f697d4496b00ffbd8ecfe1b8f5481de052941b0b
30 changes: 15 additions & 15 deletions pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type Operator struct {
eventRecorder record.EventRecorder
libgoRecorder events.Recorder

syncHandler func(ic string) error
syncHandler func(ctx context.Context, ic string) error

crdLister apiextlistersv1.CustomResourceDefinitionLister
mcpLister mcfglistersv1.MachineConfigPoolLister
Expand Down Expand Up @@ -109,7 +109,7 @@ type Operator struct {
// queue only ever has one item, but it has nice error handling backoff/retry semantics
queue workqueue.RateLimitingInterface

stopCh <-chan struct{}
ctx context.Context

renderConfig *renderConfig
}
Expand Down Expand Up @@ -226,12 +226,12 @@ func New(
}

// Run runs the machine config operator.
func (optr *Operator) Run(workers int, stopCh <-chan struct{}) {
func (optr *Operator) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
defer optr.queue.ShutDown()

apiClient := optr.apiExtClient.ApiextensionsV1()
_, err := apiClient.CustomResourceDefinitions().Get(context.TODO(), "controllerconfigs.machineconfiguration.openshift.io", metav1.GetOptions{})
_, err := apiClient.CustomResourceDefinitions().Get(ctx, "controllerconfigs.machineconfiguration.openshift.io", metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
glog.Infof("Couldn't find controllerconfig CRD, in cluster bringup mode")
Expand All @@ -241,7 +241,7 @@ func (optr *Operator) Run(workers int, stopCh <-chan struct{}) {
}
}

if !cache.WaitForCacheSync(stopCh,
if !cache.WaitForCacheSync(ctx.Done(),
optr.crdListerSynced,
optr.deployListerSynced,
optr.daemonsetListerSynced,
Expand All @@ -265,7 +265,7 @@ func (optr *Operator) Run(workers int, stopCh <-chan struct{}) {

// these can only be synced after CRDs are installed
if !optr.inClusterBringup {
if !cache.WaitForCacheSync(stopCh,
if !cache.WaitForCacheSync(ctx.Done(),
optr.ccListerSynced,
) {
glog.Error("failed to sync caches")
Expand All @@ -276,13 +276,13 @@ func (optr *Operator) Run(workers int, stopCh <-chan struct{}) {
glog.Info("Starting MachineConfigOperator")
defer glog.Info("Shutting down MachineConfigOperator")

optr.stopCh = stopCh
optr.ctx = ctx

for i := 0; i < workers; i++ {
go wait.Until(optr.worker, time.Second, stopCh)
go wait.UntilWithContext(ctx, optr.worker, time.Second)
}

<-stopCh
<-ctx.Done()
}

func (optr *Operator) enqueue(obj interface{}) {
Expand All @@ -309,19 +309,19 @@ func (optr *Operator) eventHandler() cache.ResourceEventHandler {
}
}

func (optr *Operator) worker() {
for optr.processNextWorkItem() {
func (optr *Operator) worker(ctx context.Context) {
for optr.processNextWorkItem(ctx) {
}
}

func (optr *Operator) processNextWorkItem() bool {
func (optr *Operator) processNextWorkItem(ctx context.Context) bool {
key, quit := optr.queue.Get()
if quit {
return false
}
defer optr.queue.Done(key)

err := optr.syncHandler(key.(string))
err := optr.syncHandler(ctx, key.(string))
optr.handleErr(err, key)

return true
Expand All @@ -345,7 +345,7 @@ func (optr *Operator) handleErr(err error, key interface{}) {
optr.queue.AddAfter(key, 1*time.Minute)
}

func (optr *Operator) sync(key string) error {
func (optr *Operator) sync(ctx context.Context, key string) error {
startTime := time.Now()
glog.V(4).Infof("Started syncing operator %q (%v)", key, startTime)
defer func() {
Expand All @@ -365,5 +365,5 @@ func (optr *Operator) sync(key string) error {
// this check must always run last since it makes sure the pools are in sync/upgrading correctly
{"RequiredPools", optr.syncRequiredMachineConfigPools},
}
return optr.syncAll(syncFuncs)
return optr.syncAll(ctx, syncFuncs)
}
72 changes: 36 additions & 36 deletions pkg/operator/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
)

// syncVersion handles reporting the version to the clusteroperator
func (optr *Operator) syncVersion() error {
co, err := optr.fetchClusterOperator()
func (optr *Operator) syncVersion(ctx context.Context) error {
co, err := optr.fetchClusterOperator(ctx)
if err != nil {
return err
}
Expand All @@ -50,13 +50,13 @@ func (optr *Operator) syncVersion() error {
co.Status.Versions = optr.vStore.GetAll()
// TODO(runcom): abstract below with updateStatus
optr.setOperatorStatusExtension(&co.Status, nil)
_, err = optr.configClient.ConfigV1().ClusterOperators().UpdateStatus(context.TODO(), co, metav1.UpdateOptions{})
_, err = optr.configClient.ConfigV1().ClusterOperators().UpdateStatus(ctx, co, metav1.UpdateOptions{})
return err
}

// syncRelatedObjects handles reporting the relatedObjects to the clusteroperator
func (optr *Operator) syncRelatedObjects() error {
co, err := optr.fetchClusterOperator()
func (optr *Operator) syncRelatedObjects(ctx context.Context) error {
co, err := optr.fetchClusterOperator(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -84,16 +84,16 @@ func (optr *Operator) syncRelatedObjects() error {
}

if !equality.Semantic.DeepEqual(coCopy.Status.RelatedObjects, co.Status.RelatedObjects) {
_, err := optr.configClient.ConfigV1().ClusterOperators().UpdateStatus(context.TODO(), co, metav1.UpdateOptions{})
_, err := optr.configClient.ConfigV1().ClusterOperators().UpdateStatus(ctx, co, metav1.UpdateOptions{})
return err
}

return nil
}

// syncAvailableStatus applies the new condition to the mco's ClusterOperator object.
func (optr *Operator) syncAvailableStatus(ierr syncError) error {
co, err := optr.fetchClusterOperator()
func (optr *Operator) syncAvailableStatus(ctx context.Context, ierr syncError) error {
co, err := optr.fetchClusterOperator(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -126,12 +126,12 @@ func (optr *Operator) syncAvailableStatus(ierr syncError) error {
Message: message,
}

return optr.updateStatus(co, coStatus)
return optr.updateStatus(ctx, co, coStatus)
}

// syncProgressingStatus applies the new condition to the mco's ClusterOperator object.
func (optr *Operator) syncProgressingStatus() error {
co, err := optr.fetchClusterOperator()
func (optr *Operator) syncProgressingStatus(ctx context.Context) error {
co, err := optr.fetchClusterOperator(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -170,22 +170,22 @@ func (optr *Operator) syncProgressingStatus() error {
coStatus.Status = configv1.ConditionTrue
}

return optr.updateStatus(co, coStatus)
return optr.updateStatus(ctx, co, coStatus)
}

func (optr *Operator) updateStatus(co *configv1.ClusterOperator, status configv1.ClusterOperatorStatusCondition) error {
func (optr *Operator) updateStatus(ctx context.Context, co *configv1.ClusterOperator, status configv1.ClusterOperatorStatusCondition) error {
cov1helpers.SetStatusCondition(&co.Status.Conditions, status)
optr.setOperatorStatusExtension(&co.Status, nil)
_, err := optr.configClient.ConfigV1().ClusterOperators().UpdateStatus(context.TODO(), co, metav1.UpdateOptions{})
_, err := optr.configClient.ConfigV1().ClusterOperators().UpdateStatus(ctx, co, metav1.UpdateOptions{})
return err
}

const (
asExpectedReason = "AsExpected"
)

func (optr *Operator) clearDegradedStatus(task string) error {
co, err := optr.fetchClusterOperator()
func (optr *Operator) clearDegradedStatus(ctx context.Context, task string) error {
co, err := optr.fetchClusterOperator(ctx)
if err != nil {
return err
}
Expand All @@ -202,12 +202,12 @@ func (optr *Operator) clearDegradedStatus(task string) error {
if degradedStatusCondition.Reason != task+"Failed" {
return nil
}
return optr.syncDegradedStatus(syncError{})
return optr.syncDegradedStatus(ctx, syncError{})
}

// syncDegradedStatus applies the new condition to the mco's ClusterOperator object.
func (optr *Operator) syncDegradedStatus(ierr syncError) (err error) {
co, err := optr.fetchClusterOperator()
func (optr *Operator) syncDegradedStatus(ctx context.Context, ierr syncError) (err error) {
co, err := optr.fetchClusterOperator(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -260,7 +260,7 @@ func (optr *Operator) syncDegradedStatus(ierr syncError) (err error) {
Reason: reason,
}

return optr.updateStatus(co, coStatus)
return optr.updateStatus(ctx, co, coStatus)
}

const (
Expand All @@ -271,8 +271,8 @@ const (
)

// syncUpgradeableStatus applies the new condition to the mco's ClusterOperator object.
func (optr *Operator) syncUpgradeableStatus() error {
co, err := optr.fetchClusterOperator()
func (optr *Operator) syncUpgradeableStatus(ctx context.Context) error {
co, err := optr.fetchClusterOperator(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -318,18 +318,18 @@ func (optr *Operator) syncUpgradeableStatus() error {

// don't overwrite status if updating or degraded
if !updating && !degraded {
skewStatus, status, err := optr.isKubeletSkewSupported(pools)
skewStatus, status, err := optr.isKubeletSkewSupported(ctx, pools)
if err != nil {
glog.Errorf("Error checking version skew: %v, kubelet skew status: %v, status reason: %v, status message: %v", err, skewStatus, status.Reason, status.Message)
coStatus.Reason = status.Reason
coStatus.Message = status.Message
return optr.updateStatus(co, coStatus)
return optr.updateStatus(ctx, co, coStatus)
}
switch skewStatus {
case skewUnchecked:
coStatus.Reason = status.Reason
coStatus.Message = status.Message
return optr.updateStatus(co, coStatus)
return optr.updateStatus(ctx, co, coStatus)
case skewUnsupported:
coStatus.Reason = status.Reason
coStatus.Message = status.Message
Expand All @@ -341,22 +341,22 @@ func (optr *Operator) syncUpgradeableStatus() error {
}
glog.Infof("kubelet skew status: %v, status reason: %v", skewStatus, status.Reason)
optr.eventRecorder.Eventf(mcoObjectRef, corev1.EventTypeWarning, coStatus.Reason, coStatus.Message)
return optr.updateStatus(co, coStatus)
return optr.updateStatus(ctx, co, coStatus)
case skewPresent:
coStatus.Reason = status.Reason
coStatus.Message = status.Message
glog.Infof("kubelet skew status: %v, status reason: %v", skewStatus, status.Reason)
return optr.updateStatus(co, coStatus)
return optr.updateStatus(ctx, co, coStatus)
}
}
return optr.updateStatus(co, coStatus)
return optr.updateStatus(ctx, co, coStatus)
}

// isKubeletSkewSupported checks the version skew of kube-apiserver and node kubelet version.
// Returns the skew status. version skew > 2 is not supported.
func (optr *Operator) isKubeletSkewSupported(pools []*v1.MachineConfigPool) (skewStatus string, coStatus configv1.ClusterOperatorStatusCondition, err error) {
func (optr *Operator) isKubeletSkewSupported(ctx context.Context, pools []*v1.MachineConfigPool) (skewStatus string, coStatus configv1.ClusterOperatorStatusCondition, err error) {
coStatus = configv1.ClusterOperatorStatusCondition{}
kubeAPIServerStatus, err := optr.configClient.ConfigV1().ClusterOperators().Get(context.TODO(), "kube-apiserver", metav1.GetOptions{})
kubeAPIServerStatus, err := optr.configClient.ConfigV1().ClusterOperators().Get(ctx, "kube-apiserver", metav1.GetOptions{})
if err != nil {
coStatus.Reason = skewUnchecked
coStatus.Message = fmt.Sprintf("An error occurred when checking kubelet version skew: %v", err)
Expand Down Expand Up @@ -478,13 +478,13 @@ func getMinimalSkewSupportNodeVersion(version string) string {
return version
}

func (optr *Operator) fetchClusterOperator() (*configv1.ClusterOperator, error) {
co, err := optr.configClient.ConfigV1().ClusterOperators().Get(context.TODO(), optr.name, metav1.GetOptions{})
func (optr *Operator) fetchClusterOperator(ctx context.Context) (*configv1.ClusterOperator, error) {
co, err := optr.configClient.ConfigV1().ClusterOperators().Get(ctx, optr.name, metav1.GetOptions{})
if meta.IsNoMatchError(err) {
return nil, nil
}
if apierrors.IsNotFound(err) {
return optr.initializeClusterOperator()
return optr.initializeClusterOperator(ctx)
}
if err != nil {
return nil, err
Expand All @@ -493,8 +493,8 @@ func (optr *Operator) fetchClusterOperator() (*configv1.ClusterOperator, error)
return coCopy, nil
}

func (optr *Operator) initializeClusterOperator() (*configv1.ClusterOperator, error) {
co, err := optr.configClient.ConfigV1().ClusterOperators().Create(context.TODO(), &configv1.ClusterOperator{
func (optr *Operator) initializeClusterOperator(ctx context.Context) (*configv1.ClusterOperator, error) {
co, err := optr.configClient.ConfigV1().ClusterOperators().Create(ctx, &configv1.ClusterOperator{
ObjectMeta: metav1.ObjectMeta{
Name: optr.name,
},
Expand Down Expand Up @@ -523,7 +523,7 @@ func (optr *Operator) initializeClusterOperator() (*configv1.ClusterOperator, er
// For both normal runs and upgrades, this code isn't hit and we get the right version every
// time. This also only contains the operator RELEASE_VERSION when we're here.
co.Status.Versions = optr.vStore.GetAll()
return optr.configClient.ConfigV1().ClusterOperators().UpdateStatus(context.TODO(), co, metav1.UpdateOptions{})
return optr.configClient.ConfigV1().ClusterOperators().UpdateStatus(ctx, co, metav1.UpdateOptions{})
}

// setOperatorStatusExtension sets the raw extension field of the clusteroperator. Today, we set
Expand Down
Loading