diff --git a/cmd/start.go b/cmd/start.go index 120ba299d4..fbf6c1c6b2 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -8,6 +8,7 @@ import ( "github.com/golang/glog" "github.com/google/uuid" + "github.com/openshift/cluster-version-operator/pkg/autoupdate" "github.com/openshift/cluster-version-operator/pkg/cvo" clientset "github.com/openshift/cluster-version-operator/pkg/generated/clientset/versioned" informers "github.com/openshift/cluster-version-operator/pkg/generated/informers/externalversions" @@ -46,6 +47,8 @@ var ( startOpts struct { kubeconfig string nodeName string + + enableAutoUpdate bool } ) @@ -53,6 +56,7 @@ func init() { rootCmd.AddCommand(startCmd) startCmd.PersistentFlags().StringVar(&startOpts.kubeconfig, "kubeconfig", "", "Kubeconfig file to access a remote cluster (testing only)") startCmd.PersistentFlags().StringVar(&startOpts.nodeName, "node-name", "", "kubernetes node name CVO is scheduled on.") + startCmd.PersistentFlags().BoolVar(&startOpts.enableAutoUpdate, "enable-auto-update", true, "Enables the autoupdate controller.") } func runStartCmd(cmd *cobra.Command, args []string) { @@ -203,13 +207,13 @@ func createControllerContext(cb *clientBuilder, stop <-chan struct{}) *controlle kubeClient := cb.KubeClientOrDie("kube-shared-informer") apiExtClient := cb.APIExtClientOrDie("apiext-shared-informer") - sharedNamespacedInformers := informers.NewSharedInformerFactory(client, resyncPeriod()()) + sharedInformers := informers.NewSharedInformerFactory(client, resyncPeriod()()) kubeSharedInformer := kubeinformers.NewSharedInformerFactory(kubeClient, resyncPeriod()()) apiExtSharedInformer := apiextinformers.NewSharedInformerFactory(apiExtClient, resyncPeriod()()) return &controllerContext{ ClientBuilder: cb, - InformerFactory: sharedNamespacedInformers, + InformerFactory: sharedInformers, KubeInformerFactory: kubeSharedInformer, APIExtInformerFactory: apiExtSharedInformer, Stop: stop, @@ -232,5 +236,15 @@ func startControllers(ctx *controllerContext) error { ctx.ClientBuilder.APIExtClientOrDie(componentName), ).Run(2, ctx.Stop) + if startOpts.enableAutoUpdate { + go autoupdate.New( + componentNamespace, componentName, + ctx.InformerFactory.Clusterversion().V1().CVOConfigs(), + ctx.InformerFactory.Clusterversion().V1().OperatorStatuses(), + ctx.ClientBuilder.ClientOrDie(componentName), + ctx.ClientBuilder.KubeClientOrDie(componentName), + ).Run(2, ctx.Stop) + } + return nil } diff --git a/pkg/autoupdate/autoupdate.go b/pkg/autoupdate/autoupdate.go new file mode 100644 index 0000000000..ba8a6b058a --- /dev/null +++ b/pkg/autoupdate/autoupdate.go @@ -0,0 +1,227 @@ +package autoupdate + +import ( + "fmt" + "sort" + "time" + + "github.com/blang/semver" + + "github.com/golang/glog" + "github.com/openshift/cluster-version-operator/lib/resourceapply" + "github.com/openshift/cluster-version-operator/pkg/apis/clusterversion.openshift.io/v1" + clientset "github.com/openshift/cluster-version-operator/pkg/generated/clientset/versioned" + "github.com/openshift/cluster-version-operator/pkg/generated/clientset/versioned/scheme" + informersv1 "github.com/openshift/cluster-version-operator/pkg/generated/informers/externalversions/clusterversion.openshift.io/v1" + listersv1 "github.com/openshift/cluster-version-operator/pkg/generated/listers/clusterversion.openshift.io/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + coreclientsetv1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" +) + +const ( + // maxRetries is the number of times a machineconfig pool will be retried before it is dropped out of the queue. + // With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the times + // a machineconfig pool is going to be requeued: + // + // 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s + maxRetries = 15 +) + +// Controller defines autoupdate controller. +type Controller struct { + // namespace and name are used to find the CVOConfig, OperatorStatus. + namespace, name string + + client clientset.Interface + eventRecorder record.EventRecorder + + syncHandler func(key string) error + + cvoConfigLister listersv1.CVOConfigLister + operatorStatusLister listersv1.OperatorStatusLister + + cvoConfigListerSynced cache.InformerSynced + operatorStatusSynced cache.InformerSynced + + // queue only ever has one item, but it has nice error handling backoff/retry semantics + queue workqueue.RateLimitingInterface +} + +// New returns a new autoupdate controller. +func New( + namespace, name string, + cvoConfigInformer informersv1.CVOConfigInformer, + operatorStatusInformer informersv1.OperatorStatusInformer, + client clientset.Interface, + kubeClient kubernetes.Interface, +) *Controller { + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + eventBroadcaster.StartRecordingToSink(&coreclientsetv1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) + + ctrl := &Controller{ + namespace: namespace, + name: name, + client: client, + eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "autoupdater"}), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "autoupdater"), + } + + cvoConfigInformer.Informer().AddEventHandler(ctrl.eventHandler()) + operatorStatusInformer.Informer().AddEventHandler(ctrl.eventHandler()) + + ctrl.syncHandler = ctrl.sync + + ctrl.cvoConfigLister = cvoConfigInformer.Lister() + ctrl.operatorStatusLister = operatorStatusInformer.Lister() + + ctrl.cvoConfigListerSynced = cvoConfigInformer.Informer().HasSynced + ctrl.operatorStatusSynced = operatorStatusInformer.Informer().HasSynced + + return ctrl +} + +// Run runs the autoupdate controller. +func (ctrl *Controller) Run(workers int, stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + defer ctrl.queue.ShutDown() + + glog.Info("Starting AutoUpdateController") + defer glog.Info("Shutting down AutoUpdateController") + + if !cache.WaitForCacheSync(stopCh, + ctrl.cvoConfigListerSynced, + ctrl.operatorStatusSynced, + ) { + return + } + + for i := 0; i < workers; i++ { + go wait.Until(ctrl.worker, time.Second, stopCh) + } + + <-stopCh +} + +func (ctrl *Controller) eventHandler() cache.ResourceEventHandler { + key := fmt.Sprintf("%s/%s", ctrl.namespace, ctrl.name) + return cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { ctrl.queue.Add(key) }, + UpdateFunc: func(old, new interface{}) { ctrl.queue.Add(key) }, + DeleteFunc: func(obj interface{}) { ctrl.queue.Add(key) }, + } +} + +func (ctrl *Controller) worker() { + for ctrl.processNextWorkItem() { + } +} + +func (ctrl *Controller) processNextWorkItem() bool { + key, quit := ctrl.queue.Get() + if quit { + return false + } + defer ctrl.queue.Done(key) + + err := ctrl.syncHandler(key.(string)) + ctrl.handleErr(err, key) + + return true +} + +func (ctrl *Controller) handleErr(err error, key interface{}) { + if err == nil { + ctrl.queue.Forget(key) + return + } + + if ctrl.queue.NumRequeues(key) < maxRetries { + glog.V(2).Infof("Error syncing controller %v: %v", key, err) + ctrl.queue.AddRateLimited(key) + return + } + + utilruntime.HandleError(err) + glog.V(2).Infof("Dropping controller %q out of the queue: %v", key, err) + ctrl.queue.Forget(key) +} + +func (ctrl *Controller) sync(key string) error { + startTime := time.Now() + glog.V(4).Infof("Started syncing controller %q (%v)", key, startTime) + defer func() { + glog.V(4).Infof("Finished syncing controller %q (%v)", key, time.Since(startTime)) + }() + + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return err + } + + operatorstatus, err := ctrl.operatorStatusLister.OperatorStatuses(namespace).Get(name) + if errors.IsNotFound(err) { + glog.V(2).Infof("OperatorStatus %v has been deleted", key) + return nil + } + if err != nil { + return err + } + + cvoconfig, err := ctrl.cvoConfigLister.CVOConfigs(namespace).Get(name) + if errors.IsNotFound(err) { + glog.V(2).Infof("CVOConfig %v has been deleted", key) + return nil + } + if err != nil { + return err + } + + // Deep-copy otherwise we are mutating our cache. + // TODO: Deep-copy only when needed. + ops := operatorstatus.DeepCopy() + config := new(v1.CVOConfig) + cvoconfig.DeepCopyInto(config) + + obji, _, err := scheme.Codecs.UniversalDecoder().Decode(ops.Extension.Raw, nil, &v1.CVOStatus{}) + if err != nil { + return fmt.Errorf("unable to decode CVOStatus from extension.Raw: %v", err) + } + cvoststatus, ok := obji.(*v1.CVOStatus) + if !ok { + return fmt.Errorf("expected *v1.CVOStatus found %T", obji) + } + + if !updateAvail(cvoststatus.AvailableUpdates) { + return nil + } + up := nextUpdate(cvoststatus.AvailableUpdates) + config.DesiredUpdate = up + + _, updated, err := resourceapply.ApplyCVOConfigFromCache(ctrl.cvoConfigLister, ctrl.client.ClusterversionV1(), config) + if updated { + glog.Info("Auto Update set to %s", up) + } + return err +} + +func updateAvail(ups []v1.Update) bool { + return len(ups) > 0 +} + +func nextUpdate(ups []v1.Update) v1.Update { + sorted := ups + sort.Slice(sorted, func(i, j int) bool { + vi := semver.MustParse(sorted[i].Version) + vj := semver.MustParse(sorted[j].Version) + return vi.GTE(vj) + }) + return sorted[0] +} diff --git a/pkg/autoupdate/autoupdate_test.go b/pkg/autoupdate/autoupdate_test.go new file mode 100644 index 0000000000..06df2178b7 --- /dev/null +++ b/pkg/autoupdate/autoupdate_test.go @@ -0,0 +1,43 @@ +package autoupdate + +import ( + "fmt" + "testing" + + "github.com/openshift/cluster-version-operator/pkg/apis/clusterversion.openshift.io/v1" +) + +func TestNextUpdate(t *testing.T) { + tests := []struct { + avail []string + want string + }{{ + avail: []string{"0.0.0", "0.0.1", "0.0.2"}, + want: "0.0.2", + }, { + avail: []string{"0.0.2", "0.0.0", "0.0.1"}, + want: "0.0.2", + }, { + avail: []string{"0.0.1", "0.0.0", "0.0.2"}, + want: "0.0.2", + }, { + avail: []string{"0.0.0", "0.0.0+new.2", "0.0.0+new.3"}, + want: "0.0.0+new.3", + }, { + avail: []string{"0.0.0", "0.0.0-new.2", "0.0.0-new.3"}, + want: "0.0.0", + }} + for idx, test := range tests { + t.Run(fmt.Sprintf("test: #%d", idx), func(t *testing.T) { + ups := []v1.Update{} + for _, v := range test.avail { + ups = append(ups, v1.Update{Version: v}) + } + + got := nextUpdate(ups) + if got.Version != test.want { + t.Fatalf("mismatch: got %s want: %s", got, test.want) + } + }) + } +}