Skip to content

Commit 2c849e5

Browse files
Merge pull request #446 from wking/gracefully-release-leader-lease-4.5
Bug 1872906: pkg/start: Release leader lease on graceful shutdown
2 parents 8bfbc20 + 65bcffd commit 2c849e5

File tree

12 files changed

+298
-181
lines changed

12 files changed

+298
-181
lines changed

bootstrap/bootstrap-pod.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ spec:
3737
fieldRef:
3838
fieldPath: spec.nodeName
3939
hostNetwork: true
40+
terminationGracePeriodSeconds: 130
4041
volumes:
4142
- name: kubeconfig
4243
hostPath:

cmd/start.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package main
22

33
import (
4+
"context"
5+
46
"github.com/spf13/cobra"
57
"k8s.io/klog"
68

@@ -16,11 +18,12 @@ func init() {
1618
Long: "",
1719
Run: func(cmd *cobra.Command, args []string) {
1820
// To help debugging, immediately log version
19-
klog.Infof("%s", version.String)
21+
klog.Info(version.String)
2022

21-
if err := opts.Run(); err != nil {
23+
if err := opts.Run(context.Background()); err != nil {
2224
klog.Fatalf("error: %v", err)
2325
}
26+
klog.Infof("Graceful shutdown complete for %s.", version.String)
2427
},
2528
}
2629

install/0000_00_cluster-version-operator_03_deployment.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ spec:
5252
nodeSelector:
5353
node-role.kubernetes.io/master: ""
5454
priorityClassName: "system-cluster-critical"
55+
terminationGracePeriodSeconds: 130
5556
tolerations:
5657
- key: "node-role.kubernetes.io/master"
5758
operator: Exists

lib/resourceread/apiext_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ func TestReadCustomResourceDefinitionOrDie(t *testing.T) {
1313
args args
1414
}{
1515
{
16-
name:"v1",
16+
name: "v1",
1717
args: args{
1818
objBytes: []byte(`
1919
apiVersion: apiextensions.k8s.io/v1
@@ -42,7 +42,7 @@ spec:
4242
},
4343
},
4444
{
45-
name:"v1beta1",
45+
name: "v1beta1",
4646
args: args{
4747
objBytes: []byte(`
4848
apiVersion: apiextensions.k8s.io/v1beta1
@@ -82,4 +82,4 @@ spec:
8282
_ = ReadCustomResourceDefinitionOrDie(tt.args.objBytes)
8383
})
8484
}
85-
}
85+
}

pkg/autoupdate/autoupdate.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77

88
"github.com/blang/semver"
99

10-
"k8s.io/klog"
1110
v1 "github.com/openshift/api/config/v1"
1211
clientset "github.com/openshift/client-go/config/clientset/versioned"
1312
"github.com/openshift/client-go/config/clientset/versioned/scheme"
@@ -23,6 +22,7 @@ import (
2322
"k8s.io/client-go/tools/cache"
2423
"k8s.io/client-go/tools/record"
2524
"k8s.io/client-go/util/workqueue"
25+
"k8s.io/klog"
2626
)
2727

2828
const (
@@ -87,23 +87,24 @@ func New(
8787
}
8888

8989
// Run runs the autoupdate controller.
90-
func (ctrl *Controller) Run(workers int, stopCh <-chan struct{}) {
90+
func (ctrl *Controller) Run(workers int, stopCh <-chan struct{}) error {
9191
defer utilruntime.HandleCrash()
9292
defer ctrl.queue.ShutDown()
9393

9494
klog.Info("Starting AutoUpdateController")
9595
defer klog.Info("Shutting down AutoUpdateController")
9696

9797
if !cache.WaitForCacheSync(stopCh, ctrl.cacheSynced...) {
98-
klog.Info("Caches never synchronized")
99-
return
98+
return fmt.Errorf("caches never synchronized")
10099
}
101100

102101
for i := 0; i < workers; i++ {
102+
// FIXME: actually wait until these complete if the Context is canceled. And possibly add utilruntime.HandleCrash.
103103
go wait.Until(ctrl.worker, time.Second, stopCh)
104104
}
105105

106106
<-stopCh
107+
return nil
107108
}
108109

109110
func (ctrl *Controller) eventHandler() cache.ResourceEventHandler {

pkg/cvo/availableupdates.go

Lines changed: 0 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package cvo
22

33
import (
44
"crypto/tls"
5-
"crypto/x509"
65
"fmt"
76
"net/url"
87
"runtime"
@@ -11,7 +10,6 @@ import (
1110
"github.com/blang/semver"
1211
"github.com/google/uuid"
1312
"k8s.io/apimachinery/pkg/api/equality"
14-
"k8s.io/apimachinery/pkg/api/errors"
1513
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1614
"k8s.io/klog"
1715

@@ -197,54 +195,3 @@ func calculateAvailableUpdatesStatus(clusterID string, proxyURL *url.URL, tlsCon
197195
LastTransitionTime: metav1.Now(),
198196
}
199197
}
200-
201-
// getHTTPSProxyURL returns a url.URL object for the configured
202-
// https proxy only. It can be nil if does not exist or there is an error.
203-
func (optr *Operator) getHTTPSProxyURL() (*url.URL, string, error) {
204-
proxy, err := optr.proxyLister.Get("cluster")
205-
206-
if errors.IsNotFound(err) {
207-
return nil, "", nil
208-
}
209-
if err != nil {
210-
return nil, "", err
211-
}
212-
213-
if &proxy.Spec != nil {
214-
if proxy.Spec.HTTPSProxy != "" {
215-
proxyURL, err := url.Parse(proxy.Spec.HTTPSProxy)
216-
if err != nil {
217-
return nil, "", err
218-
}
219-
return proxyURL, proxy.Spec.TrustedCA.Name, nil
220-
}
221-
}
222-
return nil, "", nil
223-
}
224-
225-
func (optr *Operator) getTLSConfig(cmNameRef string) (*tls.Config, error) {
226-
cm, err := optr.cmConfigLister.Get(cmNameRef)
227-
228-
if err != nil {
229-
return nil, err
230-
}
231-
232-
certPool, _ := x509.SystemCertPool()
233-
if certPool == nil {
234-
certPool = x509.NewCertPool()
235-
}
236-
237-
if cm.Data["ca-bundle.crt"] != "" {
238-
if ok := certPool.AppendCertsFromPEM([]byte(cm.Data["ca-bundle.crt"])); !ok {
239-
return nil, fmt.Errorf("unable to add ca-bundle.crt certificates")
240-
}
241-
} else {
242-
return nil, nil
243-
}
244-
245-
config := &tls.Config{
246-
RootCAs: certPool,
247-
}
248-
249-
return config, nil
250-
}

pkg/cvo/cvo.go

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,6 @@ func New(
169169
proxyInformer configinformersv1.ProxyInformer,
170170
client clientset.Interface,
171171
kubeClient kubernetes.Interface,
172-
enableMetrics bool,
173172
exclude string,
174173
) *Operator {
175174
eventBroadcaster := record.NewBroadcaster()
@@ -214,11 +213,6 @@ func New(
214213
// make sure this is initialized after all the listers are initialized
215214
optr.upgradeableChecks = optr.defaultUpgradeableChecks()
216215

217-
if enableMetrics {
218-
if err := optr.registerMetrics(coInformer.Informer()); err != nil {
219-
panic(err)
220-
}
221-
}
222216
return optr
223217
}
224218

@@ -321,8 +315,7 @@ func loadConfigMapVerifierDataFromUpdate(update *payload.Update, clientBuilder v
321315
}
322316

323317
// Run runs the cluster version operator until stopCh is completed. Workers is ignored for now.
324-
func (optr *Operator) Run(ctx context.Context, workers int) {
325-
defer utilruntime.HandleCrash()
318+
func (optr *Operator) Run(ctx context.Context, workers int) error {
326319
defer optr.queue.ShutDown()
327320
stopCh := ctx.Done()
328321
workerStopCh := make(chan struct{})
@@ -331,8 +324,7 @@ func (optr *Operator) Run(ctx context.Context, workers int) {
331324
defer klog.Info("Shutting down ClusterVersionOperator")
332325

333326
if !cache.WaitForCacheSync(stopCh, optr.cacheSynced...) {
334-
klog.Info("Caches never synchronized")
335-
return
327+
return fmt.Errorf("caches never synchronized: %w", ctx.Err())
336328
}
337329

338330
// trigger the first cluster version reconcile always
@@ -361,6 +353,8 @@ func (optr *Operator) Run(ctx context.Context, workers int) {
361353
// stop the queue, then wait for the worker to exit
362354
optr.queue.ShutDown()
363355
<-workerStopCh
356+
357+
return nil
364358
}
365359

366360
func (optr *Operator) queueKey() string {

pkg/cvo/cvo_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ import (
2727
"k8s.io/apimachinery/pkg/util/diff"
2828
"k8s.io/apimachinery/pkg/watch"
2929
"k8s.io/client-go/discovery"
30-
"k8s.io/client-go/rest"
3130
kfake "k8s.io/client-go/kubernetes/fake"
31+
"k8s.io/client-go/rest"
3232
ktesting "k8s.io/client-go/testing"
3333
"k8s.io/client-go/util/workqueue"
3434
"k8s.io/klog"

pkg/cvo/egress.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package cvo
2+
3+
import (
4+
"crypto/tls"
5+
"crypto/x509"
6+
"fmt"
7+
"net/url"
8+
9+
"k8s.io/apimachinery/pkg/api/errors"
10+
)
11+
12+
// getHTTPSProxyURL returns a url.URL object for the configured
13+
// https proxy only. It can be nil if does not exist or there is an error.
14+
func (optr *Operator) getHTTPSProxyURL() (*url.URL, string, error) {
15+
proxy, err := optr.proxyLister.Get("cluster")
16+
17+
if errors.IsNotFound(err) {
18+
return nil, "", nil
19+
}
20+
if err != nil {
21+
return nil, "", err
22+
}
23+
24+
if &proxy.Spec != nil {
25+
if proxy.Spec.HTTPSProxy != "" {
26+
proxyURL, err := url.Parse(proxy.Spec.HTTPSProxy)
27+
if err != nil {
28+
return nil, "", err
29+
}
30+
return proxyURL, proxy.Spec.TrustedCA.Name, nil
31+
}
32+
}
33+
return nil, "", nil
34+
}
35+
36+
func (optr *Operator) getTLSConfig(cmNameRef string) (*tls.Config, error) {
37+
cm, err := optr.cmConfigLister.Get(cmNameRef)
38+
39+
if err != nil {
40+
return nil, err
41+
}
42+
43+
certPool, _ := x509.SystemCertPool()
44+
if certPool == nil {
45+
certPool = x509.NewCertPool()
46+
}
47+
48+
if cm.Data["ca-bundle.crt"] != "" {
49+
if ok := certPool.AppendCertsFromPEM([]byte(cm.Data["ca-bundle.crt"])); !ok {
50+
return nil, fmt.Errorf("unable to add ca-bundle.crt certificates")
51+
}
52+
} else {
53+
return nil, nil
54+
}
55+
56+
config := &tls.Config{
57+
RootCAs: certPool,
58+
}
59+
60+
return config, nil
61+
}

pkg/cvo/metrics.go

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,28 @@
11
package cvo
22

33
import (
4+
"context"
5+
"net"
6+
"net/http"
47
"time"
58

69
"github.com/prometheus/client_golang/prometheus"
10+
"github.com/prometheus/client_golang/prometheus/promhttp"
711
corev1 "k8s.io/api/core/v1"
812
apierrors "k8s.io/apimachinery/pkg/api/errors"
913
"k8s.io/apimachinery/pkg/labels"
1014
"k8s.io/apimachinery/pkg/util/sets"
1115
"k8s.io/client-go/tools/cache"
16+
"k8s.io/klog"
1217

1318
configv1 "github.com/openshift/api/config/v1"
1419
"github.com/openshift/cluster-version-operator/lib/resourcemerge"
1520
"github.com/openshift/cluster-version-operator/pkg/internal"
1621
)
1722

18-
func (optr *Operator) registerMetrics(coInformer cache.SharedInformer) error {
23+
// RegisterMetrics initializes metrics and registers them with the
24+
// Prometheus implementation.
25+
func (optr *Operator) RegisterMetrics(coInformer cache.SharedInformer) error {
1926
m := newOperatorMetrics(optr)
2027
coInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
2128
UpdateFunc: m.clusterOperatorChanged,
@@ -86,6 +93,67 @@ version for 'cluster', or empty for 'initial'.
8693
}
8794
}
8895

96+
// RunMetrics launches an server bound to listenAddress serving
97+
// Prometheus metrics at /metrics over HTTP. Continues serving until
98+
// runContext.Done() and then attempts a clean shutdown limited by
99+
// shutdownContext.Done(). Assumes runContext.Done() occurs before or
100+
// simultaneously with shutdownContext.Done().
101+
func RunMetrics(runContext context.Context, shutdownContext context.Context, listenAddress string) error {
102+
handler := http.NewServeMux()
103+
handler.Handle("/metrics", promhttp.Handler())
104+
server := &http.Server{
105+
Handler: handler,
106+
}
107+
108+
errorChannel := make(chan error, 1)
109+
errorChannelCount := 1
110+
go func() {
111+
tcpListener, err := net.Listen("tcp", listenAddress)
112+
if err != nil {
113+
errorChannel <- err
114+
return
115+
}
116+
117+
klog.Infof("Metrics port listening for HTTP on %v", listenAddress)
118+
119+
errorChannel <- server.Serve(tcpListener)
120+
}()
121+
122+
shutdown := false
123+
var loopError error
124+
for errorChannelCount > 0 {
125+
if shutdown {
126+
err := <-errorChannel
127+
errorChannelCount--
128+
if err != nil && err != http.ErrServerClosed {
129+
if loopError == nil {
130+
loopError = err
131+
} else if err != nil { // log the error we are discarding
132+
klog.Errorf("Failed to gracefully shut down metrics server: %s", err)
133+
}
134+
}
135+
} else {
136+
select {
137+
case <-runContext.Done(): // clean shutdown
138+
case err := <-errorChannel: // crashed before a shutdown was requested
139+
errorChannelCount--
140+
if err != nil && err != http.ErrServerClosed {
141+
loopError = err
142+
}
143+
}
144+
shutdown = true
145+
shutdownError := server.Shutdown(shutdownContext)
146+
if loopError == nil {
147+
loopError = shutdownError
148+
} else if shutdownError != nil { // log the error we are discarding
149+
klog.Errorf("Failed to gracefully shut down metrics server: %s", shutdownError)
150+
}
151+
}
152+
}
153+
154+
return loopError
155+
}
156+
89157
type conditionKey struct {
90158
Name string
91159
Type string

0 commit comments

Comments
 (0)