Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
add metrics registration test
  • Loading branch information
sanchezl committed Nov 20, 2019
commit 1b14899a7e9d0a532e1669d2994b1a87b66cb5fb
35 changes: 35 additions & 0 deletions test/e2e/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package e2e
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this go to library-go as an e2e helper and just a oneliner here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intend to move to library-go once finalized.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've opened openshift/library-go#596 to start.


import (
"testing"

"github.com/stretchr/testify/require"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

"github.com/openshift/cluster-kube-apiserver-operator/pkg/operator/operatorclient"
test "github.com/openshift/cluster-kube-apiserver-operator/test/library"
)

func TestMetricsRegistration(t *testing.T) {
kubeConfig, err := test.NewClientConfigForTest()
require.NoError(t, err)
kubeClient, err := kubernetes.NewForConfig(kubeConfig)
require.NoError(t, err)

// get list of operator pods
pods, err := kubeClient.CoreV1().Pods(operatorclient.OperatorNamespace).List(metav1.ListOptions{})
require.NoError(t, err)

// we just care about the one we expect
require.GreaterOrEqual(t, len(pods.Items), 1)
pod := pods.Items[0]

metrics, err := test.GetMetricsForPod(t, kubeConfig, &pod, 8443)
require.NoError(t, err)
t.Logf("Retrieved %d metrics.", len(metrics))
if len(metrics) == 0 {
t.Fatal("No metrics retrieved.")
}
}
13 changes: 13 additions & 0 deletions test/library/network.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package library

import "net"

// FreeLocalTCPPort returns a local TCP port which is very likely to be unused.
func FreeLocalTCPPort() (int, error) {
l, err := net.Listen("tcp", ":0")
if err != nil {
return 0, err
}
defer func() { _ = l.Close() }()
return l.Addr().(*net.TCPAddr).Port, nil
}
73 changes: 73 additions & 0 deletions test/library/pod.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package library

import (
"bufio"
"bytes"
"fmt"
"io"
"strings"
"testing"

v1 "k8s.io/api/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/clientcmd/api"
)

// GetMetricsForPod returns the metrics found at /metrics for a given pod.
func GetMetricsForPod(t *testing.T, kubeConfig *rest.Config, pod *v1.Pod, metricPort int) (map[string]string, error) {
data := bytes.NewBuffer(nil)

// function to get metrics at localhost:localPort/metrics
getMetricsFunc := func(localPort int) error {
// create config that uses the local port
config, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
clientcmd.NewDefaultClientConfigLoadingRules(),
&clientcmd.ConfigOverrides{
ClusterInfo: api.Cluster{
InsecureSkipTLSVerify: true,
Server: fmt.Sprintf("https://localhost:%d", localPort),
},
},
).ClientConfig()
if err != nil {
return err
}
config = SetRESTConfigDefaults(config)
restClient, err := rest.RESTClientFor(config)
if err != nil {
return err
}
t.Log("Retrieving metrics...")
stream, err := restClient.Get().RequestURI("/metrics").Stream()
if err != nil {
return err
}
defer stream.Close()
_, err = io.Copy(data, stream)
return err
}

// we need to port-forward to the pod metric endpoint so that we can get metrics from from outside the cluster
err := ForwardPodPortAndExecute(t, kubeConfig, pod, metricPort, getMetricsFunc)
if err != nil {
return nil, err
}

// parse raw metrics output into a map
metrics := map[string]string{}
scanner := bufio.NewScanner(data)
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, "#") {
continue
}
entry := strings.SplitN(line, " ", 2)
metrics[entry[0]] = entry[1]
}
err = scanner.Err()
if err != nil {
return nil, err
}
return metrics, nil
}
87 changes: 87 additions & 0 deletions test/library/port_forwarder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package library

import (
"bytes"
"fmt"
"io/ioutil"
"net/http"
"testing"

"github.com/stretchr/testify/require"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
)

// ForwardPodPortAndExecute forwards a local port to a pod port and executes the supplied func
func ForwardPodPortAndExecute(t *testing.T, kubeConfig *rest.Config, pod *corev1.Pod, podPort int, consumer func(localPort int) error) error {
// pod must be running
if pod.Status.Phase != corev1.PodRunning {
return fmt.Errorf("pod must be running")
}

//setup port forwarding
transport, upgrader, err := spdy.RoundTripperFor(kubeConfig)
if err != nil {
return err
}
restClient, err := rest.RESTClientFor(SetRESTConfigDefaults(kubeConfig))
require.NoError(t, err)
request := restClient.Post().Resource("pods").Namespace(pod.Namespace).Name(pod.Name).SubResource("portforward")
localPort, err := FreeLocalTCPPort()
if err != nil {
return err
}
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, http.MethodPost, request.URL())
ports := []string{fmt.Sprintf("%d:%d", localPort, podPort)}
stop := make(chan struct{}, 1)
ready := make(chan struct{}, 1)
portForwarder, err := portforward.New(dialer, ports, stop, ready, bytes.NewBuffer(nil), ioutil.Discard)
if err != nil {
return err
}

// error from consumer function
var ferr error

// run consumer function
go func() {
// stop port forwarding when done
if stop != nil {
defer close(stop)
}
t.Log("Waiting for port forwarder to be ready...")
<-ready
t.Log("Port forwarder is ready.")
t.Log("Executing function...")
ferr = consumer(localPort)
}()

t.Log("Starting port forwarder...")
err = portForwarder.ForwardPorts()
if err != nil {
return err
}

<-stop
t.Log("Port forwarder has stopped.")
return ferr
}

func SetRESTConfigDefaults(config *rest.Config) *rest.Config {
if config.GroupVersion == nil {
config.GroupVersion = &schema.GroupVersion{Group: "", Version: "v1"}
}
if config.NegotiatedSerializer == nil {
config.NegotiatedSerializer = scheme.Codecs
}
if len(config.UserAgent) == 0 {
config.UserAgent = rest.DefaultKubernetesUserAgent()
}
config.APIPath = "/api"
return config
}