Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,15 @@ require (
github.com/imdario/mergo v0.3.9 // indirect
github.com/kubernetes-csi/csi-lib-utils v0.7.0
github.com/kubernetes-csi/csi-test v2.0.0+incompatible
github.com/prometheus/client_golang v1.0.0
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.4.1
google.golang.org/grpc v1.28.0
k8s.io/api v0.18.0
k8s.io/apimachinery v0.18.0
k8s.io/client-go v0.18.0
k8s.io/code-generator v0.18.0
k8s.io/component-base v0.18.0
k8s.io/klog v1.0.0
k8s.io/kubernetes v1.18.0
)
Expand Down
177 changes: 177 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
Copyright 2020 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package metrics

import (
"fmt"
"net"
"net/http"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus/promhttp"
"k8s.io/apimachinery/pkg/types"
k8smetrics "k8s.io/component-base/metrics"
"k8s.io/klog"
)

const (
opStatusUnknown = "Unknown"
labelDriverName = "driver_name"
labelOperationName = "operation_name"
labelOperationStatus = "operation_status"
subSystem = "snapshot_controller"
metricName = "operation_total_seconds"
metricHelpMsg = "Total number of seconds spent by the controller on an operation from end to end"
)

// OperationStatus is the interface type for representing an operation's execution
// status, with the nil value representing an "Unknown" status of the operation.
type OperationStatus interface {
String() string
}

var metricBuckets = []float64{0.1, 0.25, 0.5, 1, 2.5, 5, 10, 15, 30, 60, 120, 300, 600}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we think 10 mins is enough? Are we capturing upload time too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. From my observation using gce PD snapshot as example, a 4G PV roughly takes ~5-10s to do a snapshot (including uploading), assuming linear interpolation, 600s should be able to upload any snapshot for volumes of size between 240G to 480G. do you think its sufficient? GCE PD can have up to 64TB disks, I am a little concerned to have a such big bucket which seems to be a bit overkill for most of the cases. and the exact latency time is anyway captured by the sum metric.
Snapshot latency, depending on storage vendor implementation, varies a lot. For storage vendors with inplace implementation, the latency should be really small like within 1 second, where as for those do uploading, it takes much longer and depends on a volume's size. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@msau42 @yuxiangqian I believe we can't control this timeout from our end, considering the fact that, all the CSI drivers can have their own timeout configured with an arg to the sidecar, Isn't it? Either we have to use that timeout value or stick to a default value in general like 10 mins. Am I missing something here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't about a timeout value, this is about the maximum bucket range that we want to have for our histograms. This is probably fine for now, we can add more buckets if needed.


type MetricsManager interface {
// StartMetricsEndpoint starts the metrics endpoint at the specified addr/pattern for
// metrics managed by this MetricsManager. It spawns a goroutine to listen to
// and serve HTTP requests received on addr/pattern.
// If the "pattern" is empty (i.e., ""), no endpoint will be started.
// An error will be returned if there is any.
StartMetricsEndpoint(pattern, addr string, logger promhttp.Logger, wg *sync.WaitGroup) (*http.Server, error)

// OperationStart takes in an operation and caches its start time.
// if the operation already exists, it's an no-op.
OperationStart(op Operation)

// DropOperation removes an operation from cache.
// if the operation does not exist, it's an no-op.
DropOperation(op Operation)

// RecordMetrics records a metric point. Note that it will be an no-op if an
// operation has NOT been marked "Started" previously via invoking "OperationStart".
// Invoking of RecordMetrics effectively removes the cached entry.
// op - the operation which the metric is associated with.
// status - the operation status, if not specified, i.e., status == nil, an
// "Unknown" status of the passed-in operation is assumed.
RecordMetrics(op Operation, status OperationStatus)
}

// Operation is a structure which holds information to identify a snapshot
// related operation
type Operation struct {
// the name of the operation, for example: "CreateSnapshot", "DeleteSnapshot"
Name string
// the name of the driver which executes the operation
Driver string
// the resource UID to which the operation has been executed against
ResourceID types.UID
}

type operationMetricsManager struct {
// cache is a concurrent-safe map which stores start timestamps for all
// ongoing operations.
// key is an Operation
// value is the timestamp of the start time of the operation
cache sync.Map

// registry is a wrapper around Prometheus Registry
registry k8smetrics.KubeRegistry

// opLatencyMetrics is a Histogram metrics
opLatencyMetrics *k8smetrics.HistogramVec
}

func NewMetricsManager() MetricsManager {
mgr := &operationMetricsManager{
cache: sync.Map{},
}
mgr.init()
return mgr
}

func (opMgr *operationMetricsManager) OperationStart(op Operation) {
opMgr.cache.LoadOrStore(op, time.Now())
}

func (opMgr *operationMetricsManager) DropOperation(op Operation) {
opMgr.cache.Delete(op)
}

func (opMgr *operationMetricsManager) RecordMetrics(op Operation, status OperationStatus) {
obj, exists := opMgr.cache.Load(op)
if !exists {
// the operation has not been cached, return directly
return
}
ts, ok := obj.(time.Time)
if !ok {
// the cached item is not a time.Time, should NEVER happen, clean and return
klog.Errorf("Invalid cache entry for key %v", op)
opMgr.cache.Delete(op)
return
}
strStatus := opStatusUnknown
if status != nil {
strStatus = status.String()
}
duration := time.Since(ts).Seconds()
opMgr.opLatencyMetrics.WithLabelValues(op.Driver, op.Name, strStatus).Observe(duration)
opMgr.cache.Delete(op)
}

func (opMgr *operationMetricsManager) init() {
opMgr.registry = k8smetrics.NewKubeRegistry()
opMgr.opLatencyMetrics = k8smetrics.NewHistogramVec(
&k8smetrics.HistogramOpts{
Subsystem: subSystem,
Name: metricName,
Help: metricHelpMsg,
Buckets: metricBuckets,
},
[]string{labelDriverName, labelOperationName, labelOperationStatus},
)
opMgr.registry.MustRegister(opMgr.opLatencyMetrics)
}

func (opMgr *operationMetricsManager) StartMetricsEndpoint(pattern, addr string, logger promhttp.Logger, wg *sync.WaitGroup) (*http.Server, error) {
if addr == "" {
return nil, fmt.Errorf("metrics endpoint will not be started as endpoint address is not specified")
}
// start listening
l, err := net.Listen("tcp", addr)
if err != nil {
return nil, fmt.Errorf("failed to listen on address[%s], error[%v]", addr, err)
}
mux := http.NewServeMux()
mux.Handle(pattern, k8smetrics.HandlerFor(
opMgr.registry,
k8smetrics.HandlerOpts{
ErrorLog: logger,
ErrorHandling: k8smetrics.ContinueOnError,
}))
srv := &http.Server{Addr: l.Addr().String(), Handler: mux}
// start serving the endpoint
go func() {
defer wg.Done()
if err := srv.Serve(l); err != http.ErrServerClosed {
klog.Fatalf("failed to start endpoint at:%s/%s, error: %v", addr, pattern, err)
}
}()
return srv, nil
}
Loading