Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 34 additions & 19 deletions pkg/cvo/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ version for 'cluster', or empty for 'initial'.
}
}

type asyncResult struct {
name string
error error
}

// RunMetrics launches an server bound to listenAddress serving
// Prometheus metrics at /metrics over HTTP, and, if tlsConfig is
// non-nil, also over HTTPS. Continues serving until runContext.Done()
Expand All @@ -123,50 +128,60 @@ func RunMetrics(runContext context.Context, shutdownContext context.Context, lis
// HTTP, needed during upgrade, while still allowing TLS certs and end to end metrics protection.
mux := cmux.New(tcpListener)

errorChannel := make(chan error, 1)
errorChannelCount := 1
resultChannel := make(chan asyncResult, 1)
resultChannelCount := 1

go func() {
// match HTTP first
httpListener := mux.Match(cmux.HTTP1())
klog.Infof("Metrics port listening for HTTP on %v", listenAddress)
errorChannel <- server.Serve(httpListener)
err := server.Serve(httpListener)
resultChannel <- asyncResult{name: "HTTP server", error: err}
}()

if tlsConfig != nil {
errorChannelCount++
resultChannelCount++
go func() {
tlsListener := tls.NewListener(mux.Match(cmux.Any()), tlsConfig)
klog.Infof("Metrics port listening for HTTPS on %v", listenAddress)
errorChannel <- server.Serve(tlsListener)
err := server.Serve(tlsListener)
resultChannel <- asyncResult{name: "HTTPS server", error: err}
}()
}

errorChannelCount++
resultChannelCount++
go func() {
errorChannel <- mux.Serve()
err := mux.Serve()
resultChannel <- asyncResult{name: "TCP muxer", error: err}
}()

shutdown := false
var loopError error
for errorChannelCount > 0 {
for resultChannelCount > 0 {
if shutdown {
err := <-errorChannel
errorChannelCount--
if err != nil && err != http.ErrServerClosed && err != cmux.ErrListenerClosed {
if loopError == nil {
loopError = err
} else if err != nil { // log the error we are discarding
klog.Errorf("Failed to gracefully shut down metrics server: %s", err)
select {
case result := <-resultChannel:
resultChannelCount--
if result.error == nil {
klog.Infof("Collected metrics %s goroutine.", result.name)
} else {
klog.Errorf("Collected metrics %s goroutine: %v", result.name, result.error)
loopError = result.error
}
case <-shutdownContext.Done(): // out of time
klog.Errorf("Abandoning %d uncollected metrics goroutines", resultChannelCount)
return shutdownContext.Err()
}
} else {
select {
case <-runContext.Done(): // clean shutdown
case err := <-errorChannel: // crashed before a shutdown was requested
errorChannelCount--
if err != nil && err != http.ErrServerClosed && err != cmux.ErrListenerClosed {
loopError = err
case result := <-resultChannel: // crashed before a shutdown was requested
resultChannelCount--
if result.error == nil {
klog.Infof("Collected metrics %s goroutine.", result.name)
} else {
klog.Errorf("Collected metrics %s goroutine: %v", result.name, result.error)
loopError = result.error
}
}
shutdown = true
Expand Down