Skip to content

Commit 82cca56

Browse files
authored
Fixed --store.grpc.series-sample-limit (#2858)
* Fixed --store.grpc.series-sample-limit Signed-off-by: Marco Pracucci <marco@pracucci.com> * Fixes after rebase Signed-off-by: Marco Pracucci <marco@pracucci.com>
1 parent 4671966 commit 82cca56

File tree

10 files changed

+157
-56
lines changed

10 files changed

+157
-56
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
1818
- [#2834](https://github.com/thanos-io/thanos/pull/2834) Query: Fix rendered JSON state value for rules and alerts should be in lowercase
1919
- [#2866](https://github.com/thanos-io/thanos/pull/2866) Receive, Querier: Fixed leaks on receive and querier Store API Series, which were leaking on errors.
2020
- [#2895](https://github.com/thanos-io/thanos/pull/2895) Compact: Fix increment of `thanos_compact_downsample_total` metric for downsample of 5m resolution blocks.
21+
- [#2858](https://github.com/thanos-io/thanos/pull/2858) Store: Fix `--store.grpc.series-sample-limit` implementation. The limit is now applied to the sum of all samples fetched across all queried blocks via a single Series call, instead of applying it individually to each block.
2122

2223
### Added
2324

cmd/thanos/store.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
6363
Default("2GB").Bytes()
6464

6565
maxSampleCount := cmd.Flag("store.grpc.series-sample-limit",
66-
"Maximum amount of samples returned via a single Series call. 0 means no limit. NOTE: For efficiency we take 120 as the number of samples in chunk (it cannot be bigger than that), so the actual number of samples might be lower, even though the maximum could be hit.").
66+
"Maximum amount of samples returned via a single Series call. The Series call fails if this limit is exceeded. 0 means no limit. NOTE: For efficiency the limit is internally implemented as 'chunks limit' considering each chunk contains 120 samples (it's the max number of samples each chunk can contain), so the actual number of samples might be lower, even though the maximum could be hit.").
6767
Default("0").Uint()
6868

6969
maxConcurrent := cmd.Flag("store.grpc.series-max-concurrency", "Maximum number of concurrent Series calls.").Default("20").Int()
@@ -296,7 +296,7 @@ func runStore(
296296
indexCache,
297297
queriesGate,
298298
chunkPoolSizeBytes,
299-
maxSampleCount,
299+
store.NewChunksLimiterFactory(maxSampleCount/store.MaxSamplesPerChunk), // The samples limit is an approximation based on the max number of samples per chunk.
300300
verbose,
301301
blockSyncConcurrency,
302302
filterConf,

docs/components/store.md

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -93,11 +93,14 @@ Flags:
9393
memory.
9494
--store.grpc.series-sample-limit=0
9595
Maximum amount of samples returned via a single
96-
Series call. 0 means no limit. NOTE: For
97-
efficiency we take 120 as the number of samples
98-
in chunk (it cannot be bigger than that), so
99-
the actual number of samples might be lower,
100-
even though the maximum could be hit.
96+
Series call. The Series call fails if this
97+
limit is exceeded. 0 means no limit. NOTE: For
98+
efficiency the limit is internally implemented
99+
as 'chunks limit' considering each chunk
100+
contains 120 samples (it's the max number of
101+
samples each chunk can contain), so the actual
102+
number of samples might be lower, even though
103+
the maximum could be hit.
101104
--store.grpc.series-max-concurrency=20
102105
Maximum number of concurrent Series calls.
103106
--objstore.config-file=<file-path>

pkg/store/bucket.go

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,13 @@ import (
5454
)
5555

5656
const (
57-
// maxSamplesPerChunk is approximately the max number of samples that we may have in any given chunk. This is needed
57+
// MaxSamplesPerChunk is approximately the max number of samples that we may have in any given chunk. This is needed
5858
// for precalculating the number of samples that we may have to retrieve and decode for any given query
5959
// without downloading them. Please take a look at https://github.com/prometheus/tsdb/pull/397 to know
6060
// where this number comes from. Long story short: TSDB is made in such a way, and it is made in such a way
6161
// because you barely get any improvements in compression when the number of samples is beyond this.
6262
// Take a look at Figure 6 in this whitepaper http://www.vldb.org/pvldb/vol8/p1816-teller.pdf.
63-
maxSamplesPerChunk = 120
63+
MaxSamplesPerChunk = 120
6464
maxChunkSize = 16000
6565
maxSeriesSize = 64 * 1024
6666

@@ -240,9 +240,9 @@ type BucketStore struct {
240240
// Query gate which limits the maximum amount of concurrent queries.
241241
queryGate gate.Gate
242242

243-
// samplesLimiter limits the number of samples per each Series() call.
244-
samplesLimiter SampleLimiter
245-
partitioner partitioner
243+
// chunksLimiterFactory creates a new limiter used to limit the number of chunks fetched by each Series() call.
244+
chunksLimiterFactory ChunksLimiterFactory
245+
partitioner partitioner
246246

247247
filterConfig *FilterConfig
248248
advLabelSets []storepb.LabelSet
@@ -269,7 +269,7 @@ func NewBucketStore(
269269
indexCache storecache.IndexCache,
270270
queryGate gate.Gate,
271271
maxChunkPoolBytes uint64,
272-
maxSampleCount uint64,
272+
chunksLimiterFactory ChunksLimiterFactory,
273273
debugLogging bool,
274274
blockSyncConcurrency int,
275275
filterConfig *FilterConfig,
@@ -287,7 +287,6 @@ func NewBucketStore(
287287
return nil, errors.Wrap(err, "create chunk pool")
288288
}
289289

290-
metrics := newBucketStoreMetrics(reg)
291290
s := &BucketStore{
292291
logger: logger,
293292
bkt: bkt,
@@ -301,14 +300,14 @@ func NewBucketStore(
301300
blockSyncConcurrency: blockSyncConcurrency,
302301
filterConfig: filterConfig,
303302
queryGate: queryGate,
304-
samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped),
303+
chunksLimiterFactory: chunksLimiterFactory,
305304
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
306305
enableCompatibilityLabel: enableCompatibilityLabel,
307306
enablePostingsCompression: enablePostingsCompression,
308307
postingOffsetsInMemSampling: postingOffsetsInMemSampling,
309308
enableSeriesResponseHints: enableSeriesResponseHints,
309+
metrics: newBucketStoreMetrics(reg),
310310
}
311-
s.metrics = metrics
312311

313312
if err := os.MkdirAll(dir, 0777); err != nil {
314313
return nil, errors.Wrap(err, "create dir")
@@ -649,7 +648,7 @@ func blockSeries(
649648
chunkr *bucketChunkReader,
650649
matchers []*labels.Matcher,
651650
req *storepb.SeriesRequest,
652-
samplesLimiter SampleLimiter,
651+
chunksLimiter ChunksLimiter,
653652
) (storepb.SeriesSet, *queryStats, error) {
654653
ps, err := indexr.ExpandedPostings(matchers)
655654
if err != nil {
@@ -722,12 +721,16 @@ func blockSeries(
722721
s.refs = append(s.refs, meta.Ref)
723722
}
724723
if len(s.chks) > 0 {
724+
if err := chunksLimiter.Reserve(uint64(len(s.chks))); err != nil {
725+
return nil, nil, errors.Wrap(err, "exceeded chunks limit")
726+
}
727+
725728
res = append(res, s)
726729
}
727730
}
728731

729732
// Preload all chunks that were marked in the previous stage.
730-
if err := chunkr.preload(samplesLimiter); err != nil {
733+
if err := chunkr.preload(); err != nil {
731734
return nil, nil, errors.Wrap(err, "preload chunks")
732735
}
733736

@@ -858,6 +861,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
858861
g, gctx = errgroup.WithContext(ctx)
859862
resHints = &hintspb.SeriesResponseHints{}
860863
reqBlockMatchers []*labels.Matcher
864+
chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped)
861865
)
862866

863867
if req.Hints != nil {
@@ -909,7 +913,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
909913
chunkr,
910914
blockMatchers,
911915
req,
912-
s.samplesLimiter,
916+
chunksLimiter,
913917
)
914918
if err != nil {
915919
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID)
@@ -1983,19 +1987,9 @@ func (r *bucketChunkReader) addPreload(id uint64) error {
19831987
}
19841988

19851989
// preload all added chunk IDs. Must be called before the first call to Chunk is made.
1986-
func (r *bucketChunkReader) preload(samplesLimiter SampleLimiter) error {
1990+
func (r *bucketChunkReader) preload() error {
19871991
g, ctx := errgroup.WithContext(r.ctx)
19881992

1989-
numChunks := uint64(0)
1990-
for _, offsets := range r.preloads {
1991-
for range offsets {
1992-
numChunks++
1993-
}
1994-
}
1995-
if err := samplesLimiter.Check(numChunks * maxSamplesPerChunk); err != nil {
1996-
return errors.Wrap(err, "exceeded samples limit")
1997-
}
1998-
19991993
for seq, offsets := range r.preloads {
20001994
sort.Slice(offsets, func(i, j int) bool {
20011995
return offsets[i] < offsets[j]

pkg/store/bucket_e2e_test.go

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"io/ioutil"
1010
"os"
1111
"path/filepath"
12+
"strings"
1213
"testing"
1314
"time"
1415

@@ -123,7 +124,7 @@ func prepareTestBlocks(t testing.TB, now time.Time, count int, dir string, bkt o
123124
return
124125
}
125126

126-
func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, maxSampleCount uint64, relabelConfig []*relabel.Config, filterConf *FilterConfig) *storeSuite {
127+
func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, maxChunksLimit uint64, relabelConfig []*relabel.Config, filterConf *FilterConfig) *storeSuite {
127128
series := []labels.Labels{
128129
labels.FromStrings("a", "1", "b", "1"),
129130
labels.FromStrings("a", "1", "b", "2"),
@@ -161,7 +162,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
161162
s.cache,
162163
nil,
163164
0,
164-
maxSampleCount,
165+
NewChunksLimiterFactory(maxChunksLimit),
165166
false,
166167
20,
167168
filterConf,
@@ -504,7 +505,10 @@ func TestBucketStore_TimePartitioning_e2e(t *testing.T) {
504505
hourAfter := time.Now().Add(1 * time.Hour)
505506
filterMaxTime := model.TimeOrDurationValue{Time: &hourAfter}
506507

507-
s := prepareStoreWithTestBlocks(t, dir, bkt, false, 241, emptyRelabelConfig, &FilterConfig{
508+
// The query will fetch 2 series from 2 blocks, so we do expect to hit a total of 4 chunks.
509+
expectedChunks := uint64(2 * 2)
510+
511+
s := prepareStoreWithTestBlocks(t, dir, bkt, false, expectedChunks, emptyRelabelConfig, &FilterConfig{
508512
MinTime: minTimeDuration,
509513
MaxTime: filterMaxTime,
510514
})
@@ -543,3 +547,55 @@ func TestBucketStore_TimePartitioning_e2e(t *testing.T) {
543547
testutil.Equals(t, 1, len(s.Chunks))
544548
}
545549
}
550+
551+
func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {
552+
// The query will fetch 2 series from 6 blocks, so we do expect to hit a total of 12 chunks.
553+
expectedChunks := uint64(2 * 6)
554+
555+
cases := map[string]struct {
556+
maxChunksLimit uint64
557+
expectedErr string
558+
}{
559+
"should succeed if the max chunks limit is not exceeded": {
560+
maxChunksLimit: expectedChunks,
561+
},
562+
"should fail if the max chunks limit is exceeded": {
563+
maxChunksLimit: expectedChunks - 1,
564+
expectedErr: "exceeded chunks limit",
565+
},
566+
}
567+
568+
for testName, testData := range cases {
569+
t.Run(testName, func(t *testing.T) {
570+
ctx, cancel := context.WithCancel(context.Background())
571+
defer cancel()
572+
bkt := objstore.NewInMemBucket()
573+
574+
dir, err := ioutil.TempDir("", "test_bucket_chunks_limiter_e2e")
575+
testutil.Ok(t, err)
576+
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()
577+
578+
s := prepareStoreWithTestBlocks(t, dir, bkt, false, testData.maxChunksLimit, emptyRelabelConfig, allowAllFilterConf)
579+
testutil.Ok(t, s.store.SyncBlocks(ctx))
580+
581+
req := &storepb.SeriesRequest{
582+
Matchers: []storepb.LabelMatcher{
583+
{Type: storepb.LabelMatcher_EQ, Name: "a", Value: "1"},
584+
},
585+
MinTime: minTimeDuration.PrometheusTimestamp(),
586+
MaxTime: maxTimeDuration.PrometheusTimestamp(),
587+
}
588+
589+
s.cache.SwapWith(noopCache{})
590+
srv := newStoreSeriesServer(ctx)
591+
err = s.store.Series(req, srv)
592+
593+
if testData.expectedErr == "" {
594+
testutil.Ok(t, err)
595+
} else {
596+
testutil.NotOk(t, err)
597+
testutil.Assert(t, strings.Contains(err.Error(), testData.expectedErr))
598+
}
599+
})
600+
}
601+
}

pkg/store/bucket_test.go

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -574,7 +574,7 @@ func TestBucketStore_Info(t *testing.T) {
574574
noopCache{},
575575
nil,
576576
2e5,
577-
0,
577+
NewChunksLimiterFactory(0),
578578
false,
579579
20,
580580
allowAllFilterConf,
@@ -823,7 +823,7 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul
823823
noopCache{},
824824
nil,
825825
0,
826-
0,
826+
NewChunksLimiterFactory(0),
827827
false,
828828
20,
829829
allowAllFilterConf,
@@ -1248,8 +1248,8 @@ func benchBucketSeries(t testutil.TB, samplesPerSeries, totalSeries int, request
12481248
blockSets: map[uint64]*bucketBlockSet{
12491249
labels.Labels{{Name: "ext1", Value: "1"}}.Hash(): {blocks: [][]*bucketBlock{blocks}},
12501250
},
1251-
queryGate: noopGate{},
1252-
samplesLimiter: noopLimiter{},
1251+
queryGate: noopGate{},
1252+
chunksLimiterFactory: NewChunksLimiterFactory(0),
12531253
}
12541254

12551255
for _, block := range blocks {
@@ -1330,10 +1330,6 @@ type noopGate struct{}
13301330
func (noopGate) Start(context.Context) error { return nil }
13311331
func (noopGate) Done() {}
13321332

1333-
type noopLimiter struct{}
1334-
1335-
func (noopLimiter) Check(uint64) error { return nil }
1336-
13371333
// Regression test against: https://github.com/thanos-io/thanos/issues/2147.
13381334
func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
13391335
tmpDir, err := ioutil.TempDir("", "segfault-series")
@@ -1456,8 +1452,8 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
14561452
b1.meta.ULID: b1,
14571453
b2.meta.ULID: b2,
14581454
},
1459-
queryGate: noopGate{},
1460-
samplesLimiter: noopLimiter{},
1455+
queryGate: noopGate{},
1456+
chunksLimiterFactory: NewChunksLimiterFactory(0),
14611457
}
14621458

14631459
t.Run("invoke series for one block. Fill the cache on the way.", func(t *testing.T) {
@@ -1571,7 +1567,7 @@ func TestSeries_RequestAndResponseHints(t *testing.T) {
15711567
indexCache,
15721568
nil,
15731569
1000000,
1574-
10000,
1570+
NewChunksLimiterFactory(10000/MaxSamplesPerChunk),
15751571
false,
15761572
10,
15771573
nil,
@@ -1680,7 +1676,7 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) {
16801676
indexCache,
16811677
nil,
16821678
1000000,
1683-
10000,
1679+
NewChunksLimiterFactory(10000/MaxSamplesPerChunk),
16841680
false,
16851681
10,
16861682
nil,

0 commit comments

Comments
 (0)