Skip to content

Commit 7d2ea2b

Browse files
hczhu-dbHC Zhu
authored andcommitted
Limit lazyRespSet memory buffer size using a ring buffer
Signed-off-by: HC Zhu <hczhu.mtv@gmail.com>
1 parent 4ad4594 commit 7d2ea2b

File tree

8 files changed

+129
-24
lines changed

8 files changed

+129
-24
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
3535
- [#8211](https://github.com/thanos-io/thanos/pull/8211) Query: fix panic on nested partial response in distributed instant query
3636
- [#8216](https://github.com/thanos-io/thanos/pull/8216) Query/Receive: fix iter race between `next()` and `stop()` introduced in https://github.com/thanos-io/thanos/pull/7821.
3737
- [#8212](https://github.com/thanos-io/thanos/pull/8212) Receive: Ensure forward/replication metrics are incremented in err cases
38+
- [#8296](https://github.com/thanos-io/thanos/pull/8296) Query: limit LazyRetrieval memory buffer size
3839

3940
## [v0.38.0](https://github.com/thanos-io/thanos/tree/release-0.38) - 03.04.2025
4041

cmd/thanos/query.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,9 @@ func registerQuery(app *extkingpin.App) {
202202

203203
strictEndpointGroups := extkingpin.Addrs(cmd.Flag("endpoint-group-strict", "(Deprecated, Experimental): DNS name of statically configured Thanos API server groups (repeatable) that are always used, even if the health check fails.").PlaceHolder("<endpoint-group-strict>"))
204204

205+
lazyRetrievalMaxBufferedResponses := cmd.Flag("query.lazy-retrieval-max-buffered-responses", "The lazy retrieval strategy can buffer up to this number of responses. This is to limit the memory usage. This flag takes effect only when the lazy retrieval strategy is enabled.").
206+
Default("20").Int()
207+
205208
var storeRateLimits store.SeriesSelectLimits
206209
storeRateLimits.RegisterFlags(cmd)
207210

@@ -345,6 +348,7 @@ func registerQuery(app *extkingpin.App) {
345348
*enforceTenancy,
346349
*tenantLabel,
347350
*queryDistributedWithOverlappingInterval,
351+
*lazyRetrievalMaxBufferedResponses,
348352
)
349353
})
350354
}
@@ -408,6 +412,7 @@ func runQuery(
408412
enforceTenancy bool,
409413
tenantLabel string,
410414
queryDistributedWithOverlappingInterval bool,
415+
lazyRetrievalMaxBufferedResponses int,
411416
) error {
412417
comp := component.Query
413418
if alertQueryURL == "" {
@@ -421,6 +426,7 @@ func runQuery(
421426
options := []store.ProxyStoreOption{
422427
store.WithTSDBSelector(tsdbSelector),
423428
store.WithProxyStoreDebugLogging(debugLogging),
429+
store.WithLazyRetrievalMaxBufferedResponsesForProxy(lazyRetrievalMaxBufferedResponses),
424430
}
425431

426432
// Parse and sanitize the provided replica labels flags.

cmd/thanos/receive.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,10 +354,14 @@ func runReceive(
354354
return errors.Wrap(err, "setup gRPC server")
355355
}
356356

357+
if conf.lazyRetrievalMaxBufferedResponses <= 0 {
358+
return errors.New("--receive.lazy-retrieval-max-buffered-responses must be > 0")
359+
}
357360
options := []store.ProxyStoreOption{
358361
store.WithProxyStoreDebugLogging(debugLogging),
359362
store.WithMatcherCache(cache),
360363
store.WithoutDedup(),
364+
store.WithLazyRetrievalMaxBufferedResponsesForProxy(conf.lazyRetrievalMaxBufferedResponses),
361365
}
362366

363367
proxy := store.NewProxyStore(
@@ -895,6 +899,8 @@ type receiveConfig struct {
895899

896900
matcherCacheSize int
897901

902+
lazyRetrievalMaxBufferedResponses int
903+
898904
featureList *[]string
899905

900906
headExpandedPostingsCacheSize uint64
@@ -1068,6 +1074,9 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
10681074
cmd.Flag("receive.otlp-promote-resource-attributes", "(Repeatable) Resource attributes to include in OTLP metrics ingested by Receive.").Default("").StringsVar(&rc.otlpResourceAttributes)
10691075

10701076
rc.featureList = cmd.Flag("enable-feature", "Comma separated experimental feature names to enable. The current list of features is "+metricNamesFilter+".").Default("").Strings()
1077+
1078+
cmd.Flag("receive.lazy-retrieval-max-buffered-responses", "The lazy retrieval strategy can buffer up to this number of responses. This is to limit the memory usage. This flag takes effect only when the lazy retrieval strategy is enabled.").
1079+
Default("20").IntVar(&rc.lazyRetrievalMaxBufferedResponses)
10711080
}
10721081

10731082
// determineMode returns the ReceiverMode that this receiver is configured to run in.

pkg/store/bucket.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,8 @@ type BucketStore struct {
446446
postingGroupMaxKeySeriesRatio float64
447447

448448
sortingStrategy sortingStrategy
449+
// This flag limits memory usage when lazy retrieval strategy, newLazyRespSet(), is used.
450+
lazyRetrievalMaxBufferedResponses int
449451

450452
blockEstimatedMaxSeriesFunc BlockEstimator
451453
blockEstimatedMaxChunkFunc BlockEstimator
@@ -610,6 +612,14 @@ func WithDontResort(true bool) BucketStoreOption {
610612
}
611613
}
612614

615+
func WithLazyRetrievalMaxBufferedResponsesForBucket(n int) BucketStoreOption {
616+
return func(s *BucketStore) {
617+
if true {
618+
s.lazyRetrievalMaxBufferedResponses = n
619+
}
620+
}
621+
}
622+
613623
// WithIndexHeaderLazyDownloadStrategy specifies what block to lazy download its index header.
614624
// Only used when lazy mmap is enabled at the same time.
615625
func WithIndexHeaderLazyDownloadStrategy(strategy indexheader.LazyDownloadIndexHeaderFunc) BucketStoreOption {
@@ -683,6 +693,8 @@ func NewBucketStore(
683693
indexHeaderLazyDownloadStrategy: indexheader.AlwaysEagerDownloadIndexHeader,
684694
requestLoggerFunc: NoopRequestLoggerFunc,
685695
blockLifecycleCallback: &noopBlockLifecycleCallback{},
696+
697+
lazyRetrievalMaxBufferedResponses: 1,
686698
}
687699

688700
for _, option := range options {
@@ -1718,6 +1730,11 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
17181730
nil,
17191731
)
17201732
} else {
1733+
lazyRetrievalMaxBufferedResponses := s.lazyRetrievalMaxBufferedResponses
1734+
if lazyRetrievalMaxBufferedResponses < 1 {
1735+
// Some unit and e2e tests hit this path.
1736+
lazyRetrievalMaxBufferedResponses = 1
1737+
}
17211738
resp = newLazyRespSet(
17221739
span,
17231740
10*time.Minute,
@@ -1728,6 +1745,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
17281745
shardMatcher,
17291746
false,
17301747
s.metrics.emptyPostingCount.WithLabelValues(tenant),
1748+
lazyRetrievalMaxBufferedResponses,
17311749
)
17321750
}
17331751

pkg/store/bucket_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1789,12 +1789,13 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
17891789
b1.meta.ULID: b1,
17901790
b2.meta.ULID: b2,
17911791
},
1792-
queryGate: gate.NewNoop(),
1793-
chunksLimiterFactory: NewChunksLimiterFactory(0),
1794-
seriesLimiterFactory: NewSeriesLimiterFactory(0),
1795-
bytesLimiterFactory: NewBytesLimiterFactory(0),
1796-
seriesBatchSize: SeriesBatchSize,
1797-
requestLoggerFunc: NoopRequestLoggerFunc,
1792+
queryGate: gate.NewNoop(),
1793+
chunksLimiterFactory: NewChunksLimiterFactory(0),
1794+
seriesLimiterFactory: NewSeriesLimiterFactory(0),
1795+
bytesLimiterFactory: NewBytesLimiterFactory(0),
1796+
seriesBatchSize: SeriesBatchSize,
1797+
requestLoggerFunc: NoopRequestLoggerFunc,
1798+
lazyRetrievalMaxBufferedResponses: 1,
17981799
}
17991800

18001801
t.Run("invoke series for one block. Fill the cache on the way.", func(t *testing.T) {

pkg/store/proxy.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ type ProxyStore struct {
9292
tsdbSelector *TSDBSelector
9393
matcherCache storecache.MatchersCache
9494
enableDedup bool
95+
96+
lazyRetrievalMaxBufferedResponses int
9597
}
9698

9799
type proxyStoreMetrics struct {
@@ -118,6 +120,12 @@ func RegisterStoreServer(storeSrv storepb.StoreServer, logger log.Logger) func(*
118120
// ProxyStoreOption are functions that configure the ProxyStore.
119121
type ProxyStoreOption func(s *ProxyStore)
120122

123+
func WithLazyRetrievalMaxBufferedResponsesForProxy(buferSize int) ProxyStoreOption {
124+
return func(s *ProxyStore) {
125+
s.lazyRetrievalMaxBufferedResponses = buferSize
126+
}
127+
}
128+
121129
// WithProxyStoreDebugLogging toggles debug logging.
122130
func WithProxyStoreDebugLogging(enable bool) ProxyStoreOption {
123131
return func(s *ProxyStore) {
@@ -309,7 +317,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.
309317
for _, st := range stores {
310318
st := st
311319

312-
respSet, err := newAsyncRespSet(ctx, st, r, s.responseTimeout, s.retrievalStrategy, &s.buffers, r.ShardInfo, reqLogger, s.metrics.emptyStreamResponses)
320+
respSet, err := newAsyncRespSet(ctx, st, r, s.responseTimeout, s.retrievalStrategy, &s.buffers, r.ShardInfo, reqLogger, s.metrics.emptyStreamResponses, s.lazyRetrievalMaxBufferedResponses)
313321
if err != nil {
314322
level.Error(reqLogger).Log("err", err)
315323

pkg/store/proxy_merge.go

Lines changed: 74 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88
"fmt"
99
"io"
10+
"math"
1011
"sort"
1112
"sync"
1213
"time"
@@ -227,8 +228,20 @@ type lazyRespSet struct {
227228
frameTimeout time.Duration
228229

229230
// Internal bookkeeping.
230-
dataOrFinishEvent *sync.Cond
231-
bufferedResponses []*storepb.SeriesResponse
231+
dataOrFinishEvent *sync.Cond
232+
// This event firing means the buffer has a slot for more data.
233+
bufferSlotEvent *sync.Cond
234+
fixedBufferSize int
235+
// This a ring buffer of size fixedBufferSize.
236+
// A ring buffer of size N can hold N - 1 elements at most in order to distinguish being empty from being full.
237+
bufferedResponses []*storepb.SeriesResponse
238+
// ringHead points to the first element in the ring buffer.
239+
// ringTail points to the slot after the last element in the ring buffer.
240+
// if ringHead == ringTail then the buffer is empty.
241+
// if ringHead == (ringTail + 1) % fixedBufferSize then the buffer is full.
242+
ringHead int
243+
ringTail int
244+
closed bool
232245
bufferedResponsesMtx *sync.Mutex
233246
lastResp *storepb.SeriesResponse
234247

@@ -238,24 +251,32 @@ type lazyRespSet struct {
238251
shardMatcher *storepb.ShardMatcher
239252
}
240253

254+
func (l *lazyRespSet) isEmpty() bool {
255+
return l.ringHead == l.ringTail
256+
}
257+
258+
func (l *lazyRespSet) isFull() bool {
259+
return (l.ringTail+1)%l.fixedBufferSize == l.ringHead
260+
}
261+
241262
func (l *lazyRespSet) Empty() bool {
242263
l.bufferedResponsesMtx.Lock()
243264
defer l.bufferedResponsesMtx.Unlock()
244265

245266
// NOTE(GiedriusS): need to wait here for at least one
246267
// response so that we could build the heap properly.
247-
if l.noMoreData && len(l.bufferedResponses) == 0 {
268+
if l.noMoreData && l.isEmpty() {
248269
return true
249270
}
250271

251-
for len(l.bufferedResponses) == 0 {
272+
for l.isEmpty() {
252273
l.dataOrFinishEvent.Wait()
253-
if l.noMoreData && len(l.bufferedResponses) == 0 {
274+
if l.noMoreData && l.isEmpty() {
254275
break
255276
}
256277
}
257278

258-
return len(l.bufferedResponses) == 0 && l.noMoreData
279+
return l.isEmpty() && l.noMoreData
259280
}
260281

261282
// Next either blocks until more data is available or reads
@@ -267,23 +288,24 @@ func (l *lazyRespSet) Next() bool {
267288

268289
l.initialized = true
269290

270-
if l.noMoreData && len(l.bufferedResponses) == 0 {
291+
if l.noMoreData && l.isEmpty() {
271292
l.lastResp = nil
272293

273294
return false
274295
}
275296

276-
for len(l.bufferedResponses) == 0 {
297+
for l.isEmpty() {
277298
l.dataOrFinishEvent.Wait()
278-
if l.noMoreData && len(l.bufferedResponses) == 0 {
299+
if l.noMoreData && l.isEmpty() {
279300
break
280301
}
281302
}
282303

283-
if len(l.bufferedResponses) > 0 {
284-
l.lastResp = l.bufferedResponses[0]
304+
if !l.isEmpty() {
305+
l.lastResp = l.bufferedResponses[l.ringHead]
285306
if l.initialized {
286-
l.bufferedResponses = l.bufferedResponses[1:]
307+
l.ringHead = (l.ringHead + 1) % l.fixedBufferSize
308+
l.bufferSlotEvent.Signal()
287309
}
288310
return true
289311
}
@@ -310,8 +332,12 @@ func newLazyRespSet(
310332
shardMatcher *storepb.ShardMatcher,
311333
applySharding bool,
312334
emptyStreamResponses prometheus.Counter,
335+
fixedBufferSize int,
313336
) respSet {
314-
bufferedResponses := []*storepb.SeriesResponse{}
337+
// A ring buffer of size N can hold N - 1 elements at most in order to distinguish being empty from being full.
338+
// That's why the size is increased by 1 internally.
339+
fixedBufferSize++
340+
bufferedResponses := make([]*storepb.SeriesResponse, fixedBufferSize)
315341
bufferedResponsesMtx := &sync.Mutex{}
316342
dataAvailable := sync.NewCond(bufferedResponsesMtx)
317343

@@ -323,9 +349,14 @@ func newLazyRespSet(
323349
closeSeries: closeSeries,
324350
span: span,
325351
dataOrFinishEvent: dataAvailable,
352+
bufferSlotEvent: sync.NewCond(bufferedResponsesMtx),
326353
bufferedResponsesMtx: bufferedResponsesMtx,
327354
bufferedResponses: bufferedResponses,
328355
shardMatcher: shardMatcher,
356+
fixedBufferSize: fixedBufferSize,
357+
ringHead: 0,
358+
ringTail: 0,
359+
closed: false,
329360
}
330361
respSet.storeLabels = make(map[string]struct{})
331362
for _, ls := range storeLabelSets {
@@ -378,16 +409,26 @@ func newLazyRespSet(
378409
} else {
379410
rerr = errors.Wrapf(err, "receive series from %s", st)
380411
}
381-
382412
l.span.SetTag("err", rerr.Error())
383413

384414
l.bufferedResponsesMtx.Lock()
385-
l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(rerr))
415+
for l.isFull() && !l.closed {
416+
l.bufferSlotEvent.Wait()
417+
}
418+
if !l.closed {
419+
l.bufferedResponses[l.ringTail] = storepb.NewWarnSeriesResponse(rerr)
420+
l.ringTail = (l.ringTail + 1) % l.fixedBufferSize
421+
}
386422
l.noMoreData = true
387423
l.dataOrFinishEvent.Signal()
388424
l.bufferedResponsesMtx.Unlock()
389425
return false
390426
}
427+
if t != nil {
428+
// frameTimeout only applies to cl.Recv() gRPC call because the goroutine may be blocked on waiting for an empty buffer slot.
429+
// Set the timeout to the largest possible value to void triggering it.
430+
t.Reset(time.Duration(math.MaxInt64))
431+
}
391432

392433
numResponses++
393434
bytesProcessed += resp.Size()
@@ -401,8 +442,14 @@ func newLazyRespSet(
401442
}
402443

403444
l.bufferedResponsesMtx.Lock()
404-
l.bufferedResponses = append(l.bufferedResponses, resp)
405-
l.dataOrFinishEvent.Signal()
445+
for l.isFull() && !l.closed {
446+
l.bufferSlotEvent.Wait()
447+
}
448+
if !l.closed {
449+
l.bufferedResponses[l.ringTail] = resp
450+
l.ringTail = (l.ringTail + 1) % l.fixedBufferSize
451+
l.dataOrFinishEvent.Signal()
452+
}
406453
l.bufferedResponsesMtx.Unlock()
407454
return true
408455
}
@@ -446,6 +493,7 @@ func newAsyncRespSet(
446493
shardInfo *storepb.ShardInfo,
447494
logger log.Logger,
448495
emptyStreamResponses prometheus.Counter,
496+
lazyRetrievalMaxBufferedResponses int,
449497
) (respSet, error) {
450498

451499
var (
@@ -496,6 +544,12 @@ func newAsyncRespSet(
496544

497545
switch retrievalStrategy {
498546
case LazyRetrieval:
547+
span.SetTag("retrival_strategy", LazyRetrieval)
548+
if lazyRetrievalMaxBufferedResponses < 1 {
549+
// Some unit and e2e tests hit this path.
550+
lazyRetrievalMaxBufferedResponses = 1
551+
}
552+
499553
return newLazyRespSet(
500554
span,
501555
frameTimeout,
@@ -506,6 +560,7 @@ func newAsyncRespSet(
506560
shardMatcher,
507561
applySharding,
508562
emptyStreamResponses,
563+
lazyRetrievalMaxBufferedResponses,
509564
), nil
510565
case EagerRetrieval:
511566
return newEagerRespSet(
@@ -530,6 +585,8 @@ func (l *lazyRespSet) Close() {
530585
defer l.bufferedResponsesMtx.Unlock()
531586

532587
l.closeSeries()
588+
l.closed = true
589+
l.bufferSlotEvent.Signal()
533590
l.noMoreData = true
534591
l.dataOrFinishEvent.Signal()
535592

0 commit comments

Comments
 (0)