Skip to content

Commit be5ca46

Browse files
committed
add helper functions for running prometheus query
1 parent e708126 commit be5ca46

File tree

1 file changed

+254
-0
lines changed

1 file changed

+254
-0
lines changed

pkg/metrics/query.go

Lines changed: 254 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
package metrics
2+
3+
import (
4+
"crypto/tls"
5+
"crypto/x509"
6+
"encoding/json"
7+
"fmt"
8+
"io/ioutil"
9+
"net/http"
10+
"os"
11+
"strings"
12+
"time"
13+
14+
corev1 "k8s.io/api/core/v1"
15+
kerrs "k8s.io/apimachinery/pkg/api/errors"
16+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
17+
"k8s.io/apimachinery/pkg/util/wait"
18+
"k8s.io/client-go/kubernetes"
19+
20+
routeclient "github.com/openshift/client-go/route/clientset/versioned"
21+
"github.com/prometheus/common/model"
22+
)
23+
24+
const (
25+
serviceCAinjectionDataKey = "service-ca.crt"
26+
serviceCAinjectCABundleAnnotationName = "service.beta.openshift.io/inject-cabundle"
27+
)
28+
29+
func LocatePrometheus(client *kubernetes.Clientset, rc *routeclient.Clientset) (string, string, error) {
30+
_, err := client.CoreV1().Services("openshift-monitoring").Get("prometheus-k8s", metav1.GetOptions{})
31+
if err != nil {
32+
return "", "", err
33+
}
34+
35+
route, err := rc.RouteV1().Routes("openshift-monitoring").Get("prometheus-k8s", metav1.GetOptions{})
36+
if err != nil {
37+
return "", "", err
38+
}
39+
host := route.Spec.Host
40+
var bearerToken string
41+
secrets, err := client.CoreV1().Secrets("openshift-monitoring").List(metav1.ListOptions{})
42+
if err != nil {
43+
return "", "", fmt.Errorf("could not list secrets in openshift-monitoring namespace")
44+
}
45+
for _, secret := range secrets.Items {
46+
if secret.Type != corev1.SecretTypeServiceAccountToken {
47+
continue
48+
}
49+
if !strings.HasPrefix(secret.Name, "prometheus-") {
50+
continue
51+
}
52+
bearerToken = string(secret.Data[corev1.ServiceAccountTokenKey])
53+
break
54+
}
55+
if len(bearerToken) == 0 {
56+
return "", "", fmt.Errorf("error getting bearer token from prometheus service account")
57+
}
58+
return bearerToken, host, nil
59+
}
60+
61+
type prometheusResponse struct {
62+
Status string `json:"status"`
63+
Data prometheusResponseData `json:"data"`
64+
}
65+
66+
type prometheusResponseData struct {
67+
ResultType string `json:"resultType"`
68+
Result model.Vector `json:"result"`
69+
}
70+
71+
// RunPrometheusQuery() checks that query returns non-empty result, errors if query returns empty, and returns metrics
72+
// example: query := `ALERTS{alertstate="pending",alertname="PodDisruptionBudgetAtLimit",severity="warning"} == 1`
73+
func RunQuery(caBundle []byte, tlsCert tls.Certificate, host, query, bearerToken string) (model.Vector, error) {
74+
var metrics model.Vector
75+
err := wait.PollImmediate(time.Second*3, time.Second*300, func() (bool, error) {
76+
certPool := x509.NewCertPool()
77+
if ok := certPool.AppendCertsFromPEM(caBundle); !ok {
78+
return false, fmt.Errorf("error appending caBundle to pool")
79+
}
80+
81+
tlsConfig := &tls.Config{
82+
InsecureSkipVerify: true,
83+
RootCAs: certPool,
84+
Certificates: []tls.Certificate{tlsCert},
85+
}
86+
tlsConfig.BuildNameToCertificate()
87+
tr := &http.Transport{TLSClientConfig: tlsConfig}
88+
prometheusClient := &http.Client{Transport: tr}
89+
req, err := http.NewRequest("GET", "https://"+host+"/api/v1/query", nil)
90+
if err != nil {
91+
return false, err
92+
}
93+
94+
q := req.URL.Query()
95+
q.Add("query", query)
96+
req.URL.RawQuery = q.Encode()
97+
98+
req.Header.Add("Authorization", "Bearer "+bearerToken)
99+
100+
resp, err := prometheusClient.Do(req)
101+
if err != nil {
102+
return false, fmt.Errorf("prometheus query failed: %v\n%v", err, resp)
103+
}
104+
defer resp.Body.Close()
105+
if resp.StatusCode != http.StatusOK {
106+
return false, fmt.Errorf("Non-200 response from prometheus: %v", resp)
107+
}
108+
109+
body, err := ioutil.ReadAll(resp.Body)
110+
result := prometheusResponse{}
111+
json.Unmarshal(body, &result)
112+
metrics = result.Data.Result
113+
if len(metrics) == 0 {
114+
return false, fmt.Errorf("prometheus returned unexpected results")
115+
}
116+
return true, nil
117+
})
118+
if err != nil {
119+
return nil, fmt.Errorf("prometheus query error: %v", err)
120+
}
121+
return metrics, nil
122+
}
123+
124+
func RetrieveTLSCert(client *kubernetes.Clientset, secret, svc, ns string) (tls.Certificate, error) {
125+
err := createServingCertAnnotatedService(client, secret, svc, ns)
126+
if err != nil {
127+
return tls.Certificate{}, err
128+
}
129+
err = pollForServiceServingSecret(client, secret, ns)
130+
if err != nil {
131+
return tls.Certificate{}, err
132+
}
133+
cert, err := getServiceServingCertSecretData(client, secret, ns)
134+
if err != nil {
135+
return tls.Certificate{}, err
136+
}
137+
return cert, nil
138+
}
139+
140+
func RetrieveServiceCABundle(client *kubernetes.Clientset, configMapName, nsName string) ([]byte, error) {
141+
// Prompt the injection of the ca bundle into a configmap
142+
err := createAnnotatedCABundleInjectionConfigMap(client, configMapName, nsName)
143+
if err != nil {
144+
return nil, fmt.Errorf("error creating annotated configmap: %v", err)
145+
}
146+
147+
// Retrieve the ca bundle
148+
expectedDataSize := 1
149+
var ca []byte
150+
err = wait.PollImmediate(time.Second*1, time.Second*30, func() (bool, error) {
151+
configMap, err := client.CoreV1().ConfigMaps(nsName).Get(configMapName, metav1.GetOptions{})
152+
if err != nil {
153+
return false, err
154+
}
155+
if len(configMap.Data) != expectedDataSize {
156+
return false, fmt.Errorf("expected data size %d, got %d", expectedDataSize, len(configMap.Data))
157+
}
158+
_, ok := configMap.Data[serviceCAinjectionDataKey]
159+
if !ok {
160+
return false, fmt.Errorf("key %q is missing", serviceCAinjectionDataKey)
161+
}
162+
ca = []byte(configMap.Data[serviceCAinjectionDataKey])
163+
return true, nil
164+
})
165+
if err != nil {
166+
return nil, err
167+
}
168+
return ca, nil
169+
}
170+
171+
func createAnnotatedCABundleInjectionConfigMap(client *kubernetes.Clientset, configMapName, nsName string) error {
172+
_, err := client.CoreV1().ConfigMaps(nsName).Create(&corev1.ConfigMap{
173+
TypeMeta: metav1.TypeMeta{},
174+
ObjectMeta: metav1.ObjectMeta{
175+
Name: configMapName,
176+
Annotations: map[string]string{
177+
serviceCAinjectCABundleAnnotationName: "true",
178+
},
179+
},
180+
})
181+
return err
182+
}
183+
184+
func createServingCertAnnotatedService(client *kubernetes.Clientset, secretName, serviceName, namespace string) error {
185+
_, err := client.CoreV1().Services(namespace).Create(&corev1.Service{
186+
TypeMeta: metav1.TypeMeta{},
187+
ObjectMeta: metav1.ObjectMeta{
188+
Name: serviceName,
189+
Annotations: map[string]string{
190+
"service.beta.openshift.io/serving-cert-secret-name": secretName,
191+
},
192+
},
193+
Spec: corev1.ServiceSpec{
194+
Ports: []corev1.ServicePort{
195+
{
196+
Name: "tests",
197+
Port: 8443,
198+
},
199+
},
200+
},
201+
})
202+
return err
203+
}
204+
205+
func pollForServiceServingSecret(client *kubernetes.Clientset, secretName, namespace string) error {
206+
return wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) {
207+
_, err := client.CoreV1().Secrets(namespace).Get(secretName, metav1.GetOptions{})
208+
if err != nil && kerrs.IsNotFound(err) {
209+
return false, nil
210+
}
211+
if err != nil {
212+
return false, err
213+
}
214+
return true, nil
215+
})
216+
}
217+
218+
func getServiceServingCertSecretData(client *kubernetes.Clientset, secretName, namespace string) (tls.Certificate, error) {
219+
sss, err := client.CoreV1().Secrets(namespace).Get(secretName, metav1.GetOptions{})
220+
if err != nil {
221+
return tls.Certificate{}, err
222+
}
223+
keyFile, err := ioutil.TempFile(os.TempDir(), "test-keyfile")
224+
if err != nil {
225+
return tls.Certificate{}, err
226+
}
227+
certFile, err := ioutil.TempFile(os.TempDir(), "test-certfile")
228+
if err != nil {
229+
return tls.Certificate{}, err
230+
}
231+
232+
defer os.Remove(keyFile.Name())
233+
defer os.Remove(certFile.Name())
234+
235+
if _, err = keyFile.Write(sss.Data[corev1.TLSPrivateKeyKey]); err != nil {
236+
return tls.Certificate{}, err
237+
}
238+
if _, err = certFile.Write(sss.Data[corev1.TLSCertKey]); err != nil {
239+
return tls.Certificate{}, err
240+
}
241+
242+
if err := keyFile.Close(); err != nil {
243+
return tls.Certificate{}, err
244+
}
245+
if err := certFile.Close(); err != nil {
246+
return tls.Certificate{}, err
247+
}
248+
249+
cert, err := tls.LoadX509KeyPair(certFile.Name(), keyFile.Name())
250+
if err != nil {
251+
return tls.Certificate{}, err
252+
}
253+
return cert, nil
254+
}

0 commit comments

Comments
 (0)