@@ -13,6 +13,7 @@ import (
1313 "github.com/go-kit/kit/log/level"
1414 "github.com/improbable-eng/thanos/pkg/block"
1515 "github.com/improbable-eng/thanos/pkg/block/metadata"
16+ "github.com/improbable-eng/thanos/pkg/compact"
1617 "github.com/improbable-eng/thanos/pkg/compact/downsample"
1718 "github.com/improbable-eng/thanos/pkg/component"
1819 "github.com/improbable-eng/thanos/pkg/objstore"
@@ -31,20 +32,46 @@ import (
3132func registerDownsample (m map [string ]setupFunc , app * kingpin.Application , name string ) {
3233 cmd := app .Command (name , "continuously downsamples blocks in an object store bucket" )
3334
35+ httpAddr := regHTTPAddrFlag (cmd )
36+
3437 dataDir := cmd .Flag ("data-dir" , "Data directory in which to cache blocks and process downsamplings." ).
3538 Default ("./data" ).String ()
3639
3740 objStoreConfig := regCommonObjStoreFlags (cmd , "" , true )
3841
3942 m [name ] = func (g * run.Group , logger log.Logger , reg * prometheus.Registry , tracer opentracing.Tracer , _ bool ) error {
40- return runDownsample (g , logger , reg , * dataDir , objStoreConfig )
43+ return runDownsample (g , logger , reg , * httpAddr , * dataDir , objStoreConfig )
4144 }
4245}
4346
47+ type DownsampleMetrics struct {
48+ downsamples * prometheus.CounterVec
49+ downsampleFailures * prometheus.CounterVec
50+ }
51+
52+ func newDownsampleMetrics (reg * prometheus.Registry ) * DownsampleMetrics {
53+ m := new (DownsampleMetrics )
54+
55+ m .downsamples = prometheus .NewCounterVec (prometheus.CounterOpts {
56+ Name : "thanos_compact_downsample_total" ,
57+ Help : "Total number of downsampling attempts." ,
58+ }, []string {"group" })
59+ m .downsampleFailures = prometheus .NewCounterVec (prometheus.CounterOpts {
60+ Name : "thanos_compact_downsample_failures_total" ,
61+ Help : "Total number of failed downsampling attempts." ,
62+ }, []string {"group" })
63+
64+ reg .MustRegister (m .downsamples )
65+ reg .MustRegister (m .downsampleFailures )
66+
67+ return m
68+ }
69+
4470func runDownsample (
4571 g * run.Group ,
4672 logger log.Logger ,
4773 reg * prometheus.Registry ,
74+ httpBindAddr string ,
4875 dataDir string ,
4976 objStoreConfig * pathOrContent ,
5077) error {
@@ -65,6 +92,8 @@ func runDownsample(
6592 }
6693 }()
6794
95+ metrics := newDownsampleMetrics (reg )
96+
6897 // Start cycle of syncing blocks from the bucket and garbage collecting the bucket.
6998 {
7099 ctx , cancel := context .WithCancel (context .Background ())
@@ -74,13 +103,13 @@ func runDownsample(
74103
75104 level .Info (logger ).Log ("msg" , "start first pass of downsampling" )
76105
77- if err := downsampleBucket (ctx , logger , bkt , dataDir ); err != nil {
106+ if err := downsampleBucket (ctx , logger , metrics , bkt , dataDir ); err != nil {
78107 return errors .Wrap (err , "downsampling failed" )
79108 }
80109
81110 level .Info (logger ).Log ("msg" , "start second pass of downsampling" )
82111
83- if err := downsampleBucket (ctx , logger , bkt , dataDir ); err != nil {
112+ if err := downsampleBucket (ctx , logger , metrics , bkt , dataDir ); err != nil {
84113 return errors .Wrap (err , "downsampling failed" )
85114 }
86115
@@ -90,13 +119,18 @@ func runDownsample(
90119 })
91120 }
92121
122+ if err := metricHTTPListenGroup (g , logger , reg , httpBindAddr ); err != nil {
123+ return err
124+ }
125+
93126 level .Info (logger ).Log ("msg" , "starting downsample node" )
94127 return nil
95128}
96129
97130func downsampleBucket (
98131 ctx context.Context ,
99132 logger log.Logger ,
133+ metrics * DownsampleMetrics ,
100134 bkt objstore.Bucket ,
101135 dir string ,
102136) error {
@@ -181,8 +215,10 @@ func downsampleBucket(
181215 continue
182216 }
183217 if err := processDownsampling (ctx , logger , bkt , m , dir , 5 * 60 * 1000 ); err != nil {
218+ metrics .downsampleFailures .WithLabelValues (compact .GroupKey (* m ))
184219 return errors .Wrap (err , "downsampling to 5 min" )
185220 }
221+ metrics .downsamples .WithLabelValues (compact .GroupKey (* m ))
186222
187223 case 5 * 60 * 1000 :
188224 missing := false
@@ -202,8 +238,10 @@ func downsampleBucket(
202238 continue
203239 }
204240 if err := processDownsampling (ctx , logger , bkt , m , dir , 60 * 60 * 1000 ); err != nil {
241+ metrics .downsampleFailures .WithLabelValues (compact .GroupKey (* m ))
205242 return errors .Wrap (err , "downsampling to 60 min" )
206243 }
244+ metrics .downsamples .WithLabelValues (compact .GroupKey (* m ))
207245 }
208246 }
209247 return nil
0 commit comments