@@ -8,20 +8,22 @@ import (
88 "github.com/go-kit/kit/log"
99 "github.com/go-kit/kit/log/level"
1010 "github.com/oklog/run"
11- opentracing "github.com/opentracing/opentracing-go"
11+ "github.com/opentracing/opentracing-go"
1212 "github.com/pkg/errors"
1313 "github.com/prometheus/client_golang/prometheus"
14+ "github.com/thanos-io/thanos/pkg/component"
1415 "github.com/thanos-io/thanos/pkg/model"
1516 "github.com/thanos-io/thanos/pkg/objstore/client"
17+ "github.com/thanos-io/thanos/pkg/prober"
1618 "github.com/thanos-io/thanos/pkg/runutil"
1719 "github.com/thanos-io/thanos/pkg/store"
1820 storecache "github.com/thanos-io/thanos/pkg/store/cache"
19- kingpin "gopkg.in/alecthomas/kingpin.v2"
21+ "gopkg.in/alecthomas/kingpin.v2"
2022)
2123
2224// registerStore registers a store command.
23- func registerStore (m map [string ]setupFunc , app * kingpin.Application , name string ) {
24- cmd := app .Command (name , "store node giving access to blocks in a bucket provider. Now supported GCS, S3, Azure, Swift and Tencent COS." )
25+ func registerStore (m map [string ]setupFunc , app * kingpin.Application ) {
26+ cmd := app .Command (component . Store . String () , "store node giving access to blocks in a bucket provider. Now supported GCS, S3, Azure, Swift and Tencent COS." )
2527
2628 grpcBindAddr , httpBindAddr , cert , key , clientCA := regCommonServerFlags (cmd )
2729
@@ -54,7 +56,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
5456 maxTime := model .TimeOrDuration (cmd .Flag ("max-time" , "End of time range limit to serve. Thanos Store serves only blocks, which happened eariler than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y." ).
5557 Default ("9999-12-31T23:59:59Z" ))
5658
57- m [name ] = func (g * run.Group , logger log.Logger , reg * prometheus.Registry , tracer opentracing.Tracer , debugLogging bool ) error {
59+ m [component . Store . String () ] = func (g * run.Group , logger log.Logger , reg * prometheus.Registry , tracer opentracing.Tracer , debugLogging bool ) error {
5860 if minTime .PrometheusTimestamp () > maxTime .PrometheusTimestamp () {
5961 return errors .Errorf ("invalid argument: --min-time '%s' can't be greater than --max-time '%s'" ,
6062 minTime , maxTime )
@@ -75,7 +77,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
7577 uint64 (* chunkPoolSize ),
7678 uint64 (* maxSampleCount ),
7779 int (* maxConcurrent ),
78- name ,
80+ component . Store ,
7981 debugLogging ,
8082 * syncInterval ,
8183 * blockSyncConcurrency ,
@@ -104,102 +106,104 @@ func runStore(
104106 chunkPoolSizeBytes uint64 ,
105107 maxSampleCount uint64 ,
106108 maxConcurrent int ,
107- component string ,
109+ component component. Component ,
108110 verbose bool ,
109111 syncInterval time.Duration ,
110112 blockSyncConcurrency int ,
111113 filterConf * store.FilterConfig ,
112114) error {
113- {
114- confContentYaml , err := objStoreConfig .Content ()
115- if err != nil {
116- return err
117- }
115+ statusProber := prober .NewProber (component , logger , prometheus .WrapRegistererWithPrefix ("thanos_" , reg ))
116+
117+ confContentYaml , err := objStoreConfig .Content ()
118+ if err != nil {
119+ return err
120+ }
121+
122+ bkt , err := client .NewBucket (logger , confContentYaml , reg , component .String ())
123+ if err != nil {
124+ return errors .Wrap (err , "create bucket client" )
125+ }
118126
119- bkt , err := client .NewBucket (logger , confContentYaml , reg , component )
127+ // Ensure we close up everything properly.
128+ defer func () {
120129 if err != nil {
121- return errors . Wrap ( err , "create bucket client" )
130+ runutil . CloseWithLogOnErr ( logger , bkt , " bucket client" )
122131 }
132+ }()
123133
124- // Ensure we close up everything properly.
125- defer func () {
126- if err != nil {
127- runutil .CloseWithLogOnErr (logger , bkt , "bucket client" )
128- }
129- }()
134+ // TODO(bwplotka): Add as a flag?
135+ maxItemSizeBytes := indexCacheSizeBytes / 2
130136
131- // TODO(bwplotka): Add as a flag?
132- maxItemSizeBytes := indexCacheSizeBytes / 2
137+ indexCache , err := storecache .NewIndexCache (logger , reg , storecache.Opts {
138+ MaxSizeBytes : indexCacheSizeBytes ,
139+ MaxItemSizeBytes : maxItemSizeBytes ,
140+ })
141+ if err != nil {
142+ return errors .Wrap (err , "create index cache" )
143+ }
133144
134- indexCache , err := storecache .NewIndexCache (logger , reg , storecache.Opts {
135- MaxSizeBytes : indexCacheSizeBytes ,
136- MaxItemSizeBytes : maxItemSizeBytes ,
137- })
138- if err != nil {
139- return errors .Wrap (err , "create index cache" )
140- }
145+ bs , err := store .NewBucketStore (
146+ logger ,
147+ reg ,
148+ bkt ,
149+ dataDir ,
150+ indexCache ,
151+ chunkPoolSizeBytes ,
152+ maxSampleCount ,
153+ maxConcurrent ,
154+ verbose ,
155+ blockSyncConcurrency ,
156+ filterConf ,
157+ )
158+ if err != nil {
159+ return errors .Wrap (err , "create object storage store" )
160+ }
141161
142- bs , err := store .NewBucketStore (
143- logger ,
144- reg ,
145- bkt ,
146- dataDir ,
147- indexCache ,
148- chunkPoolSizeBytes ,
149- maxSampleCount ,
150- maxConcurrent ,
151- verbose ,
152- blockSyncConcurrency ,
153- filterConf ,
154- )
155- if err != nil {
156- return errors .Wrap (err , "create object storage store" )
157- }
162+ begin := time .Now ()
163+ level .Debug (logger ).Log ("msg" , "initializing bucket store" )
164+ if err := bs .InitialSync (context .Background ()); err != nil {
165+ return errors .Wrap (err , "bucket store initial sync" )
166+ }
167+ level .Debug (logger ).Log ("msg" , "bucket store ready" , "init_duration" , time .Since (begin ).String ())
158168
159- begin := time .Now ()
160- level .Debug (logger ).Log ("msg" , "initializing bucket store" )
161- if err := bs .InitialSync (context .Background ()); err != nil {
162- return errors .Wrap (err , "bucket store initial sync" )
163- }
164- level .Debug (logger ).Log ("msg" , "bucket store ready" , "init_duration" , time .Since (begin ).String ())
165-
166- ctx , cancel := context .WithCancel (context .Background ())
167- g .Add (func () error {
168- defer runutil .CloseWithLogOnErr (logger , bkt , "bucket client" )
169-
170- err := runutil .Repeat (syncInterval , ctx .Done (), func () error {
171- if err := bs .SyncBlocks (ctx ); err != nil {
172- level .Warn (logger ).Log ("msg" , "syncing blocks failed" , "err" , err )
173- }
174- return nil
175- })
176-
177- runutil .CloseWithLogOnErr (logger , bs , "bucket store" )
178- return err
179- }, func (error ) {
180- cancel ()
169+ ctx , cancel := context .WithCancel (context .Background ())
170+ g .Add (func () error {
171+ defer runutil .CloseWithLogOnErr (logger , bkt , "bucket client" )
172+
173+ err := runutil .Repeat (syncInterval , ctx .Done (), func () error {
174+ if err := bs .SyncBlocks (ctx ); err != nil {
175+ level .Warn (logger ).Log ("msg" , "syncing blocks failed" , "err" , err )
176+ }
177+ return nil
181178 })
182179
183- l , err := net .Listen ("tcp" , grpcBindAddr )
184- if err != nil {
185- return errors .Wrap (err , "listen API address" )
186- }
180+ runutil .CloseWithLogOnErr (logger , bs , "bucket store" )
181+ return err
182+ }, func (error ) {
183+ cancel ()
184+ })
187185
188- opts , err := defaultGRPCServerOpts (logger , cert , key , clientCA )
189- if err != nil {
190- return errors .Wrap (err , "grpc server options" )
191- }
192- s := newStoreGRPCServer (logger , reg , tracer , bs , opts )
186+ l , err := net .Listen ("tcp" , grpcBindAddr )
187+ if err != nil {
188+ return errors .Wrap (err , "listen API address" )
189+ }
193190
194- g .Add (func () error {
195- level .Info (logger ).Log ("msg" , "Listening for StoreAPI gRPC" , "address" , grpcBindAddr )
196- return errors .Wrap (s .Serve (l ), "serve gRPC" )
197- }, func (error ) {
198- s .Stop ()
199- })
191+ opts , err := defaultGRPCServerOpts (logger , cert , key , clientCA )
192+ if err != nil {
193+ return errors .Wrap (err , "grpc server options" )
200194 }
201- if err := metricHTTPListenGroup (g , logger , reg , httpBindAddr ); err != nil {
202- return err
195+ s := newStoreGRPCServer (logger , reg , tracer , bs , opts )
196+
197+ g .Add (func () error {
198+ level .Info (logger ).Log ("msg" , "Listening for StoreAPI gRPC" , "address" , grpcBindAddr )
199+ statusProber .SetReady ()
200+ return errors .Wrap (s .Serve (l ), "serve gRPC" )
201+ }, func (error ) {
202+ s .Stop ()
203+ })
204+
205+ if err := scheduleHTTPServer (g , logger , reg , statusProber , httpBindAddr , nil , component ); err != nil {
206+ return errors .Wrap (err , "schedule HTTP server" )
203207 }
204208
205209 level .Info (logger ).Log ("msg" , "starting store node" )
0 commit comments