Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,10 @@ var (
Short: "Run Cluster Version Controller",
Long: "",
}

rootOpts struct {
releaseImage string
}
)

func init() {
rootCmd.PersistentFlags().AddGoFlagSet(flag.CommandLine)
rootCmd.PersistentFlags().StringVar(&rootOpts.releaseImage, "release-image", "", "The Openshift release image url.")
}

func main() {
Expand Down
8 changes: 5 additions & 3 deletions cmd/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ var (
}

renderOpts struct {
outputDir string
releaseImage string
outputDir string
}
)

func init() {
rootCmd.AddCommand(renderCmd)
renderCmd.PersistentFlags().StringVar(&renderOpts.outputDir, "output-dir", "", "The output directory where the manifests will be rendered.")
renderCmd.PersistentFlags().StringVar(&renderOpts.releaseImage, "release-image", "", "The Openshift release image url.")
}

func runRenderCmd(cmd *cobra.Command, args []string) {
Expand All @@ -34,10 +36,10 @@ func runRenderCmd(cmd *cobra.Command, args []string) {
if renderOpts.outputDir == "" {
glog.Fatalf("missing --output-dir flag, it is required")
}
if rootOpts.releaseImage == "" {
if renderOpts.releaseImage == "" {
glog.Fatalf("missing --release-image flag, it is required")
}
if err := cvo.Render(renderOpts.outputDir, rootOpts.releaseImage); err != nil {
if err := cvo.Render(renderOpts.outputDir, renderOpts.releaseImage); err != nil {
glog.Fatalf("Render command failed: %v", err)
}
}
282 changes: 17 additions & 265 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,284 +2,36 @@ package main

import (
"flag"
"fmt"
"math/rand"
"net/http"
"os"
"time"

"github.com/golang/glog"
"github.com/google/uuid"
clientset "github.com/openshift/client-go/config/clientset/versioned"
informers "github.com/openshift/client-go/config/informers/externalversions"
"github.com/openshift/cluster-version-operator/pkg/autoupdate"
"github.com/openshift/cluster-version-operator/pkg/cvo"
"github.com/openshift/cluster-version-operator/pkg/start"
"github.com/openshift/cluster-version-operator/pkg/version"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"
v1 "k8s.io/api/core/v1"
apiext "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
)

const (
minResyncPeriod = 2 * time.Minute

leaseDuration = 90 * time.Second
renewDeadline = 45 * time.Second
retryPeriod = 30 * time.Second
)

var (
startCmd = &cobra.Command{
func init() {
opts := start.NewOptions()
cmd := &cobra.Command{
Use: "start",
Short: "Starts Cluster Version Operator",
Long: "",
Run: runStartCmd,
}

startOpts struct {
// name is provided for testing only to allow multiple CVO's to be running at once
name string
// namespace is provided for testing only
namespace string

kubeconfig string
nodeName string
listenAddr string

enableAutoUpdate bool
}
)

func init() {
rootCmd.AddCommand(startCmd)
startCmd.PersistentFlags().StringVar(&startOpts.listenAddr, "listen", "0.0.0.0:9099", "Address to listen on for metrics")
startCmd.PersistentFlags().StringVar(&startOpts.kubeconfig, "kubeconfig", "", "Kubeconfig file to access a remote cluster (testing only)")
startCmd.PersistentFlags().StringVar(&startOpts.nodeName, "node-name", "", "kubernetes node name CVO is scheduled on.")
startCmd.PersistentFlags().BoolVar(&startOpts.enableAutoUpdate, "enable-auto-update", true, "Enables the autoupdate controller.")
}

func runStartCmd(cmd *cobra.Command, args []string) {
flag.Set("logtostderr", "true")
flag.Parse()

// To help debugging, immediately log version
glog.Infof("%s", version.String)

if startOpts.nodeName == "" {
name, ok := os.LookupEnv("NODE_NAME")
if !ok || name == "" {
glog.Fatalf("node-name is required")
}
startOpts.nodeName = name
}

// exposed for end-to-end testing only
startOpts.name = os.Getenv("CVO_NAME")
if len(startOpts.name) == 0 {
startOpts.name = componentName
}
startOpts.namespace = os.Getenv("CVO_NAMESPACE")
if len(startOpts.name) == 0 {
startOpts.namespace = componentNamespace
}
Run: func(cmd *cobra.Command, args []string) {
flag.Set("logtostderr", "true")
flag.Parse()

if rootOpts.releaseImage == "" {
glog.Fatalf("missing --release-image flag, it is required")
}
// To help debugging, immediately log version
glog.Infof("%s", version.String)

if len(startOpts.listenAddr) > 0 {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
go func() {
if err := http.ListenAndServe(startOpts.listenAddr, mux); err != nil {
glog.Fatalf("Unable to start metrics server: %v", err)
if err := opts.Run(); err != nil {
glog.Fatalf("error: %v", err)
}
}()
}

cb, err := newClientBuilder(startOpts.kubeconfig)
if err != nil {
glog.Fatalf("error creating clients: %v", err)
}
stopCh := make(chan struct{})
run := func(stop <-chan struct{}) {

ctx := createControllerContext(cb, startOpts.name, stopCh)
if err := startControllers(ctx); err != nil {
glog.Fatalf("error starting controllers: %v", err)
}

ctx.CVInformerFactory.Start(ctx.Stop)
ctx.InformerFactory.Start(ctx.Stop)
close(ctx.InformersStarted)

select {}
}

leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
Lock: createResourceLock(cb),
LeaseDuration: leaseDuration,
RenewDeadline: renewDeadline,
RetryPeriod: retryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
glog.Fatalf("leaderelection lost")
},
},
})
panic("unreachable")
}

func createResourceLock(cb *clientBuilder) resourcelock.Interface {
recorder := record.
NewBroadcaster().
NewRecorder(runtime.NewScheme(), v1.EventSource{Component: componentName})

id, err := os.Hostname()
if err != nil {
glog.Fatalf("error creating lock: %v", err)
}

uuid, err := uuid.NewRandom()
if err != nil {
glog.Fatalf("Failed to generate UUID: %v", err)
}

// add a uniquifier so that two processes on the same host don't accidentally both become active
id = id + "_" + uuid.String()

return &resourcelock.ConfigMapLock{
ConfigMapMeta: metav1.ObjectMeta{
Namespace: componentNamespace,
Name: componentName,
},
Client: cb.KubeClientOrDie("leader-election").CoreV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: recorder,
},
}
}

func resyncPeriod() func() time.Duration {
return func() time.Duration {
factor := rand.Float64() + 1
return time.Duration(float64(minResyncPeriod.Nanoseconds()) * factor)
}
}

type clientBuilder struct {
config *rest.Config
}

func (cb *clientBuilder) RestConfig() *rest.Config {
c := rest.CopyConfig(cb.config)
return c
}

func (cb *clientBuilder) ClientOrDie(name string) clientset.Interface {
return clientset.NewForConfigOrDie(rest.AddUserAgent(cb.config, name))
}

func (cb *clientBuilder) KubeClientOrDie(name string) kubernetes.Interface {
return kubernetes.NewForConfigOrDie(rest.AddUserAgent(cb.config, name))
}

func (cb *clientBuilder) APIExtClientOrDie(name string) apiext.Interface {
return apiext.NewForConfigOrDie(rest.AddUserAgent(cb.config, name))
}

func newClientBuilder(kubeconfig string) (*clientBuilder, error) {
var config *rest.Config
var err error

if kubeconfig != "" {
glog.V(4).Infof("Loading kube client config from path %q", kubeconfig)
config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
} else {
glog.V(4).Infof("Using in-cluster kube client config")
config, err = rest.InClusterConfig()
}
if err != nil {
return nil, err
}

return &clientBuilder{
config: config,
}, nil
}

type controllerContext struct {
ClientBuilder *clientBuilder

CVInformerFactory informers.SharedInformerFactory
InformerFactory informers.SharedInformerFactory

Stop <-chan struct{}

InformersStarted chan struct{}

ResyncPeriod func() time.Duration
}

func createControllerContext(cb *clientBuilder, name string, stop <-chan struct{}) *controllerContext {
client := cb.ClientOrDie("shared-informer")

cvInformer := informers.NewFilteredSharedInformerFactory(client, resyncPeriod()(), "", func(opts *metav1.ListOptions) {
opts.FieldSelector = fmt.Sprintf("metadata.name=%s", name)
})
sharedInformers := informers.NewSharedInformerFactory(client, resyncPeriod()())

return &controllerContext{
ClientBuilder: cb,
CVInformerFactory: cvInformer,
InformerFactory: sharedInformers,
Stop: stop,
InformersStarted: make(chan struct{}),
ResyncPeriod: resyncPeriod(),
}
}

func startControllers(ctx *controllerContext) error {
overrideDirectory := os.Getenv("PAYLOAD_OVERRIDE")
if len(overrideDirectory) > 0 {
glog.Warningf("Using an override payload directory for testing only: %s", overrideDirectory)
}

go cvo.New(
startOpts.nodeName,
startOpts.namespace, startOpts.name,
rootOpts.releaseImage,
overrideDirectory,
ctx.ResyncPeriod(),
ctx.CVInformerFactory.Config().V1().ClusterVersions(),
ctx.InformerFactory.Config().V1().ClusterOperators(),
ctx.ClientBuilder.RestConfig(),
ctx.ClientBuilder.ClientOrDie(componentName),
ctx.ClientBuilder.KubeClientOrDie(componentName),
ctx.ClientBuilder.APIExtClientOrDie(componentName),
true,
).Run(2, ctx.Stop)

if startOpts.enableAutoUpdate {
go autoupdate.New(
componentNamespace, componentName,
ctx.CVInformerFactory.Config().V1().ClusterVersions(),
ctx.InformerFactory.Config().V1().ClusterOperators(),
ctx.ClientBuilder.ClientOrDie(componentName),
ctx.ClientBuilder.KubeClientOrDie(componentName),
).Run(2, ctx.Stop)
}

return nil
cmd.PersistentFlags().StringVar(&opts.ListenAddr, "listen", opts.ListenAddr, "Address to listen on for metrics")
cmd.PersistentFlags().StringVar(&opts.Kubeconfig, "kubeconfig", opts.Kubeconfig, "Kubeconfig file to access a remote cluster (testing only)")
cmd.PersistentFlags().StringVar(&opts.NodeName, "node-name", opts.NodeName, "kubernetes node name CVO is scheduled on.")
cmd.PersistentFlags().BoolVar(&opts.EnableAutoUpdate, "enable-auto-update", opts.EnableAutoUpdate, "Enables the autoupdate controller.")
cmd.PersistentFlags().StringVar(&opts.ReleaseImage, "release-image", opts.ReleaseImage, "The Openshift release image url.")
rootCmd.AddCommand(cmd)
}
4 changes: 2 additions & 2 deletions pkg/autoupdate/autoupdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/blang/semver"

"github.com/golang/glog"
"github.com/openshift/api/config/v1"
v1 "github.com/openshift/api/config/v1"
clientset "github.com/openshift/client-go/config/clientset/versioned"
"github.com/openshift/client-go/config/clientset/versioned/scheme"
configinformersv1 "github.com/openshift/client-go/config/informers/externalversions/config/v1"
Expand Down Expand Up @@ -65,7 +65,7 @@ func New(
) *Controller {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&coreclientsetv1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
eventBroadcaster.StartRecordingToSink(&coreclientsetv1.EventSinkImpl{Interface: kubeClient.CoreV1().Events(namespace)})

ctrl := &Controller{
namespace: namespace,
Expand Down
9 changes: 7 additions & 2 deletions pkg/cvo/cvo.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func New(
) *Operator {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&coreclientsetv1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
eventBroadcaster.StartRecordingToSink(&coreclientsetv1.EventSinkImpl{Interface: kubeClient.CoreV1().Events(namespace)})

optr := &Operator{
nodename: nodename,
Expand All @@ -162,7 +162,7 @@ func New(

optr.configSync = NewSyncWorker(
optr.defaultPayloadRetriever(),
optr.defaultResourceBuilder(),
NewResourceBuilder(optr.restConfig),
minimumInterval,
wait.Backoff{
Duration: time.Second * 10,
Expand Down Expand Up @@ -452,3 +452,8 @@ func (optr *Operator) currentVersion() configv1.Update {
Payload: optr.releaseImage,
}
}

// SetSyncWorkerForTesting updates the sync worker for whitebox testing.
func (optr *Operator) SetSyncWorkerForTesting(worker ConfigSyncWorker) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I contemplated having start lazily configure the worker, but this is a reasonable taint.

optr.configSync = worker
}
Loading