diff --git a/pkg/cvo/availableupdates.go b/pkg/cvo/availableupdates.go index 76c1fc74b5..fd32cd0ad9 100644 --- a/pkg/cvo/availableupdates.go +++ b/pkg/cvo/availableupdates.go @@ -2,7 +2,6 @@ package cvo import ( "crypto/tls" - "crypto/x509" "fmt" "net/url" "runtime" @@ -11,7 +10,6 @@ import ( "github.com/blang/semver" "github.com/google/uuid" "k8s.io/apimachinery/pkg/api/equality" - "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog" @@ -224,54 +222,3 @@ func calculateAvailableUpdatesStatus(clusterID string, proxyURL *url.URL, tlsCon LastTransitionTime: metav1.Now(), } } - -// getHTTPSProxyURL returns a url.URL object for the configured -// https proxy only. It can be nil if does not exist or there is an error. -func (optr *Operator) getHTTPSProxyURL() (*url.URL, string, error) { - proxy, err := optr.proxyLister.Get("cluster") - - if errors.IsNotFound(err) { - return nil, "", nil - } - if err != nil { - return nil, "", err - } - - if &proxy.Spec != nil { - if proxy.Spec.HTTPSProxy != "" { - proxyURL, err := url.Parse(proxy.Spec.HTTPSProxy) - if err != nil { - return nil, "", err - } - return proxyURL, proxy.Spec.TrustedCA.Name, nil - } - } - return nil, "", nil -} - -func (optr *Operator) getTLSConfig(cmNameRef string) (*tls.Config, error) { - cm, err := optr.cmConfigLister.Get(cmNameRef) - - if err != nil { - return nil, err - } - - certPool, _ := x509.SystemCertPool() - if certPool == nil { - certPool = x509.NewCertPool() - } - - if cm.Data["ca-bundle.crt"] != "" { - if ok := certPool.AppendCertsFromPEM([]byte(cm.Data["ca-bundle.crt"])); !ok { - return nil, fmt.Errorf("unable to add ca-bundle.crt certificates") - } - } else { - return nil, nil - } - - config := &tls.Config{ - RootCAs: certPool, - } - - return config, nil -} diff --git a/pkg/cvo/cvo.go b/pkg/cvo/cvo.go index c0f3a315e5..cef6351a11 100644 --- a/pkg/cvo/cvo.go +++ b/pkg/cvo/cvo.go @@ -169,7 +169,6 @@ func New( proxyInformer configinformersv1.ProxyInformer, client clientset.Interface, kubeClient kubernetes.Interface, - enableMetrics bool, exclude string, ) *Operator { eventBroadcaster := record.NewBroadcaster() @@ -214,11 +213,6 @@ func New( // make sure this is initialized after all the listers are initialized optr.upgradeableChecks = optr.defaultUpgradeableChecks() - if enableMetrics { - if err := optr.registerMetrics(coInformer.Informer()); err != nil { - panic(err) - } - } return optr } diff --git a/pkg/cvo/egress.go b/pkg/cvo/egress.go new file mode 100644 index 0000000000..75cfa607c1 --- /dev/null +++ b/pkg/cvo/egress.go @@ -0,0 +1,61 @@ +package cvo + +import ( + "crypto/tls" + "crypto/x509" + "fmt" + "net/url" + + "k8s.io/apimachinery/pkg/api/errors" +) + +// getHTTPSProxyURL returns a url.URL object for the configured +// https proxy only. It can be nil if does not exist or there is an error. +func (optr *Operator) getHTTPSProxyURL() (*url.URL, string, error) { + proxy, err := optr.proxyLister.Get("cluster") + + if errors.IsNotFound(err) { + return nil, "", nil + } + if err != nil { + return nil, "", err + } + + if &proxy.Spec != nil { + if proxy.Spec.HTTPSProxy != "" { + proxyURL, err := url.Parse(proxy.Spec.HTTPSProxy) + if err != nil { + return nil, "", err + } + return proxyURL, proxy.Spec.TrustedCA.Name, nil + } + } + return nil, "", nil +} + +func (optr *Operator) getTLSConfig(cmNameRef string) (*tls.Config, error) { + cm, err := optr.cmConfigLister.Get(cmNameRef) + + if err != nil { + return nil, err + } + + certPool, _ := x509.SystemCertPool() + if certPool == nil { + certPool = x509.NewCertPool() + } + + if cm.Data["ca-bundle.crt"] != "" { + if ok := certPool.AppendCertsFromPEM([]byte(cm.Data["ca-bundle.crt"])); !ok { + return nil, fmt.Errorf("unable to add ca-bundle.crt certificates") + } + } else { + return nil, nil + } + + config := &tls.Config{ + RootCAs: certPool, + } + + return config, nil +} diff --git a/pkg/cvo/metrics.go b/pkg/cvo/metrics.go index db03328694..0e8ee7603f 100644 --- a/pkg/cvo/metrics.go +++ b/pkg/cvo/metrics.go @@ -1,21 +1,30 @@ package cvo import ( + "context" + "crypto/tls" + "net" + "net/http" "time" + "github.com/cockroachdb/cmux" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/cache" + "k8s.io/klog" configv1 "github.com/openshift/api/config/v1" "github.com/openshift/cluster-version-operator/lib/resourcemerge" "github.com/openshift/cluster-version-operator/pkg/internal" ) -func (optr *Operator) registerMetrics(coInformer cache.SharedInformer) error { +// RegisterMetrics initializes metrics and registers them with the +// Prometheus implementation. +func (optr *Operator) RegisterMetrics(coInformer cache.SharedInformer) error { m := newOperatorMetrics(optr) coInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ UpdateFunc: m.clusterOperatorChanged, @@ -86,6 +95,88 @@ version for 'cluster', or empty for 'initial'. } } +// RunMetrics launches an server bound to listenAddress serving +// Prometheus metrics at /metrics over HTTP, and, if tlsConfig is +// non-nil, also over HTTPS. Continues serving until runContext.Done() +// and then attempts a clean shutdown limited by shutdownContext.Done(). +// Assumes runContext.Done() occurs before or simultaneously with +// shutdownContext.Done(). +func RunMetrics(runContext context.Context, shutdownContext context.Context, listenAddress string, tlsConfig *tls.Config) error { + handler := http.NewServeMux() + handler.Handle("/metrics", promhttp.Handler()) + server := &http.Server{ + Handler: handler, + } + + tcpListener, err := net.Listen("tcp", listenAddress) + if err != nil { + return err + } + + // if a TLS connection was requested, set up a connection mux that will send TLS requests to + // the TLS server but send HTTP requests to the HTTP server. Preserves the ability for legacy + // HTTP, needed during upgrade, while still allowing TLS certs and end to end metrics protection. + mux := cmux.New(tcpListener) + + errorChannel := make(chan error, 1) + errorChannelCount := 1 + + go func() { + // match HTTP first + httpListener := mux.Match(cmux.HTTP1()) + klog.Infof("Metrics port listening for HTTP on %v", listenAddress) + errorChannel <- server.Serve(httpListener) + }() + + if tlsConfig != nil { + errorChannelCount++ + go func() { + tlsListener := tls.NewListener(mux.Match(cmux.Any()), tlsConfig) + klog.Infof("Metrics port listening for HTTPS on %v", listenAddress) + errorChannel <- server.Serve(tlsListener) + }() + } + + errorChannelCount++ + go func() { + errorChannel <- mux.Serve() + }() + + shutdown := false + var loopError error + for errorChannelCount > 0 { + if shutdown { + err := <-errorChannel + errorChannelCount-- + if err != nil && err != http.ErrServerClosed && err != cmux.ErrListenerClosed { + if loopError == nil { + loopError = err + } else if err != nil { // log the error we are discarding + klog.Errorf("Failed to gracefully shut down metrics server: %s", err) + } + } + } else { + select { + case <-runContext.Done(): // clean shutdown + case err := <-errorChannel: // crashed before a shutdown was requested + errorChannelCount-- + if err != nil && err != http.ErrServerClosed && err != cmux.ErrListenerClosed { + loopError = err + } + } + shutdown = true + shutdownError := server.Shutdown(shutdownContext) + if loopError == nil { + loopError = shutdownError + } else if shutdownError != nil { // log the error we are discarding + klog.Errorf("Failed to gracefully shut down metrics server: %s", shutdownError) + } + } + } + + return loopError +} + type conditionKey struct { Name string Type string diff --git a/pkg/start/start.go b/pkg/start/start.go index 176d3b82d7..bfb711d3d9 100644 --- a/pkg/start/start.go +++ b/pkg/start/start.go @@ -8,18 +8,13 @@ import ( "fmt" "io/ioutil" "math/rand" - "net" - "net/http" "os" "os/signal" "sync" "syscall" "time" - "github.com/cockroachdb/cmux" - "github.com/google/uuid" - "github.com/prometheus/client_golang/prometheus/promhttp" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/informers" @@ -75,7 +70,6 @@ type Options struct { Name string Namespace string PayloadOverride string - EnableMetrics bool ResyncInterval time.Duration } @@ -99,7 +93,6 @@ func NewOptions() *Options { Name: defaultEnv("CVO_NAME", defaultComponentName), PayloadOverride: os.Getenv("PAYLOAD_OVERRIDE"), ResyncInterval: minResyncPeriod, - EnableMetrics: true, Exclude: os.Getenv("EXCLUDE_MANIFESTS"), } } @@ -190,56 +183,25 @@ func (o *Options) makeTLSConfig() (*tls.Config, error) { } func (o *Options) run(ctx context.Context, controllerCtx *Context, lock *resourcelock.ConfigMapLock) { - // listen on metrics - if len(o.ListenAddr) > 0 { - handler := http.NewServeMux() - handler.Handle("/metrics", promhttp.Handler()) - tcpl, err := net.Listen("tcp", o.ListenAddr) - if err != nil { - klog.Fatalf("Listen error: %v", err) - } - - // if a TLS connection was requested, set up a connection mux that will send TLS requests to - // the TLS server but send HTTP requests to the HTTP server. Preserves the ability for legacy - // HTTP, needed during upgrade, while still allowing TLS certs and end to end metrics protection. - m := cmux.New(tcpl) - - // match HTTP first - httpl := m.Match(cmux.HTTP1()) - go func() { - s := &http.Server{ - Handler: handler, - } - if err := s.Serve(httpl); err != cmux.ErrListenerClosed { - klog.Fatalf("HTTP serve error: %v", err) - } - }() - + runContext, runCancel := context.WithCancel(ctx) + defer runCancel() + shutdownContext, shutdownCancel := context.WithCancel(ctx) + defer shutdownCancel() + errorChannel := make(chan error, 1) + errorChannelCount := 0 + if o.ListenAddr != "" { + var tlsConfig *tls.Config if o.ServingCertFile != "" || o.ServingKeyFile != "" { - tlsConfig, err := o.makeTLSConfig() + var err error + tlsConfig, err = o.makeTLSConfig() if err != nil { klog.Fatalf("Failed to create TLS config: %v", err) } - - tlsListener := tls.NewListener(m.Match(cmux.Any()), tlsConfig) - klog.Infof("Metrics port listening for HTTP and HTTPS on %v", o.ListenAddr) - go func() { - s := &http.Server{ - Handler: handler, - } - if err := s.Serve(tlsListener); err != cmux.ErrListenerClosed { - klog.Fatalf("HTTPS serve error: %v", err) - } - }() - - go func() { - if err := m.Serve(); err != nil { - klog.Errorf("CMUX serve error: %v", err) - } - }() - } else { - klog.Infof("Metrics port listening for HTTP on %v", o.ListenAddr) } + errorChannelCount++ + go func() { + errorChannel <- cvo.RunMetrics(runContext, shutdownContext, o.ListenAddr, tlsConfig) + }() } exit := make(chan struct{}) @@ -255,9 +217,9 @@ func (o *Options) run(ctx context.Context, controllerCtx *Context, lock *resourc RetryPeriod: retryPeriod, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(localCtx context.Context) { - controllerCtx.Start(ctx) + controllerCtx.Start(runContext) select { - case <-ctx.Done(): + case <-runContext.Done(): // WARNING: this is not completely safe until we have Kube 1.14 and ReleaseOnCancel // and client-go ContextCancelable, which allows us to block new API requests before // we step down. However, the CVO isn't that sensitive to races and can tolerate @@ -286,6 +248,34 @@ func (o *Options) run(ctx context.Context, controllerCtx *Context, lock *resourc }, }) + for errorChannelCount > 0 { + var shutdownTimer *time.Timer + if shutdownTimer == nil { // running + select { + case <-runContext.Done(): + shutdownTimer = time.NewTimer(2 * time.Minute) + case err := <-errorChannel: + errorChannelCount-- + if err != nil { + klog.Error(err) + runCancel() // this will cause shutdownTimer initialization in the next loop + } + } + } else { // shutting down + select { + case <-shutdownTimer.C: // never triggers after the channel is stopped, although it would not matter much if it did because subsequent cancel calls do nothing. + shutdownCancel() + shutdownTimer.Stop() + case err := <-errorChannel: + errorChannelCount-- + if err != nil { + klog.Error(err) + runCancel() + } + } + } + } + <-exit } @@ -404,6 +394,7 @@ func (o *Options) NewControllerContext(cb *ClientBuilder) *Context { sharedInformers := externalversions.NewSharedInformerFactory(client, resyncPeriod(o.ResyncInterval)()) + coInformer := sharedInformers.Config().V1().ClusterOperators() ctx := &Context{ CVInformerFactory: cvInformer, OpenshiftConfigInformerFactory: openshiftConfigInformer, @@ -417,12 +408,11 @@ func (o *Options) NewControllerContext(cb *ClientBuilder) *Context { o.PayloadOverride, resyncPeriod(o.ResyncInterval)(), cvInformer.Config().V1().ClusterVersions(), - sharedInformers.Config().V1().ClusterOperators(), + coInformer, openshiftConfigInformer.Core().V1().ConfigMaps(), sharedInformers.Config().V1().Proxies(), cb.ClientOrDie(o.Namespace), cb.KubeClientOrDie(o.Namespace, useProtobuf), - o.EnableMetrics, o.Exclude, ), } @@ -435,6 +425,11 @@ func (o *Options) NewControllerContext(cb *ClientBuilder) *Context { cb.KubeClientOrDie(o.Namespace), ) } + if o.ListenAddr != "" { + if err := ctx.CVO.RegisterMetrics(coInformer.Informer()); err != nil { + panic(err) + } + } return ctx } diff --git a/pkg/start/start_integration_test.go b/pkg/start/start_integration_test.go index 446fba9d4d..e7cc2cad85 100644 --- a/pkg/start/start_integration_test.go +++ b/pkg/start/start_integration_test.go @@ -238,7 +238,6 @@ func TestIntegrationCVO_initializeAndUpgrade(t *testing.T) { options.NodeName = "test-node" options.ReleaseImage = payloadImage1 options.PayloadOverride = filepath.Join(dir, "ignored") - options.EnableMetrics = false controllers := options.NewControllerContext(cb) worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}, "") @@ -390,7 +389,6 @@ func TestIntegrationCVO_initializeAndHandleError(t *testing.T) { options.NodeName = "test-node" options.ReleaseImage = payloadImage1 options.PayloadOverride = filepath.Join(dir, "ignored") - options.EnableMetrics = false options.ResyncInterval = 3 * time.Second controllers := options.NewControllerContext(cb) @@ -497,7 +495,6 @@ func TestIntegrationCVO_gracefulStepDown(t *testing.T) { options.Name = ns options.ListenAddr = "" options.NodeName = "test-node" - options.EnableMetrics = false controllers := options.NewControllerContext(cb) worker := cvo.NewSyncWorker(&mapPayloadRetriever{}, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}, "") @@ -667,7 +664,6 @@ metadata: options.NodeName = "test-node" options.ReleaseImage = payloadImage1 options.PayloadOverride = payloadDir - options.EnableMetrics = false controllers := options.NewControllerContext(cb) if err := controllers.CVO.InitializeFromPayload(cb.RestConfig(defaultQPS), cb.RestConfig(highQPS)); err != nil { t.Fatal(err)