Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Store latest config hash and timestamp as metrics
  • Loading branch information
kakkoyun committed Aug 6, 2019
commit ac8b5d63c449fa2f5a3b0084bfbdbd1eb578ee5c
65 changes: 54 additions & 11 deletions pkg/receive/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package receive

import (
"context"
"crypto/md5"
"encoding/binary"
"encoding/json"
"io/ioutil"
"os"
Expand Down Expand Up @@ -33,6 +35,9 @@ type ConfigWatcher struct {
logger log.Logger
watcher *fsnotify.Watcher

hashGauge prometheus.Gauge
successGauge prometheus.Gauge
lastSuccessTimeGauge prometheus.Gauge
changesCounter prometheus.Counter
errorCounter prometheus.Counter
refreshCounter prometheus.Counter
Expand Down Expand Up @@ -62,6 +67,21 @@ func NewConfigWatcher(logger log.Logger, r prometheus.Registerer, path string, i
interval: time.Duration(interval),
logger: logger,
watcher: watcher,
hashGauge: prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "thanos_receive_config_hash",
Help: "Hash of the currently loaded hashring configuration file.",
}),
successGauge: prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "thanos_receive_config_last_reload_successful",
Help: "Whether the last hashring configuration file reload attempt was successful.",
}),
lastSuccessTimeGauge: prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "thanos_receive_config_last_reload_success_timestamp_seconds",
Help: "Timestamp of the last successful hashring configuration file reload.",
}),
changesCounter: prometheus.NewCounter(
prometheus.CounterOpts{
Name: "thanos_receive_hashrings_file_changes_total",
Expand Down Expand Up @@ -93,6 +113,9 @@ func NewConfigWatcher(logger log.Logger, r prometheus.Registerer, path string, i

if r != nil {
r.MustRegister(
c.hashGauge,
c.successGauge,
c.lastSuccessTimeGauge,
c.changesCounter,
c.errorCounter,
c.refreshCounter,
Expand Down Expand Up @@ -148,8 +171,13 @@ func (cw *ConfigWatcher) Run(ctx context.Context) {
}
}

// readFile reads the configured file and returns a configuration.
func (cw *ConfigWatcher) readFile() ([]HashringConfig, error) {
// C returns a chan that gets hashring configuration updates.
func (cw *ConfigWatcher) C() <-chan []HashringConfig {
return cw.ch
}

// readFile reads the configured file and returns content of configuration file.
func (cw *ConfigWatcher) readFile() ([]byte, error) {
fd, err := os.Open(cw.path)
if err != nil {
return nil, err
Expand All @@ -160,33 +188,43 @@ func (cw *ConfigWatcher) readFile() ([]HashringConfig, error) {
}
}()

content, err := ioutil.ReadAll(fd)
if err != nil {
return nil, err
}
return ioutil.ReadAll(fd)
}

// loadConfig loads raw configuration content and returns a configuration.
func (cw *ConfigWatcher) loadConfig(content []byte) ([]HashringConfig, error) {
var config []HashringConfig
err = json.Unmarshal(content, &config)
err := json.Unmarshal(content, &config)
return config, err
}

// refresh reads the configured file and sends the hashring configuration on the channel.
func (cw *ConfigWatcher) refresh(ctx context.Context) {
cw.refreshCounter.Inc()
config, err := cw.readFile()
cfgContent, err := cw.readFile()
if err != nil {
cw.errorCounter.Inc()
level.Error(cw.logger).Log("msg", "failed to read configuration file", "err", err, "path", cw.path)
return
}

config, err := cw.loadConfig(cfgContent)
if err != nil {
cw.errorCounter.Inc()
level.Error(cw.logger).Log("msg", "failed to load configuration file", "err", err, "path", cw.path)
return
}

// If there was no change to the configuration, return early.
if reflect.DeepEqual(cw.last, config) {
return
}
cw.changesCounter.Inc()
// Save the last known configuration.
cw.last = config
cw.successGauge.Set(1)
cw.lastSuccessTimeGauge.Set(float64(time.Now().Unix()))
cw.hashGauge.Set(hashAsMetricValue(cfgContent))

for _, c := range config {
cw.hashringNodesGauge.WithLabelValues(c.Hashring).Set(float64(len(c.Endpoints)))
Expand Down Expand Up @@ -228,7 +266,12 @@ func (cw *ConfigWatcher) stop() {
level.Debug(cw.logger).Log("msg", "hashring configuration watcher stopped")
}

// C returns a chan that gets hashring configuration updates.
func (cw *ConfigWatcher) C() <-chan []HashringConfig {
return cw.ch
// hashAsMetricValue generates metric value from hash of data.
func hashAsMetricValue(data []byte) float64 {
sum := md5.Sum(data)
// We only want 48 bits as a float64 only has a 53 bit mantissa.
smallSum := sum[0:6]
var bytes = make([]byte, 8)
copy(bytes, smallSum)
return float64(binary.LittleEndian.Uint64(bytes))
}