Skip to content

Commit 949fd2c

Browse files
committed
Use block.MetaFetcher in Store Gateway.
Fixes: #1874 * Corrupted disk cache for meta.json is handled gracefully. * Synchronize was not taking into account deletion by removing meta.json. * Prepare for future implementation of https://thanos.io/proposals/201901-read-write-operations-bucket.md/ * Better observability for syncronize process. * More logs for store startup process. TODO in separate PR: * More observability for index-cache loading / adding time. Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
1 parent c12fda9 commit 949fd2c

File tree

5 files changed

+137
-234
lines changed

5 files changed

+137
-234
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
1313

1414
### Fixed
1515

16+
- []() Store: Improved synchronization of meta JSON files. Store now properly handles corrupted disk cache. Added meta.json sync metrics.
1617
- [#1856](https://github.com/thanos-io/thanos/pull/1856) Receive: close DBReadOnly after flushing to fix a memory leak.
1718
- [#1882](https://github.com/thanos-io/thanos/pull/1882) Receive: upload to object storage as 'receive' rather than 'sidecar'.
1819
- [#1907](https://github.com/thanos-io/thanos/pull/1907) Store: Fixed the duration unit for the metric `thanos_bucket_store_series_gate_duration_seconds`.

cmd/thanos/store.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@ import (
1111
"github.com/pkg/errors"
1212
"github.com/prometheus/client_golang/prometheus"
1313
"github.com/prometheus/prometheus/pkg/relabel"
14+
"github.com/thanos-io/thanos/pkg/block"
1415
"github.com/thanos-io/thanos/pkg/component"
1516
"github.com/thanos-io/thanos/pkg/extflag"
17+
"github.com/thanos-io/thanos/pkg/extprom"
1618
"github.com/thanos-io/thanos/pkg/model"
1719
"github.com/thanos-io/thanos/pkg/objstore/client"
1820
"github.com/thanos-io/thanos/pkg/prober"
@@ -26,6 +28,8 @@ import (
2628
yaml "gopkg.in/yaml.v2"
2729
)
2830

31+
const fetcherConcurrency = 32
32+
2933
// registerStore registers a store command.
3034
func registerStore(m map[string]setupFunc, app *kingpin.Application) {
3135
cmd := app.Command(component.Store.String(), "store node giving access to blocks in a bucket provider. Now supported GCS, S3, Azure, Swift and Tencent COS.")
@@ -47,7 +51,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
4751
Default("2GB").Bytes()
4852

4953
maxSampleCount := cmd.Flag("store.grpc.series-sample-limit",
50-
"Maximum amount of samples returned via a single Series call. 0 means no limit. NOTE: for efficiency we take 120 as the number of samples in chunk (it cannot be bigger than that), so the actual number of samples might be lower, even though the maximum could be hit.").
54+
"Maximum amount of samples returned via a single Series call. 0 means no limit. NOTE: For efficiency we take 120 as the number of samples in chunk (it cannot be bigger than that), so the actual number of samples might be lower, even though the maximum could be hit.").
5155
Default("0").Uint()
5256

5357
maxConcurrent := cmd.Flag("store.grpc.series-max-concurrency", "Maximum number of concurrent Series calls.").Default("20").Int()
@@ -57,7 +61,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
5761
syncInterval := cmd.Flag("sync-block-duration", "Repeat interval for syncing the blocks between local and remote view.").
5862
Default("3m").Duration()
5963

60-
blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing blocks from object storage.").
64+
blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when constructing index-cache.json blocks from object storage.").
6165
Default("20").Int()
6266

6367
minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve. Thanos Store will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
@@ -128,7 +132,7 @@ func runStore(
128132
indexCacheSizeBytes uint64,
129133
chunkPoolSizeBytes uint64,
130134
maxSampleCount uint64,
131-
maxConcurrent int,
135+
maxConcurrency int,
132136
component component.Component,
133137
verbose bool,
134138
syncInterval time.Duration,
@@ -202,19 +206,27 @@ func runStore(
202206
return errors.Wrap(err, "create index cache")
203207
}
204208

209+
metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg),
210+
block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime).Filter,
211+
block.NewLabelShardedMetaFilter(relabelConfig).Filter,
212+
)
213+
if err != nil {
214+
return errors.Wrap(err, "meta fetcher")
215+
}
216+
205217
bs, err := store.NewBucketStore(
206218
logger,
207219
reg,
208220
bkt,
221+
metaFetcher,
209222
dataDir,
210223
indexCache,
211224
chunkPoolSizeBytes,
212225
maxSampleCount,
213-
maxConcurrent,
226+
maxConcurrency,
214227
verbose,
215228
blockSyncConcurrency,
216229
filterConf,
217-
relabelConfig,
218230
advertiseCompatibilityLabel,
219231
)
220232
if err != nil {

pkg/store/bucket.go

Lines changed: 31 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"github.com/pkg/errors"
2323
"github.com/prometheus/client_golang/prometheus"
2424
"github.com/prometheus/prometheus/pkg/labels"
25-
"github.com/prometheus/prometheus/pkg/relabel"
2625
"github.com/prometheus/prometheus/tsdb/chunkenc"
2726
"github.com/prometheus/prometheus/tsdb/chunks"
2827
"github.com/prometheus/prometheus/tsdb/fileutil"
@@ -190,7 +189,7 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
190189
return &m
191190
}
192191

193-
// FilterConfig is a configuration, which Store uses for filtering metrics.
192+
// FilterConfig is a configuration, which Store uses for filtering metrics based on time.
194193
type FilterConfig struct {
195194
MinTime, MaxTime model.TimeOrDurationValue
196195
}
@@ -201,6 +200,7 @@ type BucketStore struct {
201200
logger log.Logger
202201
metrics *bucketStoreMetrics
203202
bucket objstore.BucketReader
203+
fetcher block.MetadataFetcher
204204
dir string
205205
indexCache storecache.IndexCache
206206
chunkPool *pool.BytesPool
@@ -222,9 +222,7 @@ type BucketStore struct {
222222
samplesLimiter *Limiter
223223
partitioner partitioner
224224

225-
filterConfig *FilterConfig
226-
relabelConfig []*relabel.Config
227-
225+
filterConfig *FilterConfig
228226
advLabelSets []storepb.LabelSet
229227
enableCompatibilityLabel bool
230228
}
@@ -235,15 +233,15 @@ func NewBucketStore(
235233
logger log.Logger,
236234
reg prometheus.Registerer,
237235
bucket objstore.BucketReader,
236+
fetcher block.MetadataFetcher,
238237
dir string,
239238
indexCache storecache.IndexCache,
240239
maxChunkPoolBytes uint64,
241240
maxSampleCount uint64,
242241
maxConcurrent int,
243242
debugLogging bool,
244243
blockSyncConcurrency int,
245-
filterConf *FilterConfig,
246-
relabelConfig []*relabel.Config,
244+
filterConfig *FilterConfig,
247245
enableCompatibilityLabel bool,
248246
) (*BucketStore, error) {
249247
if logger == nil {
@@ -265,21 +263,21 @@ func NewBucketStore(
265263
s := &BucketStore{
266264
logger: logger,
267265
bucket: bucket,
266+
fetcher: fetcher,
268267
dir: dir,
269268
indexCache: indexCache,
270269
chunkPool: chunkPool,
271270
blocks: map[ulid.ULID]*bucketBlock{},
272271
blockSets: map[uint64]*bucketBlockSet{},
273272
debugLogging: debugLogging,
274273
blockSyncConcurrency: blockSyncConcurrency,
274+
filterConfig: filterConfig,
275275
queryGate: gate.NewGate(
276276
maxConcurrent,
277277
extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg),
278278
),
279279
samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped),
280280
partitioner: gapBasedPartitioner{maxGapSize: maxGapSize},
281-
filterConfig: filterConf,
282-
relabelConfig: relabelConfig,
283281
enableCompatibilityLabel: enableCompatibilityLabel,
284282
}
285283
s.metrics = metrics
@@ -310,6 +308,12 @@ func (s *BucketStore) Close() (err error) {
310308
// SyncBlocks synchronizes the stores state with the Bucket bucket.
311309
// It will reuse disk space as persistent cache based on s.dir param.
312310
func (s *BucketStore) SyncBlocks(ctx context.Context) error {
311+
metas, _, metaFetchErr := s.fetcher.Fetch(ctx)
312+
// For partial view allow adding new blocks at least.
313+
if metaFetchErr != nil && metas == nil {
314+
return metaFetchErr
315+
}
316+
313317
var wg sync.WaitGroup
314318
blockc := make(chan *metadata.Meta)
315319

@@ -318,73 +322,40 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error {
318322
go func() {
319323
for meta := range blockc {
320324
if err := s.addBlock(ctx, meta); err != nil {
321-
level.Warn(s.logger).Log("msg", "loading block failed", "id", meta.ULID, "err", err)
322325
continue
323326
}
324327
}
325328
wg.Done()
326329
}()
327330
}
328331

329-
allIDs := map[ulid.ULID]struct{}{}
330-
331-
err := s.bucket.Iter(ctx, "", func(name string) error {
332-
// Strip trailing slash indicating a directory.
333-
id, err := ulid.Parse(name[:len(name)-1])
334-
if err != nil {
335-
return nil
336-
}
337-
338-
bdir := path.Join(s.dir, id.String())
339-
meta, err := loadMeta(ctx, s.logger, s.bucket, bdir, id)
340-
if err != nil {
341-
return errors.Wrap(err, "load meta")
342-
}
343-
344-
inRange, err := s.isBlockInMinMaxRange(ctx, meta)
345-
if err != nil {
346-
level.Warn(s.logger).Log("msg", "error parsing block range", "block", id, "err", err)
347-
return os.RemoveAll(bdir)
348-
}
349-
350-
if !inRange {
351-
return os.RemoveAll(bdir)
352-
}
353-
354-
// Check for block labels by relabeling.
355-
// If output is empty, the block will be dropped.
356-
if processedLabels := relabel.Process(labels.FromMap(meta.Thanos.Labels), s.relabelConfig...); processedLabels == nil {
357-
level.Debug(s.logger).Log("msg", "ignoring block (drop in relabeling)", "block", id)
358-
return os.RemoveAll(bdir)
359-
}
360-
361-
allIDs[id] = struct{}{}
362-
332+
for id, meta := range metas {
363333
if b := s.getBlock(id); b != nil {
364-
return nil
334+
continue
365335
}
366336
select {
367337
case <-ctx.Done():
368338
case blockc <- meta:
369339
}
370-
return nil
371-
})
340+
}
372341

373342
close(blockc)
374343
wg.Wait()
375344

376-
if err != nil {
377-
return errors.Wrap(err, "iter")
345+
if metaFetchErr != nil {
346+
return metaFetchErr
378347
}
348+
379349
// Drop all blocks that are no longer present in the bucket.
380350
for id := range s.blocks {
381-
if _, ok := allIDs[id]; ok {
351+
if _, ok := metas[id]; ok {
382352
continue
383353
}
384354
if err := s.removeBlock(id); err != nil {
385-
level.Warn(s.logger).Log("msg", "drop outdated block", "block", id, "err", err)
355+
level.Warn(s.logger).Log("msg", "drop outdated block failed", "block", id, "err", err)
386356
s.metrics.blockDropFailures.Inc()
387357
}
358+
level.Debug(s.logger).Log("msg", "dropped outdated block", "block", id)
388359
s.metrics.blockDrops.Inc()
389360
}
390361

@@ -442,19 +413,6 @@ func (s *BucketStore) numBlocks() int {
442413
return len(s.blocks)
443414
}
444415

445-
func (s *BucketStore) isBlockInMinMaxRange(ctx context.Context, meta *metadata.Meta) (bool, error) {
446-
// We check for blocks in configured minTime, maxTime range.
447-
switch {
448-
case meta.MaxTime <= s.filterConfig.MinTime.PrometheusTimestamp():
449-
return false, nil
450-
451-
case meta.MinTime >= s.filterConfig.MaxTime.PrometheusTimestamp():
452-
return false, nil
453-
}
454-
455-
return true, nil
456-
}
457-
458416
func (s *BucketStore) getBlock(id ulid.ULID) *bucketBlock {
459417
s.mtx.RLock()
460418
defer s.mtx.RUnlock()
@@ -463,13 +421,22 @@ func (s *BucketStore) getBlock(id ulid.ULID) *bucketBlock {
463421

464422
func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err error) {
465423
dir := filepath.Join(s.dir, meta.ULID.String())
424+
start := time.Now()
425+
426+
if err := os.MkdirAll(dir, os.ModePerm); err != nil {
427+
return errors.Wrap(err, "create dir")
428+
}
466429

430+
level.Debug(s.logger).Log("msg", "loading new block", "id", meta.ULID)
467431
defer func() {
468432
if err != nil {
469433
s.metrics.blockLoadFailures.Inc()
470434
if err2 := os.RemoveAll(dir); err2 != nil {
471435
level.Warn(s.logger).Log("msg", "failed to remove block we cannot load", "err", err2)
472436
}
437+
level.Warn(s.logger).Log("msg", "loading block failed", "elapsed", time.Since(start), "id", meta.ULID, "err", err)
438+
} else {
439+
level.Debug(s.logger).Log("msg", "loaded block", "elapsed", time.Since(start), "id", meta.ULID)
473440
}
474441
}()
475442
s.metrics.blockLoads.Inc()
@@ -1226,31 +1193,6 @@ func (b *bucketBlock) indexCacheFilename() string {
12261193
return path.Join(b.meta.ULID.String(), block.IndexCacheFilename)
12271194
}
12281195

1229-
func loadMeta(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID) (*metadata.Meta, error) {
1230-
// If we haven't seen the block before or it is missing the meta.json, download it.
1231-
if _, err := os.Stat(path.Join(dir, block.MetaFilename)); os.IsNotExist(err) {
1232-
if err := os.MkdirAll(dir, 0777); err != nil {
1233-
return nil, errors.Wrap(err, "create dir")
1234-
}
1235-
src := path.Join(id.String(), block.MetaFilename)
1236-
1237-
if err := objstore.DownloadFile(ctx, logger, bkt, src, dir); err != nil {
1238-
if bkt.IsObjNotFoundErr(errors.Cause(err)) {
1239-
level.Debug(logger).Log("msg", "meta file wasn't found. Block not ready or being deleted.", "block", id.String())
1240-
}
1241-
return nil, errors.Wrap(err, "download meta.json")
1242-
}
1243-
} else if err != nil {
1244-
return nil, err
1245-
}
1246-
meta, err := metadata.Read(dir)
1247-
if err != nil {
1248-
return nil, errors.Wrap(err, "read meta.json")
1249-
}
1250-
1251-
return meta, err
1252-
}
1253-
12541196
func (b *bucketBlock) loadIndexCacheFile(ctx context.Context) (err error) {
12551197
cachefn := filepath.Join(b.dir, block.IndexCacheFilename)
12561198
if err = b.loadIndexCacheFileFromFile(ctx, cachefn); err == nil {

0 commit comments

Comments
 (0)