Skip to content
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ We use *breaking* word for marking changes that are not backward compatible (rel
### Added
- [#1852](https://github.com/thanos-io/thanos/pull/1852) Add support for `AWS_CONTAINER_CREDENTIALS_FULL_URI` by upgrading to minio-go v6.0.44
- [#1854](https://github.com/thanos-io/thanos/pull/1854) Update Rule UI to support alerts count displaying and filtering.
- [#1838](https://github.com/thanos-io/thanos/pull/1838) Ruler: Add TLS and authentication support for Alertmanager with the `--alertmanagers.config` and `--alertmanagers.config-file` CLI flags. See [documentation](docs/components/rule.md/#configuration) for further information.
- [#1838](https://github.com/thanos-io/thanos/pull/1838) Ruler: Add a new `--alertmanagers.sd-dns-interval` CLI option to specify the interval between DNS resolutions of Alertmanager hosts.

## [v0.9.0](https://github.com/thanos-io/thanos/releases/tag/v0.9.0) - 2019.12.03

Expand Down
192 changes: 84 additions & 108 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"math/rand"
"net"
"net/http"
"net/url"
"os"
Expand All @@ -13,7 +12,6 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -83,8 +81,10 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application) {

alertmgrs := cmd.Flag("alertmanagers.url", "Alertmanager replica URLs to push firing alerts. Ruler claims success if push to at least one alertmanager from discovered succeeds. The scheme should not be empty e.g `http` might be used. The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect Alertmanager IPs through respective DNS lookups. The port defaults to 9093 or the SRV record's value. The URL path is used as a prefix for the regular Alertmanager API path.").
Strings()

alertmgrsTimeout := cmd.Flag("alertmanagers.send-timeout", "Timeout for sending alerts to alertmanager").Default("10s").Duration()
alertmgrsTimeout := cmd.Flag("alertmanagers.send-timeout", "Timeout for sending alerts to Alertmanager").Default("10s").Duration()
alertmgrsConfig := extflag.RegisterPathOrContent(cmd, "alertmanagers.config", "YAML file that contains alerting configuration. See format details: https://thanos.io/components/rule.md/#configuration. If defined, it takes precedence over the '--alertmanagers.url' and '--alertmanagers.send-timeout' flags.", false)
alertmgrsDNSSDInterval := modelDuration(cmd.Flag("alertmanagers.sd-dns-interval", "Interval between DNS resolutions of Alertmanager hosts.").
Default("30s"))

alertQueryURL := cmd.Flag("alert.query-url", "The external Thanos Query URL that would be set in all alerts 'Source' field").String()

Expand Down Expand Up @@ -157,6 +157,8 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application) {
lset,
*alertmgrs,
*alertmgrsTimeout,
alertmgrsConfig,
time.Duration(*alertmgrsDNSSDInterval),
*grpcBindAddr,
time.Duration(*grpcGracePeriod),
*grpcCert,
Expand Down Expand Up @@ -194,6 +196,8 @@ func runRule(
lset labels.Labels,
alertmgrURLs []string,
alertmgrsTimeout time.Duration,
alertmgrsConfig *extflag.PathOrContent,
alertmgrsDNSSDInterval time.Duration,
grpcBindAddr string,
grpcGracePeriod time.Duration,
grpcCert string,
Expand Down Expand Up @@ -286,11 +290,56 @@ func runRule(
dns.ResolverType(dnsSDResolver),
)

// Build the Alertmanager clients.
alertmgrsConfigYAML, err := alertmgrsConfig.Content()
if err != nil {
return err
}
var (
alertingCfg alert.AlertingConfig
alertmgrs []*alert.Alertmanager
)
if len(alertmgrsConfigYAML) > 0 {
if len(alertmgrURLs) != 0 {
return errors.New("--alertmanagers.url and --alertmanagers.config* flags cannot be defined at the same time")
}
alertingCfg, err = alert.LoadAlertingConfig(alertmgrsConfigYAML)
if err != nil {
return err
}
} else {
// Build the Alertmanager configuration from the legacy flags.
for _, addr := range alertmgrURLs {
cfg, err := alert.BuildAlertmanagerConfig(logger, addr, alertmgrsTimeout)
if err != nil {
return err
}
alertingCfg.Alertmanagers = append(alertingCfg.Alertmanagers, cfg)
}
}

if len(alertingCfg.Alertmanagers) == 0 {
level.Warn(logger).Log("msg", "no alertmanager configured")
}

amProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_ruler_alertmanagers_", reg),
dns.ResolverType(dnsSDResolver),
)
for _, cfg := range alertingCfg.Alertmanagers {
// Each Alertmanager client has a different list of targets thus each needs its own DNS provider.
am, err := alert.NewAlertmanager(logger, cfg, amProvider.Clone())
if err != nil {
return err
}
alertmgrs = append(alertmgrs, am)
}

// Run rule evaluation and alert notifications.
var (
alertmgrs = newAlertmanagerSet(logger, alertmgrURLs, dns.ResolverType(dnsSDResolver))
alertQ = alert.NewQueue(logger, reg, 10000, 100, labelsTSDBToProm(lset), alertExcludeLabels)
ruleMgr = thanosrule.NewManager(dataDir)
alertQ = alert.NewQueue(logger, reg, 10000, 100, labelsTSDBToProm(lset), alertExcludeLabels)
ruleMgr = thanosrule.NewManager(dataDir)
)
{
notify := func(ctx context.Context, expr string, alerts ...*rules.Alert) {
Expand Down Expand Up @@ -351,9 +400,35 @@ func runRule(
})
}
}
// Discover and resolve Alertmanager addresses.
{
for i := range alertmgrs {
am := alertmgrs[i]
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
am.Discover(ctx)
return nil
}, func(error) {
cancel()
})

g.Add(func() error {
return runutil.Repeat(alertmgrsDNSSDInterval, ctx.Done(), func() error {
am.Resolve(ctx)
return nil
})
}, func(error) {
cancel()
})
}
}
// Run the alert sender.
{
// TODO(bwplotka): https://github.com/thanos-io/thanos/issues/660.
sdr := alert.NewSender(logger, reg, alertmgrs.get, nil, alertmgrsTimeout)
clients := make([]alert.AlertmanagerClient, len(alertmgrs))
for i := range alertmgrs {
clients[i] = alertmgrs[i]
}
sdr := alert.NewSender(logger, reg, clients)
ctx, cancel := context.WithCancel(context.Background())

g.Add(func() error {
Expand All @@ -370,21 +445,6 @@ func runRule(
cancel()
})
}
{
ctx, cancel := context.WithCancel(context.Background())

g.Add(func() error {
return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
if err := alertmgrs.update(ctx); err != nil {
level.Error(logger).Log("msg", "refreshing alertmanagers failed", "err", err)
alertMngrAddrResolutionErrors.Inc()
}
return nil
})
}, func(error) {
cancel()
})
}
// Run File Service Discovery and update the query addresses when the files are modified.
if fileSD != nil {
var fileSDUpdates chan []*targetgroup.Group
Expand Down Expand Up @@ -615,90 +675,6 @@ func runRule(
return nil
}

type alertmanagerSet struct {
resolver dns.Resolver
addrs []string
mtx sync.Mutex
current []*url.URL
}

func newAlertmanagerSet(logger log.Logger, addrs []string, dnsSDResolver dns.ResolverType) *alertmanagerSet {
return &alertmanagerSet{
resolver: dns.NewResolver(dnsSDResolver.ToResolver(logger)),
addrs: addrs,
}
}

func (s *alertmanagerSet) get() []*url.URL {
s.mtx.Lock()
defer s.mtx.Unlock()
return s.current
}

const defaultAlertmanagerPort = 9093

func parseAlertmanagerAddress(addr string) (qType dns.QType, parsedUrl *url.URL, err error) {
qType = ""
parsedUrl, err = url.Parse(addr)
if err != nil {
return qType, nil, err
}
// The Scheme might contain DNS resolver type separated by + so we split it a part.
if schemeParts := strings.Split(parsedUrl.Scheme, "+"); len(schemeParts) > 1 {
parsedUrl.Scheme = schemeParts[len(schemeParts)-1]
qType = dns.QType(strings.Join(schemeParts[:len(schemeParts)-1], "+"))
}
return qType, parsedUrl, err
}

func (s *alertmanagerSet) update(ctx context.Context) error {
var result []*url.URL
for _, addr := range s.addrs {
var (
qtype dns.QType
resolvedDomain []string
)

qtype, u, err := parseAlertmanagerAddress(addr)
if err != nil {
return errors.Wrapf(err, "parse URL %q", addr)
}

// Get only the host and resolve it if needed.
host := u.Host
if qtype != "" {
if qtype == dns.A {
_, _, err = net.SplitHostPort(host)
if err != nil {
// The host could be missing a port. Append the defaultAlertmanagerPort.
host = host + ":" + strconv.Itoa(defaultAlertmanagerPort)
}
}
resolvedDomain, err = s.resolver.Resolve(ctx, host, qtype)
if err != nil {
return errors.Wrap(err, "alertmanager resolve")
}
} else {
resolvedDomain = []string{host}
}

for _, resolved := range resolvedDomain {
result = append(result, &url.URL{
Scheme: u.Scheme,
Host: resolved,
Path: u.Path,
User: u.User,
})
}
}

s.mtx.Lock()
s.current = result
s.mtx.Unlock()

return nil
}

func parseFlagLabels(s []string) (labels.Labels, error) {
var lset labels.Labels
for _, l := range s {
Expand Down
98 changes: 0 additions & 98 deletions cmd/thanos/rule_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
package main

import (
"context"
"net/url"
"testing"

"github.com/pkg/errors"
"github.com/thanos-io/thanos/pkg/discovery/dns"
"github.com/thanos-io/thanos/pkg/testutil"
)

Expand Down Expand Up @@ -49,97 +45,3 @@ func Test_parseFlagLabels(t *testing.T) {
testutil.Equals(t, err != nil, td.expectErr)
}
}

func TestRule_AlertmanagerResolveWithoutPort(t *testing.T) {
mockResolver := mockResolver{
resultIPs: map[string][]string{
"alertmanager.com:9093": {"1.1.1.1:9300"},
},
}
am := alertmanagerSet{resolver: mockResolver, addrs: []string{"dns+http://alertmanager.com"}}

ctx := context.TODO()
err := am.update(ctx)
testutil.Ok(t, err)

expected := []*url.URL{
{
Scheme: "http",
Host: "1.1.1.1:9300",
},
}
gotURLs := am.get()
testutil.Equals(t, expected, gotURLs)
}

func TestRule_AlertmanagerResolveWithPort(t *testing.T) {
mockResolver := mockResolver{
resultIPs: map[string][]string{
"alertmanager.com:19093": {"1.1.1.1:9300"},
},
}
am := alertmanagerSet{resolver: mockResolver, addrs: []string{"dns+http://alertmanager.com:19093"}}

ctx := context.TODO()
err := am.update(ctx)
testutil.Ok(t, err)

expected := []*url.URL{
{
Scheme: "http",
Host: "1.1.1.1:9300",
},
}
gotURLs := am.get()
testutil.Equals(t, expected, gotURLs)
}

type mockResolver struct {
resultIPs map[string][]string
err error
}

func (m mockResolver) Resolve(ctx context.Context, name string, qtype dns.QType) ([]string, error) {
if m.err != nil {
return nil, m.err
}
if res, ok := m.resultIPs[name]; ok {
return res, nil
}
return nil, errors.Errorf("mockResolver not found response for name: %s", name)
}

func Test_ParseAlertmanagerAddress(t *testing.T) {
var tData = []struct {
address string
expectQueryType dns.QType
expectUrl *url.URL
expectError error
}{
{
address: "http://user:pass+word@foo.bar:3289",
expectQueryType: dns.QType(""),
expectUrl: &url.URL{Host: "foo.bar:3289", Scheme: "http", User: url.UserPassword("user", "pass+word")},
expectError: nil,
},
{
address: "dnssrvnoa+http://user:pass+word@foo.bar:3289",
expectQueryType: dns.QType("dnssrvnoa"),
expectUrl: &url.URL{Host: "foo.bar:3289", Scheme: "http", User: url.UserPassword("user", "pass+word")},
expectError: nil,
},
{
address: "foo+bar+http://foo.bar:3289",
expectQueryType: dns.QType("foo+bar"),
expectUrl: &url.URL{Host: "foo.bar:3289", Scheme: "http"},
expectError: nil,
},
}

for _, d := range tData {
q, u, e := parseAlertmanagerAddress(d.address)
testutil.Equals(t, d.expectError, e)
testutil.Equals(t, d.expectUrl, u)
testutil.Equals(t, d.expectQueryType, q)
}
}
Loading