@@ -16,7 +16,9 @@ import (
1616
1717 "github.com/go-kit/kit/log"
1818 "github.com/go-kit/kit/log/level"
19+ "github.com/go-openapi/strfmt"
1920 "github.com/pkg/errors"
21+ "github.com/prometheus/alertmanager/api/v2/models"
2022 "github.com/prometheus/client_golang/prometheus"
2123 "github.com/prometheus/prometheus/pkg/labels"
2224
@@ -26,7 +28,6 @@ import (
2628
2729const (
2830 defaultAlertmanagerPort = 9093
29- alertPushEndpoint = "/api/v1/alerts"
3031 contentTypeJSON = "application/json"
3132)
3233
@@ -255,6 +256,7 @@ func (q *Queue) Push(alerts []*Alert) {
255256type Sender struct {
256257 logger log.Logger
257258 alertmanagers []* Alertmanager
259+ versions []APIVersion
258260
259261 sent * prometheus.CounterVec
260262 errs * prometheus.CounterVec
@@ -272,9 +274,20 @@ func NewSender(
272274 if logger == nil {
273275 logger = log .NewNopLogger ()
274276 }
277+ var (
278+ versions []APIVersion
279+ versionPresent map [APIVersion ]struct {}
280+ )
281+ for _ , am := range alertmanagers {
282+ if _ , found := versionPresent [am .version ]; found {
283+ continue
284+ }
285+ versions = append (versions , am .version )
286+ }
275287 s := & Sender {
276288 logger : logger ,
277289 alertmanagers : alertmanagers ,
290+ versions : versions ,
278291
279292 sent : prometheus .NewCounterVec (prometheus.CounterOpts {
280293 Name : "thanos_alert_sender_alerts_sent_total" ,
@@ -302,6 +315,15 @@ func NewSender(
302315 return s
303316}
304317
318+ func toAPILabels (labels labels.Labels ) models.LabelSet {
319+ apiLabels := make (models.LabelSet , len (labels ))
320+ for _ , label := range labels {
321+ apiLabels [label .Name ] = label .Value
322+ }
323+
324+ return apiLabels
325+ }
326+
305327// Send an alert batch to all given Alertmanager clients.
306328// TODO(bwplotka): https://github.com/thanos-io/thanos/issues/660.
307329func (s * Sender ) Send (ctx context.Context , alerts []* Alert ) {
@@ -310,10 +332,38 @@ func (s *Sender) Send(ctx context.Context, alerts []*Alert) {
310332 if len (alerts ) == 0 {
311333 return
312334 }
313- b , err := json .Marshal (alerts )
314- if err != nil {
315- level .Warn (s .logger ).Log ("msg" , "sending alerts failed" , "err" , err )
316- return
335+
336+ payload := make (map [APIVersion ][]byte )
337+ for _ , version := range s .versions {
338+ var (
339+ b []byte
340+ err error
341+ )
342+ switch version {
343+ case APIv1 :
344+ if b , err = json .Marshal (alerts ); err != nil {
345+ level .Warn (s .logger ).Log ("msg" , "encoding alerts for v1 API failed" , "err" , err )
346+ return
347+ }
348+ case APIv2 :
349+ apiAlerts := make (models.PostableAlerts , 0 , len (alerts ))
350+ for _ , a := range alerts {
351+ apiAlerts = append (apiAlerts , & models.PostableAlert {
352+ Annotations : toAPILabels (a .Annotations ),
353+ EndsAt : strfmt .DateTime (a .EndsAt ),
354+ StartsAt : strfmt .DateTime (a .StartsAt ),
355+ Alert : models.Alert {
356+ GeneratorURL : strfmt .URI (a .GeneratorURL ),
357+ Labels : toAPILabels (a .Labels ),
358+ },
359+ })
360+ }
361+ if b , err = json .Marshal (apiAlerts ); err != nil {
362+ level .Warn (s .logger ).Log ("msg" , "encoding alerts for v2 API failed" , "err" , err )
363+ return
364+ }
365+ }
366+ payload [version ] = b
317367 }
318368
319369 var (
@@ -323,18 +373,19 @@ func (s *Sender) Send(ctx context.Context, alerts []*Alert) {
323373 for _ , am := range s .alertmanagers {
324374 for _ , u := range am .dispatcher .Endpoints () {
325375 wg .Add (1 )
326- go func (am * Alertmanager , u * url.URL ) {
376+ go func (am * Alertmanager , u url.URL ) {
327377 defer wg .Done ()
328378
329379 level .Debug (s .logger ).Log ("msg" , "sending alerts" , "alertmanager" , u .Host , "numAlerts" , len (alerts ))
330380 start := time .Now ()
381+ u .Path = path .Join (u .Path , fmt .Sprintf ("/api/%s/alerts" , string (am .version )))
331382 span , ctx := tracing .StartSpan (ctx , "post_alerts HTTP[client]" )
332383 defer span .Finish ()
333- if err := am .postAlerts (ctx , * u , bytes .NewReader (b )); err != nil {
384+ if err := am .postAlerts (ctx , u , bytes .NewReader (payload [ am . version ] )); err != nil {
334385 level .Warn (s .logger ).Log (
335386 "msg" , "sending alerts failed" ,
336387 "alertmanager" , u .Host ,
337- "numAlerts " , len ( alerts ),
388+ "alerts " , string ( payload [ am . version ] ),
338389 "err" , err ,
339390 )
340391 s .errs .WithLabelValues (u .Host ).Inc ()
@@ -344,7 +395,7 @@ func (s *Sender) Send(ctx context.Context, alerts []*Alert) {
344395 s .sent .WithLabelValues (u .Host ).Add (float64 (len (alerts )))
345396
346397 atomic .AddUint64 (& numSuccess , 1 )
347- }(am , u )
398+ }(am , * u )
348399 }
349400 }
350401 wg .Wait ()
@@ -354,7 +405,7 @@ func (s *Sender) Send(ctx context.Context, alerts []*Alert) {
354405 }
355406
356407 s .dropped .Add (float64 (len (alerts )))
357- level .Warn (s .logger ).Log ("msg" , "failed to send alerts to all alertmanagers" , "alerts" , string ( b ) )
408+ level .Warn (s .logger ).Log ("msg" , "failed to send alerts to all alertmanagers" )
358409}
359410
360411type Dispatcher interface {
@@ -369,10 +420,11 @@ type Alertmanager struct {
369420 logger log.Logger
370421 dispatcher Dispatcher
371422 timeout time.Duration
423+ version APIVersion
372424}
373425
374426// NewAlertmanager returns a new Alertmanager client.
375- func NewAlertmanager (logger log.Logger , dispatcher Dispatcher , timeout time.Duration ) * Alertmanager {
427+ func NewAlertmanager (logger log.Logger , dispatcher Dispatcher , timeout time.Duration , version APIVersion ) * Alertmanager {
376428 if logger == nil {
377429 logger = log .NewNopLogger ()
378430 }
@@ -381,11 +433,11 @@ func NewAlertmanager(logger log.Logger, dispatcher Dispatcher, timeout time.Dura
381433 logger : logger ,
382434 dispatcher : dispatcher ,
383435 timeout : timeout ,
436+ version : version ,
384437 }
385438}
386439
387440func (a * Alertmanager ) postAlerts (ctx context.Context , u url.URL , r io.Reader ) error {
388- u .Path = path .Join (u .Path , alertPushEndpoint )
389441 req , err := http .NewRequest ("POST" , u .String (), r )
390442 if err != nil {
391443 return err
0 commit comments